class Fluent::StreamInput::Handler
Public Class Methods
new(io, log, on_message)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_stream.rb, line 116 def initialize(io, log, on_message) super(io) if io.is_a?(TCPSocket) opt = [1, @timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) end @on_message = on_message @log = log @log.trace { remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) rescue nil "accepted fluent socket from '#{remote_addr}:#{remote_port}': object_id=#{self.object_id}" } end
Public Instance Methods
on_close()
click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 166 def on_close @log.trace { "closed fluent socket object_id=#{self.object_id}" } end
on_connect()
click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 130 def on_connect end
on_read(data)
click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 133 def on_read(data) first = data[0] if first == '{' || first == '[' m = method(:on_read_json) @y = Yajl::Parser.new @y.on_parse_complete = @on_message else m = method(:on_read_msgpack) @u = MessagePack::Unpacker.new end (class << self; self; end).module_eval do define_method(:on_read, m) end m.call(data) end
on_read_json(data)
click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 150 def on_read_json(data) @y << data rescue @log.error "unexpected error", :error=>$!.to_s @log.error_backtrace close end
on_read_msgpack(data)
click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 158 def on_read_msgpack(data) @u.feed_each(data, &@on_message) rescue @log.error "unexpected error", :error=>$!.to_s @log.error_backtrace close end