Tuesday, July 16, 2013

Apache Oozie - Part 8: Subworkflow


1.0. What's covered in the blog?

1) Apache documentation on sub-workflows
2) A sample program that includes components of a oozie workflow application with a java main action and a subworkflow containing a sqoop action.  Scripts/code, sample dataset and commands are included;  Oozie actions covered: java action, sqoop action (mysql database); 

Versions:
Oozie 3.3.0, Sqoop (1.4.2) with Mysql (5.1.69)

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another


2.0. Apache documentation on sub-workflows


The sub-workflow action runs a child workflow job, the child workflow job can be in the same Oozie system or in another Oozie system.  The parent workflow job will wait until the child workflow job has completed.

Syntax:






















The child workflow job runs in the same Oozie system instance where the parent workflow job is running.
The app-path element specifies the path to the workflow application of the child workflow job.
The propagate-configuration flag, if present, indicates that the workflow job configuration should be propagated to the child workflow.

The configuration section can be used to specify the job properties that are required to run the child workflow job.  The configuration of the sub-workflow action can be parameterized (templatized) using EL expressions.

Link to Apache documentation:
http://oozie.apache.org/docs/3.3.0/WorkflowFunctionalSpec.html#a3.2.6_Sub-workflow_Action

Note:
For a typical on-demand workflow, you have core components - job.properties and workflow.xml.  For a sub workflow, you need yet another workflow.xml that clearly defines activities to occur in the sub-workflow.  In the parent workflow, the sub-workflow is referenced.  To keep it neat, best to have a sub-directory to hold the sub-workflow core components.  Also, a single job.properties is sufficient. 

3.0. Sample workflow application

Highlights:
The workflow has two actions - one is a java main action and the other is a sub-workflow action.

The java main action parses log files on hdfs and generates a report.
The sub-workflow action executes after success of the java main action, and pipes the report in hdfs to mysql database.


Pictorial overview:





Components of such a workflow application:
























Application details:

This gist includes components of a oozie workflow application - scripts/code, sample data
and commands; Oozie actions covered: sub-workflow, email java main action,
sqoop action (to mysql); Oozie controls covered: decision;
Pictorial overview:
--------------------
http://hadooped.blogspot.com/2013/07/apache-oozie-part-8-subworkflow.html
Usecase:
--------
Parse Syslog generated log files to generate reports; Export reports to RDBMS;
Includes:
---------
Sample data defintion and structure 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Mysql database setup: 04-mysqlDBSetup
Sqoop task -standalone tryout: 05-SqoopStandAloneTryout
App job properties file: 06-JobProperties
Workflow defintion -Parent: 07-WorkflowXMLMain
Independent test of LogParser jar: 08-LogParserStandaloneTestHowTo
Workflow defintion -DataExporter: 09-SubWorkflowXMLDataExporter
Oozie commands: 10-OozieJobExecutionCommands
Output of LogParser: 11a-OutputLogParser
Output in mysql: 11b-OutputDataExporter
Oozie web console - screenshots: 12-OozieWebConsoleScreenshots
Java LogParser code: 13-JavaCodeHyperlink
01a. Sample data (log files)
----------------------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
01b. Structure
---------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
02a. Data and code download
----------------------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
workflowWithSubworkflow
job.properties
workflow.xml
lib
LogEventCount.jar
dataExporterSubWorkflowApp
workflow.xml
03-Hdfs load commands
----------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
*********************************
Mysql database setup tasks
*********************************
a) Create database:
mysql>
create database airawat;
b) Switch to database created:
mysql>
use airawat;
c) Create destination table for sqoop export from hdfs:
mysql>
CREATE TABLE IF NOT EXISTS Logged_Process_Count_By_Year(
year_and_process varchar(100),
occurrence INTEGER);
d) Ensure your sqoop user has access to database created:
mysql>
grant all on airawat.* to myUser@'myMachine';
view raw 04-mysqlDBSetup hosted with ❤ by GitHub
Tryout the sqoop task- outside of workflow
-------------------------------------------
Use the dataset from my gist-
https://gist.github.com/airawat/5970026
*********************************
Sqoop command
*********************************
Pre-requisties:
1. Dataset to be exported should exist on HDFS
2. mySql table that is the destination for the export should exist
Command:
--Run on node that acts as sqoop client;
$ sqoop export \
--connect jdbc:mysql://cdh-dev01/airawat \
--username devUser \
--password myPwd \
--table Logged_Process_Count_By_Year \
--direct \
--export-dir "oozieProject/datasetGeneratorApp/outputDir" \
--fields-terminated-by "\t"
*********************************
Results in mysql
*********************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
#*************************************************
# job.properties
#*************************************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozieLibPath=${nameNode}/user/oozie/share/lib
oozie.libpath=${oozieLibPath}
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
# Paths
#------
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowWithSubworkflow
oozie.wf.application.path=${appPath}
# For logParserAction (java main action)
#---------------------------------------
inputDir=${oozieProjectRoot}/data/*/*/*/*/*
outputDir=${appPath}/output
inputDirRecordCount=`cat ${inputDir} | wc -l`
minRequiredRecordCount=1
# For dataExporterSubWorkflow (having sqoop action)
#---------------------------------------------------
subWorkflowCodeDir=${appPath}/dataExporterSubWorkflowApp
mysqlServer=cdh-dev01
mysqlServerDB=airawat
mysqlServerDBUID=devUser
mysqlServerDBPwd=myPwd
triggerDatasetDir=${outputDir}
triggerDataFiles=${triggerDatasetDir}/part*
sqoopInputRecordCount=`cat ${triggerDataFiles} | wc -l`
<!------------------------------------------------------------->
<!-----Workflow defintion file - workflow.xml ----------------->
<!------------------------------------------------------------->
<workflow-app name="SubWorkflow-Parent" xmlns="uri:oozie:workflow:0.1">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="logParserAction">
${inputDirRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<action name="logParserAction">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>Airawat.Oozie.Samples.LogEventCount</main-class>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to="dataExporterSubWorkflow"/>
<error to="killJob"/>
</action>
<action name='dataExporterSubWorkflow'>
<sub-workflow>
<app-path>${subWorkflowCodeDir}</app-path>
<propagate-configuration/>
</sub-workflow>
<ok to="end"/>
<error to="killJob" />
</action>
<kill name="killJob">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
#*******************************************
# LogParser program - standalone test
#*******************************************
Commands to test the java program
a) Command to run the program
$ $ hadoop jar oozieProject/workflowWithSubworkflow/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount "oozieProject/data/*/*/*/*/*" "oozieProject/workflowWithSubworkflow/myCLIOutput"
b) Command to view results
$ hadoop fs -cat oozieProject/workflowWithSubworkflow/myCLIOutput/part* | sort
c) Results
2013-NetworkManager 7
2013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
<!------------------------------------------------------------->
<!--Sub-Workflow defintion file - workflow.xml ---------------->
<!------------------------------------------------------------->
<workflow-app name="SubworkflowApp-SubWf-DataExporter" xmlns="uri:oozie:workflow:0.1">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="sqoopAction">
${sqoopInputRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<action name="sqoopAction">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.libpath</name>
<value>${oozieLibPath}</value>
</property>
</configuration>
<command>export --connect jdbc:mysql://${mysqlServer}/${mysqlServerDB} --username ${mysqlServerDBUID} --password ${mysql
ServerDBPwd} --table Logged_Process_Count_By_Year --direct --export-dir ${triggerDatasetDir} --fields-terminated-by "\t"</command>
</sqoop>
<ok to="end"/>
<error to="killJob"/>
</action>
<kill name="killJob">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
****************************************
10. Oozie job commands
****************************************
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowWithSubworkflow/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowWithSubworkflow/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
****************************************
Output - Log Parser program
****************************************
$ hadoop fs -cat oozieProject/workflowWithSubworkflow/output/part*
2013-NetworkManager 7
22013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
****************************************
Output - data export from hdfs to mysql
****************************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
Screenshots from Oozie web console:
-----------------------------------
Available at:
http://hadooped.blogspot.com/2013/07/apache-oozie-part-8-subworkflow.html



Oozie web console - screenshots:







































Thursday, July 11, 2013

Apache Oozie - Part 10: Bundle jobs



1.0. What's covered in the blog?

1) Apache documentation on bundle jobs
2) A sample bundle application with two coordinator apps - one that it time triggered, another that is dataset availability triggered. Oozie actions covered: hdfs action, email action, java main action, sqoop action (mysql database); Includes oozie job property files, workflow xml files, sample data (syslog generated files), java program (jar) for log parsing, commands;  

Version:
Oozie 3.3.0;

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered, with sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 11b: Oozie Web Service API for interfacing with oozie workflows


2.0. About Oozie bundle jobs

Excerpt from Apache documentation-
Bundle is a higher-level oozie abstraction that will batch a set of coordinator applications. The user will be able to start/stop/suspend/resume/rerun in the bundle level resulting a better and easy operational control.
More specififcally, the oozie Bundle system allows the user to define and execute a bunch of coordinator applications often called a data pipeline. There is no explicit dependency among the coordinator applications in a bundle. However, a user could use the data dependency of coordinator applications to create an implicit data application pipeline.

Apache documentation:
http://oozie.apache.org/docs/3.3.0/BundleFunctionalSpec.html#a1._Bundle_Overview


A bundle job can have one to many coordinator jobs.
A coordinator job can have one to many workflows.
A workflow can have one to many actions.


3.0. Sample program

Highlights:
The sample bundle application is time triggered.  The start time is defined in the bundle job.properties file.  The bundle application starts two coordinator applications- as defined in the bundle definition file - bundleConfirguration.xml.

The first coordinator job is time triggered.  The start time is defined in the bundle job.properties file.  It runs a workflow, that includes a java main action.  The java program parses some log files and generates a report.  The output of the java action is a dataset (the report) which is the trigger for the next coordinator job.

The second coordinator job gets triggered upon availability of the file _SUCCESS in the output directory of the workflow application of the first coordinator application.  It executes a workflow that has a sqoop action;  The sqoop action pipes the output of the first coordinator job to a mysql database.


Pictorial overview of the job:
Components of the bundle application:


Bundle application details:
Introduction
-------------
This gist includes sample data, application components, and components to execute a bundle application.
The sample bundle application is time triggered. The start time is defined in the bundle job.properties
file. The bundle application starts two coordinator applications- as defined in the bundle definition file -
bundleConfirguration.xml.
The first coordinator job is time triggered. The start time is defined in the bundle job.properties file.
It runs a workflow, that includes a java main action. The java program parses some log files and generates
a report. The output of the java action is a dataset (the report) which is the trigger for the next
coordinator job.
The second coordinator job gets triggered upon availability of the file _SUCCESS in the output directory
of the workflow application of the first coordinator application. It executes a workflow that has a
sqoop action; The sqoop action pipes the output of the first coordinator job to a mysql database.
Pictorial overview of the bundle application:
---------------------------------------------
http://hadooped.blogspot.com/2013/07/apache-oozie-part-10-bundle-jobs.html
Includes:
---------
Sample data defintion and structure 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Mysql database setup: 04-mysqlDBSetup
Sqoop task -standalone tryout: 05-SqoopStandAloneTryout
Oozie configuration for email: 06-OozieSMTPconfiguration
Bunle job properties file: 07-BundleJobProperties
Bundle definition file: 08-BundleXML
Coordinator defintion -LogParser: 09-CoordinatorXMLLogParser
Workflow defintion -LogParser: 10-WorkflowXMLLogParser
Independent test of LogParser jar: 11-LogParserStandaloneTestHowTo
Coordinator defintion -DataExporter: 12-CoordinatorXMLDataExporter
Workflow defintion -DataExporter: 13-WorkflowXMLDataExporter
Oozie commands: 14-OozieJobExecutionCommands
Output of LogParser: 15a-OutputLogParser
Output in mysql: 15b-OutputDataExporter
Oozie web console - screenshots: 16-OozieWebConsoleScreenshots
Java LogParser code: 17-JavaCodeHyperlink
01a. Sample data
-----------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
01b. Structure
---------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
02a. Data download
-------------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
bundleApplication
job.properties
bundleConfiguration.xml
coordAppLogParser
coordinator.xml
workflowAppLogParser
workflow.xml
lib
LogEventCount.jar
coordAppDataExporter
coordinator.xml
workflowAppDataExporter
workflow.xml
03-Hdfs load commands
----------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
*********************************
Mysql database setup tasks
*********************************
a) Create database:
mysql>
create database airawat;
b) Switch to database created:
mysql>
use airawat;
c) Create destination table for sqoop export from hdfs:
mysql>
CREATE TABLE IF NOT EXISTS Logged_Process_Count_By_Year(
year_and_process varchar(100),
occurrence INTEGER);
d) Ensure your sqoop user has access to database created:
mysql>
grant all on airawat.* to myUser@'myMachine';
view raw 04-mysqlDBSetup hosted with ❤ by GitHub
Tryout the sqoop task- outside of workflow
-------------------------------------------
Use the dataset from my gist-
https://gist.github.com/airawat/5970026
*********************************
Sqoop command
*********************************
Pre-requisties:
1. Dataset to be exported should exist on HDFS
2. mySql table that is the destination for the export should exist
Command:
--Run on node that acts as sqoop client;
$ sqoop export \
--connect jdbc:mysql://cdh-dev01/airawat \
--username devUser \
--password myPwd \
--table Logged_Process_Count_By_Year \
--direct \
--export-dir "oozieProject/datasetGeneratorApp/outputDir" \
--fields-terminated-by "\t"
*********************************
Results in mysql
*********************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
*************************
Oozie SMTP configuration
*************************
The following needs to be added to oozie-site.xml - after updating per your environment and configuration;
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property
#*************************************************
# job.properties of bundle app
#*************************************************
#Bundle job properties file
# Environment
#-----------
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
# Oozie related
#---------------------------------
oozieLibPath=${nameNode}/user/oozie/share/lib
oozie.libpath=${oozieLibPath}
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
# Application paths
#------------------
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appRoot=${oozieProjectRoot}/bundleApplication
oozie.bundle.application.path=${appRoot}/bundleConfiguration.xml
coordAppPathDataExporter=${appRoot}/coordAppDataExporter
coordAppPathLogParser=${appRoot}/coordAppLogParser
# Log parser app specific
#-----------------------------------------
workflowAppLogParserPath=${coordAppPathLogParser}/workflowAppLogParser
logParserInputDir=${oozieProjectRoot}/data/*/*/*/*/
logParserOutputDir=${workflowAppLogParserPath}/output
# Data exporter app specific
#-------------------------------
workflowAppDataExporterPath=${coordAppPathDataExporter}/workflowAppDataExporter
triggerDatasetDir=${logParserOutputDir}
triggerDataFiles=${triggerDatasetDir}/part*
sqoopInputRecordCount=`cat ${triggerDataFiles} | wc -l`
mysqlServer=cdh-dev01
mysqlServerDB=airawat
mysqlServerDBUID=devUser
mysqlServerDBPwd=myPassword
# Bundle app specific
#--------------------------
toEmailAddress=akhanolk@cdh-dev01
startTime=2013-07-16T00:30Z
endTime=2013-07-17T00:00Z
timeZoneDef=UTC
minRequiredRecordCount=1
<!------------------------------------------------------------->
<!-----Bundle defintion file - bundleConfiguration.xml -------->
<!------------------------------------------------------------->
<bundle-app name='BundleApp' xmlns='uri:oozie:bundle:0.2'>
<controls>
<kick-off-time>${startTime}</kick-off-time>
</controls>
<coordinator name='CoordApp-LogParser' >
<app-path>${coordAppPathLogParser}</app-path>
</coordinator>
<coordinator name='CoordApp-DataExporter' >
<app-path>${coordAppPathDataExporter}</app-path>
</coordinator>
</bundle-app>
view raw 08-BundleXML hosted with ❤ by GitHub
<!---------------------------------------------------------------------------->
<!--Coordinator defintion file for LogParser app - coordinator.xml ----------->
<!---------------------------------------------------------------------------->
<coordinator-app name="CoordApp-LogParser"
frequency="${coord:days(1)}"
start="${startTime}"
end="${endTime}"
timezone="${timeZoneDef}"
xmlns="uri:oozie:coordinator:0.2">
<controls>
<timeout>20</timeout>
<concurrency>6</concurrency>
<execution>FIFO</execution>
</controls>
<action>
<workflow>
<app-path>${workflowAppLogParserPath}</app-path>
</workflow>
</action>
</coordinator-app>
<!---------------------------------------------------------------------------->
<!--------Workflow defintion file for LogParser app - workflow.xml ----------->
<!---------------------------------------------------------------------------->
<workflow-app name="WorkflowApp-LogParser" xmlns="uri:oozie:workflow:0.2">
<start to="javaAction"/>
<action name="javaAction">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${logParserOutputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>Airawat.Oozie.Samples.LogEventCount</main-class>
<arg>${logParserInputDir}</arg>
<arg>${logParserOutputDir}</arg>
</java>
<ok to="end"/>
<error to="sendErrorEmail"/>
</action>
<action name="sendErrorEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${toEmailAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:name()} with id -${wf:id()}, had issues and will be killed; The error logged is: ${wf:errorMessage(wf:lastErrorNode());
}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
#*******************************************
# LogParser program - standalone test
#*******************************************
Commands to test the java program
a) Command to run the program
$ $ hadoop jar oozieProject/bundleApplication/coordAppLogParser/workflowAppLogParser/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount "oozieProject/data/*/*/*/*/*" "oozieProject/bundleApplication/coordAppLogParser/workflowAppLogParser/myCLIOutput"
b) Command to view results
$ hadoop fs -cat oozieProject/bundleApplication/coordAppLogParser/workflowAppLogParser/myCLIOutput/part* | sort
c) Results
2013-NetworkManager 7
22013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
<!------------------------------------------------------------------------------->
<!--Coordinator defintion file for DataExporter app - coordinator.xml ----------->
<!------------------------------------------------------------------------------->
<coordinator-app name="CoordApp-DataExporter"
frequency="${coord:days(1)}"
start="${startTime}"
end="${endTime}"
timezone="${timeZoneDef}"
xmlns="uri:oozie:coordinator:0.2">
<controls>
<timeout>20</timeout>
<concurrency>6</concurrency>
<execution>FIFO</execution>
</controls>
<datasets>
<dataset name="inputDS" frequency="${coord:days(1)}" initial-instance="${startTime}" timezone="${timeZoneDef}">
<uri-template>${triggerDatasetDir}</uri-template>
</dataset>
</datasets>
<input-events>
<data-in name="CoordAppTrigDepInput" dataset="inputDS">
<instance>${startTime}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${workflowAppDataExporterPath}</app-path>
</workflow>
</action>
</coordinator-app>
<!------------------------------------------------------------------------------->
<!--------Workflow defintion file for DataExporter app - workflow.xml ----------->
<!------------------------------------------------------------------------------->
<workflow-app name="WorkflowApp-SqoopAction" xmlns="uri:oozie:workflow:0.2">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="sqoopAction">
${sqoopInputRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<action name="sqoopAction">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.libpath</name>
<value>${oozieLibPath}</value>
</property>
</configuration>
<command>export --connect jdbc:mysql://${mysqlServer}/${mysqlServerDB} --username ${mysqlServerDBUID} --password ${mysqlServerDBPwd} --table Log
ged_Process_Count_By_Year --direct --export-dir ${triggerDatasetDir} --fields-terminated-by "\t"</command>
</sqoop>
<ok to="end"/>
<error to="sendErrorEmail"/>
</action>
<action name="sendErrorEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${toEmailAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:name()} with id -${wf:id()}, had issues and will be killed; The error logged is: ${wf:errorMessage(wf:lastErrorNode());
}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
****************************************
14. Oozie job commands
****************************************
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/bundleApplication/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/bundleApplication/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
****************************************
Output - Log Parser program
****************************************
$ hadoop fs -cat oozieProject/bundleApplication/coordAppLogParser/workflowAppLogParser/output/part*
2013-NetworkManager 7
22013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
****************************************
Output - data export from hdfs to mysql
****************************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
Oozie web console - screenshots
--------------------------------
Available at:
http://hadooped.blogspot.com/2013/07/apache-oozie-part-10-bundle-jobs.html



Oozie web console - screenshots:







Wednesday, July 10, 2013

Apache Oozie - Part 9c: Coordinator job - dataset availability triggered


1.0. What's covered in the blog?

1) Apache documentation on cooridnator jobs that execute workflows upon availability of datasets
2) A sample program that includes components of a oozie, dataset availability initiated, coordinator job - scripts/code, sample dataset and commands;  Oozie actions covered: hdfs action, email action, sqoop action (mysql database); 

Version:
Oozie 3.3.0;

Related blogs:

Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another
Blog 13: Oozie workflow - SSH action


2.0. Apache documentation on dataset availability triggered coordinator jobs

http://oozie.apache.org/docs/3.3.0/CoordinatorFunctionalSpec.html

3.0. Sample coordinator application


Highlights:
The coordinator application has a start time, and when the start time condition is met, it will transition to waiting state where it will look for the availability of a dataset.  Once the dataset is available, it will run the workflow specified.


Sample application - pictorial overview:


Coordinator application components:




















Coordinator application details:

This gist includes components of a oozie, dataset availability initiated, coordinator job -
scripts/code, sample data and commands; Oozie actions covered: hdfs action, email action,
sqoop action (mysql database); Oozie controls covered: decision;
Usecase
-------
Pipe report data available in HDFS, to mysql database;
Pictorial overview of job:
--------------------------
http://hadooped.blogspot.com/p/ooziecooridnatorjobdatasetdep-pix.html
Includes:
---------
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Mysql database setup: 04-mysqlDBSetup
Sqoop command - test: 05-SqoopStandaloneTryout
Oozie configuration for email 06-OozieSMTPconfiguration
Oozie coorindator properties file 07-OozieCoordinatorProperties
Oozie cooridinator conf file 08-OozieCoordinatorXML
Sqoop workflow conf file 09-SqoopWorkflowXML
Oozie commands 10-OozieJobExecutionCommands
Output in mysql 11-Output
Oozie web console - screenshots 12-OozieWebConsoleScreenshots
*********************************
Data download
*********************************
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues.
*********************************
Directory structure
*********************************
oozieProject
sampleCoordinatorJobDatasetDep
coordinatorConf/
coordinator.properties
coordinator.xml
sqoopWorkflowApp
workflow.xml
datasetGeneratorApp
outputDir
part-r-00000
_SUCCESS
_logs
history
cdh-jt01_1372261353326_job_201306261042_0536_conf.xml
job_201306261042_0536_1373407670448_akhanolk_Syslog+Event+Rollup
---------------------------------------------------------------------------------------
Line 14 - 20
-------------
Cordinator application
Line 21 -30
------------
The datasetGeneratorApp is essentially what we will use to trigger the coordinator job. While the logs are not important for the simulation, the presence of _SUCCESS is needed, failing which the job will not get triggered.
*********************************
Hdfs data load commands
*********************************
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
Run command below to validate load against expected directory structure in section 02-DataAndScriptDownload
$ hadoop fs -ls -R oozieProject/sampleCoordinatorJobDatasetDep | awk '{print $8}'
Remove the dataset directory - we will load it when we want to trigger the job
$ hadoop fs -rm -R oozieProject/datasetGeneratorApp
*********************************
Mysql database setup tasks
*********************************
a) Create database:
mysql>
create database airawat;
b) Switch to database created:
mysql>
use airawat;
c) Create destination table for sqoop export from hdfs:
mysql>
CREATE TABLE IF NOT EXISTS Logged_Process_Count_By_Year(
year_and_process varchar(100),
occurrence INTEGER);
d) Ensure your sqoop user has access to database created:
mysql>
grant all on airawat.* to myUser@'myMachine';
view raw 04-mysqlDBSetup hosted with ❤ by GitHub
*************************************************************
Sqoop command - try out the command to see if it works
*************************************************************
Pre-requisties:
1. Dataset to be exported should exist on HDFS
2. mySql table that is the destination for the export should exist
Command:
--Run on node that acts as sqoop client;
$ sqoop export \
--connect jdbc:mysql://cdh-dev01/airawat \
--username devUser \
--password myPwd \
--table Logged_Process_Count_By_Year \
--direct \
--export-dir "oozieProject/datasetGeneratorApp/outputDir" \
--fields-terminated-by "\t"
*********************************
Results in mysql
*********************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
*************************************************
--Cleanup
*************************************************
mysql>
delete from Logged_Process_Count_By_Year;
*************************
Oozie SMTP configuration
*************************
The following needs to be added to oozie-site.xml - after updating per your environment and configuration;
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property
***************************************************************************
Oozie coordinator properties file: coordinator.properties
***************************************************************************
nameNode=hdfs://cdh-nn01.hadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appRoot=${oozieProjectRoot}/sampleCoordinatorJobDatasetDep
oozie.coord.application.path=${appRoot}/coordinatorConf
sqoopWorkflowAppPath=${appRoot}/sqoopWorkflowApp
oozieLibPath=${nameNode}/user/oozie/share/lib
oozie.libpath=${oozieLibPath}
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
triggerDatasetDir=${oozieProjectRoot}/datasetGeneratorApp/outputDir
triggerDataFiles=${triggerDatasetDir}/part*
mysqlServer=cdh-dev01
mysqlServerDB=airawat
mysqlServerDBUID=myUID
mysqlServerDBPwd=myPWD
toEmailAddress=akhanolk@cdh-dev01
startTime=2013-07-11T00:00Z
endTime=2013-07-15T00:00Z
timeZoneDef=UTC
sqoopInputRecordCount=`cat ${triggerDataFiles} | wc -l`
minRequiredRecordCount=1
<!--***************************************************************************
Oozie coordinator xml file: coordinator.xml
*****************************************************************************-->
<coordinator-app name="AirawatCoordJobDataTrig"
frequency="${coord:days(1)}"
start="${startTime}"
end="${endTime}"
timezone="${timeZoneDef}"
xmlns="uri:oozie:coordinator:0.1"
xmlns:sla="uri:oozie:sla:0.1">
<controls>
<timeout>20</timeout>
<concurrency>6</concurrency>
<execution>FIFO</execution>
</controls>
<datasets>
<dataset name="inputDS" frequency="${coord:days(1)}" initial-instance="${startTime}" timezone="${timeZoneDef}">
<uri-template>${triggerDatasetDir}</uri-template>
</dataset>
</datasets>
<input-events>
<data-in name="AirawatCoordTrigDepInput" dataset="inputDS">
<instance>${startTime}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${sqoopWorkflowAppPath}</app-path>
</workflow>
</action>
</coordinator-app>
<!--***************************************************************************
Oozie workflow xml file: workflow.xml
*****************************************************************************-->
<workflow-app name="AirawatSampleCoordJobDSDep" xmlns="uri:oozie:workflow:0.1">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="sqoopAction">
${sqoopInputRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<action name="sqoopAction">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.libpath</name>
<value>${oozieLibPath}</value>
</property>
</configuration>
<command>export --connect jdbc:mysql://${mysqlServer}/${mysqlServerDB} --username ${mysqlServerDBUID} --password ${mysqlServerDBPwd} --table Logged_Process_Count_B
y_Year --direct --export-dir ${triggerDatasetDir} --fields-terminated-by "\t"</command>
</sqoop>
<ok to="end"/>
<error to="sendErrorEmail"/>
</action>
<action name="sendErrorEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${toEmailAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:name()} with id -${wf:id()}, had issues and will be killed; The error logged is: ${wf:errorMessage(wf:lastErrorNode());}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
****************************************
Oozie job commands
****************************************
a) Prep
Modify the start-end time of the job in the coordinator properties file, as needed.
Then run the following command.
b) Submit job
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/sampleCoordinatorJobDatasetDep/coordinatorConf/coordinator.properties -run
A job ID is displayed.
The job shuld not trigger till the dataset is loaded.
It should be in waiting state - see oozie web console screenshots in the last section.
If you need to kill the job...
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill <<Job ID>>
c) Publish trigger
Ideally, this would be generated after some map reduce job completed.
For simplicity, I have provided the output of one of the jobs from one of my blogs/gists.
$ hadoop fs -put oozieProject/datasetGeneratorApp/ oozieProject/
****************************************
Output - data export from hdfs to mysql
****************************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
view raw 11-Output hosted with ❤ by GitHub
http://hadooped.blogspot.com/p/ooziecooridnatorjobtimedep-pix_10.html


Oozie web console output:
Screenshots from the execution of the sample program..


















Upon availability of the dataset...









Tuesday, July 9, 2013

Apache Oozie - Part 9b: Coordinator jobs - (trigger) file triggered


1.0. What's covered in the blog?

A sample application that includes components of a Oozie (trigger) file triggered coordinator job - scripts/code, sample data (Syslog generated log files) and commands;  Oozie actions covered: hdfs action, email action, java main action, hive action;  Oozie controls covered: decision, fork-join; The workflow includes a sub-workflow that runs two hive actions concurrently.  The hive table is partitioned; Parsing - hive-regex, and Java-regex.  Also, the java mapper, gets the input directory path and includes part of it in the key.

Version:
Oozie 3.3.0;

Related blogs:

Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another
Blog 13: Oozie workflow - SSH action

Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.

2.0. Sample coordinator application

Highlights:
The coordinator application starts executing upon availability of the trigger file defined and initiates the two workflows.  Both workflows generate reports off of data in hdfs.
The java main action parses log files and generates a report.  
The hive actions in the hive sub-workflow run reports off of hive tables against the same log files in hdfs.

Pictorial overview of coordinator application:





Components:





























Coordinator application details:
This gist includes components of a oozie (trigger file initiated) coordinator job -
scripts/code, sample data and commands; Oozie actions covered: hdfs action, email action,
java main action, hive action; Oozie controls covered: decision, fork-join; The workflow
includes a sub-workflow that runs two hive actions concurrently. The hive table is
partitioned; Parsing uses hive-regex serde, and Java-regex. Also, the java mapper, gets
the input directory path and includes part of it in the key.
Usecase
-------
Parse Syslog generated log files to generate reports;
Pictorial overview of job:
--------------------------
http://hadooped.blogspot.com/p/ooziecooridnatorjobtrigfiledep-pix.html
Includes:
---------
Sample data and structure: 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Java MR - Mapper code: 04A-MapperJavaCode
Java MR - Reducer code: 04B-ReducerJavaCode
Java MR - Driver code: 04C-DriverJavaCode
Command to test Java MR program: 04D-CommandTestJavaMRProg
Hive -create log table command 05A-HiveCreateTable
Hive -load partitions 05B-HiveLoadPartitions
Hive commands to test data loaded 05C-HiveDataLoadTestCommands
Hive QL script for report 2 05D-HiveQLReport2
Hive QL script for report 3 05E-HiveQLReport3
Oozie configuration for email 06-OozieSMTPconfiguration
Oozie coorindator properties file 07-OozieCoordinatorProperties
Oozie cooridinator conf file 08-OozieCoordinatorXML
Oozie workflow conf file 09-OozieWorkflowXML
Oozie sub-workflow conf file 10-OozieSubWorkflowXML
Oozie commands 11-OozieJobExecutionCommands
Output -Report1 12A-Rpt1-JavaMainProgramOutput
Output -Report2 12B-Rpt2-HiveProgramOutputIssuesByMonth
Output -Report3 12C-Rpt3-HiveProgramOutputTop3Issues
Oozie web console - screenshots 13-OozieWebConsoleScreenshots
Sample data
------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
Structure
----------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
Data download
-------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
sampleCoordinatorJobTrigFileDep
triggerDir
trigger.dat
coordinatorConf/
coordinator.properties
coordinator.xml
workflowApp
workflow.xml
hiveSubWorkflowApp
hive-site.xml
hiveConsolidated-Year-Month-Report.hql
hiveTop3Processes-Year-Report.hql
workflow.xml
lib
LogEventCount.jar
Hdfs load commands
------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/data oozieProject/
$ hadoop fs -put oozieProject/sampleCoordinatorJobTrigFileDep oozieProject/
Run command below to validate load against expected directory structure in section 02-DataAndScriptDownload
$ hadoop fs -ls -R oozieProject/sampleCoordinatorJobTrigFileDep | awk '{print $8}'
Remove the trigger file directory - we will load it when we want to execute the job
$ hadoop fs -rm -R oozieProject/sampleCoordinatorJobTrigFileDep/triggerDir/
// Source code for Mapper
//-----------------------------------------------------------
// LogEventCountMapper.java
//-----------------------------------------------------------
// Java program that parses logs using regex
// The program counts the number of processes logged by year.
// E.g. Key=2013-ntpd; Value=1;
package Airawat.Oozie.Samples;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class LogEventCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
String strLogEntryPattern = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)";
public static final int NUM_FIELDS = 6;
Text strEvent = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String strLogEntryLine = value.toString();
Pattern objPtrn = Pattern.compile(strLogEntryPattern);
Matcher objPatternMatcher = objPtrn.matcher(strLogEntryLine);
if (!objPatternMatcher.matches() || NUM_FIELDS != objPatternMatcher.groupCount()) {
System.err.println("Bad log entry (or problem with RE?):");
System.err.println(strLogEntryLine);
return;
}
/*
System.out.println("Month_Name: " + objPatternMatcher.group(1));
System.out.println("Day: " + objPatternMatcher.group(2));
System.out.println("Time: " + objPatternMatcher.group(3));
System.out.println("Node: " + objPatternMatcher.group(4));
System.out.println("Process: " + objPatternMatcher.group(5));
System.out.println("LogMessage: " + objPatternMatcher.group(6));
*/
//Oh what a pretty chunk of code ;)
strEvent.set(((FileSplit)context.getInputSplit()).getPath().toString().substring((((FileSplit)context.getInputSplit()).getPath().toString().length()-16), (((FileSplit)context.getInputSplit()).getPath().toString().length()-12)) + "-" + ((objPatternMatcher.group(5).toString().indexOf("[")) == -1 ? (objPatternMatcher.group(5).toString().substring(0,(objPatternMatcher.group(5).length()-1))) : (objPatternMatcher.group(5).toString().substring(0,(objPatternMatcher.group(5).toString().indexOf("["))))));
context.write(strEvent, new IntWritable(1));
}
}
// Source code for reducer
//--------------------------
// LogEventCountReducer.java
//--------------------------
package Airawat.Oozie.Samples;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LogEventCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int intEventCount = 0;
for (IntWritable value : values) {
intEventCount += value.get();
}
context.write(key, new IntWritable(intEventCount));
}
}
// Source code for reducer
//--------------------------
// LogEventCountReducer.java
//--------------------------
package Airawat.Oozie.Samples;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
public class LogEventCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf(
"Usage: Airawat.Oozie.Samples.LogEventCount <input dir> <output dir>\n");
System.exit(-1);
}
//Instantiate a Job object for your job's configuration.
Job job = new Job();
//Job jar file
job.setJarByClass(LogEventCount.class);
//Job name
job.setJobName("Syslog Event Rollup");
//Paths
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//Mapper and reducer classes
job.setMapperClass(LogEventCountMapper.class);
job.setReducerClass(LogEventCountReducer.class);
//Job's output key and value classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//Number of reduce tasks
job.setNumReduceTasks(3);
//Start the MapReduce job, wait for it to finish.
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
Commands to test the java program
---------------------------------
a) Command to run the program
$ hadoop jar oozieProject/sampleCoordinatorJobTrigFileDep/workflowApp/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount "oozieProject/sampleCoordinatorJobTrigFileDep/data/*/*/*/*/*" "oozieProject/sampleCoordinatorJobTrigFileDep/myCLIOutput"
b) Command to view results
$ hadoop fs -cat oozieProject/sampleCoordinatorJobTrigFileDep/myCLIOutput/part*
c) Results
2013-NetworkManager 7
2013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
Hive script to create table for logs
-------------------------------------
hive>
CREATE EXTERNAL TABLE SysLogEvents(
month_name STRING,
day STRING,
time STRING,
host STRING,
event STRING,
log STRING)
PARTITIONED BY(node string,year int, month int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)"
)
stored as textfile;
Hive scripts to create and load partitions
-------------------------------------------
Note: Replace my user ID "akhanolk" with yours
hive >
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dev01",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dev01/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dev01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dev01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn02",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn02/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn02",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn02/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn03",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn03/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn03",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn03/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-jt01",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-jt01/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-jt01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-jt01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-nn01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-nn01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-vms",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-vms/2013/05/';
Hive ql to test data loaded
----------------------------
hive>
--Print headers
set hive.cli.print.header=true;
--Need to add this jar for MR to work..your env may not need it
add jar hadoop-lib/hive-contrib-0.10.0-cdh4.2.0.jar;
--Sample query
select * from SysLogEvents limit 2;
--Hive QL script: Generates report
--File name: hiveConsolidated-Year-Month-Report.hql
---------------------------------------------------
use default;
drop table consolidated_YM_report;
CREATE TABLE IF NOT EXISTS consolidated_YM_report(
process string,
node string,
year int,
month int,
occurrence int)
ROW FORMAT DELIMITED
FIELDS TERMINATED by ','
LINES TERMINATED by '\n';
INSERT OVERWRITE TABLE consolidated_YM_report
select case locate('[',event,1) when 0 then case locate(':',event,1) when 0 then event else substr(event,1,(locate(':',event,1))-1) end
else substr(event,1,(locate('[',event,1))-1) end process,Node,Year,Month,Count(*) Occurrence from SysLogEvents group by node,year,month, case locate('[',event,1) when 0 then case locate(
':',event,1) when 0 then event else substr(event,1,(locate(':',event,1))-1) end else substr(event,1,(locate('[',event,1))-1) end order by process asc,node asc,year,month;
--Hive QL script: Generates report
--File name: hiveTop3Processes-Year-Report.hql
---------------------------------------------------
use default;
drop table top3_process_by_year_report;
CREATE TABLE IF NOT EXISTS top3_process_by_year_report(
process string,
year int,
occurrence int)
ROW FORMAT DELIMITED
FIELDS TERMINATED by ','
LINES TERMINATED by '\n';
INSERT OVERWRITE TABLE top3_process_by_year_report
select process, year, occurrence from (select case locate('[',event,1) when 0 then case locate(':',event,1) when 0 then event else substr(event,1,(locate(':',event,1))-1) end else substr
(event,1,(locate('[',event,1))-1) end process,Year,Count(*) Occurrence from SysLogEvents
group by year,case locate('[',event,1) when 0 then case locate(':',event,1) when 0 then event else substr(event,1,(locate(':',event,1))-1) end else substr(event,1,(locate('[',event,1))-1
) end order by process asc,year,Occurrence desc) X where process is not null order by occurrence desc limit 3;
Oozie SMTP configuration
------------------------
The following needs to be added to oozie-site.xml - after updating per your environment and configuration;
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property
#------------------------------------------------------------
# Oozie coordinator properties file
# Filename: cordinator.properties
#------------------------------------------------------------
#Coordinator job properties file
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appRoot=${oozieProjectRoot}/sampleCoordinatorJobTrigFileDep
oozie.coord.application.path=${appRoot}/coordinatorConf
workflowAppPath=${appRoot}/workflowApp
subWorkflowAppPath=${workflowAppPath}/hiveSubWorkflowApp
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
inputDir=${oozieProjectRoot}/data/*/*/*/*/*
outputDirJavaMain=${appRoot}/output-JavaMain
triggerFileDir=${appRoot}/triggerDir
toEmailAddress=akhanolk@cdh-dev01
startTime=2013-07-09T15:55Z
endTime=2013-07-09T015:57Z
timeZoneDef=UTC
inputDirRecordCount=`cat ${inputDir} | wc -l`
minRequiredRecordCount=1
<!------------------------------------------>
<!--Coordinator xml file: coordinator.xml -->
<!------------------------------------------>
<coordinator-app name="AirawatCoordJobTrigDep"
frequency="${coord:days(1)}"
start="${startTime}"
end="${endTime}"
timezone="${timeZoneDef}"
xmlns="uri:oozie:coordinator:0.1"
xmlns:sla="uri:oozie:sla:0.1">
<controls>
<timeout>20</timeout>
<concurrency>6</concurrency>
<execution>FIFO</execution>
</controls>
<datasets>
<dataset name="inputDS" frequency="${coord:days(1)}" initial-instance="${startTime}" timezone="${timeZoneDef}">
<uri-template>${triggerFileDir}</uri-template>
<done-flag>trigger.dat</done-flag>
</dataset>
</datasets>
<input-events>
<data-in name="AirawatCoordTrigDepInput" dataset="inputDS">
<instance>${startTime}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${workflowAppPath}</app-path>
</workflow>
</action>
</coordinator-app>
<!------------------------------------------>
<!--Workflow xml file: workflow.xml -->
<!------------------------------------------>
<workflow-app name="AirawatSampleCoordJob-Parent" xmlns="uri:oozie:workflow:0.1">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="startTaskFork">
${inputDirRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<fork name="startTaskFork">
<path start="javaMainAction"/>
<path start="hiveSubWorkflow"/>
</fork>
<action name="javaMainAction">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDirJavaMain}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>Airawat.Oozie.Samples.LogEventCount</main-class>
<arg>${inputDir}</arg>
<arg>${outputDirJavaMain}</arg>
</java>
<ok to="joiningControl-P"/>
<error to="sendErrorEmail"/>
</action>
<action name='hiveSubWorkflow'>
<sub-workflow>
<app-path>${subWorkflowAppPath}</app-path>
<propagate-configuration/>
</sub-workflow>
<ok to="joiningControl-P" />
<error to="sendErrorEmail" />
</action>
<join name="joiningControl-P" to="cleanUp"/>
<action name='cleanUp'>
<fs>
<delete path="${triggerFileDir}"/>
</fs>
<ok to="end" />
<error to="sendErrorEmail" />
</action>
<action name="sendErrorEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${toEmailAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:name()} with id -${wf:id()}, had issues and will be killed; The error logged is: ${wf:errorMessage(wf:lastErrorNode());}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
<!------------------------------------------------->
<!--Sub-workflow xml file: workflow.xml ----------->
<!------------------------------------------------->
<workflow-app name="AirawatSampleCoordJob-Child" xmlns="uri:oozie:workflow:0.1">
<start to="startConcurrentHiveTasksFork"/>
<fork name="startConcurrentHiveTasksFork">
<path start="hiveActionIssuesByYM"/>
<path start="hiveActionTop3Issues"/>
</fork>
<action name="hiveActionIssuesByYM">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>${subWorkflowAppPath}/hive-site.xml</job-xml>
<script>${subWorkflowAppPath}/hiveConsolidated-Year-Month-Report.hql</script>
</hive>
<ok to="joiningControl-C"/>
<error to="sendErrorEmail"/>
</action>
<action name="hiveActionTop3Issues">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>${subWorkflowAppPath}/hive-site.xml</job-xml>
<script>${subWorkflowAppPath}/hiveTop3Processes-Year-Report.hql</script>
</hive>
<ok to="joiningControl-C"/>
<error to="sendErrorEmail"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<action name="sendErrorEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${toEmailAddress}</to>
<subject>Status of child workflow ${wf:id()}</subject>
<body>The workflow ${wf:id()} had issues and will be killed;; The error logged is: ${wf:errorMessage(wf:lastErrorNode())}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<join name="joiningControl-C" to="end"/>
<end name="end" />
</workflow-app>
Executing the oozie cooridnator job
------------------------------------
Step 1) Modify coordinator.properties file
Set the start and end time to be in the future, UTC, so you can see how the job is in waiting state prior to start time condition being met; The following are the entries that need to be changed.
startTime=2013-07-09T03:45Z
endTime=2013-07-09T03:47Z
Step 2) Submit the coordinator job
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/sampleCoordinatorJobTrigFileDep/coordinatorConf/coordinator.properties -run
Step 3) Publish trigger file to run job
$ hadoop fs -put oozieProject/sampleCoordinatorJobTrigFileDep/triggerDir oozieProject/sampleCoordinatorJobTrigFileDep
Replace cdh-dev01 with your oozie server, and 11000 with the associated port number;
Output of java program:
------------------------
$ hadoop fs -ls -R oozieProject/sampleCoordinatorJobTrigFileDep/out*/part* | awk '{print $8}' | xargs hadoop fs -cat
2013-NetworkManager 7
2013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
Results of report 2, from execution of hiveConsolidated-Year-Month-Report.hql
------------------------------------------------------------------------------
hive>
set hive.cli.print.header=true;
hive> select * from consolidated_YM_report;
OK
process node year month occurrence
NULL cdh-dev01 2013 5 19
NULL cdh-vms 2013 5 6
NetworkManager cdh-dev01 2013 5 7
console-kit-daemon cdh-dev01 2013 5 7
gnome-session cdh-dev01 2013 5 11
init cdh-dev01 2013 5 38
init cdh-dn01 2013 5 17
init cdh-dn02 2013 5 17
init cdh-dn03 2013 5 23
init cdh-jt01 2013 5 17
init cdh-nn01 2013 5 29
init cdh-vms 2013 5 25
kernel cdh-dev01 2013 5 203
kernel cdh-dn01 2013 5 67
kernel cdh-dn02 2013 5 58
kernel cdh-dn03 2013 5 58
kernel cdh-jt01 2013 5 76
kernel cdh-nn01 2013 5 172
kernel cdh-vms 2013 5 176
login cdh-vms 2013 5 2
nm-dispatcher.action cdh-dev01 2013 5 4
ntpd_initres cdh-dev01 2013 5 57
ntpd_initres cdh-dn01 2013 5 803
ntpd_initres cdh-dn02 2013 5 804
ntpd_initres cdh-dn03 2013 5 792
ntpd_initres cdh-jt01 2013 5 804
ntpd_initres cdh-nn01 2013 5 834
ntpd_initres cdh-vms 2013 5 39
polkit-agent-helper-1 cdh-dev01 2013 5 8
pulseaudio cdh-dev01 2013 4 1
pulseaudio cdh-dev01 2013 5 17
spice-vdagent cdh-dev01 2013 4 1
spice-vdagent cdh-dev01 2013 5 14
sshd cdh-dev01 2013 5 6
sudo cdh-dn02 2013 4 1
sudo cdh-dn02 2013 5 1
sudo cdh-dn03 2013 4 1
sudo cdh-dn03 2013 5 1
sudo cdh-jt01 2013 4 3
sudo cdh-jt01 2013 5 1
udevd cdh-dn01 2013 5 1
udevd cdh-dn02 2013 5 1
udevd cdh-dn03 2013 5 1
udevd cdh-jt01 2013 5 1
udevd cdh-vms 2013 5 2
Time taken: 5.841 seconds
Results of report 3, from execution of hiveTop3Processes-Year-Report.hql
------------------------------------------------------------------------
--Get top3 issues logged by year
hive>
set hive.cli.print.header=true;
hive>
select * from top3_process_by_year_report;
process year occurrence
ntpd_initres 2013 4133
kernel 2013 810
init 2013 166
Time taken: 0.385 seconds
http://hadooped.blogspot.com/p/ooziecooridnatorjobtrigfiledep-pix-oozie.html




Oozie web console - screenshots:











Thursday, July 4, 2013

Apache Oozie - Part 9a: Coordinator jobs - time triggered; fork-join and decision controls

1.0. What's covered in the blog?

1. Oozie documentation on coordinator job, sub workflow, fork-join, and decision controls
2. A sample application that includes components of a oozie time triggered coordinator job - scripts/code, sample data 
and commands;  Oozie actions covered: hdfs action, email action, java main action, hive action;  Oozie controls covered: decision, fork-join; The workflow includes a sub-workflow that runs two hive actions concurrently.  The hive table is partitioned; Parsing - hive-regex, and Java-regex.  Also, the java mapper, gets the input directory path and includes part of it in the key.

Version:
Oozie 3.3.0;

Related blogs:

Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another
Blog 13: Oozie workflow - SSH action


Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.

2.0. Oozie sub-workflow

The sub-workflow action runs a child workflow job, the child workflow job can be in the same Oozie system or in another Oozie system.  The parent workflow job will wait until the child workflow job has completed.

Syntax:






















The child workflow job runs in the same Oozie system instance where the parent workflow job is running.
The app-path element specifies the path to the workflow application of the child workflow job.
The propagate-configuration flag, if present, indicates that the workflow job configuration should be propagated to the child workflow.

The configuration section can be used to specify the job properties that are required to run the child workflow job.  The configuration of the sub-workflow action can be parameterized (templatized) using EL expressions.

Link to Apache documentation:
http://oozie.apache.org/docs/3.3.0/WorkflowFunctionalSpec.html#a3.2.6_Sub-workflow_Action

Note:
For a typical on-demand workflow, you have core components - job.properties and workflow.xml.  For a sub workflow, you need yet another workflow.xml that clearly defines activities to occur in the sub-workflow.  In the parent workflow, the sub-workflow is referenced.  To keep it neat, best to have a sub-directory to hold the sub-workflow core components.  Also, a single job.properties is sufficient. 

E.g.
workflowAppPath
    workflow.xml
    job.properties
    Any other lib/archives/files etc

    subWorkflowAppPath
        workflow.xml

       

3.0. Coordinator job

Users typically run map-reduce, hadoop-streaming, hdfs and/or Pig jobs on the grid. Multiple of these jobs can be combined to form a workflow job. Oozie, Hadoop Workflow Systemdefines a workflow system that runs such jobs.

Commonly, workflow jobs are run based on regular time intervals and/or data availability. And, in some cases, they can be triggered by an external event.  Expressing the condition(s) that trigger a workflow job can be modeled as a predicate that has to be satisfied. 

The workflow job is started after the predicate is satisfied. A predicate can reference to data, time and/or external events. In the future, the model can be extended to support additional event types.
It is also necessary to connect workflow jobs that run regularly, but at different time intervals. The outputs of multiple subsequent runs of a workflow become the input to the next workflow. For example, the outputs of last 4 runs of a workflow that runs every 15 minutes become the input of another workflow that runs every 60 minutes. Chaining together these workflows result it is referred as a data application pipeline.

The Oozie Coordinator system allows the user to define and execute recurrent and interdependent workflow jobs (data application pipelines).  Real world data application pipelines have to account for reprocessing, late processing, catchup, partial processing, monitoring, notification and SLAs.

Link to Apache documentation:
http://oozie.apache.org/docs/3.3.0/CoordinatorFunctionalSpec.html


4.0. Decision control

decision node enables a workflow to make a selection on the execution path to follow.  The behavior of a decision node can be seen as a switch-case statement.

decision node consists of a list of predicates-transition pairs plus a default transition. Predicates are evaluated in order or appearance until one of them evaluates to true and the corresponding transition is taken. If none of the predicates evaluates to true the default transition is taken.

Predicates are JSP Expression Language (EL) expressions (refer to section 4.2 of this document) that resolve into a boolean value, true or false.  For example:
${fs:fileSize('/usr/foo/myinputdir') gt 10 * GB}

Syntax:
The name attribute in the decision node is the name of the decision node.
Each case elements contains a predicate an a transition name. The predicate ELs are evaluated in order until one returns true and the corresponding transition is taken.

The default element indicates the transition to take if none of the predicates evaluates to true .
All decision nodes must have a default element to avoid bringing the workflow into an error state if none of the predicates evaluates to true.

Link to Apache documentation:
http://oozie.apache.org/docs/3.3.0/WorkflowFunctionalSpec.html#a3.1.4_Decision_Control_Node


5.0. Fork-Join controls

fork node splits one path of execution into multiple concurrent paths of execution.
join node waits until every concurrent execution path of a previous fork node arrives to it.
The fork and join nodes must be used in pairs. 
The join node assumes concurrent execution paths are children of the same fork node.

Syntax:


The name attribute in the fork node is the name of the workflow fork node. The start attribute in the path elements in the fork node indicate the name of the workflow node that will be part of the concurrent execution paths.

The name attribute in the join node is the name of the workflow join node. The to attribute in the join node indicates the name of the workflow node that will executed after all concurrent execution paths of the corresponding fork arrive to the join node.

Link to Apache documentation:
http://oozie.apache.org/docs/3.3.0/WorkflowFunctionalSpec.html#a3.1.5_Fork_and_Join_Control_Nodes

6.0. Helpful sites

7.0. Sample coordinator application
Highlights:
The sample application includes components of a oozie (time initiated) coordinator application - scripts/code, sample data and commands;  Oozie actions covered: hdfs action, email action, java main action, hive action;  Oozie controls covered: decision, fork-join; The workflow includes a sub-workflow that runs two hive actions concurrently.  The hive table is partitioned; Parsing uses hive-regex, and Java-regex.  Also, the java mapper, gets the input directory path and includes part of it in the key.

Pictorial overview of application:






Components of application:


Application details:
This gist includes components of a oozie (time initiated) coordinator application - scripts/code, sample data
and commands; Oozie actions covered: hdfs action, email action, java main action,
hive action; Oozie controls covered: decision, fork-join; The workflow includes a
sub-workflow that runs two hive actions concurrently. The hive table is partitioned;
Parsing uses hive-regex serde, and Java-regex. Also, the java mapper, gets the input
directory path and includes part of it in the key.
Usecase: Parse Syslog generated log files to generate reports;
Pictorial overview of job:
--------------------------
http://hadooped.blogspot.com/p/ooziecooridnatorjobtimedepparent.html
Includes:
---------
Sample data and structure: 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Java MR - Mapper code: 04A-MapperJavaCode
Java MR - Reducer code: 04B-ReducerJavaCode
Java MR - Driver code: 04C-DriverJavaCode
Command to test Java MR program: 04D-CommandTestJavaMRProg
Hive -create log table command 05A-HiveCreateTable
Hive -load partitions 05B-HiveLoadPartitions
Hive commands to test data loaded 05C-HiveDataLoadTestCommands
Hive QL script for report 2 05D-HiveQLReport2
Hive QL script for report 3 05E-HiveQLReport3
Oozie configuration for email 06-OozieSMTPconfiguration
Oozie coorindator properties file 07-OozieCoordinatorProperties
Oozie cooridinator conf file 08-OozieCoordinatorXML
Oozie workflow conf file 09-OozieWorkflowXML
Oozie sub-workflow conf file 10-OozieSubWorkflowXML
Oozie commands 11-OozieJobExecutionCommands
Output -Report1 12A-Rpt1-JavaMainProgramOutput
Output -Report2 12B-Rpt2-HiveProgramOutputIssuesByMonth
Output -Report3 12C-Rpt3-HiveProgramOutputTop3Issues
Oozie web console 13-OozieWebConsoleScreenshots
Sample data
------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
Structure
----------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
Data download
-------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
sampleCoordinatorJobTimeDep
coordinatorConf/
coordinator.properties
coordinator.xml
workflowApp
workflow.xml
hiveSubWorkflowApp
workflow.xml
hive-site.xml
hiveConsolidated-Year-Month-Report.hql
hiveTop3Processes-Year-Report.hql
lib
LogEventCount.jar
Hdfs load commands
------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/data oozieProject/
$ hadoop fs -put oozieProject/sampleCoordinatorJobTimeDep oozieProject
Run command below to validate load against expected directory structure in section 02-DataAndScriptDownload
$ hadoop fs -ls -R oozieProject/sampleCoordinatorJobTimeDep | awk '{print $8}'
// Source code for Mapper
//-----------------------------------------------------------
// LogEventCountMapper.java
//-----------------------------------------------------------
// Java program that parses logs using regex
// The program counts the number of processes logged by year.
// E.g. Key=2013-ntpd; Value=1;
package Airawat.Oozie.Samples;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class LogEventCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
String strLogEntryPattern = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)";
public static final int NUM_FIELDS = 6;
Text strEvent = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String strLogEntryLine = value.toString();
Pattern objPtrn = Pattern.compile(strLogEntryPattern);
Matcher objPatternMatcher = objPtrn.matcher(strLogEntryLine);
if (!objPatternMatcher.matches() || NUM_FIELDS != objPatternMatcher.groupCount()) {
System.err.println("Bad log entry (or problem with RE?):");
System.err.println(strLogEntryLine);
return;
}
/*
System.out.println("Month_Name: " + objPatternMatcher.group(1));
System.out.println("Day: " + objPatternMatcher.group(2));
System.out.println("Time: " + objPatternMatcher.group(3));
System.out.println("Node: " + objPatternMatcher.group(4));
System.out.println("Process: " + objPatternMatcher.group(5));
System.out.println("LogMessage: " + objPatternMatcher.group(6));
*/
//TODO: Move this NOT so pretty chunk of code to the mapper setup method so its executed only once
strEvent.set(((FileSplit)context.getInputSplit()).getPath().toString().substring((((FileSplit)context.getInputSplit()).getPath().toString().length()-16), (((FileSplit)context.getInputSplit()).getPath().toString().length()-12)) + "-" + ((objPatternMatcher.group(5).toString().indexOf("[")) == -1 ? (objPatternMatcher.group(5).toString().substring(0,(objPatternMatcher.group(5).length()-1))) : (objPatternMatcher.group(5).toString().substring(0,(objPatternMatcher.group(5).toString().indexOf("["))))));
context.write(strEvent, new IntWritable(1));
}
}
// Source code for reducer
//--------------------------
// LogEventCountReducer.java
//--------------------------
package Airawat.Oozie.Samples;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LogEventCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int intEventCount = 0;
for (IntWritable value : values) {
intEventCount += value.get();
}
context.write(key, new IntWritable(intEventCount));
}
}
// Source code for reducer
//--------------------------
// LogEventCountReducer.java
//--------------------------
package Airawat.Oozie.Samples;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
public class LogEventCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf(
"Usage: Airawat.Oozie.Samples.LogEventCount <input dir> <output dir>\n");
System.exit(-1);
}
//Instantiate a Job object for your job's configuration.
Job job = new Job();
//Job jar file
job.setJarByClass(LogEventCount.class);
//Job name
job.setJobName("Syslog Event Rollup");
//Paths
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//Mapper and reducer classes
job.setMapperClass(LogEventCountMapper.class);
job.setReducerClass(LogEventCountReducer.class);
//Job's output key and value classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//Number of reduce tasks
job.setNumReduceTasks(3);
//Start the MapReduce job, wait for it to finish.
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
Commands to test the java program
---------------------------------
a) Command to run the program
$ hadoop jar oozieProject/sampleCoordinatorJobTimeDep/workflowApp/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount "oozieProject/sampleCoordinatorJobTimeDep/data/*/*/*/*/*" "oozieProject/sampleCoordinatorJobTimeDep/myCLIOutput"
b) Command to view results
$ hadoop fs -cat oozieProject/sampleCoordinatorJobTimeDep/myCLIOutput/part*
c) Results
2013-NetworkManager 7
2013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
Hive script to create table for logs
-------------------------------------
hive>
CREATE EXTERNAL TABLE SysLogEvents(
month_name STRING,
day STRING,
time STRING,
host STRING,
event STRING,
log STRING)
PARTITIONED BY(node string,year int, month int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)"
)
stored as textfile;
Hive scripts to create and load partitions
-------------------------------------------
Note: Replace my user ID "akhanolk" with yours
hive >
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dev01",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dev01/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dev01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dev01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn02",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn02/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn02",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn02/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn03",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn03/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn03",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn03/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-jt01",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-jt01/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-jt01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-jt01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-nn01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-nn01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-vms",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-vms/2013/05/';
Hive ql to test data loaded
----------------------------
hive>
--Print headers
set hive.cli.print.header=true;
--Need to add this jar for MR to work..your env may not need it
add jar hadoop-lib/hive-contrib-0.10.0-cdh4.2.0.jar;
--Sample query
select * from SysLogEvents limit 2;
--Hive QL script: Generates report
--File name: hiveConsolidated-Year-Month-Report.hql
---------------------------------------------------
use default;
drop table consolidated_YM_report;
CREATE TABLE IF NOT EXISTS consolidated_YM_report(
process string,
node string,
year int,
month int,
occurrence int)
ROW FORMAT DELIMITED
FIELDS TERMINATED by ','
LINES TERMINATED by '\n';
INSERT OVERWRITE TABLE consolidated_YM_report
select case locate('[',event,1) when 0 then case locate(':',event,1) when 0 then event else substr(event,1,(locate(':',event,1))-1) end
else substr(event,1,(locate('[',event,1))-1) end process,Node,Year,Month,Count(*) Occurrence from SysLogEvents group by node,year,month, case locate('[',event,1) when 0 then case locate(
':',event,1) when 0 then event else substr(event,1,(locate(':',event,1))-1) end else substr(event,1,(locate('[',event,1))-1) end order by process asc,node asc,year,month;
--Hive QL script: Generates report
--File name: hiveTop3Processes-Year-Report.hql
---------------------------------------------------
use default;
drop table top3_process_by_year_report;
CREATE TABLE IF NOT EXISTS top3_process_by_year_report(
process string,
year int,
occurrence int)
ROW FORMAT DELIMITED
FIELDS TERMINATED by ','
LINES TERMINATED by '\n';
INSERT OVERWRITE TABLE top3_process_by_year_report
select process, year, occurrence from (select case locate('[',event,1) when 0 then case locate(':',event,1) when 0 then event else substr(event,1,(locate(':',event,1))-1) end else substr
(event,1,(locate('[',event,1))-1) end process,Year,Count(*) Occurrence from SysLogEvents
group by year,case locate('[',event,1) when 0 then case locate(':',event,1) when 0 then event else substr(event,1,(locate(':',event,1))-1) end else substr(event,1,(locate('[',event,1))-1
) end order by process asc,year,Occurrence desc) X where process is not null order by occurrence desc limit 3;
Oozie SMTP configuration
------------------------
The following needs to be added to oozie-site.xml - after updating per your environment and configuration;
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property
#------------------------------------------------------------
# Oozie coordinator properties file
# Filename: cordinator.properties
#------------------------------------------------------------
#Coordinator job properties file - coordinator.properties
nameNode=hdfs://cdh-nn01.hadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
appRoot=${nameNode}/user/${user.name}/oozieProject/sampleCoordinatorJobTimeDep
oozie.coord.application.path=${appRoot}/coordinatorConf
workflowAppPath=${appRoot}/workflowApp
subWorkflowAppPath=${workflowAppPath}/hiveSubWorkflowApp
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
inputDir=${appRoot}/data/*/*/*/*/*
outputDirJavaMain=${appRoot}/output-JavaMain
triggerFileDir=${appRoot}/triggerDir
toEmailAddress=akhanolk@cdh-dev01
startTime=2013-07-09T03:45Z
endTime=2013-07-09T03:47Z
timeZoneDef=UTC
inputDirRecordCount=`cat ${inputDir} | wc -l`
minRequiredRecordCount=1
<!------------------------------------------>
<!--Coordinator xml file: coordinator.xml -->
<!------------------------------------------>
<coordinator-app name="AirawatCoordJobTimeDep"
frequency="${coord:days(1)}"
start="${startTime}"
end="${endTime}"
timezone="${timeZoneDef}"
xmlns="uri:oozie:coordinator:0.1">
<controls>
<timeout>20</timeout>
<concurrency>6</concurrency>
<execution>FIFO</execution>
</controls>
<action>
<workflow>
<app-path>${workflowAppPath}</app-path>
</workflow>
</action>
</coordinator-app>
<!------------------------------------------>
<!--Workflow xml file: workflow.xml -->
<!------------------------------------------>
<workflow-app name="AirawatSampleCoordJob-Parent" xmlns="uri:oozie:workflow:0.1">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="startTaskFork">
${inputDirRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<fork name="startTaskFork">
<path start="javaMainAction"/>
<path start="hiveSubWorkflow"/>
</fork>
<action name="javaMainAction">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDirJavaMain}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>Airawat.Oozie.Samples.LogEventCount</main-class>
<arg>${inputDir}</arg>
<arg>${outputDirJavaMain}</arg>
</java>
<ok to="joiningControl-P"/>
<error to="sendErrorEmail"/>
</action>
<action name='hiveSubWorkflow'>
<sub-workflow>
<app-path>${subWorkflowAppPath}</app-path>
<propagate-configuration/>
</sub-workflow>
<ok to="joiningControl-P" />
<error to="sendErrorEmail" />
</action>
<join name="joiningControl-P" to="end"/>
<action name="sendErrorEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${toEmailAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:name()} with id -${wf:id()}, had issues and will be killed; The error logged is: ${wf:errorMessage(wf:lastErrorNode());}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
<!------------------------------------------------->
<!--Sub-workflow xml file: workflow.xml ----------->
<!------------------------------------------------->
<workflow-app name="AirawatSampleCoordJob-Child" xmlns="uri:oozie:workflow:0.1">
<start to="startConcurrentHiveTasksFork"/>
<fork name="startConcurrentHiveTasksFork">
<path start="hiveActionIssuesByYM"/>
<path start="hiveActionTop3Issues"/>
</fork>
<action name="hiveActionIssuesByYM">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>${subWorkflowAppPath}/hive-site.xml</job-xml>
<script>${subWorkflowAppPath}/hiveConsolidated-Year-Month-Report.hql</script>
</hive>
<ok to="joiningControl-C"/>
<error to="sendErrorEmail"/>
</action>
<action name="hiveActionTop3Issues">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>${subWorkflowAppPath}/hive-site.xml</job-xml>
<script>${subWorkflowAppPath}/hiveTop3Processes-Year-Report.hql</script>
</hive>
<ok to="joiningControl-C"/>
<error to="sendErrorEmail"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<action name="sendErrorEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${toEmailAddress}</to>
<subject>Status of child workflow ${wf:id()}</subject>
<body>The workflow ${wf:id()} had issues and will be killed;; The error logged is: ${wf:errorMessage(wf:lastErrorNode())}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<join name="joiningControl-C" to="end"/>
<end name="end" />
</workflow-app>
Executing the oozie cooridnator job
------------------------------------
Step 1) Modify coordinator.properties file
Set the start and end time to be in the future, UTC, so you can see how the job is in waiting state prior to start time condition being met; The following are the entries that need to be changed.
startTime=2013-07-09T03:45Z
endTime=2013-07-09T03:47Z
Step 2) Submit the coordinator job
oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/sampleCoordinatorJobTimeDep/coordinatorConf/coordinator.properties -submit
Step 3) Job execution
The job will be in waiting state and execute when the time predicate is met
Replace cdh-dev01 with your oozie server, and 11000 with the associated port number;
Output of java program:
------------------------
$ hadoop fs -ls -R oozieProject/sampleCoordinatorJobTimeDep/out*/part* | awk '{print $8}' | xargs hadoop fs -cat
2013-NetworkManager 7
2013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
Results of report 2, from execution of hiveConsolidated-Year-Month-Report.hql
------------------------------------------------------------------------------
hive>
set hive.cli.print.header=true;
hive> select * from consolidated_YM_report;
OK
process node year month occurrence
NULL cdh-dev01 2013 5 19
NULL cdh-vms 2013 5 6
NetworkManager cdh-dev01 2013 5 7
console-kit-daemon cdh-dev01 2013 5 7
gnome-session cdh-dev01 2013 5 11
init cdh-dev01 2013 5 38
init cdh-dn01 2013 5 17
init cdh-dn02 2013 5 17
init cdh-dn03 2013 5 23
init cdh-jt01 2013 5 17
init cdh-nn01 2013 5 29
init cdh-vms 2013 5 25
kernel cdh-dev01 2013 5 203
kernel cdh-dn01 2013 5 67
kernel cdh-dn02 2013 5 58
kernel cdh-dn03 2013 5 58
kernel cdh-jt01 2013 5 76
kernel cdh-nn01 2013 5 172
kernel cdh-vms 2013 5 176
login cdh-vms 2013 5 2
nm-dispatcher.action cdh-dev01 2013 5 4
ntpd_initres cdh-dev01 2013 5 57
ntpd_initres cdh-dn01 2013 5 803
ntpd_initres cdh-dn02 2013 5 804
ntpd_initres cdh-dn03 2013 5 792
ntpd_initres cdh-jt01 2013 5 804
ntpd_initres cdh-nn01 2013 5 834
ntpd_initres cdh-vms 2013 5 39
polkit-agent-helper-1 cdh-dev01 2013 5 8
pulseaudio cdh-dev01 2013 4 1
pulseaudio cdh-dev01 2013 5 17
spice-vdagent cdh-dev01 2013 4 1
spice-vdagent cdh-dev01 2013 5 14
sshd cdh-dev01 2013 5 6
sudo cdh-dn02 2013 4 1
sudo cdh-dn02 2013 5 1
sudo cdh-dn03 2013 4 1
sudo cdh-dn03 2013 5 1
sudo cdh-jt01 2013 4 3
sudo cdh-jt01 2013 5 1
udevd cdh-dn01 2013 5 1
udevd cdh-dn02 2013 5 1
udevd cdh-dn03 2013 5 1
udevd cdh-jt01 2013 5 1
udevd cdh-vms 2013 5 2
Time taken: 5.841 seconds
Results of report 3, from execution of hiveTop3Processes-Year-Report.hql
------------------------------------------------------------------------
--Get top3 issues logged by year
hive>
set hive.cli.print.header=true;
hive>
select * from top3_process_by_year_report;
process year occurrence
ntpd_initres 2013 4133
kernel 2013 810
init 2013 166
Time taken: 0.385 seconds
http://hadooped.blogspot.com/p/ooziecooridnatorjobtimedep-pix.html


Oozie web console:
Screenshots from execution of sample program