Class: Concurrent::Cancellation
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::AbstractObject
- Synchronization::Object
- Concurrent::Cancellation
- Defined in:
- lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb
Overview
Edge Features are under active development and may change frequently.
- Deprecations are not added before incompatible changes.
- Edge version: major is always 0, minor bump means incompatible change, patch bump means compatible change.
- Edge features may also lack tests and documentation.
- Features developed in
concurrent-ruby-edge
are expected to move toconcurrent-ruby
when finalised.
The Cancellation abstraction provides cooperative cancellation.
The standard methods Thread#raise
of Thread#kill
available in Ruby
are very dangerous (see linked the blog posts bellow).
Therefore concurrent-ruby provides an alternative.
- https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/
- http://www.mikeperham.com/2015/05/08/timeout-rubys-most-dangerous-api/
- http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html
It provides an object which represents a task which can be executed, the task has to get the reference to the object and periodically cooperatively check that it is not cancelled. Good practices to make tasks cancellable:
- check cancellation every cycle of a loop which does significant work,
- do all blocking actions in a loop with a timeout then on timeout check cancellation and if ok block again with the timeout
The idea was inspired by https://msdn.microsoft.com/en-us/library/dd537607(v=vs.110).aspx
Examples
Run async task until cancelled
Create cancellation and then run work in a background thread until it is cancelled.
cancellation, origin = Concurrent::Cancellation.new
# => #
# - origin is used for cancelling, resolve it to cancel
# - cancellation is passed down to tasks for cooperative cancellation
async_task = Concurrent::Promises.future(cancellation) do |cancellation|
# Do work repeatedly until it is cancelled
do_stuff until cancellation.canceled?
:stopped_gracefully
end
# => #
sleep 0.01 # => 0
# Wait a bit then stop the thread by resolving the origin of the cancellation
origin.resolve
# => #
async_task.value! # => :stopped_gracefully
Or let it raise an error.
cancellation, origin = Concurrent::Cancellation.new
# => #
async_task = Concurrent::Promises.future(cancellation) do |cancellation|
# Do work repeatedly until it is cancelled
while true
cancellation.check!
do_stuff
end
end
# => #
sleep 0.01 # => 0
# Wait a bit then stop the thread by resolving the origin of the cancellation
origin.resolve
# => #
async_task.result
# => [false,
# nil,
# #]
Run additional tasks on Cancellation
Cancellation can also be used to log or plan re-execution.
cancellation.origin.chain do
# This block is executed after the Cancellation is cancelled
# It can then log cancellation or e.g. plan new re-execution
end
# => #
Run only for limited time – Timeout replacement
Execute task for a given time then finish. Instead of letting Cancellation crate its own origin, it can be passed in as argument. The passed in origin is scheduled to be resolved in given time which then cancels the Cancellation.
timeout = Concurrent::Cancellation.new Concurrent::Promises.schedule(0.02)
# => #
# or using shortcut helper method
timeout = Concurrent::Cancellation.timeout 0.02
# => #
count = Concurrent::AtomicFixnum.new
# => #
Concurrent.global_io_executor.post(timeout) do |timeout|
# do stuff until cancelled
count.increment until timeout.canceled?
end
timeout.origin.wait
# => #
count.value # => 177576
Parallel background processing with single cancellation
Each task tries to count to 1000 but there is a randomly failing test. The tasks share single cancellation, when one of them fails it cancels the others. The failing tasks ends with an error, the other tasks are gracefully cancelled.
cancellation, origin = Concurrent::Cancellation.new
# => #
tasks = 4.times.map do |i|
Concurrent::Promises.future(cancellation, origin, i) do |cancellation, origin, i|
count = 0
100.times do
break count = :cancelled if cancellation.canceled?
count += 1
sleep 0.001
if rand > 0.95
origin.resolve # cancel
raise 'random error'
end
count
end
end
end
# => [#,
# #,
# #,
# #]
Concurrent::Promises.zip(*tasks).result
# => [false,
# [:cancelled, nil, :cancelled, :cancelled],
# [nil, #, nil, nil]]
Without the randomly failing part it produces following.
cancellation, origin = Concurrent::Cancellation.new
# => #
tasks = 4.times.map do |i|
Concurrent::Promises.future(cancellation, origin, i) do |cancellation, origin, i|
count = 0
100.times do
break count = :cancelled if cancellation.canceled?
count += 1
sleep 0.001
# if rand > 0.95
# origin.resolve
# raise 'random error'
# end
count
end
end
end
# => [#,
# #,
# #,
# #]
Concurrent::Promises.zip(*tasks).result
# => [true, [100, 100, 100, 100], nil]
Combine cancellations
The combination created by joining two cancellations cancels when the first or the other does.
cancellation_a, origin_a = Concurrent::Cancellation.new
# => #
cancellation_b, origin_b = Concurrent::Cancellation.new
# => #
combined_cancellation = cancellation_a.join(cancellation_b)
# => #
origin_a.resolve
# => #
cancellation_a.canceled? # => true
cancellation_b.canceled? # => false
combined_cancellation.canceled? # => true
If a different rule for joining is needed, the source can be combined manually. The manually created cancellation cancels when both the first and the other cancels.
cancellation_a, origin_a = Concurrent::Cancellation.new
# => #
cancellation_b, origin_b = Concurrent::Cancellation.new
# => #
# cancels only when both a and b is cancelled
combined_cancellation = Concurrent::Cancellation.new origin_a & origin_b
# => #
origin_a.resolve
# => #
cancellation_a.canceled? #=> true
cancellation_b.canceled? #=> false
combined_cancellation.canceled? #=> false
origin_b.resolve
# => #
combined_cancellation.canceled? #=> true
Class Method Summary collapse
-
.timeout(intended_time) ⇒ Cancellation
Create Cancellation which will cancel itself in given time.
Instance Method Summary collapse
-
#canceled? ⇒ true, false
Is the cancellation cancelled? Respective, was the origin of the cancellation resolved.
-
#check!(error = CancelledOperationError) ⇒ self
Raise error when cancelled.
-
#initialize(origin = Promises.resolvable_event) ⇒ Cancellation
constructor
Creates the cancellation object.
-
#join(*cancellations) ⇒ Cancellation
Creates a new Cancellation which is cancelled when first of the supplied cancellations or self is cancelled.
-
#origin ⇒ Promises::Future, Promises::Event
The event or future which is the origin of the cancellation.
-
#to_ary ⇒ Array(Cancellation, Promises::Future), Array(Cancellation, Promises::Event)
Allow to multi-assign the Cancellation object.
-
#to_s ⇒ String
(also: #inspect)
Short string representation.
Constructor Details
#initialize(origin = Promises.resolvable_event) ⇒ Cancellation
Creates the cancellation object.
54 55 56 57 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 54 def initialize(origin = Promises.resolvable_event) super() @Origin = origin end |
Class Method Details
.timeout(intended_time) ⇒ Cancellation
Create Cancellation which will cancel itself in given time
43 44 45 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 43 def self.timeout(intended_time) new Concurrent::Promises.schedule(intended_time) end |
Instance Method Details
#canceled? ⇒ true, false
Is the cancellation cancelled? Respective, was the origin of the cancellation resolved.
77 78 79 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 77 def canceled? @Origin.resolved? end |
#check!(error = CancelledOperationError) ⇒ self
Raise error when cancelled
85 86 87 88 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 85 def check!(error = CancelledOperationError) raise error if canceled? self end |
#join(*cancellations) ⇒ Cancellation
Creates a new Cancellation which is cancelled when first of the supplied cancellations or self is cancelled.
95 96 97 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 95 def join(*cancellations) Cancellation.new Promises.any_event(*[@Origin, *cancellations.map(&:origin)]) end |
#origin ⇒ Promises::Future, Promises::Event
The event or future which is the origin of the cancellation
70 71 72 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 70 def origin @Origin end |
#to_ary ⇒ Array(Cancellation, Promises::Future), Array(Cancellation, Promises::Event)
Allow to multi-assign the Cancellation object
64 65 66 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 64 def to_ary [self, @Origin] end |
#to_s ⇒ String Also known as: inspect
Short string representation.
101 102 103 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb', line 101 def to_s format '%s %s>', super[0..-2], canceled? ? 'canceled' : 'pending' end |