Tuesday, July 16, 2013

Apache Oozie - Part 8: Subworkflow


1.0. What's covered in the blog?

1) Apache documentation on sub-workflows
2) A sample program that includes components of a oozie workflow application with a java main action and a subworkflow containing a sqoop action.  Scripts/code, sample dataset and commands are included;  Oozie actions covered: java action, sqoop action (mysql database); 

Versions:
Oozie 3.3.0, Sqoop (1.4.2) with Mysql (5.1.69)

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another


2.0. Apache documentation on sub-workflows


The sub-workflow action runs a child workflow job, the child workflow job can be in the same Oozie system or in another Oozie system.  The parent workflow job will wait until the child workflow job has completed.

Syntax:






















The child workflow job runs in the same Oozie system instance where the parent workflow job is running.
The app-path element specifies the path to the workflow application of the child workflow job.
The propagate-configuration flag, if present, indicates that the workflow job configuration should be propagated to the child workflow.

The configuration section can be used to specify the job properties that are required to run the child workflow job.  The configuration of the sub-workflow action can be parameterized (templatized) using EL expressions.

Link to Apache documentation:
http://oozie.apache.org/docs/3.3.0/WorkflowFunctionalSpec.html#a3.2.6_Sub-workflow_Action

Note:
For a typical on-demand workflow, you have core components - job.properties and workflow.xml.  For a sub workflow, you need yet another workflow.xml that clearly defines activities to occur in the sub-workflow.  In the parent workflow, the sub-workflow is referenced.  To keep it neat, best to have a sub-directory to hold the sub-workflow core components.  Also, a single job.properties is sufficient. 

3.0. Sample workflow application

Highlights:
The workflow has two actions - one is a java main action and the other is a sub-workflow action.

The java main action parses log files on hdfs and generates a report.
The sub-workflow action executes after success of the java main action, and pipes the report in hdfs to mysql database.


Pictorial overview:





Components of such a workflow application:
























Application details:

This gist includes components of a oozie workflow application - scripts/code, sample data
and commands; Oozie actions covered: sub-workflow, email java main action,
sqoop action (to mysql); Oozie controls covered: decision;
Pictorial overview:
--------------------
http://hadooped.blogspot.com/2013/07/apache-oozie-part-8-subworkflow.html
Usecase:
--------
Parse Syslog generated log files to generate reports; Export reports to RDBMS;
Includes:
---------
Sample data defintion and structure 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Mysql database setup: 04-mysqlDBSetup
Sqoop task -standalone tryout: 05-SqoopStandAloneTryout
App job properties file: 06-JobProperties
Workflow defintion -Parent: 07-WorkflowXMLMain
Independent test of LogParser jar: 08-LogParserStandaloneTestHowTo
Workflow defintion -DataExporter: 09-SubWorkflowXMLDataExporter
Oozie commands: 10-OozieJobExecutionCommands
Output of LogParser: 11a-OutputLogParser
Output in mysql: 11b-OutputDataExporter
Oozie web console - screenshots: 12-OozieWebConsoleScreenshots
Java LogParser code: 13-JavaCodeHyperlink
01a. Sample data (log files)
----------------------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
01b. Structure
---------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
02a. Data and code download
----------------------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
workflowWithSubworkflow
job.properties
workflow.xml
lib
LogEventCount.jar
dataExporterSubWorkflowApp
workflow.xml
03-Hdfs load commands
----------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
*********************************
Mysql database setup tasks
*********************************
a) Create database:
mysql>
create database airawat;
b) Switch to database created:
mysql>
use airawat;
c) Create destination table for sqoop export from hdfs:
mysql>
CREATE TABLE IF NOT EXISTS Logged_Process_Count_By_Year(
year_and_process varchar(100),
occurrence INTEGER);
d) Ensure your sqoop user has access to database created:
mysql>
grant all on airawat.* to myUser@'myMachine';
view raw 04-mysqlDBSetup hosted with ❤ by GitHub
Tryout the sqoop task- outside of workflow
-------------------------------------------
Use the dataset from my gist-
https://gist.github.com/airawat/5970026
*********************************
Sqoop command
*********************************
Pre-requisties:
1. Dataset to be exported should exist on HDFS
2. mySql table that is the destination for the export should exist
Command:
--Run on node that acts as sqoop client;
$ sqoop export \
--connect jdbc:mysql://cdh-dev01/airawat \
--username devUser \
--password myPwd \
--table Logged_Process_Count_By_Year \
--direct \
--export-dir "oozieProject/datasetGeneratorApp/outputDir" \
--fields-terminated-by "\t"
*********************************
Results in mysql
*********************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
#*************************************************
# job.properties
#*************************************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozieLibPath=${nameNode}/user/oozie/share/lib
oozie.libpath=${oozieLibPath}
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
# Paths
#------
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowWithSubworkflow
oozie.wf.application.path=${appPath}
# For logParserAction (java main action)
#---------------------------------------
inputDir=${oozieProjectRoot}/data/*/*/*/*/*
outputDir=${appPath}/output
inputDirRecordCount=`cat ${inputDir} | wc -l`
minRequiredRecordCount=1
# For dataExporterSubWorkflow (having sqoop action)
#---------------------------------------------------
subWorkflowCodeDir=${appPath}/dataExporterSubWorkflowApp
mysqlServer=cdh-dev01
mysqlServerDB=airawat
mysqlServerDBUID=devUser
mysqlServerDBPwd=myPwd
triggerDatasetDir=${outputDir}
triggerDataFiles=${triggerDatasetDir}/part*
sqoopInputRecordCount=`cat ${triggerDataFiles} | wc -l`
<!------------------------------------------------------------->
<!-----Workflow defintion file - workflow.xml ----------------->
<!------------------------------------------------------------->
<workflow-app name="SubWorkflow-Parent" xmlns="uri:oozie:workflow:0.1">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="logParserAction">
${inputDirRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<action name="logParserAction">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>Airawat.Oozie.Samples.LogEventCount</main-class>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to="dataExporterSubWorkflow"/>
<error to="killJob"/>
</action>
<action name='dataExporterSubWorkflow'>
<sub-workflow>
<app-path>${subWorkflowCodeDir}</app-path>
<propagate-configuration/>
</sub-workflow>
<ok to="end"/>
<error to="killJob" />
</action>
<kill name="killJob">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
#*******************************************
# LogParser program - standalone test
#*******************************************
Commands to test the java program
a) Command to run the program
$ $ hadoop jar oozieProject/workflowWithSubworkflow/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount "oozieProject/data/*/*/*/*/*" "oozieProject/workflowWithSubworkflow/myCLIOutput"
b) Command to view results
$ hadoop fs -cat oozieProject/workflowWithSubworkflow/myCLIOutput/part* | sort
c) Results
2013-NetworkManager 7
2013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
<!------------------------------------------------------------->
<!--Sub-Workflow defintion file - workflow.xml ---------------->
<!------------------------------------------------------------->
<workflow-app name="SubworkflowApp-SubWf-DataExporter" xmlns="uri:oozie:workflow:0.1">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="sqoopAction">
${sqoopInputRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<action name="sqoopAction">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.libpath</name>
<value>${oozieLibPath}</value>
</property>
</configuration>
<command>export --connect jdbc:mysql://${mysqlServer}/${mysqlServerDB} --username ${mysqlServerDBUID} --password ${mysql
ServerDBPwd} --table Logged_Process_Count_By_Year --direct --export-dir ${triggerDatasetDir} --fields-terminated-by "\t"</command>
</sqoop>
<ok to="end"/>
<error to="killJob"/>
</action>
<kill name="killJob">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
****************************************
10. Oozie job commands
****************************************
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowWithSubworkflow/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowWithSubworkflow/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
****************************************
Output - Log Parser program
****************************************
$ hadoop fs -cat oozieProject/workflowWithSubworkflow/output/part*
2013-NetworkManager 7
22013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
****************************************
Output - data export from hdfs to mysql
****************************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
Screenshots from Oozie web console:
-----------------------------------
Available at:
http://hadooped.blogspot.com/2013/07/apache-oozie-part-8-subworkflow.html



Oozie web console - screenshots:







































4 comments:

  1. Thanks for well-detailed tutorials !! Just one query - Is it possible to capture output from a sub workflow ? It's like I have a java action to find a HDFS path that is to be used for subsequent actions and this action is being used in multiple workflows. Is it possible to make it as sub-workflow and retrieve the path in main workflows ?

    ReplyDelete
  2. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  3. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete
  4. Such an excellent and interesting information in your blog, it is awesome to read and do post like this with more informations. Salesforce Training Dallas

    ReplyDelete