class BigRecord::ConnectionAdapters::HbaseAdapter
Constants
Public Class Methods
new(connection, logger, connection_options, config)
click to toggle source
TRUE = “001” FALSE = “000”
Calls superclass method
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 58 def initialize(connection, logger, connection_options, config) super(connection, logger) @connection_options, @config = connection_options, config connect end
Public Instance Methods
active?()
click to toggle source
CONNECTION MANAGEMENT ====================================
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 79 def active? @connection.ping rescue BigRecord::Driver::DriverError false end
add_column_family(table_name, column_name, options = {})
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 251 def add_column_family(table_name, column_name, options = {}) column = BigRecord::Driver::ColumnDescriptor.new(column_name.to_s, options) result = nil log "ADD COLUMN TABLE #{table_name} COLUMN #{column_name} (#{options.inspect});" do result = @connection.add_column(table_name, column) end result end
Also aliased as: add_family
build_serialized_value(type, value)
click to toggle source
Serialize an object in a given type
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 295 def build_serialized_value(type, value) type.chr + value end
configuration()
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 65 def configuration @config.clone end
create_table(table_name, options = {}) { |table_definition| ... }
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 227 def create_table(table_name, options = {}) table_definition = HbaseAdapterTable.new yield table_definition if block_given? if options[:force] && table_exists?(table_name) drop_table(table_name) end result = nil log "CREATE TABLE #{table_name} (#{table_definition.column_families_list});" do result = @connection.create_table(table_name, table_definition.to_adapter_format) end result end
delete(table_name, row, timestamp = nil)
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 185 def delete(table_name, row, timestamp = nil) timestamp ||= Time.now.to_bigrecord_timestamp result = nil log "DELETE FROM #{table_name} WHERE ROW=#{row};" do result = @connection.delete(table_name, row, timestamp) end result end
deserialize(str)
click to toggle source
Deserialize the given string. This method supports both the pure YAML format and the type header format.
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 301 def deserialize(str) return unless str # stay compatible with the old serialization code # YAML documents start with "--- " so if we find that sequence at the beginning we # consider it as a serialized YAML value, else it's the new format with the type header if str[0..3] == "--- " YAML::load(str) if str else deserialize_with_header(str) end end
deserialize_with_header(data)
click to toggle source
Deserialize the given string assumed to be in the type header format.
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 315 def deserialize_with_header(data) return unless data and data.size >= 2 # the type of the data is encoded in the first byte type = data[0]; case type when TYPE_NULL then nil when TYPE_STRING then data[1..-1] when TYPE_BINARY then data[1..-1] else data end end
disconnect!()
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 90 def disconnect! @connection.close rescue nil end
drop_table(table_name)
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 243 def drop_table(table_name) result = nil log "DROP TABLE #{table_name};" do result = @connection.drop_table(table_name) end result end
get(table_name, row, column, options={})
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 121 def get(table_name, row, column, options={}) serialized_result = get_raw(table_name, row, column, options) result = nil if serialized_result.is_a?(Array) result = serialized_result.collect{|e| deserialize(e)} else result = deserialize(serialized_result) end result end
get_all_schema_versions()
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 215 def get_all_schema_versions sm_table = BigRecord::Migrator.schema_migrations_table_name get_consecutive_rows(sm_table, nil, nil, ["attribute:version"]).map{|version| version["attribute:version"]} end
get_columns(table_name, row, columns, options={})
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 140 def get_columns(table_name, row, columns, options={}) row_cols = get_columns_raw(table_name, row, columns, options) result = {} return nil unless row_cols row_cols.each do |key, col| result[key] = if key == 'id' col else deserialize(col) end end result end
get_columns_raw(table_name, row, columns, options={})
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 132 def get_columns_raw(table_name, row, columns, options={}) result = {} log "SELECT (#{columns.join(", ")}) FROM #{table_name} WHERE ROW=#{row};" do result = @connection.get_columns(table_name, row, columns, options) end result end
get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 164 def get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil) rows = get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row) result = rows.collect do |row_cols| cols = {} row_cols.each do |key, col| begin cols[key] = if key == 'id' col else deserialize(col) end rescue Exception => e puts "Could not load column value #{key} for row=#{row_cols['id']}" end end cols end result end
get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 156 def get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil) result = nil log "SCAN (#{columns.join(", ")}) FROM #{table_name} WHERE START_ROW=#{start_row} AND STOP_ROW=#{stop_row} LIMIT=#{limit};" do result = @connection.get_consecutive_rows(table_name, start_row, limit, columns, stop_row) end result end
get_raw(table_name, row, column, options={})
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 113 def get_raw(table_name, row, column, options={}) result = nil log "SELECT (#{column}) FROM #{table_name} WHERE ROW=#{row};" do result = @connection.get(table_name, row, column, options) end result end
initialize_schema_migrations_table()
click to toggle source
SCHEMA STATEMENTS ========================================
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 205 def initialize_schema_migrations_table sm_table = BigRecord::Migrator.schema_migrations_table_name unless table_exists?(sm_table) create_table(sm_table) do |t| t.family :attribute, :versions => 1 end end end
modify_column_family(table_name, column_name, options = {})
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 273 def modify_column_family(table_name, column_name, options = {}) column = BigRecord::Driver::ColumnDescriptor.new(column_name.to_s, options) result = nil log "MODIFY COLUMN TABLE #{table_name} COLUMN #{column_name} (#{options.inspect});" do result = @connection.modify_column(table_name, column) end result end
Also aliased as: modify_family
reconnect!()
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 85 def reconnect! disconnect! connect end
remove_column_family(table_name, column_name)
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 263 def remove_column_family(table_name, column_name) result = nil log "REMOVE COLUMN TABLE #{table_name} COLUMN #{column_name};" do result = @connection.remove_column(table_name, column_name) end result end
Also aliased as: remove_family
serialize(value)
click to toggle source
Serialize the given value
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 286 def serialize(value) case value when NilClass then NULL when String then build_serialized_value(TYPE_STRING, value) else value.to_yaml end end
table_exists?(table_name)
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 221 def table_exists?(table_name) log "TABLE EXISTS? #{table_name};" do @connection.table_exists?(table_name) end end
truncate_table(table_name)
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 194 def truncate_table(table_name) result = nil log "TRUNCATE TABLE #{table_name}" do result = @connection.truncate_table(table_name) end result end
update(table_name, row, values, timestamp)
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 105 def update(table_name, row, values, timestamp) serialized_collection = {} values.each do |column, value| serialized_collection[column] = serialize(value) end update_raw(table_name, row, serialized_collection, timestamp) end
update_raw(table_name, row, values, timestamp)
click to toggle source
DATABASE STATEMENTS ======================================
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 97 def update_raw(table_name, row, values, timestamp) result = nil log "UPDATE #{table_name} SET #{values.inspect if values} WHERE ROW=#{row};" do result = @connection.update(table_name, row, values, timestamp) end result end
Protected Instance Methods
format_log_entry(message, dump = nil)
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 374 def format_log_entry(message, dump = nil) if BigRecord::Base.colorize_logging if @@row_even @@row_even = false message_color, dump_color = "4;36;1", "0;1" else @@row_even = true message_color, dump_color = "4;35;1", "0" end log_entry = " \e[#{message_color}m#{message}\e[0m " log_entry << "\e[#{dump_color}m%#{String === dump ? 's' : 'p'}\e[0m" % dump if dump log_entry else "%s %s" % [message, dump] end end
log(str, name = nil) { || ... }
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 338 def log(str, name = nil) if block_given? if @logger and @logger.level <= Logger::INFO result = nil seconds = Benchmark.realtime { result = yield } @runtime += seconds log_info(str, name, seconds) result else yield end else log_info(str, name, 0) nil end rescue Exception => e # Log message and raise exception. # Set last_verfication to 0, so that connection gets verified # upon reentering the request loop @last_verification = 0 message = "#{e.class.name}: #{e.message}: #{str}" log_info(message, name, 0) raise e end
log_info(str, name, runtime)
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 363 def log_info(str, name, runtime) return unless @logger @logger.debug( format_log_entry( "#{name.nil? ? "HBASE" : name} (#{sprintf("%f", runtime)})", str.gsub(/ +/, " ") ) ) end
Private Instance Methods
connect()
click to toggle source
# File lib/big_record/connection_adapters/hbase_adapter.rb, line 330 def connect @connection.configure(@config) rescue DRb::DRbConnError raise BigRecord::ConnectionFailed, "Failed to connect to the DRb server (jruby) " + "at #{@config[:drb_host]}:#{@config[:drb_port]}." end