class ActiveMessaging::Adapters::Amqp::Connection
Constants
- DEFAULT_QUEUE_CONFIG
- SERVER_RETRY_MAX_ATTEMPTS
Public Class Methods
new(config = {})
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 39 def initialize config = {} @connect_options = { :user => config[:user] || 'guest', :pass => config[:pass] || 'guest', :host => config[:host] || 'localhost', :port => config[:port] || (config[:ssl] ? 5671 : 5672), :vhost => config[:vhost] || nil, :ssl => config[:ssl] || false, :ssl_verify => config[:ssl_verify] || OpenSSL::SSL::VERIFY_PEER, } @debug = config[:debug].to_i rescue 0 Carrot.logging = true unless @debug < 5 @auto_generated_queue = false unless config[:queue_name] @queue_name = Digest::MD5.hexdigest Time.now.to_s @auto_generated_queue = true else @queue_name = config[:queue_name] end @queue_config = DEFAULT_QUEUE_CONFIG unless @auto_generated_queue @queue_config.merge!({ :durable => !!config[:queue_durable], :auto_delete => !!config[:queue_auto_delete], :exclusive => !!config[:queue_exclusive] }) end end
Public Instance Methods
disconnect(headers={})
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 140 def disconnect(headers={}) @client.stop end
receive(options={})
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 82 def receive(options={}) while true message = queue.pop(:ack => true) unless message.nil? message = AmqpMessage.decode(message).stamp_received! unless message.nil? message.delivery_tag = queue.delivery_tag puts "RECEIVE: #{message.inspect}" if @debug return message end sleep 0.2 end end
received(message, headers = {})
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 72 def received message, headers = {} puts "Received Message - ACK'ing with delivery_tag '#{message.headers[:delivery_tag]}'" if @debug > 0 client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Ack.new(:delivery_tag => message.headers[:delivery_tag])) end
send(queue_name, data, headers = {})
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 95 def send queue_name, data, headers = {} headers[:routing_key] ||= queue_name message = AmqpMessage.new({:headers => headers, :data => data}, queue_name) if @debug > 0 puts "Sending the following message: "; pp message end begin exchange(*exchange_info(headers)).publish(message.stamp_sent!.encode, :key => headers[:routing_key]) rescue ::Carrot::AMQP::Server::ServerDown retry_attempts = retry_attempts.nil? ? 1 : retry_attempts + 1 sleep(retry_attempts * 0.25) retry unless retry_attempts >= SERVER_RETRY_MAX_ATTEMPTS raise e end end
subscribe(queue_name, headers = {})
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 113 def subscribe queue_name, headers = {}, subId = nil if @debug > 1 puts "Begin Subscribe Request:" puts " Queue Name: #{queue_name.inspect}" puts " Headers: #{headers.inspect}" puts " subId: #{subId.inspect}" puts " EXCH INFO: #{exchange_info(headers).inspect}" puts "End Subscribe Request." end routing_key = headers[:routing_key] || queue_name queue.bind(exchange(*exchange_info(headers)), :key => routing_key) end
unreceive(message, headers = {})
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 77 def unreceive message, headers = {} puts "Un-Receiving Message - REJECTing with delivery_tag '#{message.headers[:delivery_tag]}'" if @debug > 0 client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Reject.new(:delivery_tag => message.headers[:delivery_tag])) end
unsubscribe(queue_name, headers={}, subId=nil)
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 127 def unsubscribe(queue_name, headers={}, subId=nil) if @debug > 1 puts "Begin UNsubscribe Request:" puts " Queue Name: #{queue_name.inspect}" puts " Headers: #{headers.inspect}" puts " subId: #{subId.inspect}" puts "End UNsubscribe Request." end routing_key = headers[:routing_key] || queue_name queue.unbind(exchange(*exchange_info(headers)), :key => routing_key) end
Private Instance Methods
client()
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 167 def client return @client unless @client.nil? puts "Client [amqp]: #{@connect_options.inspect}" if @debug > 0 @client ||= Carrot.new(@connect_options) end
exchange(type, name, *args)
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 150 def exchange type, name, *args type = type.to_sym rescue nil unless [:topic, :fanout, :direct].include? type raise InvalidExchangeType, "The carrot library does not support an exchange type of '#{type.inspect}'" end name ||= "amq.#{type}" puts "Exchange [#{type}::#{name}]: #{args.inspect}" if @debug > 3 (@exchanges||={})[name] ||= ::Carrot::AMQP::Exchange.new client, type, name, *args end
exchange_info(headers)
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 146 def exchange_info headers [ (headers[:exchange_type].to_sym rescue nil) || :direct, headers[:exchange_name] || nil] end
queue()
click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 161 def queue return @queue unless @queue.nil? puts "Queue [#{@queue_name}]: #{@queue_config.inspect}" if @debug > 0 @queue ||= client.queue(@queue_name, @queue_config) end