Class: Concurrent::MVar
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::AbstractObject
- Synchronization::Object
- Concurrent::MVar
- Includes:
- Concern::Dereferenceable
- Defined in:
- lib/concurrent-ruby/concurrent/mvar.rb
Overview
An MVar
is a synchronized single element container. They are empty or
contain one item. Taking a value from an empty MVar
blocks, as does
putting a value into a full one. You can either think of them as blocking
queue of length one, or a special kind of mutable variable.
On top of the fundamental #put
and #take
operations, we also provide a
#mutate
that is atomic with respect to operations on the same instance.
These operations all support timeouts.
We also support non-blocking operations #try_put!
and #try_take!
, a
#set!
that ignores existing values, a #value
that returns the value
without removing it or returns MVar::EMPTY
, and a #modify!
that yields
MVar::EMPTY
if the MVar
is empty and can be used to set MVar::EMPTY
.
You shouldn't use these operations in the first instance.
MVar
is a Dereferenceable.
MVar
is related to M-structures in Id, MVar
in Haskell and SyncVar
in Scala.
Note that unlike the original Haskell paper, our #take
is blocking. This is how
Haskell and Scala do it today.
Copy Options
Object references in Ruby are mutable. This can lead to serious
problems when the Concern::Dereferenceable#value of an object is a mutable reference. Which
is always the case unless the value is a Fixnum
, Symbol
, or similar
"primitive" data type. Each instance can be configured with a few
options that can help protect the program from potentially dangerous
operations. Each of these options can be optionally set when the object
instance is created:
:dup_on_deref
When true the object will call the#dup
method on thevalue
object every time the#value
method is called (default: false):freeze_on_deref
When true the object will call the#freeze
method on thevalue
object every time the#value
method is called (default: false):copy_on_deref
When given aProc
object theProc
will be run every time the#value
method is called. TheProc
will be given the currentvalue
as its only argument and the result returned by the block will be the return value of the#value
call. Whennil
this option will be ignored (default: nil)
When multiple deref options are set the order of operations is strictly defined. The order of deref operations is:
:copy_on_deref
:dup_on_deref
:freeze_on_deref
Because of this ordering there is no need to #freeze
an object created by a
provided :copy_on_deref
block. Simply set :freeze_on_deref
to true
.
Setting both :dup_on_deref
to true
and :freeze_on_deref
to true
is
as close to the behavior of a "pure" functional language (like Erlang, Clojure,
or Haskell) as we are likely to get in Ruby.
See Also
P. Barth, R. Nikhil, and Arvind. M-Structures: Extending a parallel, non- strict, functional language with state. In Proceedings of the 5th ACM Conference on Functional Programming Languages and Computer Architecture (FPCA), 1991.
S. Peyton Jones, A. Gordon, and S. Finne. Concurrent Haskell. In Proceedings of the 23rd Symposium on Principles of Programming Languages (PoPL), 1996.
Constant Summary collapse
- EMPTY =
Unique value that represents that an
MVar
was empty ::Object.new
- TIMEOUT =
Unique value that represents that an
MVar
timed out before it was able to produce a value. ::Object.new
Instance Method Summary collapse
-
#borrow(timeout = nil) ⇒ Object
acquires lock on the from an
MVAR
, yields the value to provided block, and release lock. -
#empty? ⇒ Boolean
Returns if the
MVar
is currently empty. -
#full? ⇒ Boolean
Returns if the
MVar
currently contains a value. -
#initialize(value = EMPTY, opts = {}) ⇒ MVar
constructor
Create a new
MVar
, either empty or with an initial value. -
#modify(timeout = nil) ⇒ Object
Atomically
take
, yield the value to a block for transformation, and thenput
the transformed value. -
#modify! ⇒ undocumented
Non-blocking version of
modify
that will yield withEMPTY
if there is no value yet. -
#put(value, timeout = nil) ⇒ Object
Put a value into an
MVar
, blocking if there is already a value until it is empty. -
#set!(value) ⇒ undocumented
Non-blocking version of
put
that will overwrite an existing value. -
#take(timeout = nil) ⇒ Object
Remove the value from an
MVar
, leaving it empty, and blocking if there isn't a value. -
#try_put!(value) ⇒ undocumented
Non-blocking version of
put
, that returns whether or not it was successful. -
#try_take! ⇒ undocumented
Non-blocking version of
take
, that returnsEMPTY
instead of blocking. -
#value ⇒ Object
(also: #deref)
included
from Concern::Dereferenceable
Return the value this object represents after applying the options specified by the
#set_deref_options
method.
Constructor Details
#initialize(value = EMPTY, opts = {}) ⇒ MVar
Create a new MVar
, either empty or with an initial value.
54 55 56 57 58 59 60 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 54 def initialize(value = EMPTY, opts = {}) @value = value @mutex = Mutex.new @empty_condition = ConditionVariable.new @full_condition = ConditionVariable.new (opts) end |
Instance Method Details
#borrow(timeout = nil) ⇒ Object
acquires lock on the from an MVAR
, yields the value to provided block,
and release lock. A timeout can be set to limit the time spent blocked,
in which case it returns TIMEOUT
if the time is exceeded.
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 86 def borrow(timeout = nil) @mutex.synchronize do wait_for_full(timeout) # if we timeoud out we'll still be empty if unlocked_full? yield @value else TIMEOUT end end end |
#empty? ⇒ Boolean
Returns if the MVar
is currently empty.
195 196 197 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 195 def empty? @mutex.synchronize { @value == EMPTY } end |
#full? ⇒ Boolean
Returns if the MVar
currently contains a value.
200 201 202 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 200 def full? !empty? end |
#modify(timeout = nil) ⇒ Object
Atomically take
, yield the value to a block for transformation, and then
put
the transformed value. Returns the transformed value. A timeout can
be set to limit the time spent blocked, in which case it returns TIMEOUT
if the time is exceeded.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 123 def modify(timeout = nil) raise ArgumentError.new('no block given') unless block_given? @mutex.synchronize do wait_for_full(timeout) # If we timed out we'll still be empty if unlocked_full? value = @value @value = yield value @full_condition.signal (value) else TIMEOUT end end end |
#modify! ⇒ undocumented
Non-blocking version of modify
that will yield with EMPTY
if there is no value yet.
179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 179 def modify! raise ArgumentError.new('no block given') unless block_given? @mutex.synchronize do value = @value @value = yield value if unlocked_empty? @empty_condition.signal else @full_condition.signal end (value) end end |
#put(value, timeout = nil) ⇒ Object
Put a value into an MVar
, blocking if there is already a value until
it is empty. A timeout can be set to limit the time spent blocked, in
which case it returns TIMEOUT
if the time is exceeded.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 103 def put(value, timeout = nil) @mutex.synchronize do wait_for_empty(timeout) # If we timed out we won't be empty if unlocked_empty? @value = value @full_condition.signal (value) else TIMEOUT end end end |
#set!(value) ⇒ undocumented
Non-blocking version of put
that will overwrite an existing value.
169 170 171 172 173 174 175 176 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 169 def set!(value) @mutex.synchronize do old_value = @value @value = value @full_condition.signal (old_value) end end |
#take(timeout = nil) ⇒ Object
Remove the value from an MVar
, leaving it empty, and blocking if there
isn't a value. A timeout can be set to limit the time spent blocked, in
which case it returns TIMEOUT
if the time is exceeded.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 66 def take(timeout = nil) @mutex.synchronize do wait_for_full(timeout) # If we timed out we'll still be empty if unlocked_full? value = @value @value = EMPTY @empty_condition.signal (value) else TIMEOUT end end end |
#try_put!(value) ⇒ undocumented
Non-blocking version of put
, that returns whether or not it was successful.
156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 156 def try_put!(value) @mutex.synchronize do if unlocked_empty? @value = value @full_condition.signal true else false end end end |
#try_take! ⇒ undocumented
Non-blocking version of take
, that returns EMPTY
instead of blocking.
142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/concurrent-ruby/concurrent/mvar.rb', line 142 def try_take! @mutex.synchronize do if unlocked_full? value = @value @value = EMPTY @empty_condition.signal (value) else EMPTY end end end |
#value ⇒ Object Also known as: deref Originally defined in module Concern::Dereferenceable
Return the value this object represents after applying the options specified
by the #set_deref_options
method.