Class: Concurrent::ProcessingActor
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::ProcessingActor
- Defined in:
- lib-edge/concurrent/edge/processing_actor.rb
Overview
Edge Features are under active development and may change frequently.
- Deprecations are not added before incompatible changes.
- Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
- Edge features may also lack tests and documentation.
- Features developed in
concurrent-ruby-edge
are expected to move toconcurrent-ruby
when finalised.
A new implementation of actor which also simulates the process, therefore it can be used in the same way as Erlang's actors but without occupying thread. A tens of thousands ProcessingActors can run at the same time sharing a thread pool.
Class Method Summary collapse
-
.act(*args, &process) ⇒ ProcessingActor
Creates an actor.
-
.act_listening(channel, *args) {|actor, *args| ... } ⇒ ProcessingActor
Creates an actor listening to a specified channel (mailbox).
Instance Method Summary collapse
-
#ask_op(answer = Promises.resolvable_future, &message_provider) ⇒ undocumented
actor.ask2 { |a| [:count, a] }.
-
#mailbox ⇒ Promises::Channel
Actor's mailbox.
-
#receive(channel = mailbox) ⇒ undocumented
Receives a message when available, used in the actor's process.
-
#tell!(message) ⇒ self
Tells a message to the actor.
-
#tell_op(message) ⇒ Promises::Future(ProcessingActor)
Tells a message to the actor.
-
#termination ⇒ Promises::Future(Object)
A future which is resolved when the actor ends its processing.
- #to_ary ⇒ undocumented
-
#to_s ⇒ String
(also: #inspect)
String representation.
Class Method Details
.act(*args, &process) ⇒ ProcessingActor
Creates an actor.
50 51 52 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 50 def self.act(*args, &process) act_listening Promises::Channel.new, *args, &process end |
.act_listening(channel, *args) {|actor, *args| ... } ⇒ ProcessingActor
Creates an actor listening to a specified channel (mailbox).
63 64 65 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 63 def self.act_listening(channel, *args, &process) ProcessingActor.new channel, *args, &process end |
Instance Method Details
#ask_op(answer = Promises.resolvable_future, &message_provider) ⇒ undocumented
actor.ask2 { |a| [:count, a] }
153 154 155 156 157 158 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 153 def ask_op(answer = Promises.resolvable_future, &) # TODO (pitr-ch 12-Dec-2018): is it ok to let the answers be unanswered when the actor terminates tell_op(.call(answer)).then(answer) { |_, a| a } # answer.chain { |v| [true, v] } | @Terminated.then end |
#mailbox ⇒ Promises::Channel
Returns actor's mailbox.
25 26 27 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 25 def mailbox @Mailbox end |
#receive(channel = mailbox) ⇒ undocumented
Receives a message when available, used in the actor's process.
@return [Promises::Future(Object)] a future which will be fulfilled with a message from
mailbox when it is available.
def receive(*channels) channels = [@Mailbox] if channels.empty? Promises::Channel.select(*channels) # TODO (pitr-ch 27-Dec-2016): support patterns # - put any received message aside if it does not match # - on each receive call check the messages put aside # - track where the message came from, cannot later receive m form other channel only because it matches end
79 80 81 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 79 def receive(channel = mailbox) channel.pop_op end |
#tell!(message) ⇒ self
Tells a message to the actor. May block current thread if the mailbox is full. #tell_op is a better option since it does not block. It's usually used to integrate with threading code.
95 96 97 98 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 95 def tell!() @Mailbox.push() self end |
#tell_op(message) ⇒ Promises::Future(ProcessingActor)
Tells a message to the actor.
104 105 106 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 104 def tell_op() @Mailbox.push_op().then(self) { |_ch, actor| actor } end |
#termination ⇒ Promises::Future(Object)
Returns a future which is resolved when the actor ends its processing. It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.
32 33 34 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 32 def termination @Terminated.with_hidden_resolvable end |
#to_ary ⇒ undocumented
167 168 169 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 167 def to_ary [@Mailbox, @Terminated] end |
#to_s ⇒ String Also known as: inspect
Returns string representation.
161 162 163 |
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 161 def to_s format '%s termination: %s>', super[0..-2], termination.state end |