class BigRecord::ConnectionAdapters::HbaseRestAdapter
Constants
- CHARSET
string charset
- LOST_CONNECTION_ERROR_MESSAGES
- NULL
utility constants
- TYPE_BINARY
- TYPE_BOOLEAN
- TYPE_NULL
data types
- TYPE_STRING
Public Class Methods
new(connection, logger, connection_options, config)
click to toggle source
Calls superclass method
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 47 def initialize(connection, logger, connection_options, config) super(connection, logger) @connection_options, @config = connection_options, config connect end
Public Instance Methods
active?()
click to toggle source
CONNECTION MANAGEMENT ====================================
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 68 def active? true end
add_column_family(table_name, column_name, options = {})
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 255 def add_column_family(table_name, column_name, options = {}) end
Also aliased as: add_family
build_serialized_value(type, value)
click to toggle source
Serialize an object in a given type
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 280 def build_serialized_value(type, value) type.chr + value end
columns_to_hbase_format(data = {})
click to toggle source
DATABASE STATEMENTS ======================================
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 81 def columns_to_hbase_format(data = {}) # Convert it to the hbase-ruby format # TODO: Add this function to hbase-ruby instead. data.map{|col, content| {:name => col.to_s, :value => content}}.compact end
configuration()
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 54 def configuration @config.clone end
create_table(table_name, options = {}) { |table_definition| ... }
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 231 def create_table(table_name, options = {}) table_definition = HbaseRestAdapterTable.new yield table_definition if block_given? if options[:force] && table_exists?(table_name) drop_table(table_name) end result = nil log "CREATE TABLE #{table_name} (#{table_definition.column_families_list});" do result = @connection.create_table(table_name, *table_definition.to_adapter_format) end result end
delete(table_name, row, timestamp = nil)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 194 def delete(table_name, row, timestamp = nil) timestamp ||= Time.now.to_bigrecord_timestamp result = nil log "DELETE FROM #{table_name} WHERE ROW=#{row};" do result = @connection.delete_row(table_name, row, timestamp) end result end
deserialize(str)
click to toggle source
Deserialize the given string. This method supports both the pure YAML format and the type header format.
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 286 def deserialize(str) return unless str # stay compatible with the old serialization code # YAML documents start with "--- " so if we find that sequence at the beginning we # consider it as a serialized YAML value, else it's the new format with the type header if str[0..3] == "--- " YAML::load(str) if str else deserialize_with_header(str) end end
deserialize_with_header(data)
click to toggle source
Deserialize the given string assumed to be in the type header format.
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 300 def deserialize_with_header(data) return unless data and data.size >= 2 # the type of the data is encoded in the first byte type = data[0]; case type when TYPE_NULL then nil when TYPE_STRING then data[1..-1] when TYPE_BINARY then data[1..-1] else nil end end
disconnect!()
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 75 def disconnect! end
drop_table(table_name)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 247 def drop_table(table_name) result = nil log "DROP TABLE #{table_name};" do result = @connection.destroy_table(table_name) end result end
get(table_name, row, column, options={})
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 118 def get(table_name, row, column, options={}) serialized_result = get_raw(table_name, row, column, options) result = nil if serialized_result.is_a?(Array) result = serialized_result.collect{|e| deserialize(e)} else result = deserialize(serialized_result) end result end
get_all_schema_versions()
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 219 def get_all_schema_versions sm_table = BigRecord::Migrator.schema_migrations_table_name get_consecutive_rows(sm_table, nil, nil, ["attribute:version"]).map{|version| version["attribute:version"]} end
get_columns(table_name, row, columns, options={})
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 143 def get_columns(table_name, row, columns, options={}) row_cols = get_columns_raw(table_name, row, columns, options) return nil unless row_cols result = {} row_cols.each do |key,value| begin result[key] = if key == 'id' value else deserialize(value) end rescue Exception => e puts "Could not load column value #{key} for row=#{row.name}" end end result end
get_columns_raw(table_name, row, columns = nil, options={})
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 129 def get_columns_raw(table_name, row, columns = nil, options={}) result = {} timestamp = options[:timestamp] || nil log "SELECT (#{columns.join(", ")}) FROM #{table_name} WHERE ROW=#{row};" do row = @connection.show_row(table_name, row, timestamp, columns, options) result.merge!({'id' => row.name}) columns = row.columns columns.each{ |col| result.merge!({col.name => col.value}) } end result end
get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 174 def get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil) rows = get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row) result = rows.collect do |row| cols = {} cols['id'] = row.name row.columns.each do |col| key = col.name value = col.value begin cols[key] = deserialize(value) rescue Exception => e puts "Could not load column value #{key} for row=#{row.name}" end end cols end result end
get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 163 def get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil) result = nil log "SCAN (#{columns.join(", ")}) FROM #{table_name} WHERE START_ROW=#{start_row} AND STOP_ROW=#{stop_row} LIMIT=#{limit};" do options = {:start_row => start_row, :end_row => stop_row, :columns => columns, :batch => 200} scanner = @connection.open_scanner(table_name, options) result = @connection.get_rows(scanner, limit) @connection.close_scanner(scanner) end result end
get_raw(table_name, row, column, options={})
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 107 def get_raw(table_name, row, column, options={}) result = nil timestamp = options[:timestamp] || nil log "SELECT (#{column}) FROM #{table_name} WHERE ROW=#{row};" do columns = @connection.show_row(table_name, row, timestamp, column, options).columns result = (columns.size == 1) ? columns.first.value : columns.map(&:value) end result end
initialize_schema_migrations_table()
click to toggle source
SCHEMA STATEMENTS ========================================
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 209 def initialize_schema_migrations_table sm_table = BigRecord::Migrator.schema_migrations_table_name unless table_exists?(sm_table) create_table(sm_table) do |t| t.family :attribute, :versions => 1 end end end
modify_column_family(table_name, column_name, options = {})
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 265 def modify_column_family(table_name, column_name, options = {}) end
Also aliased as: modify_family
reconnect!()
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 72 def reconnect! end
remove_column_family(table_name, column_name)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 260 def remove_column_family(table_name, column_name) end
Also aliased as: remove_family
serialize(value)
click to toggle source
Serialize the given value
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 271 def serialize(value) case value when NilClass then NULL when String then build_serialized_value(TYPE_STRING, value) else value.to_yaml end end
table_exists?(table_name)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 225 def table_exists?(table_name) log "TABLE EXISTS? #{table_name};" do @connection.list_tables.map(&:name).include?(table_name) end end
truncate_table(table_name)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 203 def truncate_table(table_name) end
update(table_name, row, values, timestamp)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 99 def update(table_name, row, values, timestamp) serialized_collection = {} values.each do |column, value| serialized_collection[column] = serialize(value) end update_raw(table_name, row, serialized_collection, timestamp) end
update_raw(table_name, row, values, timestamp)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 87 def update_raw(table_name, row, values, timestamp) result = nil columns = columns_to_hbase_format(values) timestamp ||= Time.now.to_bigrecord_timestamp log "UPDATE #{table_name} SET #{values.inspect if values} WHERE ROW=#{row};" do @connection.create_row(table_name, row, timestamp, columns) end result end
Protected Instance Methods
format_log_entry(message, dump = nil)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 357 def format_log_entry(message, dump = nil) if BigRecord::Base.colorize_logging if @@row_even @@row_even = false message_color, dump_color = "4;36;1", "0;1" else @@row_even = true message_color, dump_color = "4;35;1", "0" end log_entry = " \e[#{message_color}m#{message}\e[0m " log_entry << "\e[#{dump_color}m%#{String === dump ? 's' : 'p'}\e[0m" % dump if dump log_entry else "%s %s" % [message, dump] end end
log(str, name = nil) { || ... }
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 321 def log(str, name = nil) if block_given? if @logger and @logger.level <= Logger::INFO result = nil seconds = Benchmark.realtime { result = yield } @runtime += seconds log_info(str, name, seconds) result else yield end else log_info(str, name, 0) nil end rescue Exception => e # Log message and raise exception. # Set last_verfication to 0, so that connection gets verified # upon reentering the request loop @last_verification = 0 message = "#{e.class.name}: #{e.message}: #{str}" log_info(message, name, 0) raise e end
log_info(str, name, runtime)
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 346 def log_info(str, name, runtime) return unless @logger @logger.debug( format_log_entry( "#{name.nil? ? "HBASE" : name} (#{sprintf("%f", runtime)})", str.gsub(/ +/, " ") ) ) end
Private Instance Methods
connect()
click to toggle source
# File lib/big_record/connection_adapters/hbase_rest_adapter.rb, line 316 def connect end