Class: Concurrent::ProcessingActor

Inherits:
Synchronization::Object show all
Defined in:
lib-edge/concurrent/edge/processing_actor.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

A new implementation of actor which also simulates the process, therefore it can be used in the same way as Erlang's actors but without occupying thread. A tens of thousands ProcessingActors can run at the same time sharing a thread pool.

Examples:

# Runs on a pool, does not consume 50_000 threads
actors = 50_000.times.map do |i|
  Concurrent::ProcessingActor.act(i) { |a, i| a.receive.then_on(:fast, i) { |m, i| m + i } }
end

actors.each { |a| a.tell 1 }
values = actors.map(&:termination).map(&:value)
values[0,5]                                        # => [1, 2, 3, 4, 5]
values[-5, 5]                                      # => [49996, 49997, 49998, 49999, 50000]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.act(*args, &process) ⇒ ProcessingActor

Creates an actor.

Examples:

actor = Concurrent::ProcessingActor.act do |actor|
  actor.receive.then do |message|
    # the actor ends normally with message
    message
  end
end

actor.tell :a_message
    # => <#Concurrent::ProcessingActor:0x7fff11280560 termination:pending>
actor.termination.value! # => :a_message

Returns:

See Also:

  • Behaves the same way, but does not take mailbox as a first argument.


50
51
52
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 50

def self.act(*args, &process)
  act_listening Promises::Channel.new, *args, &process
end

.act_listening(channel, *args) {|actor, *args| ... } ⇒ ProcessingActor

Creates an actor listening to a specified channel (mailbox).

Parameters:

  • args (Object)

    Arguments passed to the process.

  • channel (Promises::Channel)

    which serves as mailing box. The channel can have limited size to achieve backpressure.

Yields:

  • (actor, *args)

    to the process to get back a future which represents the actors execution.

Yield Parameters:

Yield Returns:

Returns:



63
64
65
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 63

def self.act_listening(channel, *args, &process)
  ProcessingActor.new channel, *args, &process
end

Instance Method Details

#ask_op(answer = Promises.resolvable_future, &message_provider) ⇒ undocumented

actor.ask2 { |a| [:count, a] }



153
154
155
156
157
158
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 153

def ask_op(answer = Promises.resolvable_future, &message_provider)
  # TODO (pitr-ch 12-Dec-2018): is it ok to let the answers be unanswered when the actor terminates
  tell_op(message_provider.call(answer)).then(answer) { |_, a| a }

  # answer.chain { |v| [true, v] } | @Terminated.then
end

#mailboxPromises::Channel

Returns actor's mailbox.

Returns:



25
26
27
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 25

def mailbox
  @Mailbox
end

#receive(channel = mailbox) ⇒ undocumented

Receives a message when available, used in the actor's process.

@return [Promises::Future(Object)] a future which will be fulfilled with a message from

mailbox when it is available.

def receive(*channels) channels = [@Mailbox] if channels.empty? Promises::Channel.select(*channels) # TODO (pitr-ch 27-Dec-2016): support patterns # - put any received message aside if it does not match # - on each receive call check the messages put aside # - track where the message came from, cannot later receive m form other channel only because it matches end



79
80
81
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 79

def receive(channel = mailbox)
  channel.pop_op
end

#tell!(message) ⇒ self

Tells a message to the actor. May block current thread if the mailbox is full. #tell_op is a better option since it does not block. It's usually used to integrate with threading code.

Examples:

Thread.new(actor) do |actor|
  # ...
  actor.tell! :a_message # blocks until the message is told
  #   (there is a space for it in the channel)
  # ...
end

Parameters:

  • message (Object)

Returns:

  • (self)


95
96
97
98
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 95

def tell!(message)
  @Mailbox.push(message)
  self
end

#tell_op(message) ⇒ Promises::Future(ProcessingActor)

Tells a message to the actor.

Parameters:

  • message (Object)

Returns:



104
105
106
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 104

def tell_op(message)
  @Mailbox.push_op(message).then(self) { |_ch, actor| actor }
end

#terminationPromises::Future(Object)

Returns a future which is resolved when the actor ends its processing. It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.

Returns:

  • (Promises::Future(Object))

    a future which is resolved when the actor ends its processing. It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.



32
33
34
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 32

def termination
  @Terminated.with_hidden_resolvable
end

#to_aryundocumented



167
168
169
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 167

def to_ary
  [@Mailbox, @Terminated]
end

#to_sString Also known as: inspect

Returns string representation.

Returns:

  • (String)

    string representation.



161
162
163
# File 'lib-edge/concurrent/edge/processing_actor.rb', line 161

def to_s
  format '%s termination: %s>', super[0..-2], termination.state
end