generic init method needed by a13g
# File lib/activemessaging/adapters/asqs.rb, line 31 def initialize cfg raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?) raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?) @access_key_id=cfg[:access_key_id] @secret_access_key=cfg[:secret_access_key] @request_expires = cfg[:requestExpires] || 10 @request_retry_count = cfg[:requestRetryCount] || 5 @aws_version = cfg[:aws_version] || '2008-01-01' @content_type = cfg[:content_type] || 'text/plain' @host = cfg[:host] || 'queue.amazonaws.com' @port = cfg[:port] || 80 @protocol = cfg[:protocol] || 'http' @poll_interval = cfg[:poll_interval] || 1 @reconnect_delay = cfg[:reconnectDelay] || 5 @aws_url="#{@protocol}://#{@host}" @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list] @reliable = cfg[:reliable].nil? ? true : cfg[:reliable] #initialize the subscriptions and queues @subscriptions = {} @queues_by_priority = {} @current_subscription = 0 queues end
# File lib/activemessaging/adapters/asqs.rb, line 58 def disconnect #it's an http request - there is no disconnect - ha! return true end
new receive respects priorities
# File lib/activemessaging/adapters/asqs.rb, line 113 def receive(options={}) message = nil only_priorities = options[:priorities] # loop through the priorities @queues_by_priority.keys.sort.each do |priority| # skip this priority if there is a list, and it is not in the list next if only_priorities && !only_priorities.include?(priority.to_i) # puts " - priority: #{priority}" # loop through queues for the priority in random order each time @queues_by_priority[priority].shuffle.each do |queue_name| # puts " - queue_name: #{queue_name}" queue = queues[queue_name] subscription = @subscriptions[queue_name] next if queue.nil? || subscription.nil? messages = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout]) if (messages && !messages.empty?) message = messages[0] end break if message end break if message end # puts " - message: #{message}" message end
# receive a single message from any of the subscribed queues # check each queue once, then sleep for poll_interval def receive
raise "No subscriptions to receive messages from." if (@subscriptions.nil? || @subscriptions.empty?) start = @current_subscription while true # puts "calling receive..." @current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0) sleep poll_interval if (@current_subscription == start) queue_name = @subscriptions.keys.sort[@current_subscription] queue = queues[queue_name] subscription = @subscriptions[queue_name] unless queue.nil? messages = retrieve_messsages queue, 1, subscription.headers[:visibility_timeout] return messages[0] unless (messages.nil? or messages.empty? or messages[0].nil?) end end
end
# File lib/activemessaging/adapters/asqs.rb, line 167 def received message, headers={} begin delete_message message rescue Object=>exception logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: " logger.error exception end end
queue_name string, body string, headers hash send a single message to a queue
# File lib/activemessaging/adapters/asqs.rb, line 93 def send queue_name, message_body, message_headers={} queue = get_or_create_queue queue_name send_messsage queue, message_body end
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues
# File lib/activemessaging/adapters/asqs.rb, line 65 def subscribe queue_name, message_headers={} # look at the existing queues, create any that are missing queue = get_or_create_queue queue_name if @subscriptions.has_key? queue.name @subscriptions[queue.name].add else @subscriptions[queue.name] = Subscription.new(queue.name, message_headers) end priority = @subscriptions[queue.name].priority @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority) @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name) end
do nothing; by not deleting the message will eventually become visible again
# File lib/activemessaging/adapters/asqs.rb, line 177 def unreceive message, headers={} return true end
queue_name string, headers hash for sqs, attempt delete the queues, won't work if not empty, that's ok
# File lib/activemessaging/adapters/asqs.rb, line 81 def unsubscribe queue_name, message_headers={} if @subscriptions[queue_name] @subscriptions[queue_name].remove if @subscriptions[queue_name].count <= 0 sub = @subscriptions.delete(queue_name) @queues_by_priority[sub.priority].delete(queue_name) end end end
# File lib/activemessaging/adapters/asqs.rb, line 308 def check_errors(response) raise "http response was nil" if (response.nil?) raise response.errors if (response && response.errors?) response end
# File lib/activemessaging/adapters/asqs.rb, line 183 def create_queue(name) validate_new_queue name response = make_request('CreateQueue', nil, {'QueueName'=>name}) add_queue(response.get_text("//QueueUrl")) unless response.nil? end
# File lib/activemessaging/adapters/asqs.rb, line 249 def delete_message message response = make_request('DeleteMessage', "#{message.queue.queue_url}", {'ReceiptHandle'=>message.receipt_handle}) end
# File lib/activemessaging/adapters/asqs.rb, line 189 def delete_queue queue validate_queue queue response = make_request('DeleteQueue', "#{queue.queue_url}") end
# File lib/activemessaging/adapters/asqs.rb, line 201 def get_queue_attributes(queue, attribute='All') validate_get_queue_attribute(attribute) params = {'AttributeName'=>attribute} response = make_request('GetQueueAttributes', "#{queue.queue_url}") attributes = {} response.each_node('/GetQueueAttributesResponse/GetQueueAttributesResult/Attribute') { |n| n = n.elements['Name'].text v = n.elements['Value'].text attributes[n] = v } if attribute != 'All' attributes[attribute] else attributes end end
I wrap this so I can move to a different client, or easily mock for testing
# File lib/activemessaging/adapters/asqs.rb, line 296 def http_request h, p, r return Net::HTTP.start(h, p){ |http| http.request(r) } end
# File lib/activemessaging/adapters/asqs.rb, line 194 def list_queues(queue_name_prefix=nil) validate_queue_name queue_name_prefix unless queue_name_prefix.nil? params = queue_name_prefix.nil? ? {} : {"QueueNamePrefix"=>queue_name_prefix} response = make_request('ListQueues', nil, params) response.nil? ? [] : response.nodes("//QueueUrl").collect{ |n| add_queue(n.text) } end
# File lib/activemessaging/adapters/asqs.rb, line 253 def make_request(action, url=nil, params = {}) # puts "make_request a=#{action} u=#{url} p=#{params}" url ||= @aws_url # Add Actions params['Action'] = action params['Version'] = @aws_version params['AWSAccessKeyId'] = @access_key_id params['Expires']= (Time.now + @request_expires).gmtime.iso8601 params['SignatureVersion'] = '1' # Sign the string sorted_params = params.sort_by { |key,value| key.downcase } string_to_sign = sorted_params.collect { |key, value| key.to_s + value.to_s }.join() digest = OpenSSL::Digest::Digest.new('sha1') hmac = OpenSSL::HMAC.digest(digest, @secret_access_key, string_to_sign) params['Signature'] = Base64.encode64(hmac).chomp # Construct request query_params = params.collect { |key, value| key + "=" + CGI.escape(value.to_s) }.join("&") # Put these together to get the request query string request_url = "#{url}?#{query_params}" # puts "request_url = #{request_url}" request = Net::HTTP::Get.new(request_url) retry_count = 0 while retry_count < @request_retry_count.to_i retry_count = retry_count + 1 # puts "make_request try retry_count=#{retry_count}" begin response = SQSResponse.new(http_request(host,port,request)) check_errors(response) return response rescue Object=>ex # puts "make_request caught #{ex}" raise ex unless reliable sleep(@reconnect_delay) end end end
# File lib/activemessaging/adapters/asqs.rb, line 237 def retrieve_messsages queue, num_messages=1, timeout=nil validate_queue queue validate_number_of_messages num_messages validate_timeout timeout if timeout params = {'MaxNumberOfMessages'=>num_messages.to_s} params['VisibilityTimeout'] = timeout.to_s if timeout response = make_request('ReceiveMessage', "#{queue.queue_url}", params) response.nodes("//Message").collect{ |n| Message.from_element n, response, queue } unless response.nil? end
in progress
# File lib/activemessaging/adapters/asqs.rb, line 230 def send_messsage queue, message validate_queue queue validate_message message response = make_request('SendMessage', queue.queue_url, {'MessageBody'=>message}) response.get_text("//MessageId") unless response.nil? end
# File lib/activemessaging/adapters/asqs.rb, line 218 def set_queue_attribute(queue, attribute, value) validate_set_queue_attribute(attribute) params = {'Attribute.Name'=>attribute, 'Attribute.Value'=>value.to_s} response = make_request('SetQueueAttributes', "#{queue.queue_url}", params) end
Generated with the Darkfish Rdoc Generator 2.