Class: Concurrent::Channel::Buffer::Base
- Inherits:
-
Synchronization::LockableObject
- Object
- Synchronization::LockableObject
- Concurrent::Channel::Buffer::Base
- Defined in:
- lib-edge/concurrent/channel/buffer/base.rb
Overview
Abstract base class for all Channel buffers.
Concurrent::Channel objects maintain an internal, queue-like object called a buffer. It's the storage bin for values put onto or taken from the channel. Different buffer types have different characteristics. Subsequently, the behavior of any given channel is highly dependent uping the type of its buffer. This is the base class which defines the common buffer interface. Any class intended to be used as a channel buffer should extend this class.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#capacity ⇒ undocumented
readonly
The maximum number of values which can be #put onto the buffer it becomes full.
Instance Method Summary collapse
-
#blocking? ⇒ Boolean
Predicate indicating if this buffer will block #put operations once it reaches its maximum capacity.
-
#close ⇒ Boolean
Close the buffer, preventing new items from being added.
-
#closed? ⇒ Boolea
Predicate indicating is this buffer closed.
-
#empty? ⇒ Boolean
Predicate indicating if the buffer is empty.
-
#full? ⇒ Boolean
Predicate indicating if the buffer is full.
-
#initialize(*args) ⇒ Base
constructor
Creates a new buffer.
-
#next ⇒ Object, Boolean
Take the next "item" from the buffer and also return a boolean indicating if "more" items can be taken.
-
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately.
-
#put(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#size ⇒ undocumented
The number of items currently in the buffer.
-
#take ⇒ Object
Take an item from the buffer if one is available.
Constructor Details
#initialize(*args) ⇒ Base
Creates a new buffer.
27 28 29 30 31 32 33 34 35 36 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 27 def initialize(*args) super() synchronize do @closed = false @size = 0 @capacity = 0 @buffer = nil ns_initialize(*args) end end |
Instance Attribute Details
#capacity ⇒ undocumented
The maximum number of values which can be #put onto the buffer it becomes full.
22 23 24 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 22 def capacity @capacity end |
Instance Method Details
#blocking? ⇒ Boolean
Predicate indicating if this buffer will block #put operations once it reaches its maximum capacity.
44 45 46 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 44 def blocking? true end |
#close ⇒ Boolean
Close the buffer, preventing new items from being added. Once a buffer is closed it cannot be opened again.
176 177 178 179 180 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 176 def close synchronize do @closed ? false : @closed = true end end |
#closed? ⇒ Boolea
Predicate indicating is this buffer closed.
187 188 189 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 187 def closed? synchronize { ns_closed? } end |
#empty? ⇒ Boolean
Predicate indicating if the buffer is empty.
62 63 64 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 62 def empty? synchronize { ns_empty? } end |
#full? ⇒ Boolean
Predicate indicating if the buffer is full.
73 74 75 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 73 def full? synchronize { ns_full? } end |
#next ⇒ Object, Boolean
Take the next "item" from the buffer and also return a boolean indicating if "more" items can be taken. Used for iterating over a buffer until it is closed and empty.
If the buffer is open but no items remain the calling thread will
block until an item is available. The second of the two return
values, "more" (a boolean), will always be true
when the buffer is
open. The "more" value will be false
when the channel has been
closed and all values have already been received. When "more" is
false the returned item will be Concurrent::NULL
.
Note that when multiple threads access the same channel a race
condition can occur when using this method. A call to next
from
one thread may return true
for the second return value, but
another thread may take
the last value before the original
thread makes another call. Code which iterates over a channel
must be programmed to properly handle these race conditions.
151 152 153 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 151 def next raise NotImplementedError end |
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but
unable to add an item, probably due to being full, the method will
return immediately. Similarly, the method will return immediately
when the buffer is closed. A return value of false
does not
necessarily indicate that the buffer is closed, just that the item
could not be added.
106 107 108 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 106 def offer(item) raise NotImplementedError end |
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty.
165 166 167 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 165 def poll raise NotImplementedError end |
#put(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.
88 89 90 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 88 def put(item) raise NotImplementedError end |
#size ⇒ undocumented
The number of items currently in the buffer.
51 52 53 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 51 def size synchronize { ns_size } end |
#take ⇒ Object
Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken.
122 123 124 |
# File 'lib-edge/concurrent/channel/buffer/base.rb', line 122 def take raise NotImplementedError end |