class Fluent::DetachProcessManager

Public Class Methods

new() click to toggle source
# File lib/fluent/process.rb, line 32
def initialize
  require 'drb'
  DRb.start_service(create_drb_uri, Broker.new)
  @parent_uri = DRb.uri
end

Public Instance Methods

fork(delegate_object) click to toggle source
# File lib/fluent/process.rb, line 38
def fork(delegate_object)
  ipr, ipw = IO.pipe  # child Engine.emit_stream -> parent Engine.emit_stream
  opr, opw = IO.pipe  # parent target.emit -> child target.emit

  pid = Process.fork
  if pid
    # parent process
    ipw.close
    opr.close
    forward_thread = process_parent(ipr, opw, pid, delegate_object)
    return pid, forward_thread
  end

  # child process
  ipr.close
  opw.close
  forward_thread = process_child(ipw, opr, delegate_object)
  return nil, forward_thread
end

Private Instance Methods

create_drb_uri() click to toggle source
# File lib/fluent/process.rb, line 70
def create_drb_uri
  "drbunix:"  # TODO
end
input_forward_main(ipr, pid) click to toggle source
# File lib/fluent/process.rb, line 163
def input_forward_main(ipr, pid)
  read_event_stream(ipr) {|tag,es|
    # FIXME error handling
    begin
      Engine.emit_stream(tag, es)
    rescue
      $log.warn "failed to emit", :error=>$!.to_s, :pid=>Process.pid
      $log.warn_backtrace
    end
  }
rescue
  $log.error "error on input process forwarding thread", :error=>$!.to_s, :pid=>Process.pid
  $log.error_backtrace
  raise
end
new_forwarder(w, interval) click to toggle source
# File lib/fluent/process.rb, line 212
def new_forwarder(w, interval)
  if interval < 0.2  # TODO interval
    Forwarder.new(w)
  else
    DelayedForwarder.new(w, interval)
  end
end
output_forward_main(opr, target) click to toggle source

def override_delegate_methods(target, child_uri)

remote = DRbObject.new_with_uri(child_uri)
delegate_methods(target).each {|name|
  target.define_singleton_method(name) do |*args,&block|
    remote.send(name, *args, &block)
  end
}

end

def delegate_methods(target)

target.methods - Object.public_instance_methods

end

# File lib/fluent/process.rb, line 147
def output_forward_main(opr, target)
  read_event_stream(opr) {|tag,es|
    # FIXME error handling
    begin
      target.emit(tag, es, NullOutputChain.instance)
    rescue
      $log.warn "failed to emit", :error=>$!.to_s, :pid=>Process.pid
      $log.warn_backtrace
    end
  }
rescue
  $log.error "error on output process forwarding thread", :error=>$!.to_s, :pid=>Process.pid
  $log.error_backtrace
  raise
end
override_shared_methods(parent_uri) click to toggle source
# File lib/fluent/process.rb, line 96
def override_shared_methods(parent_uri)
  broker = DRbObject.new_with_uri(parent_uri)
  shared_methods.each {|(broker_accessor,target,name)|
    remote = broker.send(broker_accessor)
    target.define_singleton_method(name) do |*args,&block|
      remote.send(name, *args, &block)
    end
  }
end
process_child(ipw, opr, delegate_object) click to toggle source
# File lib/fluent/process.rb, line 75
def process_child(ipw, opr, delegate_object)
  DRb.start_service(create_drb_uri, delegate_object)
  child_uri = DRb.uri

  send_header(ipw, child_uri)

  # override target.emit_stream to write event stream to the pipe
  fwd = new_forwarder(ipw, 0.5)  # TODO interval
  Engine.define_singleton_method(:emit_stream) do |tag,es|
    fwd.emit(tag, es)
  end

  # read event stream from the pipe and forward to target.emit
  forward_thread = Thread.new(opr, delegate_object, &method(:output_forward_main))

  # override global methods to call parent process
  override_shared_methods(@parent_uri)

  return forward_thread
end
process_parent(ipr, opw, pid, delegate_object) click to toggle source
# File lib/fluent/process.rb, line 113
def process_parent(ipr, opw, pid, delegate_object)
  child_uri = read_header(ipr)

  # read event stream from the pipe and forward to Engine.emit_stream
  forward_thread = Thread.new(ipr, pid, &method(:input_forward_main))

  # note: don't override methods in parent process
  #       because another process may fork after overriding
  #override_delegate_methods(delegate_object, child_uri)

  # return forwarder for DetachProcessMixin to
  # override target.emit and write event stream to the pipe
  fwd = new_forwarder(opw, 0.5)  # TODO interval
  # note: override emit method on DetachProcessMixin
  forward_thread.define_singleton_method(:forwarder) do
    fwd
  end

  return forward_thread
end
read_event_stream(r, &block) click to toggle source
# File lib/fluent/process.rb, line 179
def read_event_stream(r, &block)
  u = MessagePack::Unpacker.new(r)
  cached_unpacker = $use_msgpack_5 ? nil : MessagePack::Unpacker.new
  begin
    #buf = ''
    #map = {}
    #while true
    #  r.readpartial(64*1024, buf)
    #  u.feed_each(buf) {|tag,ms|
    #    if msbuf = map[tag]
    #      msbuf << ms
    #    else
    #      map[tag] = ms
    #    end
    #  }
    #  unless map.empty?
    #    map.each_pair {|tag,ms|
    #      es = MessagePackEventStream.new(ms, cached_unpacker)
    #      block.call(tag, es)
    #    }
    #    map.clear
    #  end
    #end
    u.each {|tag,ms|
      es = MessagePackEventStream.new(ms, cached_unpacker)
      block.call(tag, es)
    }
  rescue EOFError
  ensure
    r.close
  end
end
read_header(ipr) click to toggle source
# File lib/fluent/process.rb, line 59
def read_header(ipr)
  sz = ipr.read(4).unpack('N')[0]
  ipr.read(sz)
end
send_header(ipw, data) click to toggle source
# File lib/fluent/process.rb, line 64
def send_header(ipw, data)
  ipw.write [data.bytesize].pack('N')
  ipw.write data
  ipw.flush
end
shared_methods() click to toggle source
# File lib/fluent/process.rb, line 106
def shared_methods
  [
    #[:engine, Engine, :flush!],
    #[:engine, Engine, :stop],
  ]
end