Parent

Class/Module Index [+]

Quicksearch

Fluent::ForwardInput

Public Class Methods

new() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 24
def initialize
  super
  require 'fluent/plugin/socket_util'
end

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 40
def configure(conf)
  super
end
listen() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 74
def listen
  log.info "listening fluent socket on #{@bind}:#{@port}"
  s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
  s.listen(@backlog) unless @backlog.nil?
  s
end
run() click to toggle source

config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen

if File.exist?(@path)
  File.unlink(@path)
end
FileUtils.mkdir_p File.dirname(@path)
log.debug "listening fluent socket on #{@path}"
Coolio::UNIXServer.new(@path, Handler, method(:on_message))

end

# File lib/fluent/plugin/in_forward.rb, line 91
def run
  if support_blocking_timeout?
    @loop.run(@blocking_timeout)
  else
    @loop.run
  end
rescue => e
  log.error "unexpected error", :error => e, :error_class => e.class
  log.error_backtrace
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 60
def shutdown
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @usock.close
  unless support_blocking_timeout?
    listen_address = (@bind == '0.0.0.0' ? '127.0.0.1' : @bind)
    # This line is for connecting listen socket to stop the event loop.
    # We should use more better approach, e.g. using pipe, fixing cool.io with timeout, etc.
    TCPSocket.open(listen_address, @port) {|sock| } # FIXME @thread.join blocks without this line
  end
  @thread.join
  @lsock.close
end
start() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 44
def start
  @loop = Coolio::Loop.new

  @lsock = listen
  @loop.attach(@lsock)

  @usock = SocketUtil.create_udp_socket(@bind)
  @usock.bind(@bind, @port)
  @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
  @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
  @loop.attach(@hbr)

  @thread = Thread.new(&method(:run))
  @cached_unpacker = $use_msgpack_5 ? nil : MessagePack::Unpacker.new
end

Protected Instance Methods

on_heartbeat_request(host, port, msg) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 270
def on_heartbeat_request(host, port, msg)
  #log.trace "heartbeat request from #{host}:#{port}"
  begin
    @usock.send "\00"", 0, host, port
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
  end
end
on_message(msg, chunk_size, source) click to toggle source

message Entry {

1: long time
2: object record

}

message Forward {

1: string tag
2: list<Entry> entries

}

message PackedForward {

1: string tag
2: raw entries  # msgpack stream of Entry

}

message Message {

1: string tag
2: long? time
3: object record

}

# File lib/fluent/plugin/in_forward.rb, line 128
def on_message(msg, chunk_size, source)
  if msg.nil?
    # for future TCP heartbeat_request
    return
  end

  # TODO format error
  tag = msg[0].to_s
  entries = msg[1]

  if @chunk_size_limit && (chunk_size > @chunk_size_limit)
    log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source, limit: @chunk_size_limit, size: chunk_size
    return
  elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit)
    log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source, limit: @chunk_size_warn_limit, size: chunk_size
  end

  if entries.class == String
    # PackedForward
    es = MessagePackEventStream.new(entries, @cached_unpacker)
    Engine.emit_stream(tag, es)

  elsif entries.class == Array
    # Forward
    es = MultiEventStream.new
    entries.each {|e|
      record = e[1]
      next if record.nil?
      time = e[0].to_i
      time = (now ||= Engine.now) if time == 0
      es.add(time, record)
    }
    Engine.emit_stream(tag, es)

  else
    # Message
    record = msg[2]
    return if record.nil?
    time = msg[1]
    time = Engine.now if time == 0
    Engine.emit(tag, time, record)
  end
end
support_blocking_timeout?() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 104
def support_blocking_timeout?
  @loop.method(:run).arity.nonzero?
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.