Posts tagged with: hbase

Spark on HBase with Spark shell

Some minor adjustements are needed for attacking HBase tables from a Spark context.

Let’s first quick create a “t1″ HBase sample table with 40 lines


[root@sandbox ~]# cat hbase_load.txt
create 't1', 'f1'
for i in '1'..'10' do \
for j in '1'..'2' do \
for k in '1'..'2' do \
rnd=(0...64).map { (65 + rand(26)).chr }.join
put 't1', "#{i}-#{j}-#{k}", "f1:#{j}#{k}", "#{rnd}"
end \
end \
[root@sandbox ~]# cat hbase_load.txt |hbase shell

You need to adjust your Spark classpath (guava 14 needed so included the first I’d found):

[root@sandbox ~]# export SPARK_CLASSPATH=/usr/hdp/current/spark-client/lib/hbase-common.jar:/usr/hdp/current/spark-client/lib/hbase-client.jar:/usr/hdp/current/spark-client/lib/hbase-protocol.jar:/usr/hdp/current/spark-client/lib/hbase-server.jar:/etc/hbase/conf:/usr/hdp/

[root@sandbox ~]# spark-shell --master yarn-client

As a side note, the SPARK_CLASSPATH is deprecated in Spark 1.5.x+ so you shall use instead 
[root@sandbox ~]# spark-shell --master yarn-client --driver-class-path=/usr/hdp/current/spark-client/lib/hbase-common.jar:/usr/hdp/current/spark-client/lib/hbase-client.jar:/usr/hdp/current/spark-client/lib/hbase-protocol.jar:/usr/hdp/current/spark-client/lib/hbase-hadoop2-compat.jar:/usr/hdp/current/spark-client/lib/hbase-server.jar:/etc/hbase/conf:/usr/hdp/

I did ran into bugs using the previous : […]Caused by: java.lang.IllegalStateException: unread block data so I used the first version (using SPARK_CLASSPATH)
Now it’s Scala’s time !

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

val tableName = "t1"
val hconf = HBaseConfiguration.create()
hconf.set(TableInputFormat.INPUT_TABLE, "t1")

val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("records found : " + hBaseRDD.count())

2016-04-07 18:44:40,553 INFO [main] scheduler.DAGScheduler: Job 0 finished: count at <console>:30, took 2.092481 s
Number of Records found : 40
If you want to use HBase Admin to see table list, snapshotting, or any admin-related operation, you’ll use
scala> val admin = new HBaseAdmin(hconf)

And if you want to create a table :
val tableDesc = new HTableDescriptor(tableName)

HBase sample table

Let’s create an simple HBase table from scratch !

There are many ways of creating a HBase table and populate it : bulk load, hbase shell, hive with HBaseStorageHandler, etc.
Here we’ll gonna use the ImportTsv class which aims to parse .tsv file to insert it into an existing HBase table.

First, let’s grab some data !

Download access.tsv to any machine of your cluster : this is a 2Gb zipped file with sample tab-separated data, containing columns rowkey,date,refer-url and http-code, and put it on HDFS.

[root@sandbox ~]# gunzip access.tsv.gz
[root@sandbox ~]# hdfs dfs -copyFromLocal ./access.tsv /tmp/

Now we have to create the table in HBase shell; it will contain only one ColumnFamily for this example

[root@sandbox ~]# hbase shell
hbase(main):001:0> create 'access_demo','cf1'
0 row(s) in 14.2610 seconds

And start the import with the ad hoc class, select the columns (don’t forget the HBASE_ROW_KEY which could be any of the column, hence it’s the first here).

Notice that you can specify tsv separator ‘-Dimporttsv.separator=,’ and that you obviously can add different column families cf1:field1,cf1:field2,cf2:field3,cf2:field4

[root@sandbox ~]# hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,cf1:date,cf1:refer-url,cf1:http-code access_demo /tmp/access.tsv

2015-05-21 19:55:38,144 INFO [main] mapreduce.Job: Job job_1432235700898_0002 running in uber mode : false
2015-05-21 19:55:38,151 INFO [main] mapreduce.Job: map 0% reduce 0%
2015-05-21 19:56:00,718 INFO [main] mapreduce.Job: map 7% reduce 0%
2015-05-21 19:56:03,742 INFO [main] mapreduce.Job: map 21% reduce 0%
2015-05-21 19:56:06,785 INFO [main] mapreduce.Job: map 65% reduce 0%
2015-05-21 19:56:10,846 INFO [main] mapreduce.Job: map 95% reduce 0%
2015-05-21 19:56:11,855 INFO [main] mapreduce.Job: map 100% reduce 0%
2015-05-21 19:56:13,948 INFO [main] mapreduce.Job: Job job_1432235700898_0002 completed successfully

Let’s check :

[root@sandbox ~]# hbase shell
hbase(main):001:0> list
3 row(s) in 9.7180 seconds

=> ["access_demo", "iemployee", "sales_data"]
hbase(main):002:0> scan 'access_demo'
# rowkey column=cf1:date, timestamp=1432238079103, value=date
# rowkey column=cf1:http-code, timestamp=1432238079103, value=http-code
# rowkey column=cf1:refer-url, timestamp=1432238079103, value=refer-url column=cf1:date, timestamp=1432238079103, value=2008-01-25 16:20:50
w/80343522-eckhart-tolle column=cf1:http-code, timestamp=1432238079103, value=200
w/80343522-eckhart-tolle column=cf1:refer-url, timestamp=1432238079103,
w/80343522-eckhart-tolle column=cf1:date, timestamp=1432238079103, value=2008-01-25 19:35:50 column=cf1:http-code, timestamp=1432238079103, value=200

This is it !

HBase regions merge

HBase writes data to multiple servers, called Region Servers.

Each region server contains one or several Regions, and data is allocated on these regions; Hbase will control which region server controls which region(s).

Regions number can be defined at the table creation level :

[hbase@gw vagrant]$ kinit -kt /etc/security/keytabs/hbase.headless.keytab hbase
[hbase@gw vagrant]$ hbase shell
hbase(main):001:0> create 'table2', 'columnfamily1', {NUMREGIONS => 5, SPLITALGO => 'HexStringSplit'}

We have previously defined that 5 regions would be accurate, regarding region servers number and desired regions size, and 2 basic algorithms are supplied, HexStringSplit and UniformSplit (but you can add yours).

You can provide your own splits :

hbase(main):001:0> create 'table2', 'columnfamily1', {NUMREGIONS => 5, SPLITS=> ['a', 'b', 'c']}

So this table2 has been created with our 5 regions, let’s go to HBase webUI to see what it looks like :

hbase01We do have our 5 regions, see the keys repartition, and we can see in the regions names : table_name,start_key,end_key,timestamp.ENCODED_REGIONNAME.

So now, if we want to merge regions, we can use the merge_region in hbase shell.
The regions have to be adjacent.

hbase(main):010:0> merge_region '234a12e83e203f2e3158c39e1da6b6e7', '89dd2d5a88e1b2b9787e3254b85b91d3'
0 row(s) in 0.0140 seconds


Notice that the ENCODED_REGIONNAME of the result region is a new one.

hbase(main):012:0> merge_region 'bfad503057fca37bd60b5a83109f7dc6','e37d7ab5513e06268459c76d5e7335e4'
0 row(s) in 0.0040 seconds

Let merge all regions, eventually !

hbase(main):013:0> merge_region '0f5fc22bf0beacbf83c1ad562324c778','af6d7af861f577ba456cff88bf5e5e38','3f1e029afd907bc62f5e5fb8b6e1b5cf','3f1e029afd907bc62f5e5fb8b6e1b5cf'
0 row(s) in 0.0290 seconds

Then we can see that only one region remains :



For the record, you can create a HBase table pre-splitted if you know the repartition of your keys : either by passing SPLITS, or by providing a SPLITS_FILE which contains the points of splitting (so lines number =regions -1)
Be aware of the order, SPLITS_FILE before {…} won’t work.

[hbase@gw vagrant]$ echo "a\nb\nc" > /tmp/splits.txt;
[hbase@gw vagrant]$ kinit -kt /etc/security/keytabs/hbase.headless.keytab hbase
[hbase@gw vagrant]$ hbase shell
hbase(main):011:0> create 'test_split', { NAME=> 'cf', VERSIONS => 1, TTL => 69200 }, SPLITS_FILE => '/tmp/splits.txt'

And the result :