Class: Concurrent::Throttle

Inherits:
Synchronization::Object show all
Includes:
Promises::FactoryMethods
Defined in:
lib/concurrent-ruby-edge/concurrent/edge/throttle.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

A tool managing concurrency level of tasks. The maximum capacity is set in constructor. Each acquire will lower the available capacity and release will increase it. When there is no available capacity the current thread may either be blocked or an event is returned which will be resolved when capacity becomes available.

The more common usage of the Throttle is with a proxy executor a_throttle.on(Concurrent.global_io_executor). Anything executed on the proxy executor will be throttled and execute on the given executor. There can be more than one proxy executors. All abstractions which execute tasks have option to specify executor, therefore the proxy executor can be injected to any abstraction throttling its concurrency level.

Examples

Limiting concurrency level of a concurrently executed block to two

max_two = Concurrent::Throttle.new 2
# => #

# used to track concurrency level
concurrency_level = Concurrent::AtomicFixnum.new
# => #
job = -> do
  # increase the current level at the beginning of the throttled block    
  concurrency_level.increment
  # work, takes some time
  do_stuff
  # read the current concurrency level 
  current_concurrency_level = concurrency_level.value
  # decrement the concurrency level back at the end of the block            
  concurrency_level.decrement
  # return the observed concurrency level 
  current_concurrency_level
end 

# create 10 threads running concurrently the jobs
Array.new(10) do  
  Thread.new do
    max_two.acquire(&job)   
  end
# wait for all the threads to finish and read the observed 
# concurrency level in each of them   
end.map(&:value)                         # => [2, 2, 1, 1, 1, 2, 2, 2, 2, 1]

Notice that the returned array has no number bigger than 2 therefore the concurrency level of the block with the do_stuff was never bigger than 2.

# runs a block, and returns he observed concurrency level during the execution
def monitor_concurrency_level(concurrency_level, &block)
  concurrency_level.increment
  block.call
  current_concurrency_level = concurrency_level.value
  concurrency_level.decrement
  # return the observed concurrency level
  return current_concurrency_level 
end 

throttle = Concurrent::Throttle.new 3
# => #
concurrency_level = Concurrent::AtomicFixnum.new 
# => #

Array.new(10) do |i|
  # create throttled future
  throttle.future(i) do |arg|
    monitor_concurrency_level(concurrency_level) { do_stuff arg }  
    # fulfill with the observed concurrency level
  end
# collect observed concurrency levels   
end.map(&:value!)                        # => [3, 2, 1, 2, 1, 3, 3, 1, 2, 1]

The concurrency level does not rise above 3.

It works by setting the executor of the future created from the throttle. The executor is a proxy executor for the Concurrent::Promises.default_executor which can be obtained using #on method. Therefore the above example could be instead more explicitly written as follows

# ...
Array.new(10) do |i|
  # create throttled future
  Concurrent::Promises.future_on(throttle.on(Concurrent::Promises.default_executor)) do
    # ...
  end
end.map(&:value!) 

Anything executed on the proxy executor is throttled. A throttle can have more proxy executors for different executors, all jobs share the same capacity provided by the throttle.

Since the proxy executor becomes the executor of the future, any chained futures will also be throttled. It can be changed by using different executor. It the following example the first 2 futures in the chain are throttled, the last is not.

concurrency_level_throttled   = Concurrent::AtomicFixnum.new 
concurrency_level_unthrottled = Concurrent::AtomicFixnum.new 
Array.new(10) do |i|
  throttle.future(i) do 
    monitor_concurrency_level(concurrency_level_throttled) { do_stuff } 
  end.then do |v|
    [v, monitor_concurrency_level(concurrency_level_throttled) { do_stuff }]
  end.then_on(:io) do |l1, l2|
    [l1, l2, monitor_concurrency_level(concurrency_level_unthrottled) { 5.times { do_stuff } }]
  end
end.map(&:value!) 
# => [[3, 3, 7],
#     [3, 2, 9],
#     [3, 3, 10],
#     [3, 3, 6],
#     [3, 3, 5],
#     [3, 3, 8],
#     [3, 3, 3],
#     [3, 3, 4],
#     [3, 2, 2],
#     [3, 1, 1]]

In the output you can see that the first 2 columns do not cross the 3 capacity limit and the last column which is untroubled does.

TODO (pitr-ch 20-Dec-2018): example with virtual throttled executor, throttling only part of promises chain.

Other abstraction

The proxy executor created with throttle can be used with other abstractions as well and combined.

concurrency_level = Concurrent::AtomicFixnum.new 
futures = Array.new(5) do |i|
  # create throttled future
  throttle.future(i) do |arg|
    monitor_concurrency_level(concurrency_level) { do_stuff arg }  
    # fulfill with the observed concurrency level
  end
end 
agents = Array.new(5) do |i|
  agent = Concurrent::Agent.new 0
  # execute agent update on throttled executor
  agent.send_via(throttle.on(:io)) { monitor_concurrency_level(concurrency_level_throttled) { do_stuff } }
  agent 
end 
futures.map(&:value!)                    # => [3, 3, 3, 2, 1]
agents.each { |a| a.await }.map(&:value) 
# => [3, 2, 3, 3, 1]

There is no observed concurrency level above 3.

Instance Method Summary collapse

Constructor Details

#initialize(capacity) ⇒ Throttle

Create throttle.

Parameters:

  • capacity (Integer)

    How many tasks using this throttle can run at the same time.



33
34
35
36
37
38
39
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 33

def initialize(capacity)
  super()
  @MaxCapacity            = capacity
  @Queue                  = LockFreeQueue.new
  @executor_cache         = [nil, nil]
  self.capacity = capacity
end

Instance Method Details

#acquire(timeout = nil) { ... } ⇒ Object, self, true, false

Blocks current thread until there is capacity available in the throttle. The acquired capacity has to be returned to the throttle by calling #release. If block is passed then the block is called after the capacity is acquired and it is automatically released after the block is executed.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Yields:

  • [] block to execute after the capacity is acquired

Returns:

  • (Object, self, true, false)
    • When no timeout and no block it returns self
    • When no timeout and with block it returns the result of the block
    • When with timeout and no block it returns true when acquired and false when timed out
    • When with timeout and with block it returns the result of the block of nil on timing out

See Also:



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 59

def acquire(timeout = nil, &block)
  event = acquire_or_event
  if event
    within_timeout = event.wait(timeout)
    # release immediately when acquired later after the timeout since it is unused
    event.on_resolution!(self, &:release) unless within_timeout
  else
    within_timeout = true
  end

  called = false
  if timeout
    if block
      if within_timeout
        called = true
        block.call
      else
        nil
      end
    else
      within_timeout
    end
  else
    if block
      called = true
      block.call
    else
      self
    end
  end
ensure
  release if called
end

#available_capacityInteger

Returns The available capacity.

Returns:

  • (Integer)

    The available capacity.



26
27
28
29
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 26

def available_capacity
  current_capacity = capacity
  current_capacity >= 0 ? current_capacity : 0
end

#default_executorExecutorService

Uses executor provided by #on therefore all events and futures created using factory methods on this object will be throttled. Overrides Promises::FactoryMethods::Configuration#default_executor.

Returns:

  • (ExecutorService)

See Also:



179
180
181
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 179

def default_executor
  on(super)
end

#max_capacityInteger

Returns The maximum capacity.

Returns:

  • (Integer)

    The maximum capacity.



42
43
44
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 42

def max_capacity
  @MaxCapacity
end

#on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService

Returns An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.

Examples:

throttling future

a_future.then_on(a_throttle.on(:io)) { a_throttled_task }

Parameters:

  • executor (ExecutorService) (defaults to: Promises::FactoryMethods.default_executor)

Returns:

  • (ExecutorService)

    An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 158

def on(executor = Promises::FactoryMethods.default_executor)
  current_executor, current_cache = @executor_cache
  return current_cache if current_executor == executor && current_cache

  if current_executor.nil?
    # cache first proxy
    proxy_executor  = ProxyExecutor.new(self, Concurrent.executor(executor))
    @executor_cache = [executor, proxy_executor]
    return proxy_executor
  else
    # do not cache more than 1 executor
    ProxyExecutor.new(self, Concurrent.executor(executor))
  end
end

#releaseself

Releases previously acquired capacity back to Throttle. Has to be called exactly once for each acquired capacity.

Returns:

  • (self)

See Also:

  • #acquire, #try_acquire


114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 114

def release
  while true
    current_capacity = capacity
    if compare_and_set_capacity current_capacity, current_capacity + 1
      if current_capacity < 0
        # release called after trigger which pushed a trigger, busy wait is ok
        Thread.pass until (trigger = @Queue.pop)
        trigger.resolve
      end
      return self
    end
  end
end

#to_sString Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.



129
130
131
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 129

def to_s
  format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity
end

#try_acquiretrue, false

Tries to acquire capacity from the throttle. Returns true when there is capacity available. The acquired capacity has to be returned to the throttle by calling #release.

Returns:

  • (true, false)

See Also:



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 98

def try_acquire
  while true
    current_capacity = capacity
    if current_capacity > 0
      return true if compare_and_set_capacity(
          current_capacity, current_capacity - 1)
    else
      return false
    end
  end
end

#any_event(*futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Shortcut of #any_event_on with default :io executor supplied.

Returns:

See Also:

#any_event_on(default_executor, *futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates new event which becomes resolved after first of the futures_and_or_events resolves. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more.

Parameters:

Returns:

#any_fulfilled_future(*futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Shortcut of #any_fulfilled_future_on with default :io executor supplied.

Returns:

See Also:

#any_fulfilled_future_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates new future which is resolved after first of futures_and_or_events is fulfilled. Its result equals result of the first resolved future or if all futures_and_or_events reject, it has reason of the last resolved future. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled with value nil.

Parameters:

Returns:

#any_resolved_future(*futures_and_or_events) ⇒ Future Also known as: any Originally defined in module Promises::FactoryMethods

Shortcut of #any_resolved_future_on with default :io executor supplied.

Returns:

See Also:

#any_resolved_future_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates new future which is resolved after first futures_and_or_events is resolved. Its result equals result of the first resolved future. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled with value nil.

Parameters:

Returns:

#delay(*args, &task) ⇒ Future, Event Originally defined in module Promises::FactoryMethods

Shortcut of #delay_on with default :io executor supplied.

Returns:

See Also:

#delay_on(default_executor, *args) {|*args| ... } ⇒ Future #delay_on(default_executor) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates new event or future which is resolved only after it is touched, see AbstractEventFuture#touch.

Overloads:

  • #delay_on(default_executor, *args) {|*args| ... } ⇒ Future

    If task is provided it returns a Concurrent::Promises::Future representing the result of the task.

    Parameters:

    • args (Object)

      arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yeild section).

    Yields:

    • (*args)

      to the task.

    Yield Returns:

    Returns:

  • #delay_on(default_executor) ⇒ Event

    If no task is provided, it returns an Event

    Returns:

Parameters:

  • default_executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

#fulfilled_future(value, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates resolved future with will be fulfilled with the given value.

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

  • value (Object)

Returns:

#future(*args, &task) ⇒ Future Originally defined in module Promises::FactoryMethods

Shortcut of #future_on with default :io executor supplied.

Returns:

See Also:

#future_on(default_executor, *args) {|*args| ... } ⇒ Future Originally defined in module Promises::FactoryMethods

Constructs new Future which will be resolved after block is evaluated on default executor. Evaluation begins immediately.

Parameters:

  • default_executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

  • args (Object)

    arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (*args)

    to the task.

Yield Returns:

Returns:

#make_future(nil, default_executor = self.default_executor) ⇒ Event #make_future(a_future, default_executor = self.default_executor) ⇒ Future #make_future(an_event, default_executor = self.default_executor) ⇒ Event #make_future(exception, default_executor = self.default_executor) ⇒ Future #make_future(value, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

General constructor. Behaves differently based on the argument's type. It's provided for convenience but it's better to be explicit.

Overloads:

  • #make_future(nil, default_executor = self.default_executor) ⇒ Event

    Returns resolved event.

    Parameters:

    • nil (nil)

    Returns:

    • (Event)

      resolved event.

  • #make_future(a_future, default_executor = self.default_executor) ⇒ Future

    Returns a future which will be resolved when a_future is.

    Parameters:

    Returns:

    • (Future)

      a future which will be resolved when a_future is.

  • #make_future(an_event, default_executor = self.default_executor) ⇒ Event

    Returns an event which will be resolved when an_event is.

    Parameters:

    Returns:

    • (Event)

      an event which will be resolved when an_event is.

  • #make_future(exception, default_executor = self.default_executor) ⇒ Future

    Returns a rejected future with the exception as its reason.

    Parameters:

    • exception (Exception)

    Returns:

    • (Future)

      a rejected future with the exception as its reason.

  • #make_future(value, default_executor = self.default_executor) ⇒ Future

    Returns a fulfilled future with the value.

    Parameters:

    • value (Object)

      when none of the above overloads fits

    Returns:

    • (Future)

      a fulfilled future with the value.

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

Returns:

See Also:

  • resolved_event, fulfilled_future

#rejected_future(reason, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates resolved future with will be rejected with the given reason.

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

  • reason (Object)

Returns:

#resolvable_eventResolvableEvent Originally defined in module Promises::FactoryMethods

Shortcut of #resolvable_event_on with default :io executor supplied.

Returns:

See Also:

#resolvable_event_on(default_executor = self.default_executor) ⇒ ResolvableEvent Originally defined in module Promises::FactoryMethods

Created resolvable event, user is responsible for resolving the event once by ResolvableEvent#resolve.

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

Returns:

#resolvable_futureResolvableFuture Originally defined in module Promises::FactoryMethods

Shortcut of #resolvable_future_on with default :io executor supplied.

#resolvable_future_on(default_executor = self.default_executor) ⇒ ResolvableFuture Originally defined in module Promises::FactoryMethods

Creates resolvable future, user is responsible for resolving the future once by ResolvableFuture#resolve, ResolvableFuture#fulfill, or ResolvableFuture#reject

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

Returns:

#resolved_event(default_executor = self.default_executor) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates resolved event.

Parameters:

  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

Returns:

#resolved_future(fulfilled, value, reason, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates resolved future with will be either fulfilled with the given value or rejection with the given reason.

Parameters:

  • fulfilled (true, false)
  • value (Object)
  • reason (Object)
  • default_executor (Executor, :io, :fast) (defaults to: self.default_executor)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

Returns:

#schedule(intended_time, *args, &task) ⇒ Future, Event Originally defined in module Promises::FactoryMethods

Shortcut of #schedule_on with default :io executor supplied.

Returns:

See Also:

#schedule_on(default_executor, intended_time, *args) {|*args| ... } ⇒ Future #schedule_on(default_executor, intended_time) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates new event or future which is resolved in intended_time.

Overloads:

  • #schedule_on(default_executor, intended_time, *args) {|*args| ... } ⇒ Future

    If task is provided it returns a Concurrent::Promises::Future representing the result of the task.

    Parameters:

    • args (Object)

      arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yeild section).

    Yields:

    • (*args)

      to the task.

    Yield Returns:

    Returns:

  • #schedule_on(default_executor, intended_time) ⇒ Event

    If no task is provided, it returns an Event

    Returns:

Parameters:

  • default_executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

  • intended_time (Numeric, Time)

    Numeric means to run in intended_time seconds. Time means to run on intended_time.

#zip_events(*futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethods

Shortcut of #zip_events_on with default :io executor supplied.

Returns:

See Also:

#zip_events_on(default_executor, *futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates new event which is resolved after all futures_and_or_events are resolved. (Future is resolved when fulfilled or rejected.)

Parameters:

Returns:

#zip_futures(*futures_and_or_events) ⇒ Future Also known as: zip Originally defined in module Promises::FactoryMethods

Shortcut of #zip_futures_on with default :io executor supplied.

Returns:

See Also:

#zip_futures_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates new future which is resolved after all futures_and_or_events are resolved. Its value is array of zipped future values. Its reason is array of reasons for rejection. If there is an error it rejects. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled with value nil.

Parameters:

Returns:

#zip_futures_over(enumerable, &future_factory) ⇒ Future Originally defined in module Promises::FactoryMethods

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

Shortcut of #zip_futures_over_on with default :io executor supplied.

Returns:

See Also:

#zip_futures_over_on(default_executor, enumerable) {|element| ... } ⇒ Future Originally defined in module Promises::FactoryMethods

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

Creates new future which is resolved after all the futures created by future_factory from enumerable elements are resolved. Simplified it does: zip(*enumerable.map { |e| future e, &future_factory })

Examples:

# `#succ` calls are executed in parallel
zip_futures_over_on(:io, [1, 2], &:succ).value! # => [2, 3]

Parameters:

  • default_executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. Default executor propagates to chained futures unless overridden with executor parameter or changed with AbstractEventFuture#with_default_executor.

  • enumerable (Enumerable)

Yields:

  • a task to be executed in future

Yield Parameters:

  • element (Object)

    from enumerable

Yield Returns:

  • (Object)

    a value of the future

Returns: