Class: Concurrent::ErlangActor::Environment

Inherits:
Synchronization::Object
  • Object
show all
Defined in:
lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb

Overview

A class providing environment and methods for actor bodies to run in.

Instance Method Summary collapse

Instance Method Details

#default_executorExecutorService

Returns a default executor which is picked by spawn call.

Returns:

  • (ExecutorService)

    a default executor which is picked by spawn call



465
466
467
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 465

def default_executor
  @DefaultExecutor
end

#demonitor(reference, *options) ⇒ true, false

If MonitorRef is a reference which the calling actor obtained by calling #monitor, this monitoring is turned off. If the monitoring is already turned off, nothing happens.

Once demonitor has returned it is guaranteed that no DownSignal message due to the monitor will be placed in the caller's message queue in the future. A DownSignal message might have been placed in the caller's message queue prior to the call, though. Therefore, in most cases, it is advisable to remove such a 'DOWN' message from the message queue after monitoring has been stopped. demonitor(reference, :flush) can be used if this cleanup is wanted.

The behavior of this method can be viewed as two combined operations: asynchronously send a "demonitor signal" to the monitored actor and ignore any future results of the monitor.

Failure: It is an error if reference refers to a monitoring started by another actor. In that case it may raise an ArgumentError or go unnoticed.

Options:

  • :flush - Remove (one) DownSignal message, if there is one, from the caller's message queue after monitoring has been stopped. Calling demonitor(pid, :flush) is equivalent to the following, but more efficient:

    demonitor(pid)
    receive on(And[DownSignal, -> d { d.reference == reference}], true), timeout: 0, timeout_value: true
    
  • info The returned value is one of the following:

    • true - The monitor was found and removed. In this case no DownSignal message due to this monitor have been nor will be placed in the message queue of the caller.
    • false - The monitor was not found and could not be removed. This probably because someone already has placed a DownSignal message corresponding to this monitor in the caller's message queue.

    If the info option is combined with the flush option, false will be returned if a flush was needed; otherwise, true.

Parameters:

  • reference (Reference)
  • options (:flush, :info)

Returns:

  • (true, false)


332
333
334
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 332

def demonitor(reference, *options)
  @Actor.demonitor(reference, *options)
end

Creates a link between the calling actor and another actor, if there is not such a link already. If a actor attempts to create a link to itself, nothing is done. Returns true.

If pid does not exist, the behavior of the method depends on if the calling actor is trapping exits or not (see #trap):

  • If the calling actor is not trapping exits link raises with NoActor.
  • Otherwise, if the calling actor is trapping exits, link returns true, but an exit signal with reason noproc is sent to the calling actor.

Returns:

  • (true)

Raises:

See Also:



236
237
238
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 236

def link(pid)
  @Actor.link(pid)
end

#monitor(pid) ⇒ Reference

The calling actor starts monitoring actor with given pid.

A DownSignal message will be sent to the monitoring actor if the actor with given pid dies, or if the actor with given pid does not exist.

The monitoring is turned off either when the DownSignal message is sent, or when #demonitor is called.

Making several calls to monitor for the same pid is not an error; it results in as many, completely independent, monitorings.

Returns:



285
286
287
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 285

def monitor(pid)
  @Actor.monitor(pid)
end

#name#to_s

Returns the name od the actor if provided to spawn method.

Returns:

  • (#to_s)

    the name od the actor if provided to spawn method



154
155
156
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 154

def name
  pid.name
end

#on(matcher, value = nil, &block) ⇒ undocumented

Helper for constructing a #receive rules

Examples:

receive on(Numeric) { |v| v.succ },
        on(ANY) { terminate :bad_message }

See Also:



185
186
187
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 185

def on(matcher, value = nil, &block)
  @Actor.on matcher, value, &block
end

#pidPid

Returns the pid of this actor.

Returns:

  • (Pid)

    the pid of this actor



149
150
151
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 149

def pid
  @Actor.pid
end

#receive(*rules, timeout: nil, timeout_value: nil, **options) {|message| ... } ⇒ Object, nothing

Receive a message.

Parameters:

  • rules (::Array(), ::Array(#===), ::Array<::Array(#===, Proc)>)
    • No rule - receive, receive {|m| m.to_s}
    • or single rule which can be combined with the supplied block - receive(Numeric), receive(Numeric) {|v| v.succ}
    • or array of matcher-proc pairs - receive on(Numeric) { |v| v*2 }, on(Symbol) { |c| do_command c }
  • timeout (Numeric) (defaults to: nil)

    how long it should wait for the message

  • timeout_value (Object) (defaults to: nil)

    if rule on(TIMEOUT) { do_something } is not specified then timeout_value is returned.

  • options (Hash)

    other options specific by type of the actor

Options Hash (**options):

  • :keep (true, false)

    Keep the rules and repeatedly call the associated blocks, until receive is called again.

Yields:

  • (message)

    block to process the message if single matcher is supplied

Yield Parameters:

  • message (Object)

    the received message

Returns:

  • (Object, nothing)

    depends on type of the actor. On thread it blocks until message is available then it returns the message (or a result of a called block). On pool it stops executing and continues with a given block when message becomes available.

See Also:



218
219
220
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 218

def receive(*rules, timeout: nil, timeout_value: nil, **options, &block)
  @Actor.receive(*rules, timeout: timeout, timeout_value: timeout_value, **options, &block)
end

#reply(value) ⇒ true, false

Shortcut for fulfilling the reply, same as reply_resolution true, value, nil.

Examples:

actor = Concurrent::ErlangActor.spawn(:on_thread) { reply receive * 2 }
actor.ask 2 #=> 4

Parameters:

  • value (Object)

Returns:

  • (true, false)

    did the sender ask, and was it resolved



405
406
407
408
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 405

def reply(value)
  # TODO (pitr-ch 08-Feb-2019): consider adding reply? which returns true,false if success, reply method will always return value
  reply_resolution true, value, nil
end

#reply_resolution(fulfilled = true, value = nil, reason = nil) ⇒ true, false

Reply to the sender of the message currently being processed if the actor was asked instead of told. The reply is stored in a Promises::ResolvableFuture so the arguments are same as for Promises::ResolvableFuture#resolve method.

The reply may timeout, then this will fail with false.

Examples:

actor = Concurrent::ErlangActor.spawn(:on_thread) { reply_resolution true, receive * 2, nil }
actor.ask 2 #=> 4

Parameters:

  • fulfilled (true, false) (defaults to: true)
  • value (Object) (defaults to: nil)
  • reason (Object) (defaults to: nil)

Returns:

  • (true, false)

    did the sender ask, and was it resolved before it timed out?



426
427
428
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 426

def reply_resolution(fulfilled = true, value = nil, reason = nil)
  @Actor.reply_resolution(fulfilled, value, reason)
end

#spawn(*args, type: @Actor.class, channel: Promises::Channel.new, environment: Environment, name: nil, executor: default_executor, link: false, monitor: false) {|*args| ... } ⇒ Pid, ::Array(Pid, Reference)

Creates an actor.

Parameters:

  • args (Object)

    arguments for the actor body

  • type (:on_thread, :on_pool) (defaults to: @Actor.class)

    of the actor to be created.

  • channel (Channel) (defaults to: Promises::Channel.new)

    The mailbox of the actor, by default it has unlimited capacity. Crating the actor with a bounded queue is useful to create backpressure. The channel can be shared with other abstractions but actor has to be the only consumer otherwise internal signals could be lost.

  • environment (Environment, Module) (defaults to: Environment)

    A class which is used to run the body of the actor in. It can either be a child of Concurrent::ErlangActor::Environment or a module. Module is extended to a new instance of environment, therefore if there is many actors with this module it is better to create a class and use it instead.

  • name (#to_s) (defaults to: nil)

    of the actor. Available by Pid#name or #name and part of Pid#to_s.

  • link (true, false) (defaults to: false)

    the created actor is atomically created and linked with the calling actor

  • monitor (true, false) (defaults to: false)

    the created actor is atomically created and monitored by the calling actor

  • executor (ExecutorService) (defaults to: default_executor)

    The executor service to use to execute the actor on. Applies only to :on_pool actor type.

Yields:

  • (*args)

    the body of the actor. When actor is spawned this block is evaluated until it terminates. The on-thread actor requires a block. The on-poll actor has a default -> { start }, therefore if not block is given it executes a #start method which needs to be provided with environment.

Returns:

  • (Pid, ::Array(Pid, Reference))

    a pid or a pid-reference pair when monitor is true

See Also:



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 378

def spawn(*args,
          type: @Actor.class,
          channel: Promises::Channel.new,
          environment: Environment,
          name: nil,
          executor: default_executor,
          link: false,
          monitor: false,
          &body)

  @Actor.spawn(*args,
               type:        type,
               channel:     channel,
               environment: environment,
               name:        name,
               executor:    executor,
               link:        link,
               monitor:     monitor,
               &body)
end

#terminate(pid = nil, reason, value: nil) ⇒ nothing

If pid is not provided stops the execution of the calling actor with the exit reason.

If pid is provided, it sends an exit signal with exit reason to the actor identified by pid.

The following behavior apply if reason is any object except :normal or :kill. If pid is not trapping exits, pid itself will exit with exit reason. If pid is trapping exits, the exit signal is transformed into a message Terminated and delivered to the message queue of pid.

If reason is the Symbol :normal, pid will not exit. If it is trapping exits, the exit signal is transformed into a message Terminated and delivered to its message queue.

If reason is the Symbol :kill, that is if exit(pid, :kill) is called, an untrappable exit signal is sent to pid which will unconditionally exit with exit reason :killed.

Since evaluating this function causes the process to terminate, it has no return value.

Parameters:

  • pid (Pid) (defaults to: nil)
  • reason (Object, :normal, :kill)
  • value (Object) (defaults to: nil)

Returns:

  • (nothing)

See Also:



460
461
462
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 460

def terminate(pid = nil, reason, value: nil)
  @Actor.terminate pid, reason, value: value
end

#terminatedPromises::Future

Returns a future which is resolved with the final result of the actor that is either the reason for termination or a value if terminated normally.

Returns:

  • (Promises::Future)

    a future which is resolved with the final result of the actor that is either the reason for termination or a value if terminated normally.



144
145
146
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 144

def terminated
  @Actor.terminated
end

#trap(value = true) ⇒ true, false

When trap is set to true, exit signals arriving to a actor are converted to Terminated messages, which can be received as ordinary messages. If trap is set to false, the actor exits if it receives an exit signal other than normal and the exit signal is propagated to its linked actors. Application actors should normally not trap exits.

Parameters:

  • value (true, false) (defaults to: true)

Returns:

  • (true, false)

    the old value of the flag

See Also:



176
177
178
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 176

def trap(value = true)
  @Actor.trap(value)
end

#traps?true, false

Returns does this actor trap exit messages?.

Returns:

  • (true, false)

    does this actor trap exit messages?

See Also:



160
161
162
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 160

def traps?
  @Actor.traps?
end

Removes the link, if there is one, between the calling actor and the actor referred to by pid.

Returns true and does not fail, even if there is no link to Id, or if Id does not exist.

Once unlink(pid) has returned it is guaranteed that the link between the caller and the actor referred to by pid has no effect on the caller in the future (unless the link is setup again). If caller is trapping exits, an Terminated message due to the link might have been placed in the caller's message queue prior to the call, though.

Note, the Terminated message can be the result of the link, but can also be the result of calling #terminate method externally. Therefore, it may be appropriate to cleanup the message queue when trapping exits after the call to unlink, as follow:

receive on(And[Terminated, -> e { e.pid == pid }], true), timeout: 0

Returns:

  • (true)


262
263
264
# File 'lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb', line 262

def unlink(pid)
  @Actor.unlink(pid)
end