importing MySQL to Hive table with Sqoop

Sqoop (SQL to Hadoop) is a very useful tool for importing or exporting SQL data (mySQL, postgres, teradata, oracle, etc) to Hadoop.

It’s widely used in the ecosystem since basic usecases usually start with offloading data from SGBD to Hive.

Let’s make a quick example of how to export a MySQL table to Hive using HDP 2.4 sandbox

First let’s make an employees table in MySQL

mysql> create database db_source;
mysql> use db_source;
mysql> CREATE TABLE employees (
-> id mediumint(8) unsigned NOT NULL auto_increment,
-> name varchar(255) default NULL,
-> email varchar(255) default NULL,
-> PRIMARY KEY (id)
-> ) AUTO_INCREMENT=1;

mysql> INSERT INTO employees (name,email) VALUES ("Ann","elit@massa.ca"),("Hyacinth","non@scelerisquesedsapien.com"),("Preston","erat.vel@tellusfaucibusleo.co.uk"),("Quentin","arcu@lectuspedeultrices.org"),("Iola","eu.odio.tristique@dui.net"),("Pamela","quis.urna@imperdietnec.com"),("Rae","faucibus@Inlorem.co.uk"),("Olympia","ante.iaculis@etmalesuadafames.com"),("Uriah","Vivamus.molestie@felisadipiscing.org"),("Yeo","tempor.est@orciin.co.uk");

mysql> INSERT INTO employees (name,email) VALUES ("Zorita","et@nibhDonec.org"),("Constance","malesuada@utquam.co.uk"),("Gisela","non@fermentumconvallis.net"),("Galena","luctus.Curabitur.egestas@Morbiaccumsan.edu"),("Petra","euismod.ac@velitQuisquevarius.org"),("Laurel","dolor@a.co.uk"),("Iona","elit.pharetra.ut@lobortis.org"),("Nola","ut.sem@convallisconvallisdolor.edu"),("Emily","ipsum.Phasellus.vitae@ridiculusmusProin.ca"),("Scarlett","tristique@nonenim.net");

Now let’s try to import that data into Hive ! Without any specific parameters Sqoop will try to find the primary key to make a mapreduce job based on that key to parallelize the transfer, but you can specify which column Sqoop use with the –split-by parameter.
You can transfer only specific columns with –columns, specify target-dir (if you don’t want table to be in the default warehouse dir) with –target-dir, etc

All parameters are to be found in Sqoop documentation.

[root@sandbox ~]# sqoop import \
--connect jdbc:mysql://localhost:3306/db_source \
--username root \
--table employees \
--fields-terminated-by "," \
--hive-import \
--create-hive-table \
--hive-table employees
 

At first execution, you may fail with an error message like

16/10/23 13:48:11 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
16/10/23 13:48:12 ERROR manager.SqlManager: Error reading from database: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@38009ade is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@38009ade is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:934)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:931)
at com.mysql.jdbc.MysqlIO.checkForOutstandingStreamingData(MysqlIO.java:2735)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1899)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2151)
 

That’s should be caused by an old version of mysql-connector. Upgrade to a 5.1.30+ version (should be available in /usr/share/java), if you already have that version in /usr/share/java then make the hadoop lib a symbolic link to that.

You can find your mysql-connector versions with md5sum :

[root@sandbox ~]# find / -xdev -type f -name "mysql-connector*jar" | xargs md5sum
 

To make a symbolic link :

[root@sandbox ~]# rm -rf /usr/hdp/2.4.0.0-169/hadoop/lib/mysql-connector-java.jar
[root@sandbox ~]# ln -s /usr/share/java/mysql-connector-java-5.1.31-bin.jar /usr/hdp/2.4.0.0-169/hadoop/lib/mysql-connector-java.jar

Quick tip on how to make sure about the version in a .jar file:

[root@sandbox ~]# unzip -p /usr/share/java/mysql-connector-java.jar META-INF/INDEX.LIST
JarIndex-Version: 1.0

mysql-connector-java-5.1.17-SNAPSHOT-bin.jar
[...]

Now, if we relaunch the sqoop import we shall see a mapreduce job occuring :

[root@sandbox ~]#sqoop import \
--connect jdbc:mysql://localhost:3306/db_source \
--username root \
--table employees \
--fields-terminated-by "," \
--hive-import \
--create-hive-table \
--hive-table employees
16/10/23 14:03:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
16/10/23 14:03:48 WARN manager.MySQLManager: It looks like you are importing from mysql.
16/10/23 14:03:48 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
16/10/23 14:03:48 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
16/10/23 14:03:48 INFO mapreduce.ImportJobBase: Beginning import of employees

Comment : you can see the query for selecting boundaries, which will make the splits occuring to parallelize the import.
Here it sees ids going from 1 to 20, deciding (default) to use 4 splits so first map job will copy from id 1 to 5, second job from 6 to 10, etc.

16/10/23 14:04:10 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `employees`
16/10/23 14:04:11 INFO mapreduce.JobSubmitter: number of splits:4
16/10/23 14:04:20 INFO impl.YarnClientImpl: Submitted application application_1477220352113_0004
16/10/23 14:04:20 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1477220352113_0004/
16/10/23 14:05:35 INFO mapreduce.Job: map 0% reduce 0%
16/10/23 14:06:24 INFO mapreduce.Job: map 100% reduce 0%
16/10/23 14:06:33 INFO mapreduce.Job: Job job_1477220352113_0004 completed successfully
16/10/23 14:06:34 INFO mapreduce.ImportJobBase: Retrieved 20 records.
16/10/23 14:06:35 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
16/10/23 14:06:35 INFO hive.HiveImport: Loading uploaded data into Hive

Time taken: 62.339 seconds
Loading data to table default.employees
Table default.employees stats: [numFiles=4, totalSize=746]
OK
Time taken: 41.574 seconds

Nice ! Let’s check the import:

hive> show create table employees;
CREATE TABLE `employees`(
`id` bigint,
`name` string,
`email` string)
COMMENT 'Imported by sqoop on 2016/10/23 14:06:35'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/employees'
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='true',
'numFiles'='4',
'totalSize'='746',
'transient_lastDdlTime'='1477231712')
 

So we can see that we have 4 files (corresponding to the 4 mappers), table is named employees and is in the default warehouse.

hive> select count(*) from employees;
20
 

 


So, what do you think ?