Posts tagged with: sample

importing MySQL to Hive table with Sqoop

Sqoop (SQL to Hadoop) is a very useful tool for importing or exporting SQL data (mySQL, postgres, teradata, oracle, etc) to Hadoop.

It’s widely used in the ecosystem since basic usecases usually start with offloading data from SGBD to Hive.

Let’s make a quick example of how to export a MySQL table to Hive using HDP 2.4 sandbox

First let’s make an employees table in MySQL

mysql> create database db_source;
mysql> use db_source;
mysql> CREATE TABLE employees (
-> id mediumint(8) unsigned NOT NULL auto_increment,
-> name varchar(255) default NULL,
-> email varchar(255) default NULL,
-> PRIMARY KEY (id)
-> ) AUTO_INCREMENT=1;

mysql> INSERT INTO employees (name,email) VALUES ("Ann","elit@massa.ca"),("Hyacinth","non@scelerisquesedsapien.com"),("Preston","erat.vel@tellusfaucibusleo.co.uk"),("Quentin","arcu@lectuspedeultrices.org"),("Iola","eu.odio.tristique@dui.net"),("Pamela","quis.urna@imperdietnec.com"),("Rae","faucibus@Inlorem.co.uk"),("Olympia","ante.iaculis@etmalesuadafames.com"),("Uriah","Vivamus.molestie@felisadipiscing.org"),("Yeo","tempor.est@orciin.co.uk");

mysql> INSERT INTO employees (name,email) VALUES ("Zorita","et@nibhDonec.org"),("Constance","malesuada@utquam.co.uk"),("Gisela","non@fermentumconvallis.net"),("Galena","luctus.Curabitur.egestas@Morbiaccumsan.edu"),("Petra","euismod.ac@velitQuisquevarius.org"),("Laurel","dolor@a.co.uk"),("Iona","elit.pharetra.ut@lobortis.org"),("Nola","ut.sem@convallisconvallisdolor.edu"),("Emily","ipsum.Phasellus.vitae@ridiculusmusProin.ca"),("Scarlett","tristique@nonenim.net");

Now let’s try to import that data into Hive ! Without any specific parameters Sqoop will try to find the primary key to make a mapreduce job based on that key to parallelize the transfer, but you can specify which column Sqoop use with the –split-by parameter.
You can transfer only specific columns with –columns, specify target-dir (if you don’t want table to be in the default warehouse dir) with –target-dir, etc

All parameters are to be found in Sqoop documentation.

[root@sandbox ~]# sqoop import \
--connect jdbc:mysql://localhost:3306/db_source \
--username root \
--table employees \
--fields-terminated-by "," \
--hive-import \
--create-hive-table \
--hive-table employees
 

At first execution, you may fail with an error message like

16/10/23 13:48:11 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
16/10/23 13:48:12 ERROR manager.SqlManager: Error reading from database: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@38009ade is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@38009ade is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:934)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:931)
at com.mysql.jdbc.MysqlIO.checkForOutstandingStreamingData(MysqlIO.java:2735)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1899)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2151)
 

That’s should be caused by an old version of mysql-connector. Upgrade to a 5.1.30+ version (should be available in /usr/share/java), if you already have that version in /usr/share/java then make the hadoop lib a symbolic link to that.

You can find your mysql-connector versions with md5sum :

[root@sandbox ~]# find / -xdev -type f -name "mysql-connector*jar" | xargs md5sum
 

To make a symbolic link :

[root@sandbox ~]# rm -rf /usr/hdp/2.4.0.0-169/hadoop/lib/mysql-connector-java.jar
[root@sandbox ~]# ln -s /usr/share/java/mysql-connector-java-5.1.31-bin.jar /usr/hdp/2.4.0.0-169/hadoop/lib/mysql-connector-java.jar

Quick tip on how to make sure about the version in a .jar file:

[root@sandbox ~]# unzip -p /usr/share/java/mysql-connector-java.jar META-INF/INDEX.LIST
JarIndex-Version: 1.0

mysql-connector-java-5.1.17-SNAPSHOT-bin.jar
[...]

Now, if we relaunch the sqoop import we shall see a mapreduce job occuring :

[root@sandbox ~]#sqoop import \
--connect jdbc:mysql://localhost:3306/db_source \
--username root \
--table employees \
--fields-terminated-by "," \
--hive-import \
--create-hive-table \
--hive-table employees
16/10/23 14:03:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
16/10/23 14:03:48 WARN manager.MySQLManager: It looks like you are importing from mysql.
16/10/23 14:03:48 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
16/10/23 14:03:48 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
16/10/23 14:03:48 INFO mapreduce.ImportJobBase: Beginning import of employees

Comment : you can see the query for selecting boundaries, which will make the splits occuring to parallelize the import.
Here it sees ids going from 1 to 20, deciding (default) to use 4 splits so first map job will copy from id 1 to 5, second job from 6 to 10, etc.

16/10/23 14:04:10 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `employees`
16/10/23 14:04:11 INFO mapreduce.JobSubmitter: number of splits:4
16/10/23 14:04:20 INFO impl.YarnClientImpl: Submitted application application_1477220352113_0004
16/10/23 14:04:20 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1477220352113_0004/
16/10/23 14:05:35 INFO mapreduce.Job: map 0% reduce 0%
16/10/23 14:06:24 INFO mapreduce.Job: map 100% reduce 0%
16/10/23 14:06:33 INFO mapreduce.Job: Job job_1477220352113_0004 completed successfully
16/10/23 14:06:34 INFO mapreduce.ImportJobBase: Retrieved 20 records.
16/10/23 14:06:35 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
16/10/23 14:06:35 INFO hive.HiveImport: Loading uploaded data into Hive

Time taken: 62.339 seconds
Loading data to table default.employees
Table default.employees stats: [numFiles=4, totalSize=746]
OK
Time taken: 41.574 seconds

Nice ! Let’s check the import:

hive> show create table employees;
CREATE TABLE `employees`(
`id` bigint,
`name` string,
`email` string)
COMMENT 'Imported by sqoop on 2016/10/23 14:06:35'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/employees'
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='true',
'numFiles'='4',
'totalSize'='746',
'transient_lastDdlTime'='1477231712')
 

So we can see that we have 4 files (corresponding to the 4 mappers), table is named employees and is in the default warehouse.

hive> select count(*) from employees;
20
 

 


HBase sample table

Let’s create an simple HBase table from scratch !

There are many ways of creating a HBase table and populate it : bulk load, hbase shell, hive with HBaseStorageHandler, etc.
Here we’ll gonna use the ImportTsv class which aims to parse .tsv file to insert it into an existing HBase table.

First, let’s grab some data !

Download access.tsv to any machine of your cluster : this is a 2Gb zipped file with sample tab-separated data, containing columns rowkey,date,refer-url and http-code, and put it on HDFS.

[root@sandbox ~]# gunzip access.tsv.gz
[root@sandbox ~]# hdfs dfs -copyFromLocal ./access.tsv /tmp/

Now we have to create the table in HBase shell; it will contain only one ColumnFamily for this example

[root@sandbox ~]# hbase shell
hbase(main):001:0> create 'access_demo','cf1'
0 row(s) in 14.2610 seconds

And start the import with the ad hoc class, select the columns (don’t forget the HBASE_ROW_KEY which could be any of the column, hence it’s the first here).
Syntax is hbase JAVA_CLASS -DPARAMETERS TABLE_NAME FILE

Notice that you can specify tsv separator ‘-Dimporttsv.separator=,’ and that you obviously can add different column families cf1:field1,cf1:field2,cf2:field3,cf2:field4

[root@sandbox ~]# hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,cf1:date,cf1:refer-url,cf1:http-code access_demo /tmp/access.tsv

2015-05-21 19:55:38,144 INFO [main] mapreduce.Job: Job job_1432235700898_0002 running in uber mode : false
2015-05-21 19:55:38,151 INFO [main] mapreduce.Job: map 0% reduce 0%
2015-05-21 19:56:00,718 INFO [main] mapreduce.Job: map 7% reduce 0%
2015-05-21 19:56:03,742 INFO [main] mapreduce.Job: map 21% reduce 0%
2015-05-21 19:56:06,785 INFO [main] mapreduce.Job: map 65% reduce 0%
2015-05-21 19:56:10,846 INFO [main] mapreduce.Job: map 95% reduce 0%
2015-05-21 19:56:11,855 INFO [main] mapreduce.Job: map 100% reduce 0%
2015-05-21 19:56:13,948 INFO [main] mapreduce.Job: Job job_1432235700898_0002 completed successfully

Let’s check :

[root@sandbox ~]# hbase shell
hbase(main):001:0> list
TABLE
access_demo
iemployee
sales_data
3 row(s) in 9.7180 seconds

=> ["access_demo", "iemployee", "sales_data"]
hbase(main):002:0> scan 'access_demo'
ROW COLUMN+CELL
# rowkey column=cf1:date, timestamp=1432238079103, value=date
# rowkey column=cf1:http-code, timestamp=1432238079103, value=http-code
# rowkey column=cf1:refer-url, timestamp=1432238079103, value=refer-url
74.201.80.25/san-rafael-ca/events/sho column=cf1:date, timestamp=1432238079103, value=2008-01-25 16:20:50
w/80343522-eckhart-tolle
74.201.80.25/san-rafael-ca/events/sho column=cf1:http-code, timestamp=1432238079103, value=200
w/80343522-eckhart-tolle
74.201.80.25/san-rafael-ca/events/sho column=cf1:refer-url, timestamp=1432238079103, value=www.google.com/search
w/80343522-eckhart-tolle
calendar.boston.com/ column=cf1:date, timestamp=1432238079103, value=2008-01-25 19:35:50
calendar.boston.com/ column=cf1:http-code, timestamp=1432238079103, value=200

This is it !


sample table in Hive

When you want to test some “real” stuff, it can be useful to have a Hive table with some “big” data in it.

Let’s build a table based on Wikipedia traffic stats. It will contains page counts for every page since 2008, allowing us to have partitions too.

These stats are available here so let’s grab those files first.
This is a lot of files, here we’re getting only 2008/01 files, which are each between 20 and 30MB.

I installed axel which is a download accelerator, available on EPEL repo or with the rpm

[vagrant@gw ~]$ sudo rpm -ivh http://pkgs.repoforge.org/axel/axel-2.4-1.el6.rf.x86_64.rpm
[vagrant@gw pagecounts]$ for YEAR in {2008..2008}; do for MONTH in {01..01}; do for DAY in {01..08}; do for HOUR in {00..23}; do axel -a http://dumps.wikimedia.org/other/pagecounts-raw/${YEAR}/${YEAR}-${MONTH}/pagecounts-${YEAR}${MONTH}${DAY}-${HOUR}0000.gz; done; done; done; done

These files contains :

fr.b Special:Recherche/Achille_Baraguey_d%5C%27Hilliers 1 624
fr.b Special:Recherche/Acteurs_et_actrices_N 1 739
fr.b Special:Recherche/Agrippa_d/%27Aubign%C3%A9 1 743

fr.b is the project name (french books), then the title of the page, the number of requests and finally the size of the content returned.

Let copy that data in HDFS :

[vagrant@gw pagecounts]$ for DAY in {01..08}; do
sudo -u hdfs hadoop fs -mkdir -p /apps/hive/warehouse/pagecounts/dt=2008-01-${DAY};
sudo -u hdfs hadoop fs -put pagecounts-200801${DAY}-* /apps/hive/warehouse/pagecounts/dt=2008-01-${DAY};
done

Now we can create the Hive table

hive> CREATE EXTERNAL TABLE pagecounts (project String, title String, requests Int, size Int) PARTITIONED BY (dt String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY '\n' STORED AS TEXTFILE;

Then you can add manually the partitions
hive> ALTER TABLE pagecounts ADD IF NOT EXISTS PARTITION(dt="2008-01-01") LOCATION "/apps/hive/warehouse/pagecounts/dt=2008-01-01";

or let Hive do it four you automagically :

hive> MSCK REPAIR TABLE pagecounts;
OK
Partitions not in metastore: pagecounts:dt=2008-01-02 pagecounts:dt=2008-01-03 pagecounts:dt=2008-01-04 pagecounts:dt=2008-01-05 pagecounts:dt=2008-01-06 pagecounts:dt=2008-01-07 pagecounts:dt=2008-01-08 pagecounts:dt=2008-01-09 pagecounts:dt=2008-01-10
Repair: Added partition to metastore pagecounts:dt=2008-01-02
Repair: Added partition to metastore pagecounts:dt=2008-01-03
Repair: Added partition to metastore pagecounts:dt=2008-01-04
Repair: Added partition to metastore pagecounts:dt=2008-01-05
Repair: Added partition to metastore pagecounts:dt=2008-01-06
Repair: Added partition to metastore pagecounts:dt=2008-01-07
Repair: Added partition to metastore pagecounts:dt=2008-01-08
Repair: Added partition to metastore pagecounts:dt=2008-01-09
Repair: Added partition to metastore pagecounts:dt=2008-01-10
Time taken: 0.762 seconds, Fetched: 9 row(s)