class Celluloid::Supervision::Container::Pool

Manages a fixed-size pool of actors Delegates work (i.e. methods) and supervises actors Don't use this class directly. Instead use MyKlass.pool

Attributes

actors[R]
size[R]

Public Class Methods

new(options={}) click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 15
def initialize(options={})
  @idle = []
  @busy = []
  @klass = options[:actors]
  @actors = Set.new
  @mutex = Mutex.new

  @size = options[:size] || [Celluloid.cores || 2, 2].max
  @args = options[:args] ? Array(options[:args]) : []

  # Do this last since it can suspend and/or crash
  @idle = @size.times.map { __spawn_actor__ }
end
pooling_options(config={}, mixins={}) click to toggle source
# File lib/celluloid/supervision/container/behavior/pool.rb, line 50
def pooling_options(config={}, mixins={})
  combined = {type: Celluloid::Supervision::Container::Pool}.merge(config).merge(mixins)
  combined[:args] = [[:block, :actors, :size, :args].inject({}) { |e, p| e[p] = combined.delete(p) if combined[p]; e }]
  combined
end

Public Instance Methods

__busy() click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 121
def __busy
  @mutex.synchronize { @busy }
end
__busy?(actor) click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 117
def __busy?(actor)
  @mutex.synchronize { @busy.include? actor }
end
__crash_handler__(actor, reason) click to toggle source

Spawn a new worker for every crashed one

# File lib/celluloid/supervision/container/pool.rb, line 160
def __crash_handler__(actor, reason)
  @busy.delete actor
  @idle.delete actor
  @actors.delete actor
  return unless reason
  @idle << __spawn_actor__
  signal :respawn_complete
end
__idle() click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 125
def __idle
  @mutex.synchronize { @idle }
end
__idle?(actor) click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 113
def __idle?(actor)
  @mutex.synchronize { @idle.include? actor }
end
__provision_actor__() click to toggle source

Provision a new actor ( take it out of idle, move it into busy, and avail it )

# File lib/celluloid/supervision/container/pool.rb, line 144
def __provision_actor__
  Task.current.guard_warnings = true
  @mutex.synchronize do
    while @idle.empty?
      # Wait for responses from one of the busy actors
      response = exclusive { receive { |msg| msg.is_a?(Internals::Response) } }
      Thread.current[:celluloid_actor].handle_message(response)
    end

    actor = @idle.shift
    @busy << actor
    actor
  end
end
__shutdown__() click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 29
def __shutdown__
  return unless defined?(@actors) && @actors
  # TODO: these can be nil if initializer crashes
  terminators = @actors.map do |actor|
    begin
      actor.future(:terminate)
    rescue DeadActorError
    end
  end

  terminators.compact.each { |terminator| terminator.value rescue nil }
end
__spawn_actor__() click to toggle source

Instantiate an actor, add it to the actor Set, and return it

# File lib/celluloid/supervision/container/pool.rb, line 136
def __spawn_actor__
  actor = @klass.new_link(*@args)
  @mutex.synchronize { @actors.add(actor) }
  @actors.add(actor)
  actor
end
__state(actor) click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 129
def __state(actor)
  return :busy if __busy?(actor)
  return :idle if __idle?(actor)
  :missing
end
_send_(method, *args, &block) click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 42
def _send_(method, *args, &block)
  actor = __provision_actor__
  begin
    actor._send_ method, *args, &block
  rescue DeadActorError # if we get a dead actor out of the pool
    wait :respawn_complete
    actor = __provision_actor__
    retry
  rescue ::Exception => ex
    abort ex
  ensure
    if actor.alive?
      @idle << actor
      @busy.delete actor

      # Broadcast that actor is done processing and
      # waiting idle
      signal :actor_idle
    end
  end
end
busy_size() click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 105
def busy_size
  @mutex.synchronize { @busy.length }
end
idle_size() click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 109
def idle_size
  @mutex.synchronize { @idle.length }
end
inspect() click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 84
def inspect
  _send_ :inspect
end
is_a?(klass) click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 68
def is_a?(klass)
  _send_ :is_a?, klass
end
kind_of?(klass) click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 72
def kind_of?(klass)
  _send_ :kind_of?, klass
end
method(meth) click to toggle source

Since Pool allocates worker objects only just before calling them, we can still help Celluloid::Call detect passing invalid parameters to async methods by checking for those methods on the worker class

Calls superclass method
# File lib/celluloid/supervision/container/pool.rb, line 202
def method(meth)
  super
rescue NameError
  @klass.instance_method(meth.to_sym)
end
method_missing(method, *args, &block) click to toggle source
Calls superclass method
# File lib/celluloid/supervision/container/pool.rb, line 191
def method_missing(method, *args, &block)
  if respond_to?(method)
    _send_ method, *args, &block
  else
    super
  end
end
methods(include_ancestors = true) click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 76
def methods(include_ancestors = true)
  _send_ :methods, include_ancestors
end
name() click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 64
def name
  _send_ @mailbox, :name
end
respond_to?(meth, include_private = false) click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 169
def respond_to?(meth, include_private = false)
  # NOTE: use method() here since this class
  # shouldn't be used directly, and method() is less
  # likely to be "reimplemented" inconsistently
  # with other Object.*method* methods.

  found = method(meth)
  if include_private
    found ? true : false
  else
    if found.is_a?(UnboundMethod)
      found.owner.public_instance_methods.include?(meth) ||
        found.owner.protected_instance_methods.include?(meth)
    else
      found.receiver.public_methods.include?(meth) ||
        found.receiver.protected_methods.include?(meth)
    end
  end
rescue NameError
  false
end
size=(new_size) click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 88
def size=(new_size)
  new_size = [0, new_size].max
  if new_size > size
    delta = new_size - size
    delta.times { @idle << __spawn_actor__ }
  else
    (size - new_size).times do
      actor = __provision_actor__
      unlink actor
      @busy.delete actor
      @actors.delete actor
      actor.terminate
    end
  end
  @size = new_size
end
to_s() click to toggle source
# File lib/celluloid/supervision/container/pool.rb, line 80
def to_s
  _send_ :to_s
end