Class: Concurrent::Actor::Utils::Broadcast

Inherits:
RestartingContext show all
Defined in:
lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb

Overview

Allows to build pub/sub easily.

Examples:

news

news_channel = Concurrent::Actor::Utils::Broadcast.spawn :news

2.times do |i|
  Concurrent::Actor::Utils::AdHoc.spawn "listener-#{i}" do
    news_channel << :subscribe
    -> message { puts message }
  end
end

news_channel << 'Ruby rocks!'
# prints: 'Ruby rocks!' twice

Instance Method Summary collapse

Constructor Details

#initializeBroadcast

Returns a new instance of Broadcast.



22
23
24
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb', line 22

def initialize
  @receivers = Set.new
end

Instance Method Details

#filtered_receiversundocumented

override to define different behaviour, filtering etc



45
46
47
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb', line 45

def filtered_receivers
  @receivers
end

#on_message(message) ⇒ undocumented



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb', line 26

def on_message(message)
  case message
  when :subscribe
    if envelope.sender.is_a? Reference
      @receivers.add envelope.sender
      true
    else
      false
    end
  when :unsubscribe
    !!@receivers.delete(envelope.sender)
  when :subscribed?
    @receivers.include? envelope.sender
  else
    filtered_receivers.each { |r| r << message }
  end
end