class Fluent::StreamInput
obsolete
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Input.new
# File lib/fluent/plugin/in_stream.rb, line 21 def initialize require 'socket' require 'yajl' super end
Public Instance Methods
run()
click to toggle source
def listen end
# File lib/fluent/plugin/in_stream.rb, line 45 def run if support_blocking_timeout? @loop.run(0.5) else @loop.run end rescue log.error "unexpected error", :error=>$!.to_s log.error_backtrace end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 35 def shutdown @loop.watchers.each {|w| w.detach } @loop.stop @lsock.close @thread.join end
start()
click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 27 def start @loop = Coolio::Loop.new @lsock = listen @loop.attach(@lsock) @thread = Thread.new(&method(:run)) @cached_unpacker = $use_msgpack_5 ? nil : MessagePack::Unpacker.new end
Protected Instance Methods
on_message(msg)
click to toggle source
message Entry {
1: long time 2: object record
}
message Forward {
1: string tag 2: list<Entry> entries
}
message PackedForward {
1: string tag 2: raw entries # msgpack stream of Entry
}
message Message {
1: string tag 2: long? time 3: object record
}
# File lib/fluent/plugin/in_stream.rb, line 82 def on_message(msg) # TODO format error tag = msg[0].to_s entries = msg[1] if entries.class == String # PackedForward es = MessagePackEventStream.new(entries, @cached_unpacker) router.emit_stream(tag, es) elsif entries.class == Array # Forward es = MultiEventStream.new entries.each {|e| record = e[1] next if record.nil? time = e[0].to_i time = (now ||= Engine.now) if time == 0 es.add(time, record) } router.emit_stream(tag, es) else # Message record = msg[2] return if record.nil? time = msg[1] time = Engine.now if time == 0 router.emit(tag, time, record) end end
support_blocking_timeout?()
click to toggle source
# File lib/fluent/plugin/in_stream.rb, line 58 def support_blocking_timeout? @loop.method(:run).arity.nonzero? end