Module: Concurrent::Concern::Observable

Included in:
Agent, Atom, IVar, TimerTask
Defined in:
lib/concurrent-ruby/concurrent/concern/observable.rb

Overview

The observer pattern is one of the most useful design patterns.

The workflow is very simple:

  • an observer can register itself to a subject via a callback
  • many observers can be registered to the same subject
  • the subject notifies all registered observers when its status changes
  • an observer can deregister itself when is no more interested to receive event notifications

In a single threaded environment the whole pattern is very easy: the subject can use a simple data structure to manage all its subscribed observers and every observer can react directly to every event without caring about synchronization.

In a multi threaded environment things are more complex. The subject must synchronize the access to its data structure and to do so currently we're using two specialized ObserverSet: CopyOnWriteObserverSet and CopyOnNotifyObserverSet.

When implementing and observer there's a very important rule to remember: there are no guarantees about the thread that will execute the callback

Let's take this example

class Observer
  def initialize
    @count = 0
  end

  def update
    @count += 1
  end
end

obs = Observer.new
[obj1, obj2, obj3, obj4].each { |o| o.add_observer(obs) }
# execute [obj1, obj2, obj3, obj4]

obs is wrong because the variable @count can be accessed by different threads at the same time, so it should be synchronized (using either a Mutex or an AtomicFixum)

Instance Method Summary collapse

Instance Method Details

#add_observer(observer = nil, func = :update, &block) ⇒ Object

Adds an observer to this set. If a block is passed, the observer will be created by this method and no other params should be passed.

Parameters:

  • observer (Object) (defaults to: nil)

    the observer to add

  • func (Symbol) (defaults to: :update)

    the function to call on the observer during notification. Default is :update

Returns:

  • (Object)

    the added observer



61
62
63
# File 'lib/concurrent-ruby/concurrent/concern/observable.rb', line 61

def add_observer(observer = nil, func = :update, &block)
  observers.add_observer(observer, func, &block)
end

#count_observersInteger

Return the number of observers associated with this object.

Returns:

  • (Integer)

    the observers count



101
102
103
# File 'lib/concurrent-ruby/concurrent/concern/observable.rb', line 101

def count_observers
  observers.count_observers
end

#delete_observer(observer) ⇒ Object

Remove observer as an observer on this object so that it will no longer receive notifications.

Parameters:

  • observer (Object)

    the observer to remove

Returns:

  • (Object)

    the deleted observer



82
83
84
# File 'lib/concurrent-ruby/concurrent/concern/observable.rb', line 82

def delete_observer(observer)
  observers.delete_observer(observer)
end

#delete_observersObservable

Remove all observers associated with this object.

Returns:



91
92
93
94
# File 'lib/concurrent-ruby/concurrent/concern/observable.rb', line 91

def delete_observers
  observers.delete_observers
  self
end

#with_observer(observer = nil, func = :update, &block) ⇒ Observable

As #add_observer but can be used for chaining.

Parameters:

  • observer (Object) (defaults to: nil)

    the observer to add

  • func (Symbol) (defaults to: :update)

    the function to call on the observer during notification.

Returns:



70
71
72
73
# File 'lib/concurrent-ruby/concurrent/concern/observable.rb', line 70

def with_observer(observer = nil, func = :update, &block)
  add_observer(observer, func, &block)
  self
end