def start_stream(options)
stop_stream
options = {
:oauth => config.slice(:consumer_key, :consumer_secret).merge(
:access_key => config[:token], :access_secret => config[:secret],
:proxy => ENV['http_proxy']
)
}.merge(options)
@stream = ::Twitter::JSONStream.connect(options)
@stream.each_item do |item|
@last_data_received_at = Time.now
item_queue << JSON.parse(item)
end
@stream.on_error do |message|
notify "error: #{message}"
end
@stream.on_reconnect do |timeout, retries|
notify "reconnecting in: #{timeout} seconds"
end
@stream.on_max_reconnects do |timeout, retries|
notify "Failed after #{retries} failed reconnects"
end
end