Module: Concurrent

Defined in:
lib/concurrent-ruby/concurrent.rb,
lib/concurrent-ruby/concurrent/map.rb,
lib/concurrent-ruby/concurrent/set.rb,
lib/concurrent-ruby/concurrent/atom.rb,
lib/concurrent-ruby/concurrent/hash.rb,
lib/concurrent-ruby/concurrent/ivar.rb,
lib/concurrent-ruby/concurrent/mvar.rb,
lib/concurrent-ruby/concurrent/tvar.rb,
lib/concurrent-ruby/concurrent/agent.rb,
lib/concurrent-ruby/concurrent/array.rb,
lib/concurrent-ruby/concurrent/async.rb,
lib/concurrent-ruby/concurrent/delay.rb,
lib/concurrent-ruby/concurrent/maybe.rb,
lib/concurrent-ruby/concurrent/tuple.rb,
lib/concurrent-ruby/concurrent/errors.rb,
lib/concurrent-ruby/concurrent/future.rb,
lib/concurrent-ruby/concurrent/options.rb,
lib/concurrent-ruby/concurrent/promise.rb,
lib/concurrent-ruby/concurrent/version.rb,
lib/concurrent-ruby/concurrent/dataflow.rb,
lib/concurrent-ruby/concurrent/promises.rb,
lib/concurrent-ruby/concurrent/constants.rb,
lib/concurrent-ruby/concurrent/exchanger.rb,
lib/concurrent-ruby/concurrent/re_include.rb,
lib/concurrent-ruby/concurrent/timer_task.rb,
lib/concurrent-ruby/concurrent/atomic/event.rb,
lib/concurrent-ruby/concurrent/configuration.rb,
lib/concurrent-ruby/concurrent/mutable_struct.rb,
lib/concurrent-ruby/concurrent/scheduled_task.rb,
lib/concurrent-ruby/concurrent/utility/engine.rb,
lib/concurrent-ruby/concurrent/concern/logging.rb,
lib/concurrent-ruby/concurrent/settable_struct.rb,
lib/concurrent-ruby/concurrent/synchronization.rb,
lib/concurrent-ruby/concurrent/atomic/semaphore.rb,
lib/concurrent-ruby/concurrent/immutable_struct.rb,
lib/concurrent-ruby/concurrent/thread_safe/util.rb,
lib/concurrent-ruby/concurrent/concern/obligation.rb,
lib/concurrent-ruby/concurrent/concern/observable.rb,
lib/concurrent-ruby/concurrent/executor/timer_set.rb,
lib/concurrent-ruby/concurrent/concern/deprecation.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_fixnum.rb,
lib/concurrent-ruby/concurrent/synchronization/lock.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_boolean.rb,
lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_semaphore.rb,
lib/concurrent-ruby/concurrent/atomic/read_write_lock.rb,
lib/concurrent-ruby/concurrent/synchronization/object.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/adder.rb,
lib/concurrent-ruby/concurrent/utility/monotonic_time.rb,
lib/concurrent-ruby/concurrent/utility/native_integer.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_reference.rb,
lib/concurrent-ruby/concurrent/atomic/count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb,
lib/concurrent-ruby/concurrent/concern/dereferenceable.rb,
lib/concurrent-ruby/concurrent/synchronization/volatile.rb,
lib/concurrent-ruby/concurrent/executor/executor_service.rb,
lib/concurrent-ruby/concurrent/synchronization/condition.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/volatile.rb,
lib/concurrent-ruby/concurrent/utility/processor_counter.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_atomic_fixnum.rb,
lib/concurrent-ruby/concurrent/collection/lock_free_stack.rb,
lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb,
lib/concurrent-ruby/concurrent/synchronization/mri_object.rb,
lib/concurrent-ruby/concurrent/synchronization/rbx_object.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/striped64.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_atomic_boolean.rb,
lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb,
lib/concurrent-ruby/concurrent/executor/immediate_executor.rb,
lib/concurrent-ruby/concurrent/executor/safe_task_executor.rb,
lib/concurrent-ruby/concurrent/atomic/java_count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/java_thread_local_var.rb,
lib/concurrent-ruby/concurrent/atomic/ruby_thread_local_var.rb,
lib/concurrent-ruby/concurrent/synchronization/jruby_object.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic_reference/mutex_atomic.rb,
lib/concurrent-ruby/concurrent/executor/serialized_execution.rb,
lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/collection/map/mri_map_backend.rb,
lib/concurrent-ruby/concurrent/executor/java_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/single_thread_executor.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_object.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_struct.rb,
lib/concurrent-ruby/concurrent/synchronization/lockable_object.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/cheap_lockable.rb,
lib/concurrent-ruby/concurrent/utility/native_extension_loader.rb,
lib/concurrent-ruby/concurrent/atomic/abstract_thread_local_var.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_markable_reference.rb,
lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb,
lib/concurrent-ruby/concurrent/executor/serial_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/simple_executor_service.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/xor_shift_random.rb,
lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/synchronization/truffleruby_object.rb,
lib/concurrent-ruby/concurrent/thread_safe/synchronized_delegator.rb,
lib/concurrent-ruby/concurrent/synchronization/rbx_lockable_object.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/power_of_two_tuple.rb,
lib/concurrent-ruby/concurrent/atomic_reference/numeric_cas_wrapper.rb,
lib/concurrent-ruby/concurrent/executor/indirect_immediate_executor.rb,
lib/concurrent-ruby/concurrent/executor/java_single_thread_executor.rb,
lib/concurrent-ruby/concurrent/executor/ruby_single_thread_executor.rb,
lib/concurrent-ruby/concurrent/collection/copy_on_write_observer_set.rb,
lib/concurrent-ruby/concurrent/synchronization/jruby_lockable_object.rb,
lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb,
lib/concurrent-ruby/concurrent/collection/copy_on_notify_observer_set.rb,
lib/concurrent-ruby/concurrent/collection/map/truffleruby_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/map/synchronized_map_backend.rb,
lib/concurrent-ruby/concurrent/executor/serialized_execution_delegator.rb,
lib/concurrent-ruby/concurrent/collection/non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_lockable_object.rb,
lib/concurrent-ruby/concurrent/collection/map/non_concurrent_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/map/atomic_reference_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/java_non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb,
lib/concurrent-ruby-edge/concurrent/edge.rb,
lib/concurrent-ruby-edge/concurrent/actor.rb,
lib/concurrent-ruby-edge/concurrent/channel.rb,
lib/concurrent-ruby-edge/concurrent/actor/core.rb,
lib/concurrent-ruby-edge/concurrent/actor/root.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils.rb,
lib/concurrent-ruby-edge/concurrent/actor/errors.rb,
lib/concurrent-ruby-edge/concurrent/channel/tick.rb,
lib/concurrent-ruby-edge/concurrent/edge/channel.rb,
lib/concurrent-ruby-edge/concurrent/edge/version.rb,
lib/concurrent-ruby-edge/concurrent/actor/context.rb,
lib/concurrent-ruby-edge/concurrent/edge/promises.rb,
lib/concurrent-ruby-edge/concurrent/edge/throttle.rb,
lib/concurrent-ruby-edge/concurrent/lazy_register.rb,
lib/concurrent-ruby-edge/concurrent/actor/envelope.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb,
lib/concurrent-ruby-edge/concurrent/actor/reference.rb,
lib/concurrent-ruby-edge/concurrent/actor/type_check.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/pool.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector.rb,
lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb,
lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/ad_hoc.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/timer.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_queue.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/ticker.rb,
lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/awaits.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/sliding.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/linking.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/pausing.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/dropping.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/abstract.rb,
lib/concurrent-ruby-edge/concurrent/actor/public_delegations.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set.rb,
lib/concurrent-ruby-edge/concurrent/actor/internal_delegations.rb,
lib/concurrent-ruby-edge/concurrent/executor/wrapping_executor.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/supervising.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/put_clause.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/take_clause.rb,
lib/concurrent-ruby-edge/concurrent/edge/old_channel_integration.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/after_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/error_clause.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/node.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/default_clause.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/window.rb,
lib/concurrent-ruby-edge/concurrent/actor/default_dead_letter_handler.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/errors_on_unknown_message.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

Defined Under Namespace

Modules: Actor, Async, Concern, Edge, ErlangActor, ImmutableStruct, MutableStruct, Promises, ReInclude, SettableStruct, Synchronization, Utility Classes: Agent, Array, Atom, AtomicBoolean, AtomicFixnum, AtomicMarkableReference, AtomicReference, CachedThreadPool, Cancellation, Channel, ConcurrentUpdateError, CountDownLatch, CyclicBarrier, Delay, Event, Exchanger, FixedThreadPool, Future, Hash, IVar, ImmediateExecutor, IndirectImmediateExecutor, LazyRegister, LockFreeStack, MVar, Map, Maybe, MultipleAssignmentError, MultipleErrors, ProcessingActor, Promise, ReadWriteLock, ReentrantReadWriteLock, SafeTaskExecutor, ScheduledTask, Semaphore, SerializedExecution, SerializedExecutionDelegator, Set, SimpleExecutorService, SingleThreadExecutor, TVar, ThreadLocalVar, ThreadPoolExecutor, Throttle, TimerSet, TimerTask, Transaction, Tuple, WrappingExecutor

Constant Summary collapse

Error =
Class.new(StandardError)
ConfigurationError =

Raised when errors occur during configuration.

Class.new(Error)
CancelledOperationError =

Raised when an asynchronous operation is cancelled before execution.

Class.new(Error)
LifecycleError =

Raised when a lifecycle method (such as stop) is called in an improper sequence or when the object is in an inappropriate state.

Class.new(Error)
ImmutabilityError =

Raised when an attempt is made to violate an immutability guarantee.

Class.new(Error)
IllegalOperationError =

Raised when an operation is attempted which is not legal given the receiver's current state

Class.new(Error)
InitializationError =

Raised when an object's methods are called when it has not been properly initialized.

Class.new(Error)
MaxRestartFrequencyError =

Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.

Class.new(Error)
RejectedExecutionError =

Raised by an Executor when it is unable to process a given task, possibly because of a reject policy or other internal error.

Class.new(Error)
ResourceLimitError =

Raised when any finite resource, such as a lock counter, exceeds its maximum limit/threshold.

Class.new(Error)
TimeoutError =

Raised when an operation times out.

Class.new(Error)
PromiseExecutionError =
Class.new(StandardError)
VERSION =
'1.1.10'
NULL_LOGGER =

Suppresses all output when used for logging.

lambda { |level, progname, message = nil, &block| }
EDGE_VERSION =
'0.6.0'

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.abort_transactionundocumented

Abort a currently running transaction - see Concurrent::atomically.



139
140
141
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 139

def abort_transaction
  raise Transaction::AbortError.new
end

.atomicallyundocumented

Run a block that reads and writes TVars as a single atomic transaction. With respect to the value of TVar objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the TVar objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.

There are some very important and unusual semantics that you must be aware of:

  • Most importantly, the block that you pass to atomically may be executed more than once. In most cases your code should be free of side-effects, except for via TVar.

  • If an exception escapes an atomically block it will abort the transaction.

  • It is undefined behaviour to use callcc or Fiber with atomically.

  • If you create a new thread within an atomically, it will not be part of the transaction. Creating a thread counts as a side-effect.

Transactions within transactions are flattened to a single transaction.

Examples:

a = new TVar(100_000)
b = new TVar(100)

Concurrent::atomically do
  a.value -= 10
  b.value += 10
end

Raises:

  • (ArgumentError)


82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 82

def atomically
  raise ArgumentError.new('no block given') unless block_given?

  # Get the current transaction

  transaction = Transaction::current

  # Are we not already in a transaction (not nested)?

  if transaction.nil?
    # New transaction

    begin
      # Retry loop

      loop do

        # Create a new transaction

        transaction = Transaction.new
        Transaction::current = transaction

        # Run the block, aborting on exceptions

        begin
          result = yield
        rescue Transaction::AbortError => e
          transaction.abort
          result = Transaction::ABORTED
        rescue Transaction::LeaveError => e
          transaction.abort
          break result
        rescue => e
          transaction.abort
          raise e
        end
        # If we can commit, break out of the loop

        if result != Transaction::ABORTED
          if transaction.commit
            break result
          end
        end
      end
    ensure
      # Clear the current transaction

      Transaction::current = nil
    end
  else
    # Nested transaction - flatten it and just run the block

    yield
  end
end

.call_dataflow(method, executor, *inputs, &block) ⇒ undocumented

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 56

def call_dataflow(method, executor, *inputs, &block)
  raise ArgumentError.new('an executor must be provided') if executor.nil?
  raise ArgumentError.new('no block given') unless block_given?
  unless inputs.all? { |input| input.is_a? IVar }
    raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }")
  end

  result = Future.new(executor: executor) do
    values = inputs.map { |input| input.send(method) }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer counter
    end
  end

  result
end

.create_simple_logger(level = Logger::FATAL, output = $stderr) ⇒ Logger

Returns Logger with provided level and output.

Returns:

  • (Logger)

    Logger with provided level and output.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 20

def self.create_simple_logger(level = Logger::FATAL, output = $stderr)
  # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking
  lambda do |severity, progname, message = nil, &block|
    return false if severity < level

    message           = block ? block.call : message
    formatted_message = case message
                        when String
                          message
                        when Exception
                          format "%s (%s)\n%s",
                                 message.message, message.class, (message.backtrace || []).join("\n")
                        else
                          message.inspect
                        end

    output.print format "[%s] %5s -- %s: %s\n",
                        Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'),
                        Logger::SEV_LABEL[severity],
                        progname,
                        formatted_message
    true
  end
end

.create_stdlib_logger(level = Logger::FATAL, output = $stderr) ⇒ Logger

Deprecated.

Returns Logger with provided level and output.

Returns:

  • (Logger)

    Logger with provided level and output.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 52

def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr)
  logger           = Logger.new(output)
  logger.level     = level
  logger.formatter = lambda do |severity, datetime, progname, msg|
    formatted_message = case msg
                        when String
                          msg
                        when Exception
                          format "%s (%s)\n%s",
                                 msg.message, msg.class, (msg.backtrace || []).join("\n")
                        else
                          msg.inspect
                        end
    format "[%s] %5s -- %s: %s\n",
           datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),
           severity,
           progname,
           formatted_message
  end

  lambda do |loglevel, progname, message = nil, &block|
    logger.add loglevel, message, progname, &block
  end
end

.dataflow(*inputs) {|inputs| ... } ⇒ Object

Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. Data dependencies are Future values. The dataflow task itself is also a Future value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.

Our syntax is somewhat related to that of Akka's flow and Habanero Java's DataDrivenFuture. However unlike Akka we don't schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.

The theory of dataflow goes back to the 70s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.

Example

A dataflow task is created with the dataflow method, passing in a block.

task = Concurrent::dataflow { 14 }

This produces a simple Future value. The task will run immediately, as it has no dependencies. We can also specify Future values that must be available before a task will run. When we do this we get the value of those futures passed to our block.

a = Concurrent::dataflow { 1 }
b = Concurrent::dataflow { 2 }
c = Concurrent::dataflow(a, b) { |av, bv| av + bv }

Using the dataflow method you can build up a directed acyclic graph (DAG) of tasks that depend on each other, and have the tasks run as soon as their dependencies are ready and there is CPU capacity to schedule them. This can help you create a program that uses more of the CPU resources available to you.

Derivation

This section describes how we could derive dataflow from other primitives in this library.

Consider a naive fibonacci calculator.

def fib(n)
  if n < 2
    n
  else
    fib(n - 1) + fib(n - 2)
  end
end

puts fib(14) #=> 377

We could modify this to use futures.

def fib(n)
  if n < 2
    Concurrent::Future.new { n }
  else
    n1 = fib(n - 1).execute
    n2 = fib(n - 2).execute
    Concurrent::Future.new { n1.value + n2.value }
  end
end

f = fib(14) #=> #f.execute   #=> #
sleep(0.5)

puts f.value #=> 377

One of the drawbacks of this approach is that all the futures start, and then most of them immediately block on their dependencies. We know that there's no point executing those futures until their dependencies are ready, so let's not execute each future until all their dependencies are ready.

To do this we'll create an object that counts the number of times it observes a future finishing before it does something - and for us that something will be to execute the next future.

class CountingObserver

  def initialize(count, &block)
    @count = count
    @block = block
  end

  def update(time, value, reason)
    @count -= 1

    if @count <= 0
      @block.call()
    end
  end

end

def fib(n)
  if n < 2
    Concurrent::Future.new { n }.execute
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)

    result = Concurrent::Future.new { n1.value + n2.value }

    barrier = CountingObserver.new(2) { result.execute }
    n1.add_observer barrier
    n2.add_observer barrier

    n1.execute
    n2.execute

    result
  end
end

We can wrap this up in a dataflow utility.

f = fib(14) #=> #sleep(0.5)

puts f.value #=> 377

def dataflow(*inputs, &block)
  result = Concurrent::Future.new(&block)

  if inputs.empty?
    result.execute
  else
    barrier = CountingObserver.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer barrier
    end
  end

  result
end

def fib(n)
  if n < 2
    dataflow { n }
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)
    dataflow(n1, n2) { n1.value + n2.value }
  end
end

f = fib(14) #=> #sleep(0.5)

puts f.value #=> 377

Since we know that the futures the dataflow computation depends on are already going to be available when the future is executed, we might as well pass the values into the block so we don't have to reference the futures inside the block. This allows us to write the dataflow block as straight non-concurrent code without reference to futures.

def dataflow(*inputs, &block)
  result = Concurrent::Future.new do
    values = inputs.map { |input| input.value }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    barrier = CountingObserver.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer barrier
    end
  end

  result
end

def fib(n)
  if n < 2
    Concurrent::dataflow { n }
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)
    Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 }
  end
end

f = fib(14) #=> #sleep(0.5)

puts f.value #=> 377

Parameters:

  • inputs (Future)

    zero or more Future operations that this dataflow depends upon

Yields:

  • The operation to perform once all the dependencies are met

Yield Parameters:

  • inputs (Future)

    each of the Future inputs to the dataflow

Yield Returns:

  • (Object)

    the result of the block operation

Returns:

  • (Object)

    the result of all the operations

Raises:

  • (ArgumentError)

    if no block is given

  • (ArgumentError)

    if any of the inputs are not IVars



34
35
36
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 34

def dataflow(*inputs, &block)
  dataflow_with(Concurrent.global_io_executor, *inputs, &block)
end

.dataflow!(*inputs, &block) ⇒ undocumented



44
45
46
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 44

def dataflow!(*inputs, &block)
  dataflow_with!(Concurrent.global_io_executor, *inputs, &block)
end

.dataflow_with(executor, *inputs, &block) ⇒ undocumented



39
40
41
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 39

def dataflow_with(executor, *inputs, &block)
  call_dataflow(:value, executor, *inputs, &block)
end

.dataflow_with!(executor, *inputs, &block) ⇒ undocumented



49
50
51
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 49

def dataflow_with!(executor, *inputs, &block)
  call_dataflow(:value!, executor, *inputs, &block)
end

.disable_at_exit_handlers!undocumented

Deprecated.

Has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841.

Note:

this option should be needed only because of at_exit ordering issues which may arise when running some of the testing frameworks. E.g. Minitest's test-suite runs itself in at_exit callback which executes after the pools are already terminated. Then auto termination needs to be disabled and called manually after test-suite ends.

Note:

This method should never be called from within a gem. It should only be used from within the main application and even then it should be used only when necessary.

Disables AtExit handlers including pool auto-termination handlers. When disabled it will be the application programmer's responsibility to ensure that the handlers are shutdown properly prior to application exit by calling AtExit.run method.



131
132
133
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 131

def self.disable_at_exit_handlers!
  deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841."
end

.executor(executor_identifier) ⇒ Executor

General access point to global executors.

Parameters:

Returns:

  • (Executor)


166
167
168
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 166

def self.executor(executor_identifier)
  Options.executor(executor_identifier)
end

.global_fast_executorThreadPoolExecutor

Global thread pool optimized for short, fast operations.

Returns:



138
139
140
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 138

def self.global_fast_executor
  GLOBAL_FAST_EXECUTOR.value
end

.global_immediate_executorundocumented



149
150
151
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 149

def self.global_immediate_executor
  GLOBAL_IMMEDIATE_EXECUTOR
end

.global_io_executorThreadPoolExecutor

Global thread pool optimized for long, blocking (IO) tasks.

Returns:



145
146
147
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 145

def self.global_io_executor
  GLOBAL_IO_EXECUTOR.value
end

.global_loggerundocumented



92
93
94
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 92

def self.global_logger
  GLOBAL_LOGGER.value
end

.global_logger=(value) ⇒ undocumented



96
97
98
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 96

def self.global_logger=(value)
  GLOBAL_LOGGER.value = value
end

.global_timer_setConcurrent::TimerSet

Global thread pool user for global timers.

Returns:



156
157
158
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 156

def self.global_timer_set
  GLOBAL_TIMER_SET.value
end

.leave_transactionundocumented

Leave a transaction without committing or aborting - see Concurrent::atomically.



144
145
146
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 144

def leave_transaction
  raise Transaction::LeaveError.new
end

.monotonic_time(unit = :float_second) ⇒ undocumented



19
20
21
# File 'lib/concurrent-ruby/concurrent/utility/monotonic_time.rb', line 19

def monotonic_time(unit = :float_second)
  Process.clock_gettime(Process::CLOCK_MONOTONIC, unit)
end

.new_fast_executor(opts = {}) ⇒ undocumented



170
171
172
173
174
175
176
177
178
179
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 170

def self.new_fast_executor(opts = {})
  FixedThreadPool.new(
      [2, Concurrent.processor_count].max,
      auto_terminate:  opts.fetch(:auto_terminate, true),
      idletime:        60, # 1 minute
      max_queue:       0, # unlimited
      fallback_policy: :abort, # shouldn't matter -- 0 max queue
      name:            "fast"
  )
end

.new_io_executor(opts = {}) ⇒ undocumented



181
182
183
184
185
186
187
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 181

def self.new_io_executor(opts = {})
  CachedThreadPool.new(
      auto_terminate:  opts.fetch(:auto_terminate, true),
      fallback_policy: :abort, # shouldn't matter -- 0 max queue
      name:            "io"
  )
end

.physical_processor_countundocumented



127
128
129
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 127

def self.physical_processor_count
  processor_counter.physical_processor_count
end

.processor_countundocumented



123
124
125
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 123

def self.processor_count
  processor_counter.processor_count
end

.use_simple_logger(level = Logger::FATAL, output = $stderr) ⇒ undocumented

Use logger created by #create_simple_logger to log concurrent-ruby messages.



46
47
48
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 46

def self.use_simple_logger(level = Logger::FATAL, output = $stderr)
  Concurrent.global_logger = create_simple_logger level, output
end

.use_stdlib_logger(level = Logger::FATAL, output = $stderr) ⇒ undocumented

Deprecated.

Use logger created by #create_stdlib_logger to log concurrent-ruby messages.



79
80
81
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 79

def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr)
  Concurrent.global_logger = create_stdlib_logger level, output
end

Instance Method Details

#exchange(value, timeout = nil) ⇒ Object

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil the method will block indefinitely.

In some edge cases when a timeout is given a return value of nil may be ambiguous. Specifically, if nil is a valid value in the exchange it will be impossible to tell whether nil is the actual return value or if it signifies timeout. When nil is a valid value in the exchange consider using #exchange! or #try_exchange instead.

Parameters:

  • value (Object)

    the value to exchange with another thread

  • timeout (Numeric, nil) (defaults to: nil)

    in seconds, nil blocks indefinitely

Returns:

  • (Object)

    the value exchanged by the other thread or nil on timeout



# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 340

#exchange!(value, timeout = nil) ⇒ Object

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil the method will block indefinitely.

On timeout a TimeoutError exception will be raised.

Parameters:

  • value (Object)

    the value to exchange with another thread

  • timeout (Numeric, nil) (defaults to: nil)

    in seconds, nil blocks indefinitely

Returns:

  • (Object)

    the value exchanged by the other thread

Raises:



# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 344

#initialize(opts = {}) ⇒ undocumented

Create a new thread pool.

Options Hash (opts):

  • :fallback_policy (Symbol) — default: :discard

    the policy for handling new tasks that are received when the queue size has reached max_queue or the executor has shut down

Raises:

  • (ArgumentError)

    if :fallback_policy is not one of the values specified in FALLBACK_POLICIES

See Also:



# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 337

#try_exchange(value, timeout = nil) ⇒ Concurrent::Maybe

Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return. The timeout value indicates the approximate number of seconds the method should block while waiting for the exchange. When the timeout value is nil the method will block indefinitely.

The return value will be a Maybe set to Just on success or Nothing on timeout.

Examples:


exchanger = Concurrent::Exchanger.new

result = exchanger.exchange(:foo, 0.5)

if result.just?
  puts result.value #=> :bar
else
  puts 'timeout'
end

Parameters:

  • value (Object)

    the value to exchange with another thread

  • timeout (Numeric, nil) (defaults to: nil)

    in seconds, nil blocks indefinitely

Returns:

  • (Concurrent::Maybe)

    on success a Just maybe will be returned with the item exchanged by the other thread as #value; on timeout a Nothing maybe will be returned with TimeoutError as #reason



# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 348