Wednesday, July 10, 2013

Apache Oozie - Part 9c: Coordinator job - dataset availability triggered


1.0. What's covered in the blog?

1) Apache documentation on cooridnator jobs that execute workflows upon availability of datasets
2) A sample program that includes components of a oozie, dataset availability initiated, coordinator job - scripts/code, sample dataset and commands;  Oozie actions covered: hdfs action, email action, sqoop action (mysql database); 

Version:
Oozie 3.3.0;

Related blogs:

Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another
Blog 13: Oozie workflow - SSH action


2.0. Apache documentation on dataset availability triggered coordinator jobs

http://oozie.apache.org/docs/3.3.0/CoordinatorFunctionalSpec.html

3.0. Sample coordinator application


Highlights:
The coordinator application has a start time, and when the start time condition is met, it will transition to waiting state where it will look for the availability of a dataset.  Once the dataset is available, it will run the workflow specified.


Sample application - pictorial overview:


Coordinator application components:




















Coordinator application details:

This gist includes components of a oozie, dataset availability initiated, coordinator job -
scripts/code, sample data and commands; Oozie actions covered: hdfs action, email action,
sqoop action (mysql database); Oozie controls covered: decision;
Usecase
-------
Pipe report data available in HDFS, to mysql database;
Pictorial overview of job:
--------------------------
http://hadooped.blogspot.com/p/ooziecooridnatorjobdatasetdep-pix.html
Includes:
---------
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Mysql database setup: 04-mysqlDBSetup
Sqoop command - test: 05-SqoopStandaloneTryout
Oozie configuration for email 06-OozieSMTPconfiguration
Oozie coorindator properties file 07-OozieCoordinatorProperties
Oozie cooridinator conf file 08-OozieCoordinatorXML
Sqoop workflow conf file 09-SqoopWorkflowXML
Oozie commands 10-OozieJobExecutionCommands
Output in mysql 11-Output
Oozie web console - screenshots 12-OozieWebConsoleScreenshots
*********************************
Data download
*********************************
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues.
*********************************
Directory structure
*********************************
oozieProject
sampleCoordinatorJobDatasetDep
coordinatorConf/
coordinator.properties
coordinator.xml
sqoopWorkflowApp
workflow.xml
datasetGeneratorApp
outputDir
part-r-00000
_SUCCESS
_logs
history
cdh-jt01_1372261353326_job_201306261042_0536_conf.xml
job_201306261042_0536_1373407670448_akhanolk_Syslog+Event+Rollup
---------------------------------------------------------------------------------------
Line 14 - 20
-------------
Cordinator application
Line 21 -30
------------
The datasetGeneratorApp is essentially what we will use to trigger the coordinator job. While the logs are not important for the simulation, the presence of _SUCCESS is needed, failing which the job will not get triggered.
*********************************
Hdfs data load commands
*********************************
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
Run command below to validate load against expected directory structure in section 02-DataAndScriptDownload
$ hadoop fs -ls -R oozieProject/sampleCoordinatorJobDatasetDep | awk '{print $8}'
Remove the dataset directory - we will load it when we want to trigger the job
$ hadoop fs -rm -R oozieProject/datasetGeneratorApp
*********************************
Mysql database setup tasks
*********************************
a) Create database:
mysql>
create database airawat;
b) Switch to database created:
mysql>
use airawat;
c) Create destination table for sqoop export from hdfs:
mysql>
CREATE TABLE IF NOT EXISTS Logged_Process_Count_By_Year(
year_and_process varchar(100),
occurrence INTEGER);
d) Ensure your sqoop user has access to database created:
mysql>
grant all on airawat.* to myUser@'myMachine';
view raw 04-mysqlDBSetup hosted with ❤ by GitHub
*************************************************************
Sqoop command - try out the command to see if it works
*************************************************************
Pre-requisties:
1. Dataset to be exported should exist on HDFS
2. mySql table that is the destination for the export should exist
Command:
--Run on node that acts as sqoop client;
$ sqoop export \
--connect jdbc:mysql://cdh-dev01/airawat \
--username devUser \
--password myPwd \
--table Logged_Process_Count_By_Year \
--direct \
--export-dir "oozieProject/datasetGeneratorApp/outputDir" \
--fields-terminated-by "\t"
*********************************
Results in mysql
*********************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
*************************************************
--Cleanup
*************************************************
mysql>
delete from Logged_Process_Count_By_Year;
*************************
Oozie SMTP configuration
*************************
The following needs to be added to oozie-site.xml - after updating per your environment and configuration;
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property
***************************************************************************
Oozie coordinator properties file: coordinator.properties
***************************************************************************
nameNode=hdfs://cdh-nn01.hadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appRoot=${oozieProjectRoot}/sampleCoordinatorJobDatasetDep
oozie.coord.application.path=${appRoot}/coordinatorConf
sqoopWorkflowAppPath=${appRoot}/sqoopWorkflowApp
oozieLibPath=${nameNode}/user/oozie/share/lib
oozie.libpath=${oozieLibPath}
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
triggerDatasetDir=${oozieProjectRoot}/datasetGeneratorApp/outputDir
triggerDataFiles=${triggerDatasetDir}/part*
mysqlServer=cdh-dev01
mysqlServerDB=airawat
mysqlServerDBUID=myUID
mysqlServerDBPwd=myPWD
toEmailAddress=akhanolk@cdh-dev01
startTime=2013-07-11T00:00Z
endTime=2013-07-15T00:00Z
timeZoneDef=UTC
sqoopInputRecordCount=`cat ${triggerDataFiles} | wc -l`
minRequiredRecordCount=1
<!--***************************************************************************
Oozie coordinator xml file: coordinator.xml
*****************************************************************************-->
<coordinator-app name="AirawatCoordJobDataTrig"
frequency="${coord:days(1)}"
start="${startTime}"
end="${endTime}"
timezone="${timeZoneDef}"
xmlns="uri:oozie:coordinator:0.1"
xmlns:sla="uri:oozie:sla:0.1">
<controls>
<timeout>20</timeout>
<concurrency>6</concurrency>
<execution>FIFO</execution>
</controls>
<datasets>
<dataset name="inputDS" frequency="${coord:days(1)}" initial-instance="${startTime}" timezone="${timeZoneDef}">
<uri-template>${triggerDatasetDir}</uri-template>
</dataset>
</datasets>
<input-events>
<data-in name="AirawatCoordTrigDepInput" dataset="inputDS">
<instance>${startTime}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${sqoopWorkflowAppPath}</app-path>
</workflow>
</action>
</coordinator-app>
<!--***************************************************************************
Oozie workflow xml file: workflow.xml
*****************************************************************************-->
<workflow-app name="AirawatSampleCoordJobDSDep" xmlns="uri:oozie:workflow:0.1">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="sqoopAction">
${sqoopInputRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<action name="sqoopAction">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.libpath</name>
<value>${oozieLibPath}</value>
</property>
</configuration>
<command>export --connect jdbc:mysql://${mysqlServer}/${mysqlServerDB} --username ${mysqlServerDBUID} --password ${mysqlServerDBPwd} --table Logged_Process_Count_B
y_Year --direct --export-dir ${triggerDatasetDir} --fields-terminated-by "\t"</command>
</sqoop>
<ok to="end"/>
<error to="sendErrorEmail"/>
</action>
<action name="sendErrorEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${toEmailAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:name()} with id -${wf:id()}, had issues and will be killed; The error logged is: ${wf:errorMessage(wf:lastErrorNode());}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
****************************************
Oozie job commands
****************************************
a) Prep
Modify the start-end time of the job in the coordinator properties file, as needed.
Then run the following command.
b) Submit job
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/sampleCoordinatorJobDatasetDep/coordinatorConf/coordinator.properties -run
A job ID is displayed.
The job shuld not trigger till the dataset is loaded.
It should be in waiting state - see oozie web console screenshots in the last section.
If you need to kill the job...
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill <<Job ID>>
c) Publish trigger
Ideally, this would be generated after some map reduce job completed.
For simplicity, I have provided the output of one of the jobs from one of my blogs/gists.
$ hadoop fs -put oozieProject/datasetGeneratorApp/ oozieProject/
****************************************
Output - data export from hdfs to mysql
****************************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
view raw 11-Output hosted with ❤ by GitHub
http://hadooped.blogspot.com/p/ooziecooridnatorjobtimedep-pix_10.html


Oozie web console output:
Screenshots from the execution of the sample program..


















Upon availability of the dataset...









3 comments:

  1. Hey Anagha,

    Your blogs are extremely well written and make the topics very easy to understand. Appreciate your work a lot. :) Thank you

    ReplyDelete
  2. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  3. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete