Parent

Methods

Qrack::Subscription

Subscription ancestor class @deprecated

Attributes

ack[RW]
client[R]
consumer_tag[RW]
delivery_tag[RW]
exclusive[RW]
message_count[R]
message_max[RW]
queue[R]
timeout[RW]

Public Class Methods

new(client, queue, opts = {}) click to toggle source
# File lib/qrack/subscription.rb, line 15
def initialize(client, queue, opts = {})
  @client = client
  @queue = queue

  # Get timeout value
  @timeout = opts[:timeout] || nil

  # Get maximum amount of messages to process
  @message_max = opts[:message_max] || nil

  # If a consumer tag is not passed in the server will generate one
  @consumer_tag = opts[:consumer_tag] || nil

  # Ignore the :nowait option if passed, otherwise program will hang waiting for a
  # response from the server causing an error.
  opts.delete(:nowait)

  # Do we want to have to provide an acknowledgement?
  @ack = opts[:ack] || nil

  # Does this consumer want exclusive use of the queue?
  @exclusive = opts[:exclusive] || false

  # Initialize message counter
  @message_count = 0

  # Store cancellator
  @cancellator = opts[:cancellator]

  # Store options
  @opts = opts
end

Public Instance Methods

start(&blk) click to toggle source
# File lib/qrack/subscription.rb, line 48
def start(&blk)
  # Do not process any messages if zero message_max
  if message_max == 0
    return
  end

  # Notify server about new consumer
  setup_consumer

  # We need to keep track of three possible subscription states
  # :subscribed, :pending, and :unsubscribed
  # 'pending' occurs because of network latency, where we tried to unsubscribe but were already given a message
  subscribe_state = :subscribed

  # Start subscription loop
  loop do

    begin
      method = client.next_method(:timeout => timeout, :cancellator => @cancellator)
    rescue Qrack::FrameTimeout
      begin
        queue.unsubscribe
        subscribe_state = :unsubscribed

        break
      rescue Bunny::ProtocolError
        # Unsubscribe failed because we actually got a message, so we're in a weird state.
        # We have to keep processing the message or else it may be lost...
        # ...and there is also a CancelOk method floating around that we need to consume from the socket

        method = client.last_method
        subscribe_state = :pending
      end
    end

    # Increment message counter
    @message_count += 1

    # get delivery tag to use for acknowledge
    queue.delivery_tag = method.delivery_tag if @ack
    header = client.next_payload

    # The unsubscribe ok may be sprinked into the payload
    if subscribe_state == :pending and header.is_a?(Qrack::Protocol::Basic::CancelOk)
      # We popped off the CancelOk, so we don't have to keep looking for it
      subscribe_state = :unsubscribed

      # Get the actual header now
      header = client.next_payload
    end

    # If maximum frame size is smaller than message payload body then message
    # will have a message header and several message bodies
    msg = ''
    while msg.length < header.size
      message = client.next_payload

      # The unsubscribe ok may be sprinked into the payload
      if subscribe_state == :pending and message.is_a?(Qrack::Protocol::Basic::CancelOk)
        # We popped off the CancelOk, so we don't have to keep looking for it
        subscribe_state = :unsubscribed
        next
      end

      msg << message
    end

    # If block present, pass the message info to the block for processing
    blk.call({:header => header, :payload => msg, :delivery_details => method.arguments}) if !blk.nil?

    # Unsubscribe if we've encountered the maximum number of messages
    if subscribe_state == :subscribed and !message_max.nil? and message_count == message_max
      queue.unsubscribe
      subscribe_state = :unsubscribed
    end

    # Exit the loop if we've unsubscribed
    if subscribe_state != :subscribed
      # We still haven't found the CancelOk, so it's the next method
      if subscribe_state == :pending
        method = client.next_method
        client.check_response(method, Qrack::Protocol::Basic::CancelOk, "Error unsubscribing from queue #{queue.name}, got #{method.class}")

        subscribe_state = :unsubscribed
      end

      # Acknowledge receipt of the final message
      queue.ack() if @ack

      # Quit the loop
      break
    end

    # Have to do the ack here because the ack triggers the release of messages from the server
    # if you are using Client#qos prefetch and you will get extra messages sent through before
    # the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is
    # deferred.
    queue.ack() if @ack
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.