class BigRecord::ConnectionAdapters::HbaseAdapter

Constants

CHARSET

string charset

LOST_CONNECTION_ERROR_MESSAGES
NULL

utility constants

TYPE_BINARY

TYPE_MAP = 0x05; # delegate to YAML TYPE_DATETIME = 0x06; # delegate to YAML

TYPE_BOOLEAN

TYPE_INTEGER = 0x02; # delegate to YAML TYPE_FLOAT = 0x03; # fixed 1 byte

TYPE_NULL

data types

TYPE_STRING

Public Class Methods

new(connection, logger, connection_options, config) click to toggle source

TRUE = “001” FALSE = “000”

Calls superclass method
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 58
def initialize(connection, logger, connection_options, config)
  super(connection, logger)
  @connection_options, @config = connection_options, config

  connect
end

Public Instance Methods

active?() click to toggle source

CONNECTION MANAGEMENT ====================================

# File lib/big_record/connection_adapters/hbase_adapter.rb, line 79
def active?
  @connection.ping
rescue BigRecord::Driver::DriverError
  false
end
add_column_family(table_name, column_name, options = {}) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 251
def add_column_family(table_name, column_name, options = {})
  column = BigRecord::Driver::ColumnDescriptor.new(column_name.to_s, options)

  result = nil
  log "ADD COLUMN TABLE #{table_name} COLUMN #{column_name} (#{options.inspect});" do
    result = @connection.add_column(table_name, column)
  end
  result
end
Also aliased as: add_family
add_family(table_name, column_name, options = {})
Alias for: add_column_family
build_serialized_value(type, value) click to toggle source

Serialize an object in a given type

# File lib/big_record/connection_adapters/hbase_adapter.rb, line 295
def build_serialized_value(type, value)
  type.chr + value
end
configuration() click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 65
def configuration
  @config.clone
end
create_table(table_name, options = {}) { |table_definition| ... } click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 227
def create_table(table_name, options = {})
  table_definition = HbaseAdapterTable.new

  yield table_definition if block_given?

  if options[:force] && table_exists?(table_name)
    drop_table(table_name)
  end

  result = nil
  log "CREATE TABLE #{table_name} (#{table_definition.column_families_list});" do
    result = @connection.create_table(table_name, table_definition.to_adapter_format)
  end
  result
end
delete(table_name, row, timestamp = nil) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 185
def delete(table_name, row, timestamp = nil)
  timestamp ||= Time.now.to_bigrecord_timestamp
  result = nil
  log "DELETE FROM #{table_name} WHERE ROW=#{row};" do
    result = @connection.delete(table_name, row, timestamp)
  end
  result
end
deserialize(str) click to toggle source

Deserialize the given string. This method supports both the pure YAML format and the type header format.

# File lib/big_record/connection_adapters/hbase_adapter.rb, line 301
def deserialize(str)
  return unless str

  #       stay compatible with the old serialization code
  #       YAML documents start with "--- " so if we find that sequence at the beginning we
  #       consider it as a serialized YAML value, else it's the new format with the type header
  if str[0..3] == "--- "
    YAML::load(str) if str
  else
    deserialize_with_header(str)
  end
end
deserialize_with_header(data) click to toggle source

Deserialize the given string assumed to be in the type header format.

# File lib/big_record/connection_adapters/hbase_adapter.rb, line 315
def deserialize_with_header(data)
  return unless data and data.size >= 2

  # the type of the data is encoded in the first byte
  type = data[0];

  case type
  when TYPE_NULL then nil
  when TYPE_STRING then data[1..-1]
  when TYPE_BINARY then data[1..-1]
  else data
  end
end
disconnect!() click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 90
def disconnect!
  @connection.close rescue nil
end
drop_table(table_name) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 243
def drop_table(table_name)
  result = nil
  log "DROP TABLE #{table_name};" do
    result = @connection.drop_table(table_name)
  end
  result
end
get(table_name, row, column, options={}) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 121
def get(table_name, row, column, options={})
  serialized_result = get_raw(table_name, row, column, options)
  result = nil
  if serialized_result.is_a?(Array)
    result = serialized_result.collect{|e| deserialize(e)}
  else
    result = deserialize(serialized_result)
  end
  result
end
get_all_schema_versions() click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 215
def get_all_schema_versions
  sm_table = BigRecord::Migrator.schema_migrations_table_name

  get_consecutive_rows(sm_table, nil, nil, ["attribute:version"]).map{|version| version["attribute:version"]}
end
get_columns(table_name, row, columns, options={}) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 140
def get_columns(table_name, row, columns, options={})
  row_cols = get_columns_raw(table_name, row, columns, options)
  result = {}
  return nil unless row_cols

  row_cols.each do |key, col|
    result[key] =
    if key == 'id'
      col
    else
      deserialize(col)
    end
  end
  result
end
get_columns_raw(table_name, row, columns, options={}) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 132
def get_columns_raw(table_name, row, columns, options={})
  result = {}
  log "SELECT (#{columns.join(", ")}) FROM #{table_name} WHERE ROW=#{row};" do
    result = @connection.get_columns(table_name, row, columns, options)
  end
  result
end
get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 164
def get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)
  rows = get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row)
  result = rows.collect do |row_cols|
    cols = {}
    row_cols.each do |key, col|
      begin
        cols[key] =
        if key == 'id'
          col
        else
          deserialize(col)
        end
      rescue Exception => e
        puts "Could not load column value #{key} for row=#{row_cols['id']}"
      end
    end
    cols
  end
  result
end
get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 156
def get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)
  result = nil
  log "SCAN (#{columns.join(", ")}) FROM #{table_name} WHERE START_ROW=#{start_row} AND STOP_ROW=#{stop_row} LIMIT=#{limit};" do
    result = @connection.get_consecutive_rows(table_name, start_row, limit, columns, stop_row)
  end
  result
end
get_raw(table_name, row, column, options={}) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 113
def get_raw(table_name, row, column, options={})
  result = nil
  log "SELECT (#{column}) FROM #{table_name} WHERE ROW=#{row};" do
    result = @connection.get(table_name, row, column, options)
  end
  result
end
initialize_schema_migrations_table() click to toggle source

SCHEMA STATEMENTS ========================================

# File lib/big_record/connection_adapters/hbase_adapter.rb, line 205
def initialize_schema_migrations_table
  sm_table = BigRecord::Migrator.schema_migrations_table_name

  unless table_exists?(sm_table)
    create_table(sm_table) do |t|
      t.family :attribute, :versions => 1
    end
  end
end
modify_column_family(table_name, column_name, options = {}) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 273
def modify_column_family(table_name, column_name, options = {})
  column = BigRecord::Driver::ColumnDescriptor.new(column_name.to_s, options)

  result = nil
  log "MODIFY COLUMN TABLE #{table_name} COLUMN #{column_name} (#{options.inspect});" do
    result = @connection.modify_column(table_name, column)
  end
  result
end
Also aliased as: modify_family
modify_family(table_name, column_name, options = {})
reconnect!() click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 85
def reconnect!
  disconnect!
  connect
end
remove_column_family(table_name, column_name) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 263
def remove_column_family(table_name, column_name)
  result = nil
  log "REMOVE COLUMN TABLE #{table_name} COLUMN #{column_name};" do
    result = @connection.remove_column(table_name, column_name)
  end
  result
end
Also aliased as: remove_family
remove_family(table_name, column_name)
serialize(value) click to toggle source

Serialize the given value

# File lib/big_record/connection_adapters/hbase_adapter.rb, line 286
def serialize(value)
  case value
  when NilClass then NULL
  when String then build_serialized_value(TYPE_STRING, value)
  else value.to_yaml
  end
end
table_exists?(table_name) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 221
def table_exists?(table_name)
  log "TABLE EXISTS? #{table_name};" do
    @connection.table_exists?(table_name)
  end
end
truncate_table(table_name) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 194
def truncate_table(table_name)
  result = nil
  log "TRUNCATE TABLE #{table_name}" do
    result = @connection.truncate_table(table_name)
  end
  result
end
update(table_name, row, values, timestamp) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 105
def update(table_name, row, values, timestamp)
  serialized_collection = {}
  values.each do |column, value|
    serialized_collection[column] = serialize(value)
  end
  update_raw(table_name, row, serialized_collection, timestamp)
end
update_raw(table_name, row, values, timestamp) click to toggle source

DATABASE STATEMENTS ======================================

# File lib/big_record/connection_adapters/hbase_adapter.rb, line 97
def update_raw(table_name, row, values, timestamp)
  result = nil
  log "UPDATE #{table_name} SET #{values.inspect if values} WHERE ROW=#{row};" do
    result = @connection.update(table_name, row, values, timestamp)
  end
  result
end

Protected Instance Methods

format_log_entry(message, dump = nil) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 374
def format_log_entry(message, dump = nil)
  if BigRecord::Base.colorize_logging
    if @@row_even
      @@row_even = false
      message_color, dump_color = "4;36;1", "0;1"
    else
      @@row_even = true
      message_color, dump_color = "4;35;1", "0"
    end

    log_entry = "  \e[#{message_color}m#{message}\e[0m   "
    log_entry << "\e[#{dump_color}m%#{String === dump ? 's' : 'p'}\e[0m" % dump if dump
    log_entry
  else
    "%s  %s" % [message, dump]
  end
end
log(str, name = nil) { || ... } click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 338
def log(str, name = nil)
  if block_given?
    if @logger and @logger.level <= Logger::INFO
      result = nil
      seconds = Benchmark.realtime { result = yield }
      @runtime += seconds
      log_info(str, name, seconds)
      result
    else
      yield
    end
  else
    log_info(str, name, 0)
    nil
  end
rescue Exception => e
  # Log message and raise exception.
  # Set last_verfication to 0, so that connection gets verified
  # upon reentering the request loop
  @last_verification = 0
  message = "#{e.class.name}: #{e.message}: #{str}"
  log_info(message, name, 0)
  raise e
end
log_info(str, name, runtime) click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 363
def log_info(str, name, runtime)
  return unless @logger

  @logger.debug(
    format_log_entry(
      "#{name.nil? ? "HBASE" : name} (#{sprintf("%f", runtime)})",
      str.gsub(/ +/, " ")
    )
  )
end

Private Instance Methods

connect() click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 330
def connect
  @connection.configure(@config)
rescue DRb::DRbConnError
  raise BigRecord::ConnectionFailed, "Failed to connect to the DRb server (jruby) " +
                                        "at #{@config[:drb_host]}:#{@config[:drb_port]}."
end