Thursday, July 11, 2013

Apache Oozie - Part 10: Bundle jobs



1.0. What's covered in the blog?

1) Apache documentation on bundle jobs
2) A sample bundle application with two coordinator apps - one that it time triggered, another that is dataset availability triggered. Oozie actions covered: hdfs action, email action, java main action, sqoop action (mysql database); Includes oozie job property files, workflow xml files, sample data (syslog generated files), java program (jar) for log parsing, commands;  

Version:
Oozie 3.3.0;

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered, with sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 11b: Oozie Web Service API for interfacing with oozie workflows


2.0. About Oozie bundle jobs

Excerpt from Apache documentation-
Bundle is a higher-level oozie abstraction that will batch a set of coordinator applications. The user will be able to start/stop/suspend/resume/rerun in the bundle level resulting a better and easy operational control.
More specififcally, the oozie Bundle system allows the user to define and execute a bunch of coordinator applications often called a data pipeline. There is no explicit dependency among the coordinator applications in a bundle. However, a user could use the data dependency of coordinator applications to create an implicit data application pipeline.

Apache documentation:
http://oozie.apache.org/docs/3.3.0/BundleFunctionalSpec.html#a1._Bundle_Overview


A bundle job can have one to many coordinator jobs.
A coordinator job can have one to many workflows.
A workflow can have one to many actions.


3.0. Sample program

Highlights:
The sample bundle application is time triggered.  The start time is defined in the bundle job.properties file.  The bundle application starts two coordinator applications- as defined in the bundle definition file - bundleConfirguration.xml.

The first coordinator job is time triggered.  The start time is defined in the bundle job.properties file.  It runs a workflow, that includes a java main action.  The java program parses some log files and generates a report.  The output of the java action is a dataset (the report) which is the trigger for the next coordinator job.

The second coordinator job gets triggered upon availability of the file _SUCCESS in the output directory of the workflow application of the first coordinator application.  It executes a workflow that has a sqoop action;  The sqoop action pipes the output of the first coordinator job to a mysql database.


Pictorial overview of the job:
Components of the bundle application:


Bundle application details:
Introduction
-------------
This gist includes sample data, application components, and components to execute a bundle application.
The sample bundle application is time triggered. The start time is defined in the bundle job.properties
file. The bundle application starts two coordinator applications- as defined in the bundle definition file -
bundleConfirguration.xml.
The first coordinator job is time triggered. The start time is defined in the bundle job.properties file.
It runs a workflow, that includes a java main action. The java program parses some log files and generates
a report. The output of the java action is a dataset (the report) which is the trigger for the next
coordinator job.
The second coordinator job gets triggered upon availability of the file _SUCCESS in the output directory
of the workflow application of the first coordinator application. It executes a workflow that has a
sqoop action; The sqoop action pipes the output of the first coordinator job to a mysql database.
Pictorial overview of the bundle application:
---------------------------------------------
http://hadooped.blogspot.com/2013/07/apache-oozie-part-10-bundle-jobs.html
Includes:
---------
Sample data defintion and structure 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Mysql database setup: 04-mysqlDBSetup
Sqoop task -standalone tryout: 05-SqoopStandAloneTryout
Oozie configuration for email: 06-OozieSMTPconfiguration
Bunle job properties file: 07-BundleJobProperties
Bundle definition file: 08-BundleXML
Coordinator defintion -LogParser: 09-CoordinatorXMLLogParser
Workflow defintion -LogParser: 10-WorkflowXMLLogParser
Independent test of LogParser jar: 11-LogParserStandaloneTestHowTo
Coordinator defintion -DataExporter: 12-CoordinatorXMLDataExporter
Workflow defintion -DataExporter: 13-WorkflowXMLDataExporter
Oozie commands: 14-OozieJobExecutionCommands
Output of LogParser: 15a-OutputLogParser
Output in mysql: 15b-OutputDataExporter
Oozie web console - screenshots: 16-OozieWebConsoleScreenshots
Java LogParser code: 17-JavaCodeHyperlink
01a. Sample data
-----------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
01b. Structure
---------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
02a. Data download
-------------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
bundleApplication
job.properties
bundleConfiguration.xml
coordAppLogParser
coordinator.xml
workflowAppLogParser
workflow.xml
lib
LogEventCount.jar
coordAppDataExporter
coordinator.xml
workflowAppDataExporter
workflow.xml
03-Hdfs load commands
----------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
*********************************
Mysql database setup tasks
*********************************
a) Create database:
mysql>
create database airawat;
b) Switch to database created:
mysql>
use airawat;
c) Create destination table for sqoop export from hdfs:
mysql>
CREATE TABLE IF NOT EXISTS Logged_Process_Count_By_Year(
year_and_process varchar(100),
occurrence INTEGER);
d) Ensure your sqoop user has access to database created:
mysql>
grant all on airawat.* to myUser@'myMachine';
view raw 04-mysqlDBSetup hosted with ❤ by GitHub
Tryout the sqoop task- outside of workflow
-------------------------------------------
Use the dataset from my gist-
https://gist.github.com/airawat/5970026
*********************************
Sqoop command
*********************************
Pre-requisties:
1. Dataset to be exported should exist on HDFS
2. mySql table that is the destination for the export should exist
Command:
--Run on node that acts as sqoop client;
$ sqoop export \
--connect jdbc:mysql://cdh-dev01/airawat \
--username devUser \
--password myPwd \
--table Logged_Process_Count_By_Year \
--direct \
--export-dir "oozieProject/datasetGeneratorApp/outputDir" \
--fields-terminated-by "\t"
*********************************
Results in mysql
*********************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
*************************
Oozie SMTP configuration
*************************
The following needs to be added to oozie-site.xml - after updating per your environment and configuration;
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property
#*************************************************
# job.properties of bundle app
#*************************************************
#Bundle job properties file
# Environment
#-----------
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
# Oozie related
#---------------------------------
oozieLibPath=${nameNode}/user/oozie/share/lib
oozie.libpath=${oozieLibPath}
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
# Application paths
#------------------
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appRoot=${oozieProjectRoot}/bundleApplication
oozie.bundle.application.path=${appRoot}/bundleConfiguration.xml
coordAppPathDataExporter=${appRoot}/coordAppDataExporter
coordAppPathLogParser=${appRoot}/coordAppLogParser
# Log parser app specific
#-----------------------------------------
workflowAppLogParserPath=${coordAppPathLogParser}/workflowAppLogParser
logParserInputDir=${oozieProjectRoot}/data/*/*/*/*/
logParserOutputDir=${workflowAppLogParserPath}/output
# Data exporter app specific
#-------------------------------
workflowAppDataExporterPath=${coordAppPathDataExporter}/workflowAppDataExporter
triggerDatasetDir=${logParserOutputDir}
triggerDataFiles=${triggerDatasetDir}/part*
sqoopInputRecordCount=`cat ${triggerDataFiles} | wc -l`
mysqlServer=cdh-dev01
mysqlServerDB=airawat
mysqlServerDBUID=devUser
mysqlServerDBPwd=myPassword
# Bundle app specific
#--------------------------
toEmailAddress=akhanolk@cdh-dev01
startTime=2013-07-16T00:30Z
endTime=2013-07-17T00:00Z
timeZoneDef=UTC
minRequiredRecordCount=1
<!------------------------------------------------------------->
<!-----Bundle defintion file - bundleConfiguration.xml -------->
<!------------------------------------------------------------->
<bundle-app name='BundleApp' xmlns='uri:oozie:bundle:0.2'>
<controls>
<kick-off-time>${startTime}</kick-off-time>
</controls>
<coordinator name='CoordApp-LogParser' >
<app-path>${coordAppPathLogParser}</app-path>
</coordinator>
<coordinator name='CoordApp-DataExporter' >
<app-path>${coordAppPathDataExporter}</app-path>
</coordinator>
</bundle-app>
view raw 08-BundleXML hosted with ❤ by GitHub
<!---------------------------------------------------------------------------->
<!--Coordinator defintion file for LogParser app - coordinator.xml ----------->
<!---------------------------------------------------------------------------->
<coordinator-app name="CoordApp-LogParser"
frequency="${coord:days(1)}"
start="${startTime}"
end="${endTime}"
timezone="${timeZoneDef}"
xmlns="uri:oozie:coordinator:0.2">
<controls>
<timeout>20</timeout>
<concurrency>6</concurrency>
<execution>FIFO</execution>
</controls>
<action>
<workflow>
<app-path>${workflowAppLogParserPath}</app-path>
</workflow>
</action>
</coordinator-app>
<!---------------------------------------------------------------------------->
<!--------Workflow defintion file for LogParser app - workflow.xml ----------->
<!---------------------------------------------------------------------------->
<workflow-app name="WorkflowApp-LogParser" xmlns="uri:oozie:workflow:0.2">
<start to="javaAction"/>
<action name="javaAction">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${logParserOutputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>Airawat.Oozie.Samples.LogEventCount</main-class>
<arg>${logParserInputDir}</arg>
<arg>${logParserOutputDir}</arg>
</java>
<ok to="end"/>
<error to="sendErrorEmail"/>
</action>
<action name="sendErrorEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${toEmailAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:name()} with id -${wf:id()}, had issues and will be killed; The error logged is: ${wf:errorMessage(wf:lastErrorNode());
}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
#*******************************************
# LogParser program - standalone test
#*******************************************
Commands to test the java program
a) Command to run the program
$ $ hadoop jar oozieProject/bundleApplication/coordAppLogParser/workflowAppLogParser/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount "oozieProject/data/*/*/*/*/*" "oozieProject/bundleApplication/coordAppLogParser/workflowAppLogParser/myCLIOutput"
b) Command to view results
$ hadoop fs -cat oozieProject/bundleApplication/coordAppLogParser/workflowAppLogParser/myCLIOutput/part* | sort
c) Results
2013-NetworkManager 7
22013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
<!------------------------------------------------------------------------------->
<!--Coordinator defintion file for DataExporter app - coordinator.xml ----------->
<!------------------------------------------------------------------------------->
<coordinator-app name="CoordApp-DataExporter"
frequency="${coord:days(1)}"
start="${startTime}"
end="${endTime}"
timezone="${timeZoneDef}"
xmlns="uri:oozie:coordinator:0.2">
<controls>
<timeout>20</timeout>
<concurrency>6</concurrency>
<execution>FIFO</execution>
</controls>
<datasets>
<dataset name="inputDS" frequency="${coord:days(1)}" initial-instance="${startTime}" timezone="${timeZoneDef}">
<uri-template>${triggerDatasetDir}</uri-template>
</dataset>
</datasets>
<input-events>
<data-in name="CoordAppTrigDepInput" dataset="inputDS">
<instance>${startTime}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${workflowAppDataExporterPath}</app-path>
</workflow>
</action>
</coordinator-app>
<!------------------------------------------------------------------------------->
<!--------Workflow defintion file for DataExporter app - workflow.xml ----------->
<!------------------------------------------------------------------------------->
<workflow-app name="WorkflowApp-SqoopAction" xmlns="uri:oozie:workflow:0.2">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="sqoopAction">
${sqoopInputRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<action name="sqoopAction">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.libpath</name>
<value>${oozieLibPath}</value>
</property>
</configuration>
<command>export --connect jdbc:mysql://${mysqlServer}/${mysqlServerDB} --username ${mysqlServerDBUID} --password ${mysqlServerDBPwd} --table Log
ged_Process_Count_By_Year --direct --export-dir ${triggerDatasetDir} --fields-terminated-by "\t"</command>
</sqoop>
<ok to="end"/>
<error to="sendErrorEmail"/>
</action>
<action name="sendErrorEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${toEmailAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:name()} with id -${wf:id()}, had issues and will be killed; The error logged is: ${wf:errorMessage(wf:lastErrorNode());
}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
****************************************
14. Oozie job commands
****************************************
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/bundleApplication/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/bundleApplication/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
****************************************
Output - Log Parser program
****************************************
$ hadoop fs -cat oozieProject/bundleApplication/coordAppLogParser/workflowAppLogParser/output/part*
2013-NetworkManager 7
22013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
****************************************
Output - data export from hdfs to mysql
****************************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
Oozie web console - screenshots
--------------------------------
Available at:
http://hadooped.blogspot.com/2013/07/apache-oozie-part-10-bundle-jobs.html



Oozie web console - screenshots:







2 comments:

  1. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  2. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete