Parent

Class/Module Index [+]

Quicksearch

Bunny::Transport

@private

Constants

DEFAULT_CONNECTION_TIMEOUT

API

DEFAULT_TLS_PROTOCOL

same as in RabbitMQ Java client

Attributes

connect_timeout[R]
disconnect_timeout[R]
host[R]
port[R]
read_write_timeout[R]
session[R]
socket[R]
tls_context[R]

Public Class Methods

new(session, host, port, opts) click to toggle source
# File lib/bunny/transport.rb, line 30
def initialize(session, host, port, opts)
  @session        = session
  @session_thread = opts[:session_thread]
  @host    = host
  @port    = port
  @opts    = opts

  @logger                = session.logger
  @tls_enabled           = tls_enabled?(opts)
  @tls_certificate_path  = tls_certificate_path_from(opts)
  @tls_key_path          = tls_key_path_from(opts)
  @tls_certificate       = opts[:tls_certificate] || opts[:ssl_cert_string]
  @tls_key               = opts[:tls_key]         || opts[:ssl_key_string]
  @tls_certificate_store = opts[:tls_certificate_store]
  @tls_ca_certificates   = opts.fetch(:tls_ca_certificates, [])
  @verify_peer           = opts[:verify_ssl] || opts[:verify_peer]

  @read_write_timeout = opts[:socket_timeout] || 3
  @read_write_timeout = nil if @read_write_timeout == 0
  @connect_timeout    = self.timeout_from(opts)
  @connect_timeout    = nil if @connect_timeout == 0
  @disconnect_timeout = @read_write_timeout || @connect_timeout

  @writes_mutex       = @session.mutex_impl.new

  maybe_initialize_socket
  prepare_tls_context if @tls_enabled
end
ping!(host, port, timeout) click to toggle source
# File lib/bunny/transport.rb, line 239
def self.ping!(host, port, timeout)
  raise ConnectionTimeout.new("#{host}:#{port} is unreachable") if !reacheable?(host, port, timeout)
end
reacheable?(host, port, timeout) click to toggle source
# File lib/bunny/transport.rb, line 226
def self.reacheable?(host, port, timeout)
  begin
    s = Bunny::Socket.open(host, port,
                           :socket_timeout => timeout)

    true
  rescue SocketError, Timeout::Error => e
    false
  ensure
    s.close if s
  end
end

Public Instance Methods

close(reason = nil) click to toggle source
# File lib/bunny/transport.rb, line 173
def close(reason = nil)
  @socket.close if open?
end
closed?() click to toggle source
# File lib/bunny/transport.rb, line 181
def closed?
  !open?
end
configure_socket(&block) click to toggle source
# File lib/bunny/transport.rb, line 92
def configure_socket(&block)
  block.call(@socket) if @socket
end
configure_tls_context(&block) click to toggle source
# File lib/bunny/transport.rb, line 96
def configure_tls_context(&block)
  block.call(@tls_context) if @tls_context
end
connect() click to toggle source
# File lib/bunny/transport.rb, line 75
def connect
  if uses_ssl?
    @socket.connect
    @socket.post_connection_check(host) if uses_tls? && @verify_peer

    @status = :connected

    @socket
  else
    # no-op
  end
end
connected?() click to toggle source
# File lib/bunny/transport.rb, line 88
def connected?
  :not_connected == @status && open?
end
flush() click to toggle source
# File lib/bunny/transport.rb, line 185
def flush
  @socket.flush if @socket
end
hostname() click to toggle source
# File lib/bunny/transport.rb, line 60
def hostname
  @host
end
initialize_socket() click to toggle source
# File lib/bunny/transport.rb, line 243
def initialize_socket
  begin
    @socket = Bunny::Timer.timeout(@connect_timeout, ConnectionTimeout) do
      Bunny::Socket.open(@host, @port,
                         :keepalive      => @opts[:keepalive],
                         :socket_timeout => @connect_timeout)
    end
  rescue StandardError, ConnectionTimeout => e
    @status = :not_connected
    raise Bunny::TCPConnectionFailed.new(e, self.hostname, self.port)
  end

  @socket
end
maybe_initialize_socket() click to toggle source
# File lib/bunny/transport.rb, line 258
def maybe_initialize_socket
  initialize_socket if !@socket || closed?
end
open?() click to toggle source
# File lib/bunny/transport.rb, line 177
def open?
  @socket && !@socket.closed?
end
post_initialize_socket() click to toggle source
# File lib/bunny/transport.rb, line 262
def post_initialize_socket
  @socket = if uses_tls?
              wrap_in_tls_socket(@socket)
            else
              @socket
            end
end
read_fully(*args) click to toggle source
# File lib/bunny/transport.rb, line 189
def read_fully(*args)
  @socket.read_fully(*args)
end
read_next_frame(opts = {}) click to toggle source

Exposed primarily for Bunny::Channel @private

# File lib/bunny/transport.rb, line 201
def read_next_frame(opts = {})
  header    = @socket.read_fully(7)
  # TODO: network issues here will sometimes cause
  #       the socket method return an empty string. We need to log
  #       and handle this better.
  # type, channel, size = begin
  #                         AMQ::Protocol::Frame.decode_header(header)
  #                       rescue AMQ::Protocol::EmptyResponseError => e
  #                         puts "Got AMQ::Protocol::EmptyResponseError, header is #{header.inspect}"
  #                       end
  type, channel, size = AMQ::Protocol::Frame.decode_header(header)
  payload   = @socket.read_fully(size)
  frame_end = @socket.read_fully(1)

  # 1) the size is miscalculated
  if payload.bytesize != size
    raise BadLengthError.new(size, payload.bytesize)
  end

  # 2) the size is OK, but the string doesn't end with FINAL_OCTET
  raise NoFinalOctetError.new if frame_end != AMQ::Protocol::Frame::FINAL_OCTET
  AMQ::Protocol::Frame.new(type, payload, channel)
end
read_ready?(timeout = nil) click to toggle source
# File lib/bunny/transport.rb, line 193
def read_ready?(timeout = nil)
  io = IO.select([@socket].compact, nil, nil, timeout)
  io && io[0].include?(@socket)
end
send_frame(frame) click to toggle source

Sends frame to the peer.

@raise [ConnectionClosedError] @private

# File lib/bunny/transport.rb, line 152
def send_frame(frame)
  if closed?
    @session.handle_network_failure(ConnectionClosedError.new(frame))
  else
    write(frame.encode)
  end
end
send_frame_without_timeout(frame) click to toggle source

Sends frame to the peer without timeout control.

@raise [ConnectionClosedError] @private

# File lib/bunny/transport.rb, line 164
def send_frame_without_timeout(frame)
  if closed?
    @session.handle_network_failure(ConnectionClosedError.new(frame))
  else
    write_without_timeout(frame.encode)
  end
end
ssl?() click to toggle source
Alias for: uses_ssl?
tls?() click to toggle source
Alias for: uses_tls?
uses_ssl?() click to toggle source
# File lib/bunny/transport.rb, line 69
def uses_ssl?
  @tls_enabled
end
Also aliased as: ssl?
uses_tls?() click to toggle source
# File lib/bunny/transport.rb, line 64
def uses_tls?
  @tls_enabled
end
Also aliased as: tls?
write(data) click to toggle source

Writes data to the socket. If read/write timeout was specified, Bunny::ClientTimeout will be raised if the operation times out.

@raise [ClientTimeout]

# File lib/bunny/transport.rb, line 104
def write(data)
  begin
    if @read_write_timeout
      Bunny::Timer.timeout(@read_write_timeout, Bunny::ClientTimeout) do
        if open?
          @writes_mutex.synchronize { @socket.write(data) }
          @socket.flush
        end
      end
    else
      if open?
        @writes_mutex.synchronize { @socket.write(data) }
        @socket.flush
      end
    end
  rescue SystemCallError, Bunny::ClientTimeout, Bunny::ConnectionError, IOError => e
    @logger.error "Got an exception when sending data: #{e.message} (#{e.class.name})"
    close
    @status = :not_connected

    if @session.automatically_recover?
      @session.handle_network_failure(e)
    else
      @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
    end
  end
end
write_without_timeout(data) click to toggle source

Writes data to the socket without timeout checks

# File lib/bunny/transport.rb, line 133
def write_without_timeout(data)
  begin
    @writes_mutex.synchronize { @socket.write(data) }
    @socket.flush
  rescue SystemCallError, Bunny::ConnectionError, IOError => e
    close

    if @session.automatically_recover?
      @session.handle_network_failure(e)
    else
      @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
    end
  end
end

Protected Instance Methods

check_local_path!(s) click to toggle source
# File lib/bunny/transport.rb, line 299
def check_local_path!(s)
  raise ArgumentError, "cannot read TLS certificate or key from #{s}" unless File.file?(s) && File.readable?(s)
end
initialize_tls_certificate_store(certs) click to toggle source
# File lib/bunny/transport.rb, line 341
def initialize_tls_certificate_store(certs)
  OpenSSL::X509::Store.new.tap do |store|
    certs.each { |path| store.add_file(path) }
  end
end
initialize_tls_context(ctx) click to toggle source
# File lib/bunny/transport.rb, line 314
def initialize_tls_context(ctx)
  ctx.cert       = OpenSSL::X509::Certificate.new(@tls_certificate) if @tls_certificate
  ctx.key        = OpenSSL::PKey::RSA.new(@tls_key) if @tls_key
  ctx.cert_store = if @tls_certificate_store
                     @tls_certificate_store
                   else
                     initialize_tls_certificate_store(@tls_ca_certificates)
                   end

  if !@tls_certificate
    @logger.warn         Using TLS but no client certificate is provided! If RabbitMQ is configured to verify peer        certificate, connection upgrade will fail!
  end
  if @tls_certificate && !@tls_key
    @logger.warn "Using TLS but no client private key is provided!"
  end

  # setting TLS/SSL version only works correctly when done
  # vis set_params. MK.
  ctx.set_params(:ssl_version => @opts.fetch(:tls_protocol, DEFAULT_TLS_PROTOCOL))
  ctx.set_params(:verify_mode => OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT) if @verify_peer

  ctx
end
prepare_tls_context() click to toggle source
# File lib/bunny/transport.rb, line 284
def prepare_tls_context
  read_tls_keys!

  @tls_context = initialize_tls_context(OpenSSL::SSL::SSLContext.new)
end
read_tls_keys!() click to toggle source
# File lib/bunny/transport.rb, line 303
def read_tls_keys!
  if @tls_certificate_path
    check_local_path!(@tls_certificate_path)
    @tls_certificate = File.read(@tls_certificate_path)
  end
  if @tls_key_path
    check_local_path!(@tls_key_path)
    @tls_key         = File.read(@tls_key_path)
  end
end
timeout_from(options) click to toggle source
# File lib/bunny/transport.rb, line 347
def timeout_from(options)
  options[:connect_timeout] || options[:connection_timeout] || options[:timeout] || DEFAULT_CONNECTION_TIMEOUT
end
tls_certificate_path_from(opts) click to toggle source
# File lib/bunny/transport.rb, line 276
def tls_certificate_path_from(opts)
  opts[:tls_cert] || opts[:ssl_cert] || opts[:tls_cert_path] || opts[:ssl_cert_path] || opts[:tls_certificate_path] || opts[:ssl_certificate_path]
end
tls_enabled?(opts) click to toggle source
# File lib/bunny/transport.rb, line 272
def tls_enabled?(opts)
  opts[:tls] || opts[:ssl] || (opts[:port] == AMQ::Protocol::TLS_PORT) || false
end
tls_key_path_from(opts) click to toggle source
# File lib/bunny/transport.rb, line 280
def tls_key_path_from(opts)
  opts[:tls_key] || opts[:ssl_key] || opts[:tls_key_path] || opts[:ssl_key_path]
end
wrap_in_tls_socket(socket) click to toggle source
# File lib/bunny/transport.rb, line 290
def wrap_in_tls_socket(socket)
  raise ArgumentError, "cannot wrap nil into TLS socket, @tls_context is nil. This is a Bunny bug." unless socket
  raise "cannot wrap a socket into TLS socket, @tls_context is nil. This is a Bunny bug." unless @tls_context

  s = Bunny::SSLSocket.new(socket, @tls_context)
  s.sync_close = true
  s
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.