class Statsd::Mongo

Attributes

database[RW]
flush_interval[RW]
hostname[RW]
retentions[RW]

Public Class Methods

aggregate(current_bucket) click to toggle source

For each coarse granularity of retention

Look up the previous bucket 
  If there is no data, aggregate the finest Fill it if empty

TODO consider doing this inside Mongo with M/R

# File lib/statsd/mongo.rb, line 86
def self.aggregate(current_bucket)
  db = ::Mongo::Connection.new(hostname).db(database)
  retentions.sort_by! {|r| r['seconds']}
  docs = []
  fine_stats_collection = db.collection(retentions.first['name']) # Use the finest granularity for now
  retentions[1..-1].each_with_index do |retention,index|
    # fine_stats_collection = db.collection(retentions[index]['name'])
    coarse_stats_collection = db.collection(retention['name'])
    puts "Aggregating #{retention['name']}"
    step = retention['seconds']
    current_coarse_bucket = current_bucket / step * step - step
    previous_coarse_bucket = current_coarse_bucket - step
    puts "#{Time.at(previous_coarse_bucket)}..#{Time.at(current_coarse_bucket)}"
    # Look up previous bucket
    if coarse_stats_collection.find({:ts => previous_coarse_bucket}).count == 0
      # Aggregate
      print '.'
      stats_to_aggregate = fine_stats_collection.find(
        {:ts => {"$gte" => previous_coarse_bucket, "$lt" => current_coarse_bucket}})
      rows = stats_to_aggregate.to_a
      count = rows.count
      rows.group_by {|r| r["stat"] }.each_pair do |name,stats|
        case stats.first['type']
        when 'timer' 
          mean = stats.collect {|stat| stat['values']['mean'] }.inject( 0 ) { |s,x| s+x } / stats.count
          max  = stats.collect {|stat| stat['values']['max'] }.max
          min  = stats.collect {|stat| stat['values']['min'] }.min
          upper_key = stats.first['values'].keys.find{|k| k =~ /upper_/}
          max_at_threshold = stats.collect {|stat| stat['values'][upper_key] }.max
          total_stats = stats.collect {|stat| stat['values']['count'] }.inject( 0 ) { |s,x| s+x }            
          doc = { :stat => name, 
            :values => {
              :mean => mean,
              :max => max,
              :min => min,
              upper_key.to_sym => max_at_threshold,
              :count => total_stats
            },
            :type => "timer",
            :ts => previous_coarse_bucket
          }
        when 'counter'  
          doc = {:stat => name, 
            :value => stats.collect {|stat| stat['value'] }.inject( 0 ) { |s,x| s+x }, 
            :ts => previous_coarse_bucket, 
            :type => "counter"
          }
        else
          raise "unknown type #{stats.first['type']}"
        end
        docs.push(doc) 
      end
      coarse_stats_collection.insert(docs) unless docs.empty?
    end
  end
  
end
flush_stats(counters, timers) click to toggle source
# File lib/statsd/mongo.rb, line 9
def self.flush_stats(counters, timers)
  raise 'Invalid retention config' if retentions.empty?
  print "#{Time.now} Flushing #{counters.count} counters and #{timers.count} timers to MongoDB"
  stat_string = ''
  time = ::Benchmark.realtime do
    docs = []
    ts = Time.now.to_i
    num_stats = 0        
    retention = retentions.first # always write at the fineset granularity        
    ts_bucket = ts / retention['seconds'].to_i * retention['seconds'].to_i
    
    # connect to store
    db = ::Mongo::Connection.new(hostname).db(database)
    coll = db.collection(retention['name'])

    # store counters
    counters.each_pair do |key,value|
      doc = {:stat => key, :value => value, :ts => ts_bucket, :type => "counter" }
      docs.push(doc)
      counters[key] = 0

      num_stats += 1
    end

    # store timers
    timers.each_pair do |key, values|
      if (values.length > 0) 
        pct_threshold = 90
        values.sort!
        count = values.count
        min = values.first
        max = values.last

        mean = min
        max_at_threshold = max

        if (count > 1)
          # strip off the top 100-threshold
          threshold_index = (((100 - pct_threshold) / 100.0) * count).round
          values = values[0..-threshold_index]
          max_at_threshold = values.last

          # average the remaining timings
          sum = values.inject( 0 ) { |s,x| s+x }
          mean = sum / values.count
        end

        timers[key] = []
      
        # Flush Values to Store
        doc = { :stat => key, 
          :values => {
            :mean => mean,
            :max => max,
            :min => min,
            "upper_#{pct_threshold}".to_sym => max_at_threshold,
            :count => count
          },
          :type => "timer",
          :ts => ts_bucket
        }
        docs.push(doc)
      
        num_stats += 1
      end
    end
    coll.insert(docs)
    
   aggregate(ts_bucket)
  end 
  puts "complete. (#{time.round(3)}s)"
end