Class: Concurrent::Atom

Inherits:
Synchronization::Object
  • Object
show all
Includes:
Concern::Observable
Defined in:
lib/concurrent-ruby/concurrent/atom.rb

Overview

Atoms provide a way to manage shared, synchronous, independent state.

An atom is initialized with an initial value and an optional validation proc. At any time the value of the atom can be synchronously and safely changed. If a validator is given at construction then any new value will be checked against the validator and will be rejected if the validator returns false or raises an exception.

There are two ways to change the value of an atom: #compare_and_set and #swap. The former will set the new value if and only if it validates and the current value matches the new value. The latter will atomically set the new value to the result of running the given block if and only if that value validates.

Example

def next_fibonacci(set = nil)
  return [0, 1] if set.nil?
  set + [set[-2..-1].reduce{|sum,x| sum + x }]
end

# create an atom with an initial value
atom = Concurrent::Atom.new(next_fibonacci)

# send a few update requests
5.times do
  atom.swap{|set| next_fibonacci(set) }
end

# get the current value
atom.value #=> [0, 1, 1, 2, 3, 5, 8]

Observation

Atoms support observers through the Observable mixin module. Notification of observers occurs every time the value of the Atom changes. When notified the observer will receive three arguments: time, old_value, and new_value. The time argument is the time at which the value change occurred. The old_value is the value of the Atom when the change began The new_value is the value to which the Atom was set when the change completed. Note that old_value and new_value may be the same. This is not an error. It simply means that the change operation returned the same value.

Unlike in Clojure, Atom cannot participate in TVar transactions.

Thread-safe Variable Classes

Each of the thread-safe variable classes is designed to solve a different problem. In general:

  • Agent: Shared, mutable variable providing independent, uncoordinated, asynchronous change of individual values. Best used when the value will undergo frequent, complex updates. Suitable when the result of an update does not need to be known immediately.
  • Atom: Shared, mutable variable providing independent, uncoordinated, synchronous change of individual values. Best used when the value will undergo frequent reads but only occasional, though complex, updates. Suitable when the result of an update must be known immediately.
  • AtomicReference: A simple object reference that can be updated atomically. Updates are synchronous but fast. Best used when updates a simple set operations. Not suitable when updates are complex. AtomicBoolean and AtomicFixnum are similar but optimized for the given data type.
  • Exchanger: Shared, stateless synchronization point. Used when two or more threads need to exchange data. The threads will pair then block on each other until the exchange is complete.
  • MVar: Shared synchronization point. Used when one thread must give a value to another, which must take the value. The threads will block on each other until the exchange is complete.
  • ThreadLocalVar: Shared, mutable, isolated variable which holds a different value for each thread which has access. Often used as an instance variable in objects which must maintain different state for different threads.
  • TVar: Shared, mutable variables which provide coordinated, synchronous, change of many stated. Used when multiple value must change together, in an all-or-nothing transaction.

Instance Method Summary collapse

Constructor Details

#initialize(value, opts = {}) ⇒ Atom

Create a new atom with the given initial value.

Parameters:

  • value (Object)

    The initial value

  • opts (Hash) (defaults to: {})

    The options used to configure the atom

Options Hash (opts):

  • :validator (Proc) — default: nil

    Optional proc used to validate new values. It must accept one and only one argument which will be the intended new value. The validator will return true if the new value is acceptable else return false (preferrably) or raise an exception.

  • :dup_on_deref (Boolean) — default: false

    Call #dup before returning the data from #value

  • :freeze_on_deref (Boolean) — default: false

    Call #freeze before returning the data from #value

  • :copy_on_deref (Proc) — default: nil

    When calling the #value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

Raises:

  • (ArgumentError)

    if the validator is not a Proc (when given)



121
122
123
124
125
126
# File 'lib/concurrent-ruby/concurrent/atom.rb', line 121

def initialize(value, opts = {})
  super()
  @Validator     = opts.fetch(:validator, -> v { true })
  self.observers = Collection::CopyOnNotifyObserverSet.new
  self.value     = value
end

Instance Method Details

#compare_and_set(old_value, new_value) ⇒ Boolean

Atomically sets the value of atom to the new value if and only if the current value of the atom is identical to the old value and the new value successfully validates against the (optional) validator given at construction.

Parameters:

  • old_value (Object)

    The expected current value.

  • new_value (Object)

    The intended new value.

Returns:

  • (Boolean)

    True if the value is changed else false.



181
182
183
184
185
186
187
188
# File 'lib/concurrent-ruby/concurrent/atom.rb', line 181

def compare_and_set(old_value, new_value)
  if valid?(new_value) && compare_and_set_value(old_value, new_value)
    observers.notify_observers(Time.now, old_value, new_value)
    true
  else
    false
  end
end

#reset(new_value) ⇒ Object

Atomically sets the value of atom to the new value without regard for the current value so long as the new value successfully validates against the (optional) validator given at construction.

Parameters:

  • new_value (Object)

    The intended new value.

Returns:

  • (Object)

    The final value of the atom after all operations and validations are complete.



198
199
200
201
202
203
204
205
206
207
# File 'lib/concurrent-ruby/concurrent/atom.rb', line 198

def reset(new_value)
  old_value = value
  if valid?(new_value)
    self.value = new_value
    observers.notify_observers(Time.now, old_value, new_value)
    new_value
  else
    old_value
  end
end

#swap(*args) {|value, args| ... } ⇒ Object

Note:

The given block may be called multiple times, and thus should be free of side effects.

Atomically swaps the value of atom using the given block. The current value will be passed to the block, as will any arguments passed as arguments to the function. The new value will be validated against the (optional) validator proc given at construction. If validation fails the value will not be changed.

Internally, #swap reads the current value, applies the block to it, and attempts to compare-and-set it in. Since another thread may have changed the value in the intervening time, it may have to retry, and does so in a spin loop. The net effect is that the value will always be the result of the application of the supplied block to a current value, atomically. However, because the block might be called multiple times, it must be free of side effects.

Parameters:

  • args (Object)

    Zero or more arguments passed to the block.

Yields:

  • (value, args)

    Calculates a new value for the atom based on the current value and any supplied arguments.

Yield Parameters:

  • value (Object)

    The current value of the atom.

  • args (Object)

    All arguments passed to the function, in order.

Yield Returns:

  • (Object)

    The intended new value of the atom.

Returns:

  • (Object)

    The final value of the atom after all operations and validations are complete.

Raises:

  • (ArgumentError)

    When no block is given.



157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/concurrent-ruby/concurrent/atom.rb', line 157

def swap(*args)
  raise ArgumentError.new('no block given') unless block_given?

  loop do
    old_value = value
    new_value = yield(old_value, *args)
    begin
      break old_value unless valid?(new_value)
      break new_value if compare_and_set(old_value, new_value)
    rescue
      break old_value
    end
  end
end

#valueObject Also known as: deref

The current value of the atom.

Returns:

  • (Object)

    The current value.



# File 'lib/concurrent-ruby/concurrent/atom.rb', line 104

#add_observer(observer = nil, func = :update, &block) ⇒ Object Originally defined in module Concern::Observable

Adds an observer to this set. If a block is passed, the observer will be created by this method and no other params should be passed.

Parameters:

  • observer (Object) (defaults to: nil)

    the observer to add

  • func (Symbol) (defaults to: :update)

    the function to call on the observer during notification. Default is :update

Returns:

  • (Object)

    the added observer

#count_observersInteger Originally defined in module Concern::Observable

Return the number of observers associated with this object.

Returns:

  • (Integer)

    the observers count

#delete_observer(observer) ⇒ Object Originally defined in module Concern::Observable

Remove observer as an observer on this object so that it will no longer receive notifications.

Parameters:

  • observer (Object)

    the observer to remove

Returns:

  • (Object)

    the deleted observer

#delete_observersObservable Originally defined in module Concern::Observable

Remove all observers associated with this object.

Returns:

#with_observer(observer = nil, func = :update, &block) ⇒ Observable Originally defined in module Concern::Observable

As #add_observer but can be used for chaining.

Parameters:

  • observer (Object) (defaults to: nil)

    the observer to add

  • func (Symbol) (defaults to: :update)

    the function to call on the observer during notification.

Returns: