Class: Concurrent::Channel::Buffer::Unbuffered
- Inherits:
-
Base
- Object
- Synchronization::LockableObject
- Base
- Concurrent::Channel::Buffer::Unbuffered
- Defined in:
- lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb
Overview
A blocking buffer with a size of zero. An item can only be put onto the buffer when a thread is waiting to take. Similarly, an item can only be put onto the buffer when a thread is waiting to put. When either #put or #take is called and there is no corresponding call in progress, the call will block indefinitely. Any other calls to the same method will queue behind the first call and block as well. As soon as a corresponding put/take call is made an exchange will occur and the first blocked call will return.
Instance Method Summary collapse
-
#empty? ⇒ Boolean
Predicate indicating if the buffer is empty.
-
#full? ⇒ Boolean
Predicate indicating if the buffer is full.
-
#next ⇒ Object, Boolean
Take the next "item" from the buffer and also return a boolean indicating if "more" items can be taken.
-
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately.
-
#put(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#size ⇒ undocumented
The number of items currently in the buffer.
-
#take ⇒ Object
Take an item from the buffer if one is available.
Constructor Details
This class inherits a constructor from Concurrent::Channel::Buffer::Base
Instance Method Details
#empty? ⇒ Boolean
Predicate indicating if the buffer is empty.
27 28 29 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 27 def empty? size == 0 end |
#full? ⇒ Boolean
Predicate indicating if the buffer is full.
32 33 34 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 32 def full? !empty? end |
#next ⇒ Object, Boolean
Take the next "item" from the buffer and also return a boolean indicating if "more" items can be taken. Used for iterating over a buffer until it is closed and empty.
If the buffer is open but no items remain the calling thread will
block until an item is available. The second of the two return
values, "more" (a boolean), will always be true
when the buffer is
open. The "more" value will be false
when the channel has been
closed and all values have already been received. When "more" is
false the returned item will be Concurrent::NULL
.
Note that when multiple threads access the same channel a race
condition can occur when using this method. A call to next
from
one thread may return true
for the second return value, but
another thread may take
the last value before the original
thread makes another call. Code which iterates over a channel
must be programmed to properly handle these race conditions.
Items can only be taken from the buffer when one or more threads are waiting to #put items onto the buffer. This method exhibits the same blocking behavior as #take.
135 136 137 138 139 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 135 def next item = take more = (item != Concurrent::NULL) return item, more end |
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but
unable to add an item, probably due to being full, the method will
return immediately. Similarly, the method will return immediately
when the buffer is closed. A return value of false
does not
necessarily indicate that the buffer is closed, just that the item
could not be added.
Items can only be put onto the buffer when one or more threads are
waiting to #take items off the buffer. When there is a thread
waiting to take an item this method will give its item and return
true
immediately. When there are no threads waiting to take or the
buffer is closed, this method will return false
immediately.
71 72 73 74 75 76 77 78 79 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 71 def offer(item) synchronize do return false if ns_closed? || taking.empty? taken = taking.shift taken.value = item true end end |
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty.
Items can only be taken off the buffer when one or more threads are
waiting to #put items onto the buffer. When there is a thread
waiting to put an item this method will take the item and return
it immediately. When there are no threads waiting to put or the
buffer is closed, this method will return Concurrent::NULL
immediately.
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 117 def poll synchronize do return Concurrent::NULL if putting.empty? put = putting.shift value = put.value put.value = nil value end end |
#put(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.
Items can only be put onto the buffer when one or more threads are
waiting to #take items off the buffer. When there is a thread
waiting to take an item this method will give its item and return
immediately. When there are no threads waiting to take, this method
will block. As soon as a thread calls take
the exchange will
occur and this method will return.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 44 def put(item) mine = synchronize do return false if ns_closed? ref = Concurrent::AtomicReference.new(item) if taking.empty? putting.push(ref) else taken = taking.shift taken.value = item ref.value = nil end ref end loop do return true if mine.value.nil? Thread.pass end end |
#size ⇒ undocumented
The number of items currently in the buffer.
20 21 22 23 24 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 20 def size synchronize do putting.empty? ? 0 : 1 end end |
#take ⇒ Object
Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken.
Items can only be taken from the buffer when one or more threads are
waiting to #put items onto the buffer. When there is a thread
waiting to put an item this method will take that item and return it
immediately. When there are no threads waiting to put, this method
will block. As soon as a thread calls pur
the exchange will occur
and this method will return.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 89 def take mine = synchronize do return Concurrent::NULL if ns_closed? && putting.empty? ref = Concurrent::AtomicReference.new(nil) if putting.empty? taking.push(ref) else put = putting.shift ref.value = put.value put.value = nil end ref end loop do item = mine.value return item if item Thread.pass end end |