class ActiveMessaging::Adapters::ReliableMsgConnection
Constants
- QUEUE_PARAMS
- THREAD_OLD_TXS
- TOPIC_PARAMS
Attributes
current_subscription[RW]
configurable params
destinations[RW]
configurable params
poll_interval[RW]
configurable params
subscriptions[RW]
configurable params
tx_timeout[RW]
configurable params
Public Class Methods
new(cfg)
click to toggle source
generic init method needed by a13g
# File lib/activemessaging/adapters/reliable_msg.rb, line 33 def initialize cfg @poll_interval = cfg[:poll_interval] || 1 @reliable = cfg[:reliable] || true @tx_timeout = cfg[:tx_timeout] || ::ReliableMsg::Client::DEFAULT_TX_TIMEOUT @subscriptions = {} @destinations = {} @current_subscription = 0 end
Public Instance Methods
disconnect()
click to toggle source
called to cleanly get rid of connection
# File lib/activemessaging/adapters/reliable_msg.rb, line 44 def disconnect nil end
get_or_create_destination(destination_name, message_headers={})
click to toggle source
# File lib/activemessaging/adapters/reliable_msg.rb, line 80 def get_or_create_destination destination_name, message_headers={} return destinations[destination_name] if destinations.has_key? destination_name dd = /^\/(queue|topic)\/(.*)$/.match(destination_name) rm_class = dd[1].titleize message_headers.delete("id") dest_headers = message_headers.reject {|k,v| rm_class == 'Queue' ? !QUEUE_PARAMS.include?(k) : !TOPIC_PARAMS.include?(k)} rm_dest = "ReliableMsg::#{rm_class}".constantize.new(dd[2], dest_headers) destinations[destination_name] = rm_dest end
receive(options={})
click to toggle source
receive a single message from any of the subscribed destinations check each destination once, then sleep for #poll_interval
# File lib/activemessaging/adapters/reliable_msg.rb, line 92 def receive(options={}) raise "No subscriptions to receive messages from." if (subscriptions.nil? || subscriptions.empty?) start = current_subscription while true self.current_subscription = ((current_subscription < subscriptions.length-1) ? current_subscription + 1 : 0) sleep poll_interval if (current_subscription == start) destination_name = subscriptions.keys.sort[current_subscription] destination = destinations[destination_name] unless destination.nil? # from the way we use this, assume this is the start of a transaction, # there should be no current transaction ctx = Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] raise "There should not be an existing reliable-msg transaction. #{ctx.inspect}" if ctx # start a new transaction @tx = {:qm=>destination.queue_manager} @tx[:tid] = @tx[:qm].begin @tx_timeout Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = @tx begin # now call a get on the destination - it will use the transaction #the commit or the abort will occur in the received or unreceive methods reliable_msg = destination.get subscriptions[destination_name].headers[:selector] @tx[:qm].commit(@tx[:tid]) if reliable_msg.nil? rescue Object=>err #abort the transaction on error @tx[:qm].abort(@tx[:tid]) raise err unless reliable puts "receive failed, will retry in #{@poll_interval} seconds" sleep poll_interval end return Message.new(reliable_msg.object, reliable_msg.id, reliable_msg.headers, destination_name, @tx) if reliable_msg Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = nil end end end
received(message, headers={})
click to toggle source
called after a message is successfully received and processed
# File lib/activemessaging/adapters/reliable_msg.rb, line 134 def received message, headers={} begin message.transaction[:qm].commit(message.transaction[:tid]) rescue Object=>ex puts "received failed: #{ex.message}" ensure Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = nil end end
send(destination_name, message_body, message_headers={})
click to toggle source
destination_name string, body string, headers hash send a single message to a destination
# File lib/activemessaging/adapters/reliable_msg.rb, line 69 def send destination_name, message_body, message_headers={} dest = get_or_create_destination(destination_name) begin dest.put message_body, message_headers rescue Object=>err raise err unless reliable puts "send failed, will retry in #{@poll_interval} seconds" sleep @poll_interval end end
subscribe(destination_name, message_headers={})
click to toggle source
destination_name string, headers hash subscribe to listen on a destination use '/destination-type/name' convetion, like stomp
# File lib/activemessaging/adapters/reliable_msg.rb, line 51 def subscribe destination_name, message_headers={} get_or_create_destination(destination_name, message_headers) if subscriptions.has_key? destination_name subscriptions[destination_name].add else subscriptions[destination_name] = Subscription.new(destination_name, message_headers) end end
unreceive(message, headers={})
click to toggle source
called after a message is successfully received and processed
# File lib/activemessaging/adapters/reliable_msg.rb, line 146 def unreceive message, headers={} begin message.transaction[:qm].abort(message.transaction[:tid]) rescue Object=>ex puts "unreceive failed: #{ex.message}" ensure Thread.current[::ReliableMsg::Client::THREAD_CURRENT_TX] = nil end end
unsubscribe(destination_name, message_headers={})
click to toggle source
destination_name string, headers hash unsubscribe to listen on a destination
# File lib/activemessaging/adapters/reliable_msg.rb, line 62 def unsubscribe destination_name, message_headers={} subscriptions[destination_name].remove subscriptions.delete(destination_name) if subscriptions[destination_name].count <= 0 end