Included Modules

Class/Module Index [+]

Quicksearch

Fluent::BasicBuffer

Public Class Methods

new() click to toggle source
# File lib/fluent/buffer.rb, line 119
def initialize
  super
  @parallel_pop = true
end

Public Instance Methods

clear!() click to toggle source
# File lib/fluent/buffer.rb, line 299
def clear!
  @queue.delete_if {|chunk|
    chunk.purge
    true
  }
end
configure(conf) click to toggle source
# File lib/fluent/buffer.rb, line 139
def configure(conf)
  super
end
emit(key, data, chain) click to toggle source
# File lib/fluent/buffer.rb, line 165
def emit(key, data, chain)
  key = key.to_s

  synchronize do
    top = (@map[key] ||= new_chunk(key))  # TODO generate unique chunk id

    if storable?(top, data)
      chain.next
      top << data
      return false

      ## FIXME
      #elsif data.bytesize > @buffer_chunk_limit
      #  # TODO
      #  raise BufferChunkLimitError, "received data too large"

    elsif @queue.size >= @buffer_queue_limit
      raise BufferQueueLimitError, "queue size exceeds limit"
    end

    if data.bytesize > @buffer_chunk_limit
      $log.warn "Size of the emitted data exceeds buffer_chunk_limit."
      $log.warn "This may occur problems in the output plugins ``at this server.``"
      $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit"
      $log.warn "in the forward output ``at the log forwarding server.``"
    end

    nc = new_chunk(key)  # TODO generate unique chunk id
    ok = false

    begin
      nc << data
      chain.next

      flush_trigger = false
      @queue.synchronize {
        enqueue(top)
        flush_trigger = @queue.empty?
        @queue << top
        @map[key] = nc
      }

      ok = true
      return flush_trigger
    ensure
      nc.purge unless ok
    end

  end  # synchronize
end
enable_parallel(b=true) click to toggle source
# File lib/fluent/buffer.rb, line 124
def enable_parallel(b=true)
  @parallel_pop = b
end
keys() click to toggle source
# File lib/fluent/buffer.rb, line 216
def keys
  @map.keys
end
pop(out) click to toggle source
# File lib/fluent/buffer.rb, line 261
def pop(out)
  chunk = nil
  @queue.synchronize do
    if @parallel_pop
      chunk = @queue.find {|c| c.try_mon_enter }
      return false unless chunk
    else
      chunk = @queue.first
      return false unless chunk
      return false unless chunk.try_mon_enter
    end
  end

  begin
    if !chunk.empty?
      write_chunk(chunk, out)
    end

    empty = false
    @queue.synchronize do
      @queue.delete_if {|c|
        c.object_id == chunk.object_id
      }
      empty = @queue.empty?
    end

    chunk.purge

    return !empty
  ensure
    chunk.mon_exit
  end
end
push(key) click to toggle source

def enqueue(chunk) end

# File lib/fluent/buffer.rb, line 244
def push(key)
  synchronize do
    top = @map[key]
    if !top || top.empty?
      return false
    end

    @queue.synchronize do
      enqueue(top)
      @queue << top
      @map.delete(key)
    end

    return true
  end  # synchronize
end
queue_size() click to toggle source
# File lib/fluent/buffer.rb, line 220
def queue_size
  @queue.size
end
shutdown() click to toggle source
# File lib/fluent/buffer.rb, line 148
def shutdown
  synchronize do
    @queue.synchronize do
      until @queue.empty?
        @queue.shift.close
      end
    end
    @map.each_pair {|key,chunk|
      chunk.close
    }
  end
end
start() click to toggle source
# File lib/fluent/buffer.rb, line 143
def start
  @queue, @map = resume
  @queue.extend(MonitorMixin)
end
storable?(chunk, data) click to toggle source
# File lib/fluent/buffer.rb, line 161
def storable?(chunk, data)
  chunk.size + data.bytesize <= @buffer_chunk_limit
end
total_queued_chunk_size() click to toggle source
# File lib/fluent/buffer.rb, line 224
def total_queued_chunk_size
  total = 0
  @map.each_value {|c|
    total += c.size
  }
  @queue.each {|c|
    total += c.size
  }
  total
end
write_chunk(chunk, out) click to toggle source
# File lib/fluent/buffer.rb, line 295
def write_chunk(chunk, out)
  out.write(chunk)
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.