class ActiveMessaging::Adapters::Amqp::Connection

Constants

DEFAULT_QUEUE_CONFIG
SERVER_RETRY_MAX_ATTEMPTS

Public Class Methods

new(config = {}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 39
def initialize config = {}
  @connect_options = {
    :user  => config[:user]  || 'guest',
    :pass  => config[:pass]  || 'guest',
    :host  => config[:host]  || 'localhost',
    :port  => config[:port]  || (config[:ssl] ? 5671 : 5672),
    :vhost => config[:vhost] || nil,
    :ssl   => config[:ssl]   || false,
    :ssl_verify => config[:ssl_verify] || OpenSSL::SSL::VERIFY_PEER,
  }
  
  @debug = config[:debug].to_i rescue 0
  
  Carrot.logging = true unless @debug < 5
  
  @auto_generated_queue = false
  unless config[:queue_name]
    @queue_name = Digest::MD5.hexdigest Time.now.to_s
    @auto_generated_queue = true
  else
    @queue_name = config[:queue_name]
  end

  @queue_config = DEFAULT_QUEUE_CONFIG
  unless @auto_generated_queue
    @queue_config.merge!({
      :durable     => !!config[:queue_durable],
      :auto_delete => !!config[:queue_auto_delete],
      :exclusive   => !!config[:queue_exclusive]
    })
  end
end

Public Instance Methods

disconnect(headers={}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 140
def disconnect(headers={})
  @client.stop
end
receive(options={}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 82
def receive(options={})
  while true 
    message = queue.pop(:ack => true)
    unless message.nil?
      message = AmqpMessage.decode(message).stamp_received! unless message.nil?
      message.delivery_tag = queue.delivery_tag
      puts "RECEIVE: #{message.inspect}" if @debug 
      return message
    end
    sleep 0.2
  end
end
received(message, headers = {}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 72
def received message, headers = {}
  puts "Received Message - ACK'ing with delivery_tag '#{message.headers[:delivery_tag]}'" if @debug > 0
  client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Ack.new(:delivery_tag => message.headers[:delivery_tag]))
end
send(queue_name, data, headers = {}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 95
def send queue_name, data, headers = {}
  headers[:routing_key] ||= queue_name
  message = AmqpMessage.new({:headers => headers, :data => data}, queue_name)
  
  if @debug > 0
    puts "Sending the following message: "; pp message
  end
  
  begin
    exchange(*exchange_info(headers)).publish(message.stamp_sent!.encode, :key => headers[:routing_key])
  rescue ::Carrot::AMQP::Server::ServerDown
    retry_attempts = retry_attempts.nil? ? 1 : retry_attempts + 1
    sleep(retry_attempts * 0.25)
    retry unless retry_attempts >= SERVER_RETRY_MAX_ATTEMPTS
    raise e
  end
end
subscribe(queue_name, headers = {}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 113
def subscribe queue_name, headers = {}, subId = nil
  if @debug > 1
    puts "Begin Subscribe Request:"
    puts "    Queue Name: #{queue_name.inspect}"
    puts "       Headers: #{headers.inspect}"
    puts "         subId: #{subId.inspect}"
    puts "     EXCH INFO: #{exchange_info(headers).inspect}"
    puts "End Subscribe Request."
  end
  
  routing_key = headers[:routing_key] || queue_name
  queue.bind(exchange(*exchange_info(headers)), :key => routing_key)
end
unreceive(message, headers = {}) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 77
def unreceive message, headers = {}
  puts "Un-Receiving Message - REJECTing with delivery_tag '#{message.headers[:delivery_tag]}'" if @debug > 0
  client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Reject.new(:delivery_tag => message.headers[:delivery_tag]))
end
unsubscribe(queue_name, headers={}, subId=nil) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 127
def unsubscribe(queue_name, headers={}, subId=nil)
  if @debug > 1
    puts "Begin UNsubscribe Request:"
    puts "    Queue Name: #{queue_name.inspect}"
    puts "    Headers:    #{headers.inspect}"
    puts "    subId:      #{subId.inspect}"
    puts "End UNsubscribe Request."
  end
  
  routing_key = headers[:routing_key] || queue_name
  queue.unbind(exchange(*exchange_info(headers)), :key => routing_key)
end

Private Instance Methods

client() click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 167
def client
  return @client unless @client.nil?
  puts "Client [amqp]: #{@connect_options.inspect}" if @debug > 0
  @client ||= Carrot.new(@connect_options)
end
exchange(type, name, *args) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 150
def exchange type, name, *args
  type = type.to_sym rescue nil
  unless [:topic, :fanout, :direct].include? type
    raise InvalidExchangeType, "The carrot library does not support an exchange type of '#{type.inspect}'"
  end

  name ||= "amq.#{type}"
  puts "Exchange [#{type}::#{name}]: #{args.inspect}" if @debug > 3
  (@exchanges||={})[name] ||= ::Carrot::AMQP::Exchange.new client, type, name, *args
end
exchange_info(headers) click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 146
def exchange_info headers
  [ (headers[:exchange_type].to_sym rescue nil) || :direct, headers[:exchange_name] || nil]
end
queue() click to toggle source
# File lib/activemessaging/adapters/amqp.rb, line 161
def queue
  return @queue unless @queue.nil?
  puts "Queue [#{@queue_name}]: #{@queue_config.inspect}" if @debug > 0
  @queue ||= client.queue(@queue_name, @queue_config)
end