Class: Concurrent::Channel::Buffer::Buffered
- Inherits:
-
Base
- Object
- Synchronization::LockableObject
- Base
- Concurrent::Channel::Buffer::Buffered
- Defined in:
- lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb
Overview
A buffer with a fixed internal capacity. Items can be put onto the buffer without blocking until the internal capacity is reached. Once the buffer is at capacity, subsequent calls to #put will block until an item is removed from the buffer, creating spare capacity.
Instance Method Summary collapse
-
#next ⇒ Object, Boolean
Take the next "item" from the buffer and also return a boolean indicating if "more" items can be taken.
-
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately.
-
#put(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#take ⇒ Object
Take an item from the buffer if one is available.
Constructor Details
This class inherits a constructor from Concurrent::Channel::Buffer::Base
Instance Method Details
#next ⇒ Object, Boolean
Take the next "item" from the buffer and also return a boolean indicating if "more" items can be taken. Used for iterating over a buffer until it is closed and empty.
If the buffer is open but no items remain the calling thread will
block until an item is available. The second of the two return
values, "more" (a boolean), will always be true
when the buffer is
open. The "more" value will be false
when the channel has been
closed and all values have already been received. When "more" is
false the returned item will be Concurrent::NULL
.
Note that when multiple threads access the same channel a race
condition can occur when using this method. A call to next
from
one thread may return true
for the second return value, but
another thread may take
the last value before the original
thread makes another call. Code which iterates over a channel
must be programmed to properly handle these race conditions.
56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 56 def next loop do synchronize do if ns_closed? && ns_empty? return Concurrent::NULL, false elsif !ns_empty? item = buffer.shift return item, true end end Thread.pass end end |
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but
unable to add an item, probably due to being full, the method will
return immediately. Similarly, the method will return immediately
when the buffer is closed. A return value of false
does not
necessarily indicate that the buffer is closed, just that the item
could not be added.
New items can be put onto the buffer until the number of items in the buffer reaches the Concurrent::Channel::Buffer::Base#size value specified during initialization.
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 38 def offer(item) synchronize do if ns_closed? || ns_full? return false else ns_put_onto_buffer(item) return true end end end |
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty.
71 72 73 74 75 76 77 78 79 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 71 def poll synchronize do if ns_empty? Concurrent::NULL else buffer.shift end end end |
#put(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.
New items can be put onto the buffer until the number of items in the buffer reaches the Concurrent::Channel::Buffer::Base#size value specified during initialization.
19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 19 def put(item) loop do synchronize do if ns_closed? return false elsif !ns_full? ns_put_onto_buffer(item) return true end end Thread.pass end end |
#take ⇒ Object
Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken.
50 51 52 53 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb', line 50 def take item, _ = self.next item end |