Class: Concurrent::Cancellation

Inherits:
Synchronization::Object show all
Defined in:
lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed in concurrent-ruby-edge are expected to move to concurrent-ruby when finalised.

The Cancellation abstraction provides cooperative cancellation.

The standard methods Thread#raise of Thread#kill available in Ruby are very dangerous (see linked the blog posts bellow). Therefore concurrent-ruby provides an alternative.

It provides an object which represents a task which can be executed, the task has to get the reference to the object and periodically cooperatively check that it is not cancelled. Good practices to make tasks cancellable:

  • check cancellation every cycle of a loop which does significant work,
  • do all blocking actions in a loop with a timeout then on timeout check cancellation and if ok block again with the timeout

The idea was inspired by https://msdn.microsoft.com/en-us/library/dd537607(v=vs.110).aspx

Examples

Run async task until cancelled

Create cancellation and then run work in a background thread until it is cancelled.

cancellation, origin = Concurrent::Cancellation.new
# => #
# - origin is used for cancelling, resolve it to cancel 
# - cancellation is passed down to tasks for cooperative cancellation
async_task = Concurrent::Promises.future(cancellation) do |cancellation|
  # Do work repeatedly until it is cancelled
  do_stuff until cancellation.canceled?
  :stopped_gracefully
end
# => #

sleep 0.01                               # => 0
# Wait a bit then stop the thread by resolving the origin of the cancellation
origin.resolve 
# => #
async_task.value!                        # => :stopped_gracefully

Or let it raise an error.

cancellation, origin = Concurrent::Cancellation.new
# => #
async_task = Concurrent::Promises.future(cancellation) do |cancellation|
  # Do work repeatedly until it is cancelled
  while true
    cancellation.check!     
    do_stuff 
  end
end
# => #

sleep 0.01                               # => 0
# Wait a bit then stop the thread by resolving the origin of the cancellation
origin.resolve 
# => #
async_task.result
# => [false,
#     nil,
#     #]

Run additional tasks on Cancellation

Cancellation can also be used to log or plan re-execution.

cancellation.origin.chain do
  # This block is executed after the Cancellation is cancelled  
  # It can then log cancellation or e.g. plan new re-execution
end
# => #

Run only for limited time – Timeout replacement

Execute task for a given time then finish. Instead of letting Cancellation crate its own origin, it can be passed in as argument. The passed in origin is scheduled to be resolved in given time which then cancels the Cancellation.

timeout = Concurrent::Cancellation.new Concurrent::Promises.schedule(0.02)
# => #
# or using shortcut helper method
timeout = Concurrent::Cancellation.timeout 0.02 
# => #
count   = Concurrent::AtomicFixnum.new
# => #
Concurrent.global_io_executor.post(timeout) do |timeout|
  # do stuff until cancelled  
  count.increment until timeout.canceled?
end 

timeout.origin.wait
# => #
count.value                              # => 177576

Parallel background processing with single cancellation

Each task tries to count to 1000 but there is a randomly failing test. The tasks share single cancellation, when one of them fails it cancels the others. The failing tasks ends with an error, the other tasks are gracefully cancelled.

cancellation, origin = Concurrent::Cancellation.new
# => #
tasks = 4.times.map do |i|
  Concurrent::Promises.future(cancellation, origin, i) do |cancellation, origin, i|
    count = 0
    100.times do
      break count = :cancelled if cancellation.canceled?
      count += 1
      sleep 0.001
      if rand > 0.95
        origin.resolve # cancel
        raise 'random error'
      end
      count
    end
  end
end
# => [#,
#     #,
#     #,
#     #]
Concurrent::Promises.zip(*tasks).result 
# => [false,
#     [:cancelled, nil, :cancelled, :cancelled],
#     [nil, #, nil, nil]]

Without the randomly failing part it produces following.

cancellation, origin = Concurrent::Cancellation.new
# => #
tasks = 4.times.map do |i|
  Concurrent::Promises.future(cancellation, origin, i) do |cancellation, origin, i|
    count = 0
    100.times do
      break count = :cancelled if cancellation.canceled?
      count += 1
      sleep 0.001
      # if rand > 0.95
      #   origin.resolve
      #   raise 'random error'
      # end
      count
    end
  end
end
# => [#,
#     #,
#     #,
#     #]
Concurrent::Promises.zip(*tasks).result
# => [true, [100, 100, 100, 100], nil]

Combine cancellations

The combination created by joining two cancellations cancels when the first or the other does.

cancellation_a, origin_a = Concurrent::Cancellation.new
# => #
cancellation_b, origin_b = Concurrent::Cancellation.new
# => #
combined_cancellation    = cancellation_a.join(cancellation_b)
# => #

origin_a.resolve
# => #

cancellation_a.canceled?                 # => true
cancellation_b.canceled?                 # => false
combined_cancellation.canceled?          # => true

If a different rule for joining is needed, the source can be combined manually. The manually created cancellation cancels when both the first and the other cancels.

cancellation_a, origin_a = Concurrent::Cancellation.new
# => #
cancellation_b, origin_b = Concurrent::Cancellation.new
# => #
# cancels only when both a and b is cancelled
combined_cancellation    = Concurrent::Cancellation.new origin_a & origin_b
# => #

origin_a.resolve
# => #

cancellation_a.canceled?        #=> true
cancellation_b.canceled?        #=> false
combined_cancellation.canceled? #=> false

origin_b.resolve
# => #
combined_cancellation.canceled? #=> true

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(origin = Promises.resolvable_event) ⇒ Cancellation

Creates the cancellation object.

Examples:

cancellation, origin = Concurrent::Cancellation.new

Parameters:

  • origin (Promises::Future, Promises::Event) (defaults to: Promises.resolvable_event)

    of the cancellation. When it is resolved the cancellation is canceled.

See Also:



52
53
54
55
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 52

def initialize(origin = Promises.resolvable_event)
  super()
  @Origin = origin
end

Class Method Details

.timeout(intended_time) ⇒ Cancellation

Create Cancellation which will cancel itself in given time

Parameters:

  • intended_time (Numeric, Time)

    Numeric means to run in intended_time seconds. Time means to run on intended_time.

Returns:



41
42
43
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 41

def self.timeout(intended_time)
  new Concurrent::Promises.schedule(intended_time)
end

Instance Method Details

#canceled?true, false

Is the cancellation cancelled? Respective, was the origin of the cancellation resolved.

Returns:

  • (true, false)


75
76
77
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 75

def canceled?
  @Origin.resolved?
end

#check!(error = CancelledOperationError) ⇒ self

Raise error when cancelled

Parameters:

  • error (#exception) (defaults to: CancelledOperationError)

    to be risen

Returns:

  • (self)

Raises:

  • the error



83
84
85
86
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 83

def check!(error = CancelledOperationError)
  raise error if canceled?
  self
end

#join(*cancellations) ⇒ Cancellation

Creates a new Cancellation which is cancelled when first of the supplied cancellations or self is cancelled.

Parameters:

Returns:



93
94
95
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 93

def join(*cancellations)
  Cancellation.new Promises.any_event(*[@Origin, *cancellations.map(&:origin)])
end

#originPromises::Future, Promises::Event

The event or future which is the origin of the cancellation



68
69
70
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 68

def origin
  @Origin
end

#to_aryArray(Cancellation, Promises::Future), Array(Cancellation, Promises::Event)

Allow to multi-assign the Cancellation object

Examples:

cancellation         = Concurrent::Cancellation.new
cancellation, origin = Concurrent::Cancellation.new

Returns:



62
63
64
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 62

def to_ary
  [self, @Origin]
end

#to_sString Also known as: inspect

Short string representation.

Returns:

  • (String)


99
100
101
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 99

def to_s
  format '%s %s>', super[0..-2], canceled? ? 'canceled' : 'pending'
end