class Fluent::ExecInput
Constants
- SUPPORTED_FORMAT
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Input.new
# File lib/fluent/plugin/in_exec.rb, line 22 def initialize super require 'fluent/plugin/exec_util' require 'fluent/timezone' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Input#configure
# File lib/fluent/plugin/in_exec.rb, line 49 def configure(conf) super if localtime = conf['localtime'] @localtime = true elsif utc = conf['utc'] @localtime = false end if conf['timezone'] @timezone = conf['timezone'] Fluent::Timezone.validate!(@timezone) end if !@tag && !@tag_key raise ConfigError, "'tag' or 'tag_key' option is required on exec input" end if @time_key if @time_format f = @time_format @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i } else @time_parse_proc = Proc.new {|str| str.to_i } end end case @format when :tsv if @keys.empty? raise ConfigError, "keys option is required on exec input for tsv format" end @parser = ExecUtil::TSVParser.new(@keys, method(:on_message)) when :json @parser = ExecUtil::JSONParser.new(method(:on_message)) when :msgpack @parser = ExecUtil::MessagePackParser.new(method(:on_message)) end end
run()
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 121 def run @parser.call(@io) end
run_periodic()
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 125 def run_periodic until @finished begin sleep @run_interval io = IO.popen(@command, "r") @parser.call(io) Process.waitpid(io.pid) rescue log.error "exec failed to run or shutdown child process", :error => $!.to_s, :error_class => $!.class.to_s log.warn_backtrace $!.backtrace end end end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 100 def shutdown if @run_interval @finished = true @thread.join else begin Process.kill(:TERM, @pid) rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM end if @thread.join(60) # TODO wait time return end begin Process.kill(:KILL, @pid) rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM end @thread.join end end
start()
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 89 def start if @run_interval @finished = false @thread = Thread.new(&method(:run_periodic)) else @io = IO.popen(@command, "r") @pid = @io.pid @thread = Thread.new(&method(:run)) end end
Private Instance Methods
on_message(record)
click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 141 def on_message(record) if val = record.delete(@tag_key) tag = val else tag = @tag end if val = record.delete(@time_key) time = @time_parse_proc.call(val) else time = Engine.now end router.emit(tag, time, record) rescue => e log.error "exec failed to emit", :error => e.to_s, :error_class => e.class.to_s, :tag => tag, :record => Yajl.dump(record) end