Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.
Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.
A new Client object can be initialized using three forms:
Hash (this is the recommended Client initialization method):
hash = { :hosts => [ {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false}, {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false} ], :reliable => true, :initial_reconnect_delay => 0.01, :max_reconnect_delay => 30.0, :use_exponential_back_off => true, :back_off_multiplier => 2, :max_reconnect_attempts => 0, :randomize => false, :connect_timeout => 0, :connect_headers => {}, :parse_timeout => 5, :logger => nil, :dmh => false, :closed_check => true, :hbser => false, :stompconn => false, :usecrlf => false, :max_hbread_fails => 0, :max_hbrlck_fails => 0, :fast_hbs_adjust => 0.0, :connread_timeout => 0, :tcp_nodelay => true, :start_timeout => 10, } e.g. c = Stomp::Client.new(hash)
Positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://login:passcode@host:port stomp://login:passcode@host.domain.tld:port e.g. c = Stomp::Client.new(urlstring)
# File lib/stomp/client.rb, line 77 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) parse_hash_params(login) || parse_stomp_url(login) || parse_failover_url(login) || parse_positional_params(login, passcode, host, port, reliable) @logger = @parameters[:logger] ||= Stomp::NullLogger.new @start_timeout = @parameters[:start_timeout] || 10.0 check_arguments!() begin timeout(@start_timeout) { create_error_handler create_connection(autoflush) start_listeners() } rescue TimeoutError ex = Stomp::Error::StartTimeoutException.new(@start_timeout) raise ex end end
open is syntactic sugar for ‘Client.new’, see ‘initialize’ for usage.
# File lib/stomp/client.rb, line 123 def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) Client.new(login, passcode, host, port, reliable) end
Abort aborts work in a transaction by name.
# File lib/stomp/client.rb, line 139 def abort(name, headers = {}) @connection.abort(name, headers) # replay any ack'd messages in this transaction replay_list = @replay_messages_by_txn[name] if replay_list replay_list.each do |message| find_listener(message) # find_listener also calls the listener end end end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,{:ack => ‘client’}). Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).
# File lib/stomp/client.rb, line 184 def ack(message, headers = {}) txn_id = headers[:transaction] if txn_id # lets keep around messages ack'd in this transaction in case we rollback replay_list = @replay_messages_by_txn[txn_id] if replay_list.nil? replay_list = [] @replay_messages_by_txn[txn_id] = replay_list end replay_list << message end if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end context = ack_context_for(message, headers) @connection.ack context[:message_id], context[:headers] end
# File lib/stomp/client.rb, line 212 def ack_context_for(message, headers) id = case protocol when Stomp::SPL_12 'ack' when Stomp::SPL_11 headers.merge!(:subscription => message.headers['subscription']) 'message-id' else 'message-id' end {:message_id => message.headers[id], :headers => headers} end
autoflush returns the current connection’s autoflush setting.
# File lib/stomp/client.rb, line 336 def autoflush() @connection.autoflush() end
autoflush= sets the current connection’s autoflush setting.
# File lib/stomp/client.rb, line 331 def autoflush=(af) @connection.autoflush = af end
Begin starts work in a a transaction by name.
# File lib/stomp/client.rb, line 134 def begin(name, headers = {}) @connection.begin(name, headers) end
close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.
# File lib/stomp/client.rb, line 268 def close(headers={}) @listener_thread.exit @connection.disconnect(headers) end
close? tests if this client connection is closed.
# File lib/stomp/client.rb, line 257 def closed?() @connection.closed?() end
Commit commits work in a transaction by name.
# File lib/stomp/client.rb, line 152 def commit(name, headers = {}) txn_id = headers[:transaction] @replay_messages_by_txn.delete(txn_id) @connection.commit(name, headers) end
Return the broker’s CONNECTED frame to the client. Misnamed.
# File lib/stomp/client.rb, line 242 def connection_frame() @connection.connection_frame end
# File lib/stomp/client.rb, line 99 def create_error_handler client_thread = Thread.current @error_listener = lambda do |error| exception = case error.body when /ResourceAllocationException/ Stomp::Error::ProducerFlowControlException.new(error) when /ProtocolException/ Stomp::Error::ProtocolException.new(error) else Stomp::Error::BrokerException.new(error) end client_thread.raise exception end end
Return any RECEIPT frame received by DISCONNECT.
# File lib/stomp/client.rb, line 247 def disconnect_receipt() @connection.disconnect_receipt end
hbrecv_count returns the current connection's heartbeat receive count.
# File lib/stomp/client.rb, line 320 def hbrecv_count() @connection.hbrecv_count() end
hbrecv_interval returns the connection's heartbeat receive interval.
# File lib/stomp/client.rb, line 310 def hbrecv_interval() @connection.hbrecv_interval() end
hbsend_count returns the current connection's heartbeat send count.
# File lib/stomp/client.rb, line 315 def hbsend_count() @connection.hbsend_count() end
hbsend_interval returns the connection's heartbeat send interval.
# File lib/stomp/client.rb, line 305 def hbsend_interval() @connection.hbsend_interval() end
join the listener thread for this client, generally used to wait for a quit signal.
# File lib/stomp/client.rb, line 129 def join(limit = nil) @listener_thread.join(limit) end
jruby? tests if the connection has detcted a JRuby environment
# File lib/stomp/client.rb, line 262 def jruby?() @connection.jruby end
Stomp 1.1+ NACK.
# File lib/stomp/client.rb, line 206 def nack(message, headers = {}) context = ack_context_for(message, headers) @connection.nack context[:message_id], context[:headers] end
open? tests if this client connection is open.
# File lib/stomp/client.rb, line 252 def open? @connection.open?() end
Poll for asynchronous messages issued by broker. Return nil of no message available, else the message
# File lib/stomp/client.rb, line 326 def poll() @connection.poll() end
protocol returns the current client’s protocol level.
# File lib/stomp/client.rb, line 285 def protocol() @connection.protocol() end
Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).
# File lib/stomp/client.rb, line 234 def publish(destination, message, headers = {}) if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.publish(destination, message, headers) end
running checks if the thread was created and is not dead.
# File lib/stomp/client.rb, line 274 def running() @listener_thread && !!@listener_thread.status end
set_logger identifies a new callback logger.
# File lib/stomp/client.rb, line 279 def set_logger(logger) @logger = logger @connection.set_logger(logger) end
sha1 returns a SHA1 sum of a given string.
# File lib/stomp/client.rb, line 295 def sha1(data) @connection.sha1(data) end
Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => ‘some_transaction_id’ ).
# File lib/stomp/client.rb, line 161 def subscribe(destination, headers = {}) raise "No listener given" unless block_given? # use subscription id to correlate messages to subscription. As described in # the SUBSCRIPTION section of the protocol: http://stomp.github.com/. # If no subscription id is provided, generate one. set_subscription_id_if_missing(destination, headers) if @listeners[headers[:id]] raise "attempting to subscribe to a queue with a previous subscription" end @listeners[headers[:id]] = lambda {|msg| yield msg} @connection.subscribe(destination, headers) end
Unreceive a message, sending it back to its queue or to the DLQ.
# File lib/stomp/client.rb, line 226 def unreceive(message, options = {}) @connection.unreceive(message, options) end
Unsubscribe from a subscription by name.
# File lib/stomp/client.rb, line 175 def unsubscribe(name, headers = {}) set_subscription_id_if_missing(name, headers) @connection.unsubscribe(name, headers) @listeners[headers[:id]] = nil end
Generated with the Darkfish Rdoc Generator 2.