Module: Concurrent

Extended by:
Logging
Defined in:
lib/concurrent.rb,
lib/concurrent/mvar.rb,
lib/concurrent/ivar.rb,
lib/concurrent/tvar.rb,
lib/concurrent/agent.rb,
lib/concurrent/actor.rb,
lib/concurrent/async.rb,
lib/concurrent/delay.rb,
lib/extension_helper.rb,
lib/concurrent/future.rb,
lib/concurrent/errors.rb,
lib/concurrent/logging.rb,
lib/concurrent/promise.rb,
lib/concurrent/version.rb,
lib/concurrent/dataflow.rb,
lib/concurrent/exchanger.rb,
lib/concurrent/observable.rb,
lib/concurrent/actor/core.rb,
lib/concurrent/actor/root.rb,
lib/concurrent/obligation.rb,
lib/concurrent/timer_task.rb,
lib/concurrent/actor/utils.rb,
lib/concurrent/actor/errors.rb,
lib/concurrent/atomic/event.rb,
lib/concurrent/configuration.rb,
lib/concurrent/actor/context.rb,
lib/concurrent/utility/timer.rb,
lib/concurrent/lazy_register.rb,
lib/concurrent/lazy_reference.rb,
lib/concurrent/actor/envelope.rb,
lib/concurrent/scheduled_task.rb,
lib/concurrent/channel/channel.rb,
lib/concurrent/actor/reference.rb,
lib/concurrent/actor/behaviour.rb,
lib/concurrent/dereferenceable.rb,
lib/concurrent/utility/timeout.rb,
lib/concurrent/actor/utils/pool.rb,
lib/concurrent/atomic/semaphore.rb,
lib/concurrent/atomic/condition.rb,
lib/concurrent/actor/type_check.rb,
lib/concurrent/executor/executor.rb,
lib/concurrent/actor/utils/ad_hoc.rb,
lib/concurrent/executor/timer_set.rb,
lib/concurrent/atomic/atomic_fixnum.rb,
lib/concurrent/actor/utils/balancer.rb,
lib/concurrent/atomic_reference/rbx.rb,
lib/concurrent/atomic/cyclic_barrier.rb,
lib/concurrent/atomic_reference/ruby.rb,
lib/concurrent/atomic/atomic_boolean.rb,
lib/concurrent/channel/waitable_list.rb,
lib/concurrent/actor/utils/broadcast.rb,
lib/concurrent/actor/behaviour/buffer.rb,
lib/concurrent/actor/behaviour/awaits.rb,
lib/concurrent/collection/ring_buffer.rb,
lib/concurrent/utility/monotonic_time.rb,
lib/concurrent/atomic/synchronization.rb,
lib/concurrent/atomic/read_write_lock.rb,
lib/concurrent/atomic_reference/jruby.rb,
lib/concurrent/atomic/thread_local_var.rb,
lib/concurrent/actor/behaviour/pausing.rb,
lib/concurrent/actor/behaviour/linking.rb,
lib/concurrent/utility/processor_count.rb,
lib/concurrent/atomic/count_down_latch.rb,
lib/concurrent/channel/buffered_channel.rb,
lib/concurrent/actor/public_delegations.rb,
lib/concurrent/actor/behaviour/abstract.rb,
lib/concurrent/collection/priority_queue.rb,
lib/concurrent/executor/executor_options.rb,
lib/concurrent/actor/behaviour/supervised.rb,
lib/concurrent/executor/fixed_thread_pool.rb,
lib/concurrent/actor/internal_delegations.rb,
lib/concurrent/channel/unbuffered_channel.rb,
lib/concurrent/executor/immediate_executor.rb,
lib/concurrent/executor/cached_thread_pool.rb,
lib/concurrent/actor/behaviour/termination.rb,
lib/concurrent/actor/behaviour/supervising.rb,
lib/concurrent/executor/safe_task_executor.rb,
lib/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent/executor/per_thread_executor.rb,
lib/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent/executor/serialized_execution.rb,
lib/concurrent/atomic_reference/mutex_atomic.rb,
lib/concurrent/executor/thread_pool_executor.rb,
lib/concurrent/atomic_reference/direct_update.rb,
lib/concurrent/executor/java_fixed_thread_pool.rb,
lib/concurrent/executor/ruby_fixed_thread_pool.rb,
lib/concurrent/collection/blocking_ring_buffer.rb,
lib/concurrent/executor/single_thread_executor.rb,
lib/concurrent/executor/ruby_cached_thread_pool.rb,
lib/concurrent/executor/ruby_thread_pool_worker.rb,
lib/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent/executor/java_cached_thread_pool.rb,
lib/concurrent/actor/default_dead_letter_handler.rb,
lib/concurrent/atomic/copy_on_write_observer_set.rb,
lib/concurrent/atomic/copy_on_notify_observer_set.rb,
lib/concurrent/executor/java_thread_pool_executor.rb,
lib/concurrent/executor/ruby_thread_pool_executor.rb,
lib/concurrent/actor/behaviour/terminates_children.rb,
lib/concurrent/atomic_reference/numeric_cas_wrapper.rb,
lib/concurrent/executor/java_single_thread_executor.rb,
lib/concurrent/executor/ruby_single_thread_executor.rb,
lib/concurrent/executor/indirect_immediate_executor.rb,
lib/concurrent/atomic_reference/concurrent_update_error.rb,
lib/concurrent/actor/behaviour/errors_on_unknown_message.rb

Overview

Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.

The design goals of this gem are:

  • Stay true to the spirit of the languages providing inspiration
  • But implement in a way that makes sense for Ruby
  • Keep the semantics as idiomatic Ruby as possible
  • Support features that make sense in Ruby
  • Exclude features that don't make sense in Ruby
  • Be small, lean, and loosely coupled

Defined Under Namespace

Modules: Actor, Async, AtomicDirectUpdate, AtomicNumericCompareAndSetWrapper, Channel, Dereferenceable, Executor, JavaExecutor, Logging, Obligation, Observable, RubyExecutor, SerialExecutor, Synchronization Classes: AbstractThreadLocalVar, Agent, Atomic, AtomicBoolean, AtomicFixnum, BlockingRingBuffer, BufferedChannel, CAtomic, CAtomicBoolean, CAtomicFixnum, CachedThreadPool, ConcurrentUpdateError, Condition, Configuration, CopyOnNotifyObserverSet, CopyOnWriteObserverSet, CountDownLatch, CyclicBarrier, Delay, Event, Exchanger, FixedThreadPool, Future, IVar, ImmediateExecutor, IndirectImmediateExecutor, JavaAtomic, JavaCachedThreadPool, JavaCountDownLatch, JavaFixedThreadPool, JavaPriorityQueue, JavaSingleThreadExecutor, JavaThreadPoolExecutor, LazyReference, LazyRegister, MVar, MutexAtomic, MutexAtomicBoolean, MutexAtomicFixnum, MutexCountDownLatch, MutexPriorityQueue, MutexSemaphore, PerThreadExecutor, PriorityQueue, ProcessorCounter, Promise, RbxAtomic, ReadWriteLock, RingBuffer, RubyCachedThreadPool, RubyFixedThreadPool, RubySingleThreadExecutor, RubyThreadPoolExecutor, SafeTaskExecutor, ScheduledTask, Semaphore, SerializedExecution, SerializedExecutionDelegator, SingleThreadExecutor, TVar, ThreadLocalVar, ThreadPoolExecutor, TimerSet, TimerTask, Transaction, UnbufferedChannel, WaitableList

Constant Summary

ConfigurationError =

Raised when errors occur during configuration.

Class.new(StandardError)
LifecycleError =

Raised when a lifecycle method (such as stop) is called in an improper sequence or when the object is in an inappropriate state.

Class.new(StandardError)
InitializationError =

Raised when an object's methods are called when it has not been properly initialized.

Class.new(StandardError)
MaxRestartFrequencyError =

Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.

Class.new(StandardError)
MultipleAssignmentError =

Raised when an attempt is made to modify an immutable object (such as an IVar) after its final state has been set.

Class.new(StandardError)
RejectedExecutionError =

Raised by an Executor when it is unable to process a given task, possibly because of a reject policy or other internal error.

Class.new(StandardError)
ResourceLimitError =

Raised when any finite resource, such as a lock counter, exceeds its maximum limit/threshold.

Class.new(StandardError)
TimeoutError =

Raised when an operation times out.

Class.new(StandardError)
PromiseExecutionError =
Class.new(StandardError)
VERSION =
'0.8.0'
NULL_LOGGER =

Suppresses all output when used for logging.

lambda { |level, progname, message = nil, &block| }

Class Method Summary (collapse)

Class Method Details

+ (Object) abort_transaction

Abort a currently running transaction - see Concurrent::atomically.



143
144
145
# File 'lib/concurrent/tvar.rb', line 143

def abort_transaction
  raise Transaction::AbortError.new
end

+ (Object) atomically

Run a block that reads and writes TVars as a single atomic transaction. With respect to the value of TVar objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the TVar objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.

There are some very important and unusual semantics that you must be aware of:

  • Most importantly, the block that you pass to atomically may be executed more than once. In most cases your code should be free of side-effects, except for via TVar.

  • If an exception escapes an atomically block it will abort the transaction.

  • It is undefined behaviour to use callcc or Fiber with atomically.

  • If you create a new thread within an atomically, it will not be part of the transaction. Creating a thread counts as a side-effect.

Transactions within transactions are flattened to a single transaction.

Examples:

a = new TVar(100_000)
b = new TVar(100)

Concurrent::atomically do
  a.value -= 10
  b.value += 10
end

Raises:

  • (ArgumentError)


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/concurrent/tvar.rb', line 89

def atomically
  raise ArgumentError.new('no block given') unless block_given?

  # Get the current transaction

  transaction = Transaction::current

  # Are we not already in a transaction (not nested)?

  if transaction.nil?
    # New transaction

    begin
      # Retry loop

      loop do

        # Create a new transaction

        transaction = Transaction.new
        Transaction::current = transaction

        # Run the block, aborting on exceptions

        begin
          result = yield
        rescue Transaction::AbortError => e
          transaction.abort
          result = Transaction::ABORTED
        rescue => e
          transaction.abort
          raise e
        end
        # If we can commit, break out of the loop

        if result != Transaction::ABORTED
          if transaction.commit
            break result
          end
        end
      end
    ensure
      # Clear the current transaction

      Transaction::current = nil
    end
  else
    # Nested transaction - flatten it and just run the block

    yield
  end
end

+ (Boolean) auto_terminate_all_executors?

Note:

Only change this option if you know what you are doing! When this is set to true (the default) then at_exit handlers will be registered automatically for all thread pools to ensure that they are shutdown when the application ends. When changed to false, the at_exit handlers will be circumvented for all Concurrent Ruby thread pools running within the application. Even those created within other gems used by the application. This method should never be called from within a gem. It should only be used from within the main application. And even then it should be used only when necessary.

Defines if ALL executors should be auto-terminated with an at_exit callback. When set to false it will be the application programmer's responsibility to ensure that all thread pools, including the global thread pools, are shutdown properly prior to application exit.

Returns:

  • (Boolean)

    true when all thread pools will auto-terminate on application exit using an at_exit handler; false when no auto-termination will occur.



131
132
133
# File 'lib/concurrent/configuration.rb', line 131

def self.auto_terminate_all_executors?
  @@auto_terminate_all_executors.value
end

+ (Boolean) auto_terminate_global_executors?

Note:

Only change this option if you know what you are doing! When this is set to true (the default) then at_exit handlers will be registered automatically for the global thread pools to ensure that they are shutdown when the application ends. When changed to false, the at_exit handlers will be circumvented for all global thread pools. This method should never be called from within a gem. It should only be used from within the main application and even then it should be used only when necessary.

Defines if global executors should be auto-terminated with an at_exit callback. When set to false it will be the application programmer's responsibility to ensure that the global thread pools are shutdown properly prior to application exit.

Returns:

  • (Boolean)

    true when global thread pools will auto-terminate on application exit using an at_exit handler; false when no auto-termination will occur.



87
88
89
# File 'lib/concurrent/configuration.rb', line 87

def self.auto_terminate_global_executors?
  @@auto_terminate_global_executors.value
end

+ (Object) call_dataflow(method, executor, *inputs, &block)

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/concurrent/dataflow.rb', line 56

def call_dataflow(method, executor, *inputs, &block)
  raise ArgumentError.new('an executor must be provided') if executor.nil?
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }

  result = Future.new(executor: executor) do
    values = inputs.map { |input| input.send(method) }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer counter
    end
  end

  result
end

+ (Configuration) configuration

Returns:



296
297
298
# File 'lib/concurrent/configuration.rb', line 296

def self.configuration
  @configuration.value
end

+ (Object) configure {|the| ... }

Perform gem-level configuration.

Yields:

  • the configuration commands

Yield Parameters:



304
305
306
# File 'lib/concurrent/configuration.rb', line 304

def self.configure
  yield(configuration)
end

+ (Object) dataflow(*inputs) {|inputs| ... }

Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. Data dependencies are Future values. The dataflow task itself is also a Future value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.

Our syntax is somewhat related to that of Akka's flow and Habanero Java's DataDrivenFuture. However unlike Akka we don't schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.

The theory of dataflow goes back to the 70s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.

Example

A dataflow task is created with the dataflow method, passing in a block.

task = Concurrent::dataflow { 14 }

This produces a simple Future value. The task will run immediately, as it has no dependencies. We can also specify Future values that must be available before a task will run. When we do this we get the value of those futures passed to our block.

a = Concurrent::dataflow { 1 }
b = Concurrent::dataflow { 2 }
c = Concurrent::dataflow(a, b) { |av, bv| av + bv }

Using the dataflow method you can build up a directed acyclic graph (DAG) of tasks that depend on each other, and have the tasks run as soon as their dependencies are ready and there is CPU capacity to schedule them. This can help you create a program that uses more of the CPU resources available to you.

Derivation

This section describes how we could derive dataflow from other primitives in this library.

Consider a naive fibonacci calculator.

def fib(n)
  if n < 2
    n
  else
    fib(n - 1) + fib(n - 2)
  end
end

puts fib(14) #=> 377

We could modify this to use futures.

def fib(n)
  if n < 2
    Concurrent::Future.new { n }
  else
    n1 = fib(n - 1).execute
    n2 = fib(n - 2).execute
    Concurrent::Future.new { n1.value + n2.value }
  end
end

f = fib(14) #=> #<Concurrent::Future:0x000001019ef5a0 ...
f.execute   #=> #<Concurrent::Future:0x000001019ef5a0 ...

sleep(0.5)

puts f.value #=> 377

One of the drawbacks of this approach is that all the futures start, and then most of them immediately block on their dependencies. We know that there's no point executing those futures until their dependencies are ready, so let's not execute each future until all their dependencies are ready.

To do this we'll create an object that counts the number of times it observes a future finishing before it does something - and for us that something will be to execute the next future.

class CountingObserver

  def initialize(count, &block)
    @count = count
    @block = block
  end

  def update(time, value, reason)
    @count -= 1

    if @count <= 0
      @block.call()
    end
  end

end

def fib(n)
  if n < 2
    Concurrent::Future.new { n }.execute
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)

    result = Concurrent::Future.new { n1.value + n2.value }

    barrier = CountingObserver.new(2) { result.execute }
    n1.add_observer barrier
    n2.add_observer barrier

    n1.execute
    n2.execute

    result
  end
end

We can wrap this up in a dataflow utility.

f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ...
sleep(0.5)

puts f.value #=> 377

def dataflow(*inputs, &block)
  result = Concurrent::Future.new(&block)

  if inputs.empty?
    result.execute
  else
    barrier = CountingObserver.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer barrier
    end
  end

  result
end

def fib(n)
  if n < 2
    dataflow { n }
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)
    dataflow(n1, n2) { n1.value + n2.value }
  end
end

f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ...
sleep(0.5)

puts f.value #=> 377

Since we know that the futures the dataflow computation depends on are already going to be available when the future is executed, we might as well pass the values into the block so we don't have to reference the futures inside the block. This allows us to write the dataflow block as straight non-concurrent code without reference to futures.

def dataflow(*inputs, &block)
  result = Concurrent::Future.new do
    values = inputs.map { |input| input.value }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    barrier = CountingObserver.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer barrier
    end
  end

  result
end

def fib(n)
  if n < 2
    Concurrent::dataflow { n }
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)
    Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 }
  end
end

f = fib(14) #=> #<Concurrent::Future:0x000001019a26d8 ...
sleep(0.5)

puts f.value #=> 377

Parameters:

  • inputs (Future)

    zero or more Future operations that this dataflow depends upon

Yields:

  • The operation to perform once all the dependencies are met

Yield Parameters:

  • inputs (Future)

    each of the Future inputs to the dataflow

Yield Returns:

  • (Object)

    the result of the block operation

Returns:

  • (Object)

    the result of all the operations

Raises:

  • (ArgumentError)

    if no block is given

  • (ArgumentError)

    if any of the inputs are not IVars



34
35
36
# File 'lib/concurrent/dataflow.rb', line 34

def dataflow(*inputs, &block)
  dataflow_with(Concurrent.global_io_executor, *inputs, &block)
end

+ (Object) dataflow!(*inputs, &block)



44
45
46
# File 'lib/concurrent/dataflow.rb', line 44

def dataflow!(*inputs, &block)
  dataflow_with!(Concurrent.global_io_executor, *inputs, &block)
end

+ (Object) dataflow_with(executor, *inputs, &block)



39
40
41
# File 'lib/concurrent/dataflow.rb', line 39

def dataflow_with(executor, *inputs, &block)
  call_dataflow(:value, executor, *inputs, &block)
end

+ (Object) dataflow_with!(executor, *inputs, &block)



49
50
51
# File 'lib/concurrent/dataflow.rb', line 49

def dataflow_with!(executor, *inputs, &block)
  call_dataflow(:value!, executor, *inputs, &block)
end

+ (Object) disable_auto_termination_of_all_executors!

Note:

Only change this option if you know what you are doing! When this is set to true (the default) then at_exit handlers will be registered automatically for all thread pools to ensure that they are shutdown when the application ends. When changed to false, the at_exit handlers will be circumvented for all Concurrent Ruby thread pools running within the application. Even those created within other gems used by the application. This method should never be called from within a gem. It should only be used from within the main application. And even then it should be used only when necessary.

Defines if ALL executors should be auto-terminated with an at_exit callback. When set to false it will be the application programmer's responsibility to ensure that all thread pools, including the global thread pools, are shutdown properly prior to application exit.



107
108
109
# File 'lib/concurrent/configuration.rb', line 107

def self.disable_auto_termination_of_all_executors!
  @@auto_terminate_all_executors.make_false
end

+ (Object) disable_auto_termination_of_global_executors!

Note:

Only change this option if you know what you are doing! When this is set to true (the default) then at_exit handlers will be registered automatically for the global thread pools to ensure that they are shutdown when the application ends. When changed to false, the at_exit handlers will be circumvented for all global thread pools. This method should never be called from within a gem. It should only be used from within the main application and even then it should be used only when necessary.

Defines if global executors should be auto-terminated with an at_exit callback. When set to false it will be the application programmer's responsibility to ensure that the global thread pools are shutdown properly prior to application exit.



66
67
68
# File 'lib/concurrent/configuration.rb', line 66

def self.disable_auto_termination_of_global_executors!
  @@auto_terminate_global_executors.make_false
end

+ (ThreadPoolExecutor) global_fast_executor

Global thread pool optimized for short, fast operations.

Returns:



138
139
140
# File 'lib/concurrent/configuration.rb', line 138

def self.global_fast_executor
  @@global_fast_executor.value
end

+ (ThreadPoolExecutor) global_io_executor

Global thread pool optimized for long, blocking (IO) tasks.

Returns:



145
146
147
# File 'lib/concurrent/configuration.rb', line 145

def self.global_io_executor
  @@global_io_executor.value
end

+ (Object) global_logger



44
45
46
# File 'lib/concurrent/configuration.rb', line 44

def self.global_logger
  @@global_logger.value
end

+ (Object) global_logger=(value)



48
49
50
# File 'lib/concurrent/configuration.rb', line 48

def self.global_logger=(value)
  @@global_logger.value = value
end

+ (Concurrent::TimerSet) global_timer_set

Global thread pool user for global timers.

Returns:

See Also:

  • timer


154
155
156
# File 'lib/concurrent/configuration.rb', line 154

def self.global_timer_set
  @@global_timer_set.value
end

+ (Object) kill_global_executors



164
165
166
167
168
# File 'lib/concurrent/configuration.rb', line 164

def self.kill_global_executors
  global_fast_executor.kill
  global_io_executor.kill
  global_timer_set.kill
end

+ (Object) log(level, progname, message = nil, &block) Originally defined in module Logging

Logs through Configuration#logger, it can be overridden by setting @logger

Parameters:

  • level (Integer)

    one of Logger::Severity constants

  • progname (String)

    e.g. a path of an Actor

  • message (String, nil) (defaults to: nil)

    when nil block is used to generate the message

Yield Returns:

  • (String)

    a message

+ (Float) monotonic_interval { ... }

Note:

Time calculations one all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

Runs the given block and returns the number of seconds that elapsed.

Yields:

  • the block to run and time

Returns:

  • (Float)

    the number of seconds the block took to run

Raises:

  • (ArgumentError)

    when no block given

See Also:



67
68
69
70
71
72
# File 'lib/concurrent/utility/monotonic_time.rb', line 67

def monotonic_interval
  raise ArgumentError.new('no block given') unless block_given?
  start_time = GLOBAL_MONOTONIC_CLOCK.get_time
  yield
  GLOBAL_MONOTONIC_CLOCK.get_time - start_time
end

+ (Float) monotonic_time

Note:

Time calculations one all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

Returns the current time a tracked by the application monotonic clock.

Returns:

  • (Float)

    The current monotonic time when since not given else the elapsed monotonic time between since and the current time

See Also:



54
55
56
# File 'lib/concurrent/utility/monotonic_time.rb', line 54

def monotonic_time
  GLOBAL_MONOTONIC_CLOCK.get_time
end

+ (Object) new_fast_executor(opts = {})



178
179
180
181
182
183
184
185
186
# File 'lib/concurrent/configuration.rb', line 178

def self.new_fast_executor(opts = {})
  FixedThreadPool.new(
    [2, Concurrent.processor_count].max,
    stop_on_exit:    opts.fetch(:stop_on_exit, true),
    idletime:        60,          # 1 minute
    max_queue:       0,           # unlimited
    fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
  )
end

+ (Object) new_io_executor(opts = {})



188
189
190
191
192
193
194
195
196
197
# File 'lib/concurrent/configuration.rb', line 188

def self.new_io_executor(opts = {})
  ThreadPoolExecutor.new(
    min_threads: [2, Concurrent.processor_count].max,
    max_threads: Concurrent.processor_count * 100,
    stop_on_exit:    opts.fetch(:stop_on_exit, true),
    idletime:        60,          # 1 minute
    max_queue:       0,           # unlimited
    fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
  )
end

+ (Object) physical_processor_count



150
151
152
# File 'lib/concurrent/utility/processor_count.rb', line 150

def self.physical_processor_count
  processor_counter.physical_processor_count
end

+ (Object) processor_count



146
147
148
# File 'lib/concurrent/utility/processor_count.rb', line 146

def self.processor_count
  processor_counter.processor_count
end

+ (Object) shutdown_global_executors



158
159
160
161
162
# File 'lib/concurrent/configuration.rb', line 158

def self.shutdown_global_executors
  global_fast_executor.shutdown
  global_io_executor.shutdown
  global_timer_set.shutdown
end

+ (Object) timeout(seconds)

Note:

Time calculations one all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

Wait the given number of seconds for the block operation to complete. Intended to be a simpler and more reliable replacement to the Ruby standard library Timeout::timeout method.

Parameters:

  • seconds (Integer)

    The number of seconds to wait

Returns:

  • (Object)

    The result of the block operation

Raises:

See Also:



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/concurrent/utility/timeout.rb', line 22

def timeout(seconds)

  thread = Thread.new do
    Thread.current[:result] = yield
  end
  success = thread.join(seconds)

  if success
    return thread[:result]
  else
    raise TimeoutError
  end
ensure
  Thread.kill(thread) unless thread.nil?
end

+ (Boolean) timer(seconds, *args) { ... }

Note:

Time calculations one all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

Perform the given operation asynchronously after the given number of seconds.

Parameters:

  • seconds (Fixnum)

    the interval in seconds to wait before executing the task

Yields:

  • the task to execute

Returns:

  • (Boolean)

    true

Raises:

  • (ArgumentError)

See Also:



15
16
17
18
19
20
21
# File 'lib/concurrent/utility/timer.rb', line 15

def timer(seconds, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('interval must be greater than or equal to zero') if seconds < 0

  Concurrent.configuration.global_timer_set.post(seconds, *args, &block)
  true
end

+ (Object) wait_for_global_executors_termination(timeout = nil)



170
171
172
173
174
175
176
# File 'lib/concurrent/configuration.rb', line 170

def self.wait_for_global_executors_termination(timeout = nil)
  latch = CountDownLatch.new(3)
  [ global_fast_executor, global_io_executor, global_timer_set ].each do |executor|
    Thread.new{ executor.wait_for_termination(timeout); latch.count_down }
  end
  latch.wait(timeout)
end