class Fluent::Logger::FluentLogger

Constants

BUFFER_LIMIT
RECONNECT_WAIT
RECONNECT_WAIT_INCR_RATE
RECONNECT_WAIT_MAX
RECONNECT_WAIT_MAX_COUNT

Attributes

last_error[R]
limit[RW]
log_reconnect_error_threshold[RW]
logger[RW]

Public Class Methods

new(tag_prefix = nil, *args) click to toggle source
Calls superclass method
# File lib/fluent/logger/fluent_logger.rb, line 37
def initialize(tag_prefix = nil, *args)
  super()

  options = {
    :host => 'localhost',
    :port => 24224
  }

  case args.first
  when String, Symbol
    # backward compatible
    options[:host] = args[0]
    options[:port] = args[1] if args[1]
  when Hash
    options.update args.first
  end

  @tag_prefix = tag_prefix
  @host = options[:host]
  @port = options[:port]

  @mon = Monitor.new
  @pending = nil
  @connect_error_history = []

  @limit = options[:buffer_limit] || BUFFER_LIMIT
  @log_reconnect_error_threshold = options[:log_reconnect_error_threshold] ||  RECONNECT_WAIT_MAX_COUNT

  @buffer_overflow_handler = options[:buffer_overflow_handler]

  if logger = options[:logger]
    @logger = logger
  else
    @logger = ::Logger.new(STDERR)
    if options[:debug]
      @logger.level = ::Logger::DEBUG
    else
      @logger.level = ::Logger::INFO
    end
  end

  @last_error = {}

  begin
    connect!
  rescue => e
    set_last_error(e)
    @logger.error "Failed to connect fluentd: #{$!}"
    @logger.error "Connection will be retried."
  end

  at_exit { close }
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 104
def close
  @mon.synchronize {
    if @pending
      begin
        send_data(@pending)
      rescue => e
        set_last_error(e)
        @logger.error("FluentLogger: Can't send logs to #{@host}:#{@port}: #{$!}")
        call_buffer_overflow_handler(@pending)
      end
    end
    @con.close if connect?
    @con = nil
    @pending = nil
  }
  self
end
connect?() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 122
def connect?
  @con && !@con.closed?
end
post_with_time(tag, map, time) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 98
def post_with_time(tag, map, time)
  @logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug?
  tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
  write [tag, time.to_i, map]
end

Private Instance Methods

call_buffer_overflow_handler(pending) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 224
def call_buffer_overflow_handler(pending)
  if @buffer_overflow_handler
    @buffer_overflow_handler.call(pending)
  end
rescue Exception => e
  @logger.error("FluentLogger: Can't call buffer overflow handler: #{$!}")
end
connect!() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 205
def connect!
  @con = TCPSocket.new(@host, @port)
  @con.sync = true
  @connect_error_history.clear
  @logged_reconnect_error = false
rescue => e
  @connect_error_history << Time.now.to_i
  if @connect_error_history.size > RECONNECT_WAIT_MAX_COUNT
    @connect_error_history.shift
  end

  if @connect_error_history.size >= @log_reconnect_error_threshold && !@logged_reconnect_error
    log_reconnect_error
    @logged_reconnect_error = true
  end

  raise e
end
log_reconnect_error() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 232
def log_reconnect_error
  @logger.error("FluentLogger: Can't connect to #{@host}:#{@port}(#{@connect_error_history.size} retried): #{$!}")
end
send_data(data) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 184
def send_data(data)
  unless connect?
    connect!
  end
  @con.write data
  #while true
  #  puts "sending #{data.length} bytes"
  #  if data.length > 32*1024
  #    n = @con.syswrite(data[0..32*1024])
  #  else
  #    n = @con.syswrite(data)
  #  end
  #  puts "sent #{n}"
  #  if n >= data.bytesize
  #    break
  #  end
  #  data = data[n..-1]
  #end
  true
end
set_last_error(e) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 236
def set_last_error(e)
  # TODO: Check non GVL env
  @last_error[Thread.current.object_id] = e
end
suppress_sec() click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 135
def suppress_sec
  if (sz = @connect_error_history.size) < RECONNECT_WAIT_MAX_COUNT
    RECONNECT_WAIT * (RECONNECT_WAIT_INCR_RATE ** (sz - 1))
  else
    RECONNECT_WAIT_MAX
  end
end
to_msgpack(msg) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 127
def to_msgpack(msg)
  begin
    msg.to_msgpack
  rescue NoMethodError
    JSON.parse(JSON.generate(msg)).to_msgpack
  end
end
write(msg) click to toggle source
# File lib/fluent/logger/fluent_logger.rb, line 143
def write(msg)
  begin
    data = to_msgpack(msg)
  rescue => e
    set_last_error(e)
    @logger.error("FluentLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}")
    return false
  end

  @mon.synchronize {
    if @pending
      @pending << data
    else
      @pending = data
    end

    # suppress reconnection burst
    if !@connect_error_history.empty? && @pending.bytesize <= @limit
      if Time.now.to_i - @connect_error_history.last < suppress_sec
        return false
      end
    end

    begin
      send_data(@pending)
      @pending = nil
      true
    rescue => e
      set_last_error(e)
      if @pending.bytesize > @limit
        @logger.error("FluentLogger: Can't send logs to #{@host}:#{@port}: #{$!}")
        call_buffer_overflow_handler(@pending)
        @pending = nil
      end
      @con.close if connect?
      @con = nil
      false
    end
  }
end