importing MySQL to Hive table with Sqoop

Sqoop (SQL to Hadoop) is a very useful tool for importing or exporting SQL data (mySQL, postgres, teradata, oracle, etc) to Hadoop.

It’s widely used in the ecosystem since basic usecases usually start with offloading data from SGBD to Hive.

Let’s make a quick example of how to export a MySQL table to Hive using HDP 2.4 sandbox

First let’s make an employees table in MySQL

mysql> create database db_source;
mysql> use db_source;
mysql> CREATE TABLE employees (
-> id mediumint(8) unsigned NOT NULL auto_increment,
-> name varchar(255) default NULL,
-> email varchar(255) default NULL,
-> PRIMARY KEY (id)
-> ) AUTO_INCREMENT=1;

mysql> INSERT INTO employees (name,email) VALUES ("Ann","elit@massa.ca"),("Hyacinth","non@scelerisquesedsapien.com"),("Preston","erat.vel@tellusfaucibusleo.co.uk"),("Quentin","arcu@lectuspedeultrices.org"),("Iola","eu.odio.tristique@dui.net"),("Pamela","quis.urna@imperdietnec.com"),("Rae","faucibus@Inlorem.co.uk"),("Olympia","ante.iaculis@etmalesuadafames.com"),("Uriah","Vivamus.molestie@felisadipiscing.org"),("Yeo","tempor.est@orciin.co.uk");

mysql> INSERT INTO employees (name,email) VALUES ("Zorita","et@nibhDonec.org"),("Constance","malesuada@utquam.co.uk"),("Gisela","non@fermentumconvallis.net"),("Galena","luctus.Curabitur.egestas@Morbiaccumsan.edu"),("Petra","euismod.ac@velitQuisquevarius.org"),("Laurel","dolor@a.co.uk"),("Iona","elit.pharetra.ut@lobortis.org"),("Nola","ut.sem@convallisconvallisdolor.edu"),("Emily","ipsum.Phasellus.vitae@ridiculusmusProin.ca"),("Scarlett","tristique@nonenim.net");

Now let’s try to import that data into Hive ! Without any specific parameters Sqoop will try to find the primary key to make a mapreduce job based on that key to parallelize the transfer, but you can specify which column Sqoop use with the –split-by parameter.
You can transfer only specific columns with –columns, specify target-dir (if you don’t want table to be in the default warehouse dir) with –target-dir, etc

All parameters are to be found in Sqoop documentation.

[root@sandbox ~]# sqoop import \
--connect jdbc:mysql://localhost:3306/db_source \
--username root \
--table employees \
--fields-terminated-by "," \
--hive-import \
--create-hive-table \
--hive-table employees
 

At first execution, you may fail with an error message like

16/10/23 13:48:11 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
16/10/23 13:48:12 ERROR manager.SqlManager: Error reading from database: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@38009ade is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@38009ade is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:934)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:931)
at com.mysql.jdbc.MysqlIO.checkForOutstandingStreamingData(MysqlIO.java:2735)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1899)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2151)
 

That’s should be caused by an old version of mysql-connector. Upgrade to a 5.1.30+ version (should be available in /usr/share/java), if you already have that version in /usr/share/java then make the hadoop lib a symbolic link to that.

You can find your mysql-connector versions with md5sum :

[root@sandbox ~]# find / -xdev -type f -name "mysql-connector*jar" | xargs md5sum
 

To make a symbolic link :

[root@sandbox ~]# rm -rf /usr/hdp/2.4.0.0-169/hadoop/lib/mysql-connector-java.jar
[root@sandbox ~]# ln -s /usr/share/java/mysql-connector-java-5.1.31-bin.jar /usr/hdp/2.4.0.0-169/hadoop/lib/mysql-connector-java.jar

Quick tip on how to make sure about the version in a .jar file:

[root@sandbox ~]# unzip -p /usr/share/java/mysql-connector-java.jar META-INF/INDEX.LIST
JarIndex-Version: 1.0

mysql-connector-java-5.1.17-SNAPSHOT-bin.jar
[...]

Now, if we relaunch the sqoop import we shall see a mapreduce job occuring :

[root@sandbox ~]#sqoop import \
--connect jdbc:mysql://localhost:3306/db_source \
--username root \
--table employees \
--fields-terminated-by "," \
--hive-import \
--create-hive-table \
--hive-table employees
16/10/23 14:03:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
16/10/23 14:03:48 WARN manager.MySQLManager: It looks like you are importing from mysql.
16/10/23 14:03:48 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
16/10/23 14:03:48 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
16/10/23 14:03:48 INFO mapreduce.ImportJobBase: Beginning import of employees

Comment : you can see the query for selecting boundaries, which will make the splits occuring to parallelize the import.
Here it sees ids going from 1 to 20, deciding (default) to use 4 splits so first map job will copy from id 1 to 5, second job from 6 to 10, etc.

16/10/23 14:04:10 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `employees`
16/10/23 14:04:11 INFO mapreduce.JobSubmitter: number of splits:4
16/10/23 14:04:20 INFO impl.YarnClientImpl: Submitted application application_1477220352113_0004
16/10/23 14:04:20 INFO mapreduce.Job: The url to track the job: http://sandbox.hortonworks.com:8088/proxy/application_1477220352113_0004/
16/10/23 14:05:35 INFO mapreduce.Job: map 0% reduce 0%
16/10/23 14:06:24 INFO mapreduce.Job: map 100% reduce 0%
16/10/23 14:06:33 INFO mapreduce.Job: Job job_1477220352113_0004 completed successfully
16/10/23 14:06:34 INFO mapreduce.ImportJobBase: Retrieved 20 records.
16/10/23 14:06:35 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
16/10/23 14:06:35 INFO hive.HiveImport: Loading uploaded data into Hive

Time taken: 62.339 seconds
Loading data to table default.employees
Table default.employees stats: [numFiles=4, totalSize=746]
OK
Time taken: 41.574 seconds

Nice ! Let’s check the import:

hive> show create table employees;
CREATE TABLE `employees`(
`id` bigint,
`name` string,
`email` string)
COMMENT 'Imported by sqoop on 2016/10/23 14:06:35'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/employees'
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='true',
'numFiles'='4',
'totalSize'='746',
'transient_lastDdlTime'='1477231712')
 

So we can see that we have 4 files (corresponding to the 4 mappers), table is named employees and is in the default warehouse.

hive> select count(*) from employees;
20
 

 


accessing Hive tables with curl and webHCat

For a quick and easy access, you can think about using WebHCat, a REST interface for accessing HCatalog, though Hive.

Let’s assume we’re in a kerberized cluster (you cannot be in an unkerberized cluster, remember…)

First, we check which port is used, default is 50111, in the Hive / webhcat-site.xml (or in the Hive configuration within the Ambari interface)

templeton (webHCat) port

templeton (webHCat) port

templeton is the former name for WebHCat.

Let’s try to do a curl on webHCat to see the DDL of the default database :

[root@sandbox ~]# curl -i --negotiate -u: "http://sandbox.hortonworks.com:50111/templeton/v1/ddl/database/default"
HTTP/1.1 401 Authentication required
WWW-Authenticate: Negotiate
Set-Cookie: hadoop.auth=; Path=/; Expires=Thu, 01-Jan-1970 00:00:00 GMT; HttpOnly
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: text/html;charset=ISO-8859-1
Content-Length: 1328
Server: Jetty(7.6.0.v20120127)

<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>
<title>Error 401 Authentication required</title>
</head>
<body>
<h2>HTTP ERROR: 401</h2>
<p>Problem accessing /templeton/v1/ddl/database/default. Reason:
<pre> Authentication required</pre></p>
<hr /><i><small>Powered by Jetty://</small></i>

Hmmm, obviously : we have to kinit ourselves before being able to access HCatalog.

[root@sandbox ~]# kinit -kt /etc/security/keytabs/hdfs.headless.keytab hdfs
[root@sandbox ~]# curl -i --negotiate -u: "http://sandbox.hortonworks.com:50111/templeton/v1/ddl/database/default"
HTTP/1.1 401 Authentication required
WWW-Authenticate: Negotiate
Set-Cookie: hadoop.auth=; Path=/; Expires=Thu, 01-Jan-1970 00:00:00 GMT; HttpOnly
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: text/html;charset=ISO-8859-1
Content-Length: 1328
Server: Jetty(7.6.0.v20120127)

HTTP/1.1 500 Server Error
Set-Cookie: hadoop.auth="u=hdfs&p=hdfs@HORTONWORKS.COM&t=kerberos&e=1475885113041&s=p+38gIJagH2o1pTkoGK+af3a6Ks="; Path=/; Expires=Sat, 08-Oct-2016 00:05:13 GMT; HttpOnly
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(7.6.0.v20120127)

{"error":"User: HTTP/sandbox.hortonworks.com@HORTONWORKS.COM is not allowed to impersonate hdfs"}

 

This is a fairly common message : as you’re requesting a REST Api, your request is encapsulated with the so-called SPNego  token, that you can think as the “Kerberos for HTTP”.

You must then be able to authenticate with SPNego token, but also HTTP should be able to impersonate you (meaning HTTP will do the request on behalf of your username)

Those proxyuser parameters could be found in the HDFS core-site.xml :

HTTP proxyuser configuration

HTTP proxyuser configuration

So here, we can see HTTP can impersonate only users belonging to the group users

[root@sandbox ~]# id hdfs
uid=505(hdfs) gid=501(hadoop) groups=501(hadoop),503(hdfs)
[root@sandbox ~]# id ambari-qa
uid=1001(ambari-qa) gid=501(hadoop) groups=501(hadoop),100(users)

That’s right, hdfs doesn’t belong to that group. However, ambari-qa does ! let’s kinit ourselves to be ambari-qa.

[root@sandbox ~]# kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa
[root@sandbox ~]# curl -i --negotiate -u: "http://sandbox.hortonworks.com:50111/templeton/v1/ddl/database/default"
HTTP/1.1 401 Authentication required
WWW-Authenticate: Negotiate
Set-Cookie: hadoop.auth=; Path=/; Expires=Thu, 01-Jan-1970 00:00:00 GMT; HttpOnly
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: text/html;charset=ISO-8859-1
Content-Length: 1328
Server: Jetty(7.6.0.v20120127)

HTTP/1.1 200 OK
Set-Cookie: hadoop.auth="u=ambari-qa&p=ambari-qa@HORTONWORKS.COM&t=kerberos&e=1475885666292&s=/WGJZIe4BRKBoI4UmxfHUv8r7MU="; Path=/; Expires=Sat, 08-Oct-2016 00:14:26 GMT; HttpOnly
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(7.6.0.v20120127)

{"location":"hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse","ownerType":"ROLE","owner":"public","comment":"Default Hive database","database":"default"}

That’s it, you got your DDL !

 


check file content with ascii codes

If you want to deeply check a file (spaces are spaces, commas, quotes, etc) you can have a look on the ascii codes as well with the hexdump command :

$ hexdump -C /etc/passwd
00000000 72 6f 6f 74 3a 78 3a 30 3a 30 3a 72 6f 6f 74 3a |root:x:0:0:root:|
00000010 2f 72 6f 6f 74 3a 2f 62 69 6e 2f 62 61 73 68 0a |/root:/bin/bash.|
00000020 64 61 65 6d 6f 6e 3a 78 3a 31 3a 31 3a 64 61 65 |daemon:x:1:1:dae|
00000030 6d 6f 6e 3a 2f 75 73 72 2f 73 62 69 6e 3a 2f 62 |mon:/usr/sbin:/b|
00000040 69 6e 2f 73 68 0a 62 69 6e 3a 78 3a 32 3a 32 3a |in/sh.bin:x:2:2:|
00000050 62 69 6e 3a 2f 62 69 6e 3a 2f 62 69 6e 2f 73 68 |bin:/bin:/bin/sh|

test your Hadoop mapping rules

You may hit some impersonation issues because of some wrong auth-to-local rules.

These rules translates your principal to a user short name, and you may want to be sure that – for example – hive/worker01@REALM correctly translates to hive.

to do that :

[root@worker ~]# hadoop org.apache.hadoop.security.HadoopKerberosName \
HTTP/worker01fqdn@REALM

Name: HTTP/worker01fqdn@REALM to HTTP

HDFS ls and Out of Memory (GC Overhead limit)

If you have an error when doing a ls like

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.StringBuffer.toString(StringBuffer.java:561)
at java.net.URI.toString(URI.java:1926)
at java.net.URI.<init>(URI.java:749)
at org.apache.hadoop.fs.Path.makeQualified(Path.java:467)
...

You might increase the client memory :

HADOOP_CLIENT_OPTS="-Xmx4g" hdfs dfs -ls -R /

Oozie 101 : your first workflow in 5 minutes

Oozie is the de facto scheduler for Hadoop, bundled in the main Hadoop distributions.

Its key concepts could be not so easy to get, so let’s do our first Oozie workflow.

You only have to consider that you’ll have Oozie clients submitting workflows to Oozie server.

 

Some examples are bundled in oozie-examples.tar.gz so let’s untar that to our local directory :

[root@sandbox ~]# su - ambari-qa
[ambari-qa@sandbox ~]$ tar -xzf /usr/hdp/2.2.4.2-2/oozie/doc/oozie-examples.tar.gz

We’ll submit our first workflow, a shell action example, but first we have to modify some parameters in the job.properties file.
The 2 main (and mandatory) files are job.properties and workflow.xml : the former including parameters, the latter the definition itself.

[ambari-qa@sandbox ~]$ cat examples/apps/shell/job.properties
...
nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
examplesRoot=examples

oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/shell

Here we have to modify the jobTracker to point to the ResourceManager, so it goes from localhost:8021 to sandbox.hortonworks.com:8050

The nameNode goes from hdfs://localhost:8020 to hdfs://sandbox.hortonworks.com:8020, the other parameters doesn’t need any change.

Now to be submitted, the workflow.xml needs to be put on HDFS, because Oozie server works only with files on HDFS (and this is an important point since it can lead you to further mistakes : for example custom hdfs-site.xml or hive-site.xml will need to be put somewhere on HDFS for Oozie to know them)

In the job.properties example we put that path to NN/user/ambari-qa/examples/apps/shell, so let’s make that :

[ambari-qa@sandbox ~]$ hdfs dfs -put examples/

Now let’s run that job !

[ambari-qa@sandbox ~]$ oozie job -oozie http://sandbox.hortonworks.com:11000/oozie -config examples/apps/shell/job.properties -run

you have noticed the job.properties is a local file : it’s submitted by the Oozie client, and any client could have any job.properties file

A common tip is to export the OOZIE_URL env variable to not have to put that on every Oozie command:

[ambari-qa@sandbox ~]$ export OOZIE_URL=http://sandbox.hortonworks.com:11000/oozie

That replace the -oozie http://sandbox.hortonworks.com:11000/oozie in every following command

Now let's submit our workflow:

[ambari-qa@sandbox ~]$ oozie job -config examples/apps/shell/job.properties -run
job: 0000010-160617094106166-oozie-oozi-W

Now that we have the workflowID, let’s check its status:

[ambari-qa@sandbox ~]$ oozie job -info 0000010-160617094106166-oozie-oozi-W

Oozie RUNNING

and finally :

[ambari-qa@sandbox ~]$ oozie job -info 0000010-160617094106166-oozie-oozi-W

Oozie COMPLETE

That’s all good ! You can now dig into Oozie and look at every action type (shell, Hive, etc) and more advanced features (forks, conditions, etc)


Sqoop Teradata password file extra control character

Sqoop is a great tool to get SQL from/to Hadoop.
Using it with Teradata, you have the possibility to use a password file instead of a plaintext password :
   <arg>--password-file</arg>
  <arg>hdfs://NAMENODE/teradata.password</arg>
You may end with error
3737 [main] ERROR org.apache.sqoop.teradata.TeradataSqoopExportHelper  - Exception running Teradata export job
com.teradata.connector.common.exception.ConnectorException: java.sql.SQLException: [Teradata Database] [TeraJDBC 15.00.00.20] [Error 8017] [SQLState 28000] The UserId, Password or Account is invalid.
But you’re 100% sure of the password ? If you set your “Password” password with vi you’ll end with a line feed control character
To find if there’s a LF ending the password file :
[root@localhost ~]# od -c teradata.password
0000000    P   a   s   s   w   o   r   d  \n
0000011
So you’ll have to delete your newline control character using tr :
[root@localhost ~]# tr -d '\n' < teradata.password > teradata.password.new
[root@localhost ~]# od -c teradata.password.new
0000000    P   a   s   s   w   o   r   d
0000010

Apr 7, 2016    |      0 comments

Spark on HBase with Spark shell

Some minor adjustements are needed for attacking HBase tables from a Spark context.

Let’s first quick create a “t1″ HBase sample table with 40 lines

 

[root@sandbox ~]# cat hbase_load.txt
create 't1', 'f1'
for i in '1'..'10' do \
for j in '1'..'2' do \
for k in '1'..'2' do \
rnd=(0...64).map { (65 + rand(26)).chr }.join
put 't1', "#{i}-#{j}-#{k}", "f1:#{j}#{k}", "#{rnd}"
end \
end \
end
[root@sandbox ~]# cat hbase_load.txt |hbase shell

You need to adjust your Spark classpath (guava 14 needed so included the first I’d found):

[root@sandbox ~]# export SPARK_CLASSPATH=/usr/hdp/current/spark-client/lib/hbase-common.jar:/usr/hdp/current/spark-client/lib/hbase-client.jar:/usr/hdp/current/spark-client/lib/hbase-protocol.jar:/usr/hdp/current/spark-client/lib/hbase-server.jar:/etc/hbase/conf:/usr/hdp/2.3.2.0-2950/oozie/share/lib/spark/guava-14.0.1.jar

[root@sandbox ~]# spark-shell --master yarn-client

As a side note, the SPARK_CLASSPATH is deprecated in Spark 1.5.x+ so you shall use instead 
[root@sandbox ~]# spark-shell --master yarn-client --driver-class-path=/usr/hdp/current/spark-client/lib/hbase-common.jar:/usr/hdp/current/spark-client/lib/hbase-client.jar:/usr/hdp/current/spark-client/lib/hbase-protocol.jar:/usr/hdp/current/spark-client/lib/hbase-hadoop2-compat.jar:/usr/hdp/current/spark-client/lib/hbase-server.jar:/etc/hbase/conf:/usr/hdp/2.3.2.0-2950/oozie/share/lib/spark/guava-14.0.1.jar

I did ran into bugs using the previous : […]Caused by: java.lang.IllegalStateException: unread block data so I used the first version (using SPARK_CLASSPATH)
Now it’s Scala’s time !
scala>

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

val tableName = "t1"
val hconf = HBaseConfiguration.create()
hconf.set(TableInputFormat.INPUT_TABLE, "t1")

val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("records found : " + hBaseRDD.count())


[...]
2016-04-07 18:44:40,553 INFO [main] scheduler.DAGScheduler: Job 0 finished: count at <console>:30, took 2.092481 s
Number of Records found : 40
If you want to use HBase Admin to see table list, snapshotting, or any admin-related operation, you’ll use
scala> val admin = new HBaseAdmin(hconf)

admin.listTables
And if you want to create a table :
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)

Apr 6, 2016    |      0 comments

set date and time in VirtualBox

If you set date manually in your VirtualBox VM, datetime reset to the host date.

This behaviour is caused by VirtualBox Guest Additions, so you first need to stop that service :

sudo service vboxadd-service stop

You’ll then be able to change the date

date --set="8 Apr 2016 18:00:00"

Apr 5, 2016    |       2 comments

Hadoop log compression on the fly with log4j

Hadoop logs are as verbose and useful as heavy. From that last perspective, some want to zip their logs so they can maintain their /var/log partition under warnings.

Thanks to log4j, you can achieve that in 2 ways :

1. use the log4j-extras package

2. use the log4j2 package which contains (at least !) compression

Here I’ll use the first, using it for Hive logging :

hive.root.logger=INFO,request

log4j.appender.request=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.request.File=${hive.log.dir}/${hive.log.file}
log4j.appender.request.RollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.request.RollingPolicy.ActiveFileName=${hive.log.dir}/${hive.log.file}
log4j.appender.request.RollingPolicy.FileNamePattern=${hive.log.dir}/${hive.log.file}.%d{yyyyMMdd}.log.gz
log4j.appender.request.layout = org.apache.log4j.PatternLayout
log4j.appender.request.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n

Remember to get over the DRFA lines by commenting or deleting the lines.

Restart components, and you have zipped DRFA on daily basis (yyyyMMdd)

 


Pages:123456