class Fluent::DetachProcessManager
Public Class Methods
new()
click to toggle source
# File lib/fluent/process.rb, line 32 def initialize require 'drb' DRb.start_service(create_drb_uri, Broker.new) @parent_uri = DRb.uri end
Public Instance Methods
fork(delegate_object)
click to toggle source
# File lib/fluent/process.rb, line 38 def fork(delegate_object) ipr, ipw = IO.pipe # child Engine.emit_stream -> parent Engine.emit_stream opr, opw = IO.pipe # parent target.emit -> child target.emit pid = Process.fork if pid # parent process ipw.close opr.close forward_thread = process_parent(ipr, opw, pid, delegate_object) return pid, forward_thread end # child process ipr.close opw.close forward_thread = process_child(ipw, opr, delegate_object) return nil, forward_thread end
Private Instance Methods
create_drb_uri()
click to toggle source
# File lib/fluent/process.rb, line 70 def create_drb_uri "drbunix:" # TODO end
input_forward_main(ipr, pid)
click to toggle source
# File lib/fluent/process.rb, line 163 def input_forward_main(ipr, pid) read_event_stream(ipr) {|tag,es| # FIXME error handling begin Engine.emit_stream(tag, es) rescue $log.warn "failed to emit", :error=>$!.to_s, :pid=>Process.pid $log.warn_backtrace end } rescue $log.error "error on input process forwarding thread", :error=>$!.to_s, :pid=>Process.pid $log.error_backtrace raise end
new_forwarder(w, interval)
click to toggle source
# File lib/fluent/process.rb, line 212 def new_forwarder(w, interval) if interval < 0.2 # TODO interval Forwarder.new(w) else DelayedForwarder.new(w, interval) end end
output_forward_main(opr, target)
click to toggle source
def override_delegate_methods(target, child_uri)
remote = DRbObject.new_with_uri(child_uri) delegate_methods(target).each {|name| target.define_singleton_method(name) do |*args,&block| remote.send(name, *args, &block) end }
end
def delegate_methods(target)
target.methods - Object.public_instance_methods
end
# File lib/fluent/process.rb, line 147 def output_forward_main(opr, target) read_event_stream(opr) {|tag,es| # FIXME error handling begin target.emit(tag, es, NullOutputChain.instance) rescue $log.warn "failed to emit", :error=>$!.to_s, :pid=>Process.pid $log.warn_backtrace end } rescue $log.error "error on output process forwarding thread", :error=>$!.to_s, :pid=>Process.pid $log.error_backtrace raise end
process_child(ipw, opr, delegate_object)
click to toggle source
# File lib/fluent/process.rb, line 75 def process_child(ipw, opr, delegate_object) DRb.start_service(create_drb_uri, delegate_object) child_uri = DRb.uri send_header(ipw, child_uri) # override target.emit_stream to write event stream to the pipe fwd = new_forwarder(ipw, 0.5) # TODO interval Engine.define_singleton_method(:emit_stream) do |tag,es| fwd.emit(tag, es) end # read event stream from the pipe and forward to target.emit forward_thread = Thread.new(opr, delegate_object, &method(:output_forward_main)) # override global methods to call parent process override_shared_methods(@parent_uri) return forward_thread end
process_parent(ipr, opw, pid, delegate_object)
click to toggle source
# File lib/fluent/process.rb, line 113 def process_parent(ipr, opw, pid, delegate_object) child_uri = read_header(ipr) # read event stream from the pipe and forward to Engine.emit_stream forward_thread = Thread.new(ipr, pid, &method(:input_forward_main)) # note: don't override methods in parent process # because another process may fork after overriding #override_delegate_methods(delegate_object, child_uri) # return forwarder for DetachProcessMixin to # override target.emit and write event stream to the pipe fwd = new_forwarder(opw, 0.5) # TODO interval # note: override emit method on DetachProcessMixin forward_thread.define_singleton_method(:forwarder) do fwd end return forward_thread end
read_event_stream(r, &block)
click to toggle source
# File lib/fluent/process.rb, line 179 def read_event_stream(r, &block) u = MessagePack::Unpacker.new(r) cached_unpacker = $use_msgpack_5 ? nil : MessagePack::Unpacker.new begin #buf = '' #map = {} #while true # r.readpartial(64*1024, buf) # u.feed_each(buf) {|tag,ms| # if msbuf = map[tag] # msbuf << ms # else # map[tag] = ms # end # } # unless map.empty? # map.each_pair {|tag,ms| # es = MessagePackEventStream.new(ms, cached_unpacker) # block.call(tag, es) # } # map.clear # end #end u.each {|tag,ms| es = MessagePackEventStream.new(ms, cached_unpacker) block.call(tag, es) } rescue EOFError ensure r.close end end
read_header(ipr)
click to toggle source
# File lib/fluent/process.rb, line 59 def read_header(ipr) sz = ipr.read(4).unpack('N')[0] ipr.read(sz) end
send_header(ipw, data)
click to toggle source
# File lib/fluent/process.rb, line 64 def send_header(ipw, data) ipw.write [data.bytesize].pack('N') ipw.write data ipw.flush end