Monday, June 3, 2013

Apache Sqoop - Part 2: Import from mysql into Hive

What's in this blog?

This is part two of a series of blogs on Apache Sqoop.
This blog covers my notes on importing data into Hive, from mySQL.

Note: My blog, part 1, on Sqoop covers mySql setup and loading sample data.

Versions covered:
Sqoop (1.4.2) with Mysql (5.1.69 ) 


Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.

A) Import data into Hive

A1. Basic import

Here is some data in the employees database that we will use:

mysql> select * from departments;
+---------+--------------------+
| dept_no | dept_name          |
+---------+--------------------+
| d009    | Customer Service   |
| d005    | Development        |
| d002    | Finance            |
| d003    | Human Resources    |
| d001    | Marketing          |
| d004    | Production         |
| d006    | Quality Management |
| d008    | Research           |
| d007    | Sales              |
+---------+--------------------+
9 rows in set (0.00 sec)

sqoop import comand:
sqoop import \
--connect jdbc:mysql://airawat-mySqlServer/employees \
--username myUID \
--password myPWD \
--table departments \
--direct \
-m 1 \
--hive-import \
--create-hive-table \
--hive-table departments_mysql \
--target-dir /user/hive/warehouse/ \
--enclosed-by '\"' \
--fields-terminated-by , \
--escaped-by \\ \


File created in HDFS:
$ hadoop fs -ls -R /user/hive/warehouse | grep /part* | awk '{print $8}'

/user/hive/warehouse/departments_mysql/part-m-00000

Validate the number of records:

$ hadoop fs -cat /user/hive/warehouse/departments_mysql/part-m-00000 | wc -l

9

Check the imported data in HDFS:
$ hadoop fs -cat /user/hive/warehouse/departments_mysql/part-m-00000 | more

"d009","Customer Service"
"d005","Development"
"d002","Finance"
"d003","Human Resources"
"d001","Marketing"
"d004","Production"
"d006","Quality Management"
"d008","Research"
"d007","Sales"

Validate results in Hive:
$ hive

hive> show tables;

departments_mysql


hive> select * from departments_mysql;


"d009"           "Customer Service"
"d005"           "Development"
"d002"           "Finance"
"d003"           "Human Resources"
"d001"           "Marketing"
"d004"           "Production"
"d006"           "Quality Management"
"d008"           "Research"
"d007"           "Sales"

A2. Importing into Hive with partitions

To try this functionality out, I decided on gender as my partition criteria.


mysql> select gender, count(*) from employees group by gender;  
+--------+----------+
| gender | count(*) |
+--------+----------+
| M      |   179973 |
| F      |   120051 |
+--------+----------+

Import gender="M"

$ sqoop import \
--connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPwd \
--query 'select EMP_NO,birth_date,first_name,last_name,hire_date from employees where gender="M" AND $CONDITIONS'  \
--direct \
-m 6 \
--split-by EMP_NO \
--hive-import \
--create-hive-table \
--hive-table employees_import_parts \
--target-dir /user/hive/warehouse/employee-parts \
--hive-partition-key gender \
--hive-partition-value 'M' \
--enclosed-by '\"' \
--fields-terminated-by , \
--escaped-by \\ \



Note 1: Gender column should not be included in the query.
The two arguments (--hive-partition...) highlighted in yellow are required.
Also, note that I have added a where clause to filter on just gender="M".
More later on dynamic partitioning.

Note2: If the column emp_no is listed in lower case in the query, only null is retrieved.  If we swicth the case of this just one field, to EMP_NO, it works fine.

See if files were created:

$ hadoop fs -ls -R /user/hive/warehouse/employees_import_parts | grep /part* | awk '{print $8}'

/user/hive/warehouse/employees_import_parts/gender=M/part-m-00000
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00001
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00002
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00003
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00004
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00005


Do a line count:
$ hadoop fs -cat /user/hive/warehouse/employees_import_parts/gender=M/* | wc -l

179973

Open a file to see if it is formatted right:
$ hadoop fs -cat /user/hive/warehouse/employees_import_parts/gender=M/part-m-00005 | more

"418333","1954-11-10","Jackson","Simmen","1993-11-14"
"418334","1954-04-12","Jingling","Orlowski","1985-06-19"
"418335","1963-09-09","Kwok","Dalton","1986-07-28"
"418337","1961-08-31","Eberhardt","Ramras","1988-02-25"

Note: gender is not in the data file but in a directory name/partition name.

Check if table got created
hive> show tables;

employees_import_parts

Display column headers:
hive> set hive.cli.print.header=true;

Validate record count:
hive> select gender, count(*) from employees_import_parts group by gender;   

gender _c1
M 179973

The count is accurate.

Review one record for accuracy:
hive> select * from employees_import_parts limit 1;


emp_no birth_date first_name last_name hire_date gender
NULL "1953-09-02" "Georgi" "Facello" "1986-06-26" M

Note: Need to trouble-shoot why the emp_no is showing up as a blank.

Validate if table is partitioned

hive> show partitions employees_import_parts;

partition
gender=F
gender=M

Import gender="F"

$ sqoop import \
--connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPWD \
--query 'select emp_no,birth_date,first_name,last_name,hire_date from employees where gender="F" AND $CONDITIONS'  \
--direct \
-m 6 \
--split-by emp_no \
--hive-import \
--hive-overwrite \
--hive-table employees_import_parts \
--target-dir /user/hive/warehouse/employee-parts_F \
--hive-partition-key gender \
--hive-partition-value 'F' \
--enclosed-by '\"' \
--fields-terminated-by , \
--escaped-by \\ \

Files generated:

$ hadoop fs -ls -R /user/hive/warehouse/employees_import_parts | grep /part* | awk '{print $8}'

/user/hive/warehouse/employees_import_parts/gender=F/part-m-00000
/user/hive/warehouse/employees_import_parts/gender=F/part-m-00001
/user/hive/warehouse/employees_import_parts/gender=F/part-m-00002
/user/hive/warehouse/employees_import_parts/gender=F/part-m-00003
/user/hive/warehouse/employees_import_parts/gender=F/part-m-00004
/user/hive/warehouse/employees_import_parts/gender=F/part-m-00005
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00000
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00001
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00002
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00003
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00004
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00005


Record count for gender=F:

$ hadoop fs -cat /user/hive/warehouse/employees_import_parts/gender=F/part* | wc -l

120051

The count is accurate.


Record count for employees in total:
Expected: 300024


$ hadoop fs -cat /user/hive/warehouse/employees_import_parts/*/part* | wc -l

300024


Validate a bunch of records for accuracy of format:


$ hadoop fs -cat /user/hive/warehouse/employees_import_parts/gender=F/part-m-00005 | more

"418330","1953-06-13","Pranas","McFarlan","1989-12-23"
"418331","1954-04-07","Chaosheng","Greenaway","1996-05-21"
"418332","1961-04-19","Koichi","Cannard","1986-01-21"
"418336","1954-02-14","Georgy","Thimonier","1994-03-21"


Validate count in Hive:
hive> select gender, count(*) from employees_import_parts group by gender;   


gender _c1
F 120051
M 179973


The counts are accurate.

Validate records in Hive:

hive> select * from employees_import_parts where gender='F' limit 2;

emp_no birth_date first_name last_name hire_date gender
NULL "1964-06-02" "Bezalel" "Simmel" "1985-11-21" F
NULL "1953-04-20" "Anneke" "Preusig" "1989-06-02" F


Note: As with the gender=M, need to trouble-shoot why the emp_no is showing up as a blank.


A3. Sqoop and dynamic partitioning in Hive

Currently sqoop does not support dynamic partitioning in Hive in a single command.
The partitions need to be inferred in one step, sqoop statement built and executed in next, iteratively for each partition inferred.

In order for this to work, the Hive environment should allow dynamic partitions which by default is set to false.  It can be enabled always by updating the Hive configuration (hive-site.xml) or for the session by executing the following command-

hive> hive.exec.dynamic.partition = true;


A4. Output line formatting options

Straight out of Apache sqoop documentation:


That's it for this blog, next blog will be on exporting out of HDFS/Hive into mysql.


12 comments:

  1. You mentioned that
    "Currently Sqoop does not support dynamic partitioning in Hive in a single command."
    But you also mentioned that
    In order for this to work, I have to set
    hive.exec.dynamic.partition = true;

    So, will Sqoop support dynamic partitioning in Hive if I set that property? If yes, how would I specify --hive-partition-value? How do I let Sqoop know the date field in the mysql transaction table will map to 'created_date' partition in Hive transaction table?

    If Sqoop doesn't support dynamic partitioning in Hive, I have to make lots of Sqoop commands. That won't be pretty.

    ReplyDelete
  2. With Sqoop 1.4.2, dynamic partitioning is not supported.
    The partitions have to be inferred and plugged into sqoop statements (static partitions).
    But for the sqoop statement to generate the (static) partition behind the scenes, the dynamic partition setting should be set to true (hive.exec.dynamic.partition = true;). This has been my experience. Any additional insights are appreciated.

    ReplyDelete
  3. Update - 03/13/2014:
    From the Sqoop mailing list-
    In Sqoop 1.4.4 there is Hcatalog Integration. This allows importing data into hive partition tables with multiple partition keys (but only static partitioning keys). See http://sqoop.apache.org/docs/1.4.4/SqoopUserGuide.html#_sqoop_hcatalog_integration

    There is a presentation done at Hadoop World on Hcatalog integration that covers details -
    https://cwiki.apache.org/confluence/download/attachments/27361435/SqoopHCatIntegration-HadoopWorld2013.pptx

    ReplyDelete
  4. how migrate multiple tables at a time into hive using sqoop
    and please share your view on "best strategy to migrate mysql to mongodb using hadoop"

    ReplyDelete
  5. Hey. Have you tried Sqoop with Hive dynamic partition (using hcatalog integration). I could not find any examples for it online. Have you tried it?. Thanks in advance

    ReplyDelete
  6. Great article. Way above the average Bigdata blogs..

    ReplyDelete
  7. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  8. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete
  9. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    Big Data Hadoop training in electronic city

    ReplyDelete