Tuesday, June 18, 2013

Apache Oozie - Part 11: Java API for interfacing with oozie workflows


What's covered in the blog?

1. Documentation on the Oozie java API
2. Code of a sample java program that calls a oozie workflow with a java action to process some syslog generated log files.  Instructions on loading sample data, workflow files and running the workflow are provided, along with some notes based on my learnings.

Version:
Oozie 3.3.0;

Related blogs:

Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.

1.0. About the Oozie java API

Oozie provides a Java Client API that simplifies integrating Oozie with Java applications. This Java Client API is a convenience API to interact with Oozie Web-Services API.
The following code snippet shows how to submit an Oozie job using the Java Client API.

import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
.
import java.util.Properties;
.
    ...
.
    // get a OozieClient for local Oozie
    OozieClient wc = new OozieClient("http://bar:11000/oozie");
.
    // create a workflow job configuration and set the workflow application path
    Properties conf = wc.createConfiguration();
    conf.setProperty(OozieClient.APP_PATH, "hdfs://foo:8020/usr/tucu/my-wf-app");
.
    // setting workflow parameters
    conf.setProperty("jobTracker", "foo:8021");
    conf.setProperty("inputDir", "/usr/tucu/inputdir");
    conf.setProperty("outputDir", "/usr/tucu/outputdir");
    ...
.
    // submit and start the workflow job
    String jobId = wc.run(conf);
    System.out.println("Workflow job submitted");
.
    // wait until the workflow job finishes printing the status every 10 seconds
    while (wc.getJobInfo(jobId).getStatus() == Workflow.Status.RUNNING) {
        System.out.println("Workflow job running ...");
        Thread.sleep(10 * 1000);
    }
.
    // print the final status o the workflow job
    System.out.println("Workflow job completed ...");
    System.out.println(wf.getJobInfo(jobId));
    ...

Source of the documentation, above:
http://archive.cloudera.com/cdh/3/oozie/DG_Examples.html#Java_API_Example


2.0. Exercise

The java program below calls the workflow built in my blog 6 - include java code, workflow related files and sample data.

2.0.1. Sample data and sample workflow


2.0.2. Sample Java program to call workflow

Note: Ensure you replace configuration highlighted in yellow ochre with your cluster specific configuration.

import java.util.Properties;

import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;

public class myOozieWorkflowJavaAPICall {

public static void main(String[] args) {
    OozieClient wc = new OozieClient("http://cdh-dev01:11000/oozie");

    Properties conf = wc.createConfiguration();

    conf.setProperty(OozieClient.APP_PATH, "hdfs://cdh-nn01.hadoop.com:8020/user/airawat/oozieProject/javaApplication/workflow.xml");
    conf.setProperty("jobTracker", "cdh-jt01:8021");
    conf.setProperty("nameNode", "hdfs://cdh-nn01.hadoop.com:8020");
    conf.setProperty("queueName", "default");
    conf.setProperty("airawatOozieRoot", "hdfs://cdh-nn01.hadoop.com:8020/user/airawat/oozieProject/javaApplication");
    conf.setProperty("oozie.libpath", "hdfs://cdh-nn01.hadoop.com:8020/user/oozie/share/lib");
    conf.setProperty("oozie.use.system.libpath", "true");
    conf.setProperty("oozie.wf.rerun.failnodes", "true");

    try {
        String jobId = wc.run(conf);
        System.out.println("Workflow job, " + jobId + " submitted");

        while (wc.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
            System.out.println("Workflow job running ...");
            Thread.sleep(10 * 1000);
        }
        System.out.println("Workflow job completed ...");
System.out.println(wc.getJobInfo(jobId));
    } catch (Exception r) {
        System.out.println("Errors " + r.getLocalizedMessage());
    }
}
}

2.0.3. Program output

Workflow job, 0000081-130613112811513-oozie-oozi-W submitted
Workflow job running ...
Workflow job running ...
Workflow job running ...
Workflow job running ...
Workflow job running ...
Workflow job running ...
Workflow job running ...
Workflow job completed ...
Workflow id[0000081-130613112811513-oozie-oozi-W] status[SUCCEEDED]

2.0.4. Oozie web console

http://YourOozieServer:TypicallyPort11000/oozie


















Monday, June 17, 2013

Apache Oozie - Part 6: Oozie workflow with java main action


What's covered in the blog?

1. Documentation on the Oozie java action
2. A sample workflow that includes oozie java action to process some syslog generated log files.  Instructions on loading sample data and running the workflow are provided, along with some notes based on my learnings.

Versions covered:
Oozie 3.3.0;

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 11b: Oozie Web Service API for interfacing with oozie workflows


Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.



About the Oozie java main action

Excerpt from Apache Oozie documentation...
The java action will execute the public static void main(String[] args) method of the specified main Java class.  Java applications are executed in the Hadoop cluster as map-reduce job with a single Mapper task.  The workflow job will wait until the java application completes its execution before continuing to the next action.  The java action has to be configured with the job-tracker, name-node, main Java class, JVM options and arguments.

To indicate an ok action transition, the main Java class must complete gracefully the main method invocation.  To indicate an error action transition, the main Java class must throw an exception.  The main Java class must not call System.exit(int n) as this will make the java action to do an error transition regardless of the used exit code.

A java action can be configured to perform HDFS files/directories cleanup before starting the Java application. This capability enables Oozie to retry a Java application in the situation of a transient or non-transient failure (This can be used to cleanup any temporary data which may have been created by the Java application in case of failure).

A java action can create a Hadoop configuration. The Hadoop configuration is made available as a local file to the Java application in its running directory, the file name is oozie-action.conf.xml . Similar to map-reduce and pig actions it is possible to refer a job.xml file and using inline configuration properties. For repeated configuration properties later values override earlier ones.

Inline property values can be parameterized (templatized) using EL expressions.

The Hadoop mapred.job.tracker (=job-tracker=) and fs.default.name (=name-node=) properties must not be present in the job-xml and in the inline configuration.

As with map-reduce and pig actions, it is possible to add files and archives to be available to the Java application. Refer to section [#FilesAchives][Adding Files and Archives for the Job].

The capture-output element can be used to propagate values back into Oozie context, which can then be accessed via EL-functions. This needs to be written out as a java properties format file. The filename is obtained via a System property specified by the constant JavaMainMapper.OOZIE_JAVA_MAIN_CAPTURE_OUTPUT_FILE

IMPORTANT: Because the Java application is run from within a Map-Reduce job, from Hadoop 0.20. onwards a queue must be assigned to it. The queue name must be specified as a configuration property.

Syntax:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="[NODE-NAME]">
        <java>
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <job-xml>[JOB-XML]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <main-class>[MAIN-CLASS]</main-class>
<java-opts>[JAVA-STARTUP-OPTS]</java-opts>
<arg>ARGUMENT</arg>
            ...
            <file>[FILE-PATH]</file>
            ...
            <archive>[FILE-PATH]</archive>
            ...
            <capture-output />
        </java>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>
The prepare element, if present, indicates a list of path do delete before starting the Java application. This should be used exclusively for directory cleanup for the Java application to be executed.

The java-opts element, if present, contains the command line parameters which are to be used to start the JVM that will execute the Java application. Using this element is equivalent to use the mapred.child.java.opts configuration property.

The arg elements, if present, contains arguments for the main function. The value of each arg element is considered a single argument and they are passed to the main method in the same order.

All the above elements can be parameterized (templatized) using EL expressions.


Apache Oozie documentation:
http://oozie.apache.org/docs/3.3.0/WorkflowFunctionalSpec.html#a3.2.7_Java_Action


Sample workflow application

Highlights:
For this exercise, I have loaded some syslog generated logs to hdfs and am running a Java map reduce program through Oozie as a java action (not a map-reduce action) to run a report on the logs.

Pictorial representation of the workflow:

Components of workflow application:

















Workflow application:

This gist includes components of a oozie workflow - scripts/code, sample data
and commands; Oozie actions covered: java main action; Oozie controls
covered: start, kill, end; The java program uses regex to parse the logs, and
also extracts pat of the mapper input directory path and includes in the key
emitted.
Usecase
-------
Parse Syslog generated log files to generate reports;
Pictorial overview of job:
--------------------------
http://hadooped.blogspot.com/2013/06/apache-oozie-part-6-oozie-workflow-with.html
Includes:
---------
Sample data and structure: 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Java MR - Mapper code: 04A-MapperJavaCode
Java MR - Reducer code: 04B-ReducerJavaCode
Java MR - Driver code: 04C-DriverJavaCode
Command to test Java MR program: 04D-CommandTestJavaMRProg
Oozie job properties file: 05-OozieJobProperties
Oozie workflow file: 06-OozieWorkflowXML
Oozie commands 07-OozieJobExecutionCommands
Output -Report1 08-OutputOfJavaProgram
Oozie web console - screenshots 09-OozieWebConsoleScreenshots
01a. Sample data
-----------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
01b. Structure
---------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
02a. Data download
-------------------
gitHub:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues.
Directory structure applicable for this blog/gist:
--------------------------------------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
workflowJavaMainAction
workflow.xml
job.properties
lib
LogEventCount.jar
03-Hdfs load commands
----------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
// Source code for Mapper
//-----------------------------------------------------------
// LogEventCountMapper.java
//-----------------------------------------------------------
// Java program that parses logs using regex
// The program counts the number of processes logged by year.
// E.g. Key=2013-ntpd; Value=1;
package Airawat.Oozie.Samples;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class LogEventCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
String strLogEntryPattern = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)";
public static final int NUM_FIELDS = 6;
Text strEvent = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String strLogEntryLine = value.toString();
Pattern objPtrn = Pattern.compile(strLogEntryPattern);
Matcher objPatternMatcher = objPtrn.matcher(strLogEntryLine);
if (!objPatternMatcher.matches() || NUM_FIELDS != objPatternMatcher.groupCount()) {
System.err.println("Bad log entry (or problem with RE?):");
System.err.println(strLogEntryLine);
return;
}
/*
System.out.println("Month_Name: " + objPatternMatcher.group(1));
System.out.println("Day: " + objPatternMatcher.group(2));
System.out.println("Time: " + objPatternMatcher.group(3));
System.out.println("Node: " + objPatternMatcher.group(4));
System.out.println("Process: " + objPatternMatcher.group(5));
System.out.println("LogMessage: " + objPatternMatcher.group(6));
*/
//Oh what a pretty chunk of code ;)
strEvent.set(((FileSplit)context.getInputSplit()).getPath().toString().substring((((FileSplit)context.getInputSplit()).getPath().toString().length()-16), (((FileSplit)context.getInputSplit()).getPath().toString().length()-12)) + "-" + ((objPatternMatcher.group(5).toString().indexOf("[")) == -1 ? (objPatternMatcher.group(5).toString().substring(0,(objPatternMatcher.group(5).length()-1))) : (objPatternMatcher.group(5).toString().substring(0,(objPatternMatcher.group(5).toString().indexOf("["))))));
context.write(strEvent, new IntWritable(1));
}
}
// Source code for reducer
//--------------------------
// LogEventCountReducer.java
//--------------------------
package Airawat.Oozie.Samples;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LogEventCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int intEventCount = 0;
for (IntWritable value : values) {
intEventCount += value.get();
}
context.write(key, new IntWritable(intEventCount));
}
}
// Source code for reducer
//--------------------------
// LogEventCountReducer.java
//--------------------------
package Airawat.Oozie.Samples;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
public class LogEventCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf(
"Usage: Airawat.Oozie.Samples.LogEventCount <input dir> <output dir>\n");
System.exit(-1);
}
//Instantiate a Job object for your job's configuration.
Job job = new Job();
//Job jar file
job.setJarByClass(LogEventCount.class);
//Job name
job.setJobName("Syslog Event Rollup");
//Paths
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//Mapper and reducer classes
job.setMapperClass(LogEventCountMapper.class);
job.setReducerClass(LogEventCountReducer.class);
//Job's output key and value classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//Number of reduce tasks
job.setNumReduceTasks(3);
//Start the MapReduce job, wait for it to finish.
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
Commands to test the java program
---------------------------------
a) Command to run the program
$ hadoop jar oozieProject/workflowJavaMainAction/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount "oozieProject/data/*/*/*/*/*" "oozieProject/workflowJavaMainAction/myCLIOutput"
b) Command to view results
$ hadoop fs -cat oozieProject/workflowJavaMainAction/myCLIOutput/part* | sort
c) Results
2013-NetworkManager 7
22013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
#*****************************
# job.properties
#*****************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowJavaMainAction
oozie.wf.application.path=${appPath}
inputDir=${oozieProjectRoot}/data/*/*/*/*/*
outputDir=${appPath}/output
<!--*************************************************-->
<!--*******06-workflow.xml***************************-->
<!--*************************************************-->
<workflow-app name="WorkflowJavaMainAction" xmlns="uri:oozie:workflow:0.1">
<start to="javaMainAction"/>
<action name="javaMainAction">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>Airawat.Oozie.Samples.LogEventCount</main-class>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to="end"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
07. Oozie commands
-------------------
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowJavaMainAction/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowJavaMainAction/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
08-Workflow application output
-------------------------------
$ hadoop fs -ls -R oozieProject/workflowJavaMainAction/output/part* | awk '{print $8}'
oozieProject/workflowJavaMainAction/output/part-r-00000
$ hadoop fs -cat oozieProject/workflowJavaMainAction/output/part-r-00000
2013-NetworkManager 7
2013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
09 - Oozie web console - screenshots
-------------------------------------
Available at-
http://hadooped.blogspot.com/2013/06/apache-oozie-part-6-oozie-workflow-with.html



Oozie web console - screenshots:


















References


Map reduce cookbook
https://cwiki.apache.org/OOZIE/map-reduce-cookbook.html

How to use a sharelib in Oozie
http://blog.cloudera.com/blog/2012/12/how-to-use-the-sharelib-in-apache-oozie/

Everything-you-wanted-to-know-but-were-afraid-to-ask-about-oozie
http://www.slideshare.net/ChicagoHUG/everything-you-wanted-to-know-but-were-afraid-to-ask-about-oozie

Oozie workflow use cases
https://github.com/yahoo/oozie/wiki/Oozie-WF-use-cases 





Apache Oozie -Part 4: Oozie workflow with java mapreduce action


What's covered in the blog?

1. Documentation on the Oozie map reduce action
2. A sample workflow that includes oozie map-reduce action to process some syslog generated log files.  Instructions on loading sample data and running the workflow are provided, along with some notes based on my learnings.

Versions covered:
Oozie 3.3.0; Map reduce new API

Related blogs:

Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another
Blog 13: Oozie workflow - SSH action

Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.

About the Oozie MapReduce action
Excerpt from Apache Oozie documentation...

The map-reduce action starts a Hadoop map/reduce job from a workflow. Hadoop jobs can be Java Map/Reduce jobs or streaming jobs.

A map-reduce action can be configured to perform file system cleanup and directory creation before starting the map reduce job. This capability enables Oozie to retry a Hadoop job in the situation of a transient failure (Hadoop checks the non-existence of the job output directory and then creates it when the Hadoop job is starting, thus a retry without cleanup of the job output directory would fail).

The workflow job will wait until the Hadoop map/reduce job completes before continuing to the next action in the workflow execution path.

The counters of the Hadoop job and job exit status (=FAILED=, KILLED or SUCCEEDED ) must be available to the workflow job after the Hadoop jobs ends. This information can be used from within decision nodes and other actions configurations.

The map-reduce action has to be configured with all the necessary Hadoop JobConf properties to run the Hadoop map/reduce job.

Hadoop JobConf properties can be specified in a JobConf XML file bundled with the workflow application or they can be indicated inline in the map-reduce action configuration.

The configuration properties are loaded in the following order, streaming , job-xml and configuration , and later values override earlier values.

Streaming and inline property values can be parameterized (templatized) using EL expressions.

The Hadoop mapred.job.tracker and fs.default.name properties must not be present in the job-xml and inline configuration.


Apache Oozie documentation:
http://oozie.apache.org/docs/3.3.0/WorkflowFunctionalSpec.html#a3.2.2_Map-Reduce_Action


Components of a workflow with java map reduce action:



Sample workflow

Highlights

The sample workflow application runs a java map reduce program that parses log files (syslog generated) in HDFS and generates a report on the same.

The following is a pictorial representation of the workflow.


Workflow application details

This gist includes components of a oozie workflow - scripts/code, sample data
and commands; Oozie actions covered: java mapreduce action; Oozie controls
covered: start, kill, end; The java program uses regex to parse the logs, and
also extracts the path of the mapper input directory path and includes in the
key emitted.
Note: The reducer can be specified as a combiner as well.
Usecase
-------
Parse Syslog generated log files to generate reports;
Pictorial overview of job:
--------------------------
http://hadooped.blogspot.com/2013/06/apache-oozie-part-4-oozie-workflow-with.html
Includes:
---------
Sample data and structure: 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Java MR - Mapper code: 04A-MapperJavaCode
Java MR - Reducer code: 04B-ReducerJavaCode
Java MR - Driver code: 04C-DriverJavaCode
Command to test Java MR program: 04D-CommandTestJavaMRProg
Oozie job properties file: 05-OozieJobProperties
Oozie workflow file: 06-OozieWorkflowXML
Oozie commands 07-OozieJobExecutionCommands
Output -Report1 08-OutputOfJavaProgram
Oozie web console - screenshots 09-OozieWebConsoleScreenshots
01a. Sample data
-----------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
01b. Structure
---------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
02a. Data download
-------------------
gitHub:
https://github.com/airawat/OozieSamples
Directory structure
-------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
workflowJavaMapReduceAction
workflow.xml
job.properties
lib
LogEventCount.jar
03-Hdfs load commands
----------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
// Source code for Mapper
//-----------------------------------------------------------
// LogEventCountMapper.java
//-----------------------------------------------------------
// Java program that parses logs using regex
// The program counts the number of processes logged by year.
// E.g. Key=2013-ntpd; Value=1;
package airawat.oozie.samples;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class LogEventCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
String strLogEntryPattern = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)";
static final int NUM_FIELDS = 6;
Text strEvent = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String strLogEntryLine = value.toString();
Pattern objPtrn = Pattern.compile(strLogEntryPattern);
Matcher objPatternMatcher = objPtrn.matcher(strLogEntryLine);
if (!objPatternMatcher.matches() || NUM_FIELDS != objPatternMatcher.groupCount()) {
System.err.println("Bad log entry (or problem with RE?):");
System.err.println(strLogEntryLine);
return;
}
/*
System.out.println("Month_Name: " + objPatternMatcher.group(1));
System.out.println("Day: " + objPatternMatcher.group(2));
System.out.println("Time: " + objPatternMatcher.group(3));
System.out.println("Node: " + objPatternMatcher.group(4));
System.out.println("Process: " + objPatternMatcher.group(5));
System.out.println("LogMessage: " + objPatternMatcher.group(6));
*/
//TODO: Move the derivation of the filename (below) into a instance variable from
//the file split to the setup() method of mapper - so it is a one-time task
//- instead of the inefficient code below.
strEvent.set(((FileSplit)context.getInputSplit()).getPath().toString().substring((((FileSplit)context.getInputSplit()).getPath().toString().length()-16), (((FileSplit)context.getInputSplit()).getPath().toString().length()-12)) + "-" + ((objPatternMatcher.group(5).toString().indexOf("[")) == -1 ? (objPatternMatcher.group(5).toString().substring(0,(objPatternMatcher.group(5).length()-1))) : (objPatternMatcher.group(5).toString().substring(0,(objPatternMatcher.group(5).toString().indexOf("["))))));
context.write(strEvent, new IntWritable(1));
}
}
// Source code for reducer
//--------------------------
// LogEventCountReducer.java
//--------------------------
package airawat.oozie.samples;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LogEventCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int intEventCount = 0;
for (IntWritable value : values) {
intEventCount += value.get();
}
context.write(key, new IntWritable(intEventCount));
}
}
// Source code for driver
//--------------------------
// LogEventCount.java
//--------------------------
package airawat.oozie.samples;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
public class LogEventCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf(
"Usage: Airawat.Oozie.Samples.LogEventCount <input dir> <output dir>\n");
System.exit(-1);
}
//Instantiate a Job object for your job's configuration.
Job job = new Job();
//Job jar file
job.setJarByClass(LogEventCount.class);
//Job name
job.setJobName("Syslog Event Rollup");
//Paths
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//Mapper and reducer classes
job.setMapperClass(LogEventCountMapper.class);
job.setReducerClass(LogEventCountReducer.class);
//Job's output key and value classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//Number of reduce tasks
job.setNumReduceTasks(3);
//Start the MapReduce job, wait for it to finish.
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
Commands to test the java program in isolation
-----------------------------------------------
a) Command to run the program
$ hadoop jar oozieProject/workflowJavaMapReduceAction/lib/LogEventCount.jar airawat.oozie.samples.LogEventCount "oozieProject/data/*/*/*/*/*" "oozieProject/workflowJavaMapReduceAction/myCLIOutput"
b) Command to view results
$ hadoop fs -cat oozieProject/workflowJavaMapReduceAction/myCLIOutput/part* | sort
c) Results
2013-NetworkManager 7
22013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
#*****************************
# job.properties
#*****************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowJavaMapReduceAction
oozie.wf.application.path=${appPath}
inputDir=${oozieProjectRoot}/data/*/*/*/*/*
outputDir=${appPath}/output
<!--*************************************************-->
<!--*******06-workflow.xml***************************-->
<!--*************************************************-->
<workflow-app name="WorkFlowJavaMapReduceAction" xmlns="uri:oozie:workflow:0.1">
<start to="mapReduceAction"/>
<action name="mapReduceAction">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>airawat.oozie.samples.LogEventCountMapper</value>
</property>
<property>
<name>mapreduce.reduce.class</name>
<value>airawat.oozie.samples.LogEventCountReducer</value>
</property>
<property>
<name>mapred.mapoutput.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.mapoutput.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${outputDir}</value>
</property>
<property>
<name>mapreduce.job.acl-view-job</name>
<value>*</value>
</property>
<property>
<name>oozie.launcher.mapreduce.job.acl-view-job</name>
<value>*</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>false</value>
</property>
<property>
<name>oozie.libpath</name>
<value>${appPath}/lib</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="killJob"/>
</action>
<kill name="killJob">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
07. Oozie commands
-------------------
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowJavaMapReduceAction/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowJavaMapReduceAction/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
08-Workflow application output
-------------------------------
$ hadoop fs -ls -R oozieProject/workflowJavaMapReduceAction/output/part* | awk '{print $8}'
oozieProject/workflowJavaMapReduceAction/output/part-r-00000
oozieProject/workflowJavaMapReduceAction/output/part-r-00001
$ hadoop fs -cat oozieProject/workflowJavaMapReduceAction/output/part*
2013-init 166
2013-polkit-agent-helper-1 8
2013-spice-vdagent 15
2013-sshd 6
2013-udevd 6
2013-NetworkManager 7
2013-console-kit-daemon 7
2013-gnome-session 11
2013-kernel 810
2013-login 2
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-pulseaudio 18
2013-sudo 8
09 - Oozie web console - screenshots
-------------------------------------
Available at-
http://hadooped.blogspot.com/2013/06/apache-oozie-part-4-oozie-workflow-with.html

Oozie web GUI - screenshots

http://YourOozieServer:TypicallyPort11000/oozie/






Do share, if you have any additional insights that can be addd to the blog.

References


Map reduce cookbook
https://cwiki.apache.org/OOZIE/map-reduce-cookbook.html

How to use a sharelib in Oozie
http://blog.cloudera.com/blog/2012/12/how-to-use-the-sharelib-in-apache-oozie/

Everything-you-wanted-to-know-but-were-afraid-to-ask-about-oozie

http://www.slideshare.net/ChicagoHUG/everything-you-wanted-to-know-but-were-afraid-to-ask-about-oozie

Oozie workflow use cases

https://github.com/yahoo/oozie/wiki/Oozie-WF-use-cases 






Apache Oozie - Part 3: Workflow with sqoop action (hive to mysql)

What's covered in the blog?

I have covered this topic in my blog-
Apache Sqoop - Part 5: Scheduling Sqoop jobs in Oozie
[Versions: Oozie 3.3.0, Sqoop (1.4.2) with Mysql (5.1.69)]

It includes:
1. Documentation on the Oozie sqoop action
2. A sample workflow (against syslog generated logs) that includes oozie sqoop action (export from hive to mysql).  Instructions on loading sample data and running the workflow are provided, along with some notes based on my learnings.


Related blogs

Oozie:

Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another

Sqoop:

Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.




Friday, June 14, 2013

Apache Sqoop - Part 5: Scheduling Sqoop jobs in Oozie

What's covered in the blog?

1. Documentation on the Oozie sqoop action
2. A sample workflow (against syslog generated logs) that includes oozie sqoop action (export from hive to mysql).  Instructions on loading sample data and running the workflow are provided, along with some notes based on my learnings.

For scheduling an Oozie worklflow containing a Sqoop action to be event driven - time or data availability driven, read my blog on Oozie coordinator jobs.

Versions covered:
Oozie 3.3.0; Sqoop (1.4.2) with Mysql (5.1.69 ) 

My blogs on Sqoop:

Blog 1: Import from mysql into HDFS
Blog 2: Import from mysql into Hive
Blog 3: Export from HDFS and Hive into mysql
Blog 4: Sqoop best practices
Blog 5: Scheduling of Sqoop tasks using Oozie
Blog 6: Sqoop2

My blogs on Oozie:

Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 11b: Oozie Web Service API for interfacing with oozie workflows

Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.


About the oozie sqoop action

Apache Oozie documentation on Sqoop action:
http://archive.cloudera.com/cdh/3/oozie/DG_SqoopActionExtension.html

Salient features of the sqoop action:
Excerpt from Apache documentation..
- The sqoop action runs a Sqoop job synchronously.- The information to be included in the oozie sqoop action  are the job-tracker, the name-node and Sqoop command or arg elements as well as configuration.- A prepare node can be included to do any prep work including hdfs actions.  This will be executed prior to execution of the sqoop job.- Sqoop configuration can be specified with a file, using the job-xml element, and inline, using the configuration elements.- Oozie EL expressions can be used in the inline configuration. Property values specified in the configuration element override values specified in the job-xml file.
Note that Hadoop mapred.job.tracker and fs.default.name properties must not be present in the inline configuration.
As with Hadoop map-reduce jobs, it is possible to add files and archives in order to make them available to the Sqoop job. 


Sqoop command:
The Sqoop command can be specified either using the command element or multiple arg elements.
- When using the command element, Oozie will split the command on every space into multiple arguments.- When using the arg elements, Oozie will pass each argument value as an argument to Sqoop.  The arg variant should be used when there are spaces within a single argument.  - All the above elements can be parameterized (templatized) using EL expressions.


Components of a workflow with sqoop action:
















Sample application

Highlights:
For this exercise, I have loaded some syslog generated logs to hdfs and created a hive table.
I have also created a table in mysql that will be the destination of a report (hive query) we will run 


Pictorial representation of the workflow:

Sample program:

This gist includes components of a simple workflow application (oozie 3.3.0) that
pipes data in a Hive table to mysql;
The sample application includes:
--------------------------------
1. Oozie actions: sqoop action
2. Oozie workflow controls: start, end, and kill.
3. Workflow components: job.properties and workflow.xml
4. Sample data
5. Prep tasks in Hive
6. Commands to deploy workflow, submit and run workflow
7. Oozie web console - screenshots from sample program execution
Pictorial overview of workflow:
-------------------------------
Available at:
http://hadooped.blogspot.com/2013/06/apache-sqoop-part-5-scheduling-sqoop.html
Includes:
---------
01-WorkflowComponents
02a-DataAndCodeDownload
02b-DataSampleAndStructure
03-HdfsLoadCommands
04a-HiveSetupTasks
04b-HiveReport-SourceForSqoop
05-mySqlSetUp
06-SqoopTaskStandAloneTest
07-JobPropertiesFile
08-WorkflowXMLFile
09-Oozie commands
10-OutputInMysql
11-Oozie web console screenshots
01. Workflow Components:
------------------------
1. job.properties
File containing:
a) parameter and value declarations that are referenced in the workflows, and
b) environment information referenced by Oozie to run the workflow including name node, job tracker, workflow application path etc
2. workflow.xml
Workflow definition file
02a.1. Download location:
-------------------------
Github-
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com to contact me if you have access issues.
02a.2. Directory structure applicable for this gist:
----------------------------------------------------
oozieProject
data
airawat-syslog
<<node>>
<<year>>
<<month>>
messages
workflowSqoopAction
job.prperties
workflow.xml
hive-site.xml
02b.1. Sample data
--------------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
02b.2. Structure
-----------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
03. Hdfs commands
-------------------
1) Load the data and workflow application to hadoop
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
2) Validate load
$ hadoop fs -ls -R oozieProject |awk '{print $8}'
..should match directory listing in section 2, above.
************************
**04a. Hive setup tasks
************************
a) Create table:
hive>
CREATE EXTERNAL TABLE SysLogEvents(
month_name STRING,
day STRING,
time STRING,
host STRING,
event STRING,
log STRING)
PARTITIONED BY(node string,year int, month int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)"
)
stored as textfile;
b) Create and load partitions:
Note: Replace my user ID "akhanolk" with yours
hive >
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dev01",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dev01/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dev01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dev01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn02",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn02/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn02",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn02/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn03",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn03/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn03",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn03/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-jt01",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-jt01/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-jt01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-jt01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-nn01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-nn01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-vms",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-vms/2013/05/';
c) Hive ql to test data loaded:
hive>
--Print headers
set hive.cli.print.header=true;
--Need to add this jar for MR to work..your env may not need it
add jar hadoop-lib/hive-contrib-0.10.0-cdh4.2.0.jar;
--Sample query
hive> select * from SysLogEvents limit 2;
OK
month_name day time host event log node year month
Apr 23 16:14:10 cdh-dev01 spice-vdagent[5657]: Missing virtio device '/dev/virtio-ports/com.redhat.spice.0': No such file or directory cdh-dev01 2013 04
Apr 23 16:14:12 cdh-dev01 pulseaudio[5705]: pid.c: Daemon already running. cdh-dev01 2013 04
Time taken: 13.241
*********************************
**04b-HiveReport-SourceForSqoop
*********************************
--Run the following in Hive; It creates and populates the source table for the sqoop action;
use default;
drop table if exists eventsgranularreport;
CREATE TABLE IF NOT EXISTS eventsgranularreport(
year int,
month int,
day int,
event STRING,
occurrence int)
ROW FORMAT DELIMITED
FIELDS TERMINATED by ','
LINES TERMINATED by '\n';
INSERT OVERWRITE TABLE eventsgranularreport
select Year,Month,Day,Event,Count(*) Occurrence from SysLogEvents group by year,month,day,event order by event asc,year,month,day desc;
****************************
**Prep tasks - mysql
****************************
mysql> create database airawat;
mysql> use airawat;
mysql> CREATE TABLE IF NOT EXISTS eventsgranularreport(
year INTEGER,
month INTEGER,
day INTEGER,
event VARCHAR(100),
occurrence INTEGER);
mysql> create user 'devUser'@'machineName' identified by 'myPwd';
GRANT ALL PRIVILEGES ON airawat.* TO 'devUser'@'machineName' WITH GRANT OPTION;
view raw 05-mySqlSetUp hosted with ❤ by GitHub
**************************************
**Sqoop command - standlone test
**************************************
a) Command run on Sqoop client:
sqoop export \
--connect jdbc:mysql://cdh-dev01/airawat \
--username devUser \
--password myPwd \
--table eventsgranularreport \
--direct \
--enclosed-by '\"' \
--export-dir /user/hive/warehouse/eventsgranularreport
b) Output in mysql:
mysql>
select * from eventsgranularreport;
mysql> select * from eventsgranularreport;
+------+-------+------+------------------------------+------------+
| year | month | day | event | occurrence |
+------+-------+------+------------------------------+------------+
| 2013 | 5 | NULL | NULL | 25 |
| 2013 | 5 | 3 | NetworkManager[1232]: | 1 |
| 2013 | 5 | 7 | NetworkManager[1243]: | 1 |
| 2013 | 5 | 7 | NetworkManager[1284]: | 1 |
.....
| 2013 | 5 | 7 | pulseaudio[2074]: | 1 |
| 2013 | 5 | 7 | pulseaudio[2076]: | 1 |
| 2013 | 5 | 7 | pulseaudio[2106]: | 1 |
| 2013 | 5 | 7 | pulseaudio[2116]: | 1 |
| 2013 | 5 | 7 | pulseaudio[2135]: | 1 |
+------+-------+------+------------------------------+------------+
104 rows in set (0.08 sec)
mysql> delete from eventsgranularreport;
Query OK, 104 rows affected (0.02 sec)
#*****************************
# 07: job.properties
#*****************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowSqoopAction
oozie.wf.application.path=${appPath}
emailToAddress=akhanolk@cdh-dev01
#**********end****************************
- Note: The way the name node and job tracker information is specified in job.properties should match the oozie-site.xml properties for oozie.service.HadoopAccessorService.nameNode.whitelist and oozie.service.HadoopAccessorService.jobTracker.whitelist respectively.
Read up on oozie share lib and install if you dont already have it.
http://blog.cloudera.com/blog/2012/12/how-to-use-the-sharelib-in-apache-oozie/
<!--******************************************-->
<!--08. workflow.xml -->
<!--******************************************-->
<workflow-app name="WorkflowWithSqoopAction" xmlns="uri:oozie:workflow:0.1">
<start to="sqoopAction"/>
<action name="sqoopAction">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<command>export --connect jdbc:mysql://cdh-dev01/airawat --username devUser --password myPwd --table eventsgranularreport --direct --enclosed-by " --export-dir /user/hive/warehouse/eventsgranularreport</command>
</sqoop>
<ok to="end"/>
<error to="killJob"/>
</action>
<kill name="killJob">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
09. Oozie commands
-------------------
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowSqoopAction/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000012-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000012-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000012-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000012-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowSqoopAction/job.properties -rerun 0000012-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000012-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000012-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
*******************
***Output in mysql
*******************
mysql>
select * from eventsgranularreport;
mysql> select * from eventsgranularreport;
+------+-------+------+------------------------------+------------+
| year | month | day | event | occurrence |
+------+-------+------+------------------------------+------------+
| 2013 | 5 | NULL | NULL | 25 |
| 2013 | 5 | 3 | NetworkManager[1232]: | 1 |
| 2013 | 5 | 7 | NetworkManager[1243]: | 1 |
| 2013 | 5 | 7 | NetworkManager[1284]: | 1 |
.....
| 2013 | 5 | 7 | pulseaudio[2074]: | 1 |
| 2013 | 5 | 7 | pulseaudio[2076]: | 1 |
| 2013 | 5 | 7 | pulseaudio[2106]: | 1 |
| 2013 | 5 | 7 | pulseaudio[2116]: | 1 |
| 2013 | 5 | 7 | pulseaudio[2135]: | 1 |
+------+-------+------+------------------------------+------------+
104 rows in set (0.08 sec)
11. Available at:
-----------------
http://hadooped.blogspot.com/2013/06/apache-sqoop-part-5-scheduling-sqoop.html

Oozie web console:

Screenshots..

Monday, June 10, 2013

Apache Oozie - Part 2: Workflow - hive action

What's covered in the blog?

1. Documentation on the Oozie hive action
2. A sample workflow that includes fs action, email action, and hive action (query against some syslog generated log files).  

Version: 
Oozie 3.3.0

My other blogs on Oozie:

Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another
Blog 13: Oozie workflow - SSH action


Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.

About the Hive action

http://archive.cloudera.com/cdh4/cdh/4/oozie/DG_HiveActionExtension.html

Salient features of the hive action:

- Runs the specified hive job synchronously (the workflow job will wait until the Hive job completes before continuing to the next action).
- Can be configured to create or delete HDFS directories before starting the Hive job.
- Supports Hive scripts with parameter variables, their syntax is ${VARIABLES} .
- Hive configuration needs to be specified as part of the job submission
Oozie EL expressions can be used in the inline configuration. Property values specified in the configuration element override values specified in the job-xml file.
Note that Hadoop mapred.job.tracker and fs.default.name properties must not be present in the inline configuration.
As with Hadoop map-reduce jobs, it is possible to add files and archives in order to make them available to the Hive job. 

Components of a workflow with hive action:

For a workflow with (just a) hive action, the following are required:
1.  workflow.xml
2.  job.properties
3.  Any files, archives, jars you want to add
4.  hive-site.xml
5.  Hive query scripts

Refer sample program below.

Sample program

Highlights:

The workflow application runs a report on data in Hive.  The input is log data (Syslog generated) in Hive, output is a table containing the report results in Hive. 

Pictorial overview of application:
















Application:

This gist includes components of a simple workflow application (oozie 3.3.0) that generates
a report off of data in a Hive table; Emails are sent out to notify designated users of
success/failure of workflow. The data is syslog generated log files; The Hive table row
format serde is regex serde.
The sample application includes:
--------------------------------
1. Oozie actions: hive action and email action
2. Oozie workflow controls: start, end, and kill.
3. Workflow components: job.properties and workflow.xml
4. Sample data
5. Commands to deploy workflow, submit and run workflow
6. Oozie web console - screenshots from sample program execution
Pictorial overview of workflow:
-------------------------------
Available at:
http://hadooped.blogspot.com/2013/06/apache-oozie-part-2-workflow-hive-action.html
01. Workflow Components:
------------------------
1. job.properties
File containing:
a) parameter and value declarations that are referenced in the workflows, and
b) environment information referenced by Oozie to run the workflow including name node, job tracker, workflow application path etc
2. workflow.xml
Workflow definition file
3. hive-site.xml
Hive configuration file
4. Report query file in hive query language
02a.1. Download location:
-----------------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com to contact me if you have access issues.
02a.2. Directory structure applicable for this gist/blog/post:
---------------------------------------------------------------
oozieProject
data
airawat-syslog
<<node>>
<<year>>
<<month>>
messages
workflowHiveAction
job.prperties
workflow.xml
hive-site.xml
runHiveLogReport.hql
02b.1. Sample data
-------------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
02b.2. Structure
-----------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
03. Oozie SMTP configuration
----------------------------
Add the following to the oozie-site.xml, and restart oozie.
Replace values with the same specific to your environment.
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property>
#*****************************
# 04. job.properties
#*****************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowHiveAction
oozie.wf.application.path=${appPath}
emailToAddress=akhanolk@cdh-dev01
#*******End************************
Note: -The line - "oozie.wf.rerun.failnodes=true" is needed if you want to re-run; There is another config we can use instead as well that specifies which failed nodes to skip. Review Apache Oozie documentation for the same.
<!--******************************************-->
<!--05. workflow.xml -->
<!--******************************************-->
<workflow-app name="WorkflowWithHiveAction" xmlns="uri:oozie:workflow:0.1">
<start to="hiveAction"/>
<action name="hiveAction">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>${appPath}/hive-site.xml</job-xml>
<script>${appPath}/runHiveLogReport.hql</script>
</hive>
<ok to="sendEmailSuccess"/>
<error to="sendEmailKillHive"/>
</action>
<action name="sendEmailSuccess">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:id()} completed successfully</body>
</email>
<ok to="end"/>
<error to="end"/>
</action>
<action name="sendEmailKillHive">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:id()} had issues with the hive action and was killed. The error logged is: ${wf:errorMessage(wf:lastErrorNode());}</body>
</email>
<ok to="killJobAction"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error in FS Action"</message>
</kill>
<end name="end" />
</workflow-app>
06a. Hdfs commands
-------------------
1) Load the data and workflow application to hadoop
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
2) Validate load
$ hadoop fs -ls -R oozieProject |awk '{print $8}'
..should match directory listing in section 2, above.
************************
**06b. Hive setup tasks
************************
a) Create table:
hive>
CREATE EXTERNAL TABLE SysLogEvents(
month_name STRING,
day STRING,
time STRING,
host STRING,
event STRING,
log STRING)
PARTITIONED BY(node string,year int, month int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)"
)
stored as textfile;
b) Create and load partitions:
Note: Replace my user ID "akhanolk" with yours
hive >
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dev01",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dev01/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dev01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dev01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn02",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn02/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn02",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn02/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn03",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn03/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-dn03",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-dn03/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-jt01",year=2013, month=04)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-jt01/2013/04/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-jt01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-jt01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-nn01",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-nn01/2013/05/';
Alter table SysLogEvents Add IF NOT EXISTS partition(node="cdh-vms",year=2013, month=05)
location '/user/akhanolk/oozieProject/data/airawat-syslog/cdh-vms/2013/05/';
c) Hive ql to test data loaded:
hive>
--Print headers
set hive.cli.print.header=true;
--Need to add this jar for MR to work..your env may not need it
add jar hadoop-lib/hive-contrib-0.10.0-cdh4.2.0.jar;
--Sample query
hive> select * from SysLogEvents limit 2;
OK
month_name day time host event log node year month
Apr 23 16:14:10 cdh-dev01 spice-vdagent[5657]: Missing virtio device '/dev/virtio-ports/com.redhat.spice.0': No such file or directory cdh-dev01 2013 04
Apr 23 16:14:12 cdh-dev01 pulseaudio[5705]: pid.c: Daemon already running. cdh-dev01 2013 04
Time taken: 13.241 seconds
************************************************
**06c. Hive QL - runHiveLogReport.hql
************************************************
use default;
drop table if exists eventsgranularreport;
CREATE TABLE IF NOT EXISTS eventsgranularreport(
year int,
month int,
day int,
event STRING,
occurrence int)
ROW FORMAT DELIMITED
FIELDS TERMINATED by ','
LINES TERMINATED by '\n';
INSERT OVERWRITE TABLE eventsgranularreport
select Year,Month,Day,Event,Count(*) Occurrence from SysLogEvents group by year,month,day,event order by event asc,year,month,day desc;
07. Oozie commands
-------------------
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowHiveAction/job.properties -submit
job: 0000007-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000007-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000007-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000007-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000007-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowHiveAction/job.properties -rerun 0000007-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000007-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000007-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
08. Output
-----------
hive> select * from eventsgranularreport;
OK
year month day event occurrence
2013 5 NULL NULL 25
2013 5 3 NetworkManager[1232]: 1
2013 5 7 NetworkManager[1243]: 1
2013 5 7 NetworkManager[1284]: 1
2013 5 7 NetworkManager[1292]: 1
2013 5 7 NetworkManager[1300]: 1
2013 5 3 NetworkManager[1342]: 1
2013 5 7 NetworkManager[1422]: 1
2013 5 3 console-kit-daemon[1580]: 1
2013 5 3 console-kit-daemon[1779]: 4
2013 5 7 console-kit-daemon[1861]: 1
2013 5 7 console-kit-daemon[1964]: 1
2013 5 7 gnome-session[1902]: 1
2013 5 7 gnome-session[1980]: 3
2013 5 7 gnome-session[2009]: 3
2013 5 7 gnome-session[2010]: 3
2013 5 7 gnome-session[2033]: 1
2013 5 7 init: 104
2013 5 6 init: 12
2013 5 3 init: 50
2013 5 7 kernel: 653
2013 5 6
...
....
.....
09-Sample email
---------------
From akhanolk@cdh-dev01.localdomain Mon Jul 15 00:34:39 2013
Return-Path: <akhanolk@cdh-dev01.localdomain>
X-Original-To: akhanolk@cdh-dev01
Delivered-To: akhanolk@cdh-dev01.localdomain
From: akhanolk@cdh-dev01.localdomain
To: akhanolk@cdh-dev01.localdomain
Subject: Status of workflow 0000011-130712212133144-oozie-oozi-W
Content-Type: text/plain; charset=us-ascii
Date: Mon, 15 Jul 2013 00:34:39 -0500 (CDT)
Status: R
The workflow 0000011-130712212133144-oozie-oozi-W completed successfully
10. Available at:
-----------------
http://hadooped.blogspot.com/2013/06/apache-oozie-part-2-workflow-hive-action.html

Oozie web console:

Screenshots of application execution:




Sunday, June 9, 2013

Apache Oozie - Part 1: Workflow with hdfs and email actions

What's covered in this blog?

Apache Oozie documentation (version 3.3.0) on - workflow, hdfs action, email action, and a sample application that moves files in hdfs (move and delete operations), and sends emails notifying status of workflow execution.  Sample data, commands, and output is also detailed.

My other blogs on Oozie:

Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.

1.0. About Apache Oozie

1.0.1. What is Apache Oozie?  

It is an extensible, scalable, and data-aware service to orchestrate Hadoop jobs, manage job dependencies, and execute jobs based on event triggers such as time and data availability.  

There are three types of jobs in Oozie:
1.  Oozie workflow jobs
 DAGS of actions which are jobs such as shell scripts, MapReduce, Sqoop, Streaming, Pipes, Pig, Hive etc.
2.  Oozie coordinator jobs 
Invoke Oozie workflow jobs based on specified event triggers - date/time, data availability.
3.  Oozie bundle jobs 
Related oozie coordinator jobs managed as a single job  

- An Oozie bundle job can have one to many coordinator jobs
- An Oozie coordinator job can have one to many workflow jobs
- An Oozie workflow can have one to many actions
- An Oozie workflow can have zero to many sub-workflows
   

1.0.2. Glossary of Oozie terminology

(From Apache Oozie documentation)
Action
An execution/computation task (Map-Reduce job, Pig job, a shell command). It can also be referred as task or 'action node'.
Workflow 
A collection of actions arranged in a control dependency DAG (Direct Acyclic Graph). "control dependency" from one action to another means that the second action can't run until the first action has completed.
Workflow Definition 
A programmatic description of a workflow that can be executed.
Workflow Definition Language
The language used to define a Workflow Definition.
Workflow Job 
An executable instance of a workflow definition.
Workflow Engine
A system that executes workflows jobs. It can also be referred as a DAG engine.


1.0.3. Oozie Architecture

Oozie is a Java Web-Application that runs in a Java servlet-container (Tomcat) and uses a database to store:
1.  Definitions of Oozie jobs - workflow/coordinator/bundle
2.  Currently running workflow instances, including instance states and variables

Oozie works with HSQL, Derby, MySQL, Oracle or PostgreSQL databases.  By default, Oozie is configured to use Embedded Derby.  Oozie bundles the JDBC drivers for HSQL, Embedded Derby and PostgreSQL.

For information about the different kinds of configuration such as User authentication, logging etc, refer:
http://oozie.apache.org/docs/3.3.0/AG_Install.html#Oozie_Configuration  


This diagram is from a Yahoo deck on Oozie..


2.0. Oozie Workflow

2.0.1 What is an Oozie workflow?

An Oozie workflow is a DAG of hadoop computation/processing tasks (referred to as "actions") and flow "controls" to coordinate the tasks and manage dependencies of actions and their results.  
2.01.1. Actions:
Oozie workflow actions start jobs on remote nodes, and upon completion of the same, the processes executing the jobs callback Oozie and notify completion in response to which Oozie will start the next action.  Actions can be hadoop fs , ssh, map reduce, hive, pig, sqoop, distcp, http, email commands or custom actions.   

2.0.1.2. Controls:
Controls manage the execution path of actions and include start, fork, join, decision and end.

2.0.1.3. Parameterizing actions and decisions:
Actions and decisions can be parameterized with job properties, actions output (i.e. Hadoop counters) and file information (file exists, file size, etc). Formal parameters are expressed in the workflow definition as ${VAR} variables.

2.0.1.4. Workflow application:
A workflow application is an instance of a workflow, and is essentially a zip file containing everything needed to execute the actions within the workflows -  the workflow definition (an XML file), JARs for Map/Reduce jobs, shells for streaming Map/Reduce jobs, native libraries, Hive/Pig/Sqoop scripts, files for distributed cache and other resource files.

2.0.1.5. Workflow definition:
A workflow definition is a DAG with control flow nodes and action nodes expressed in the XML based workflow definition language called hPDL (Hadoop Process Definition Language).

2.0.1.6. Workflow nodes:
Nodes encompassing actions in hPDL, are called action nodes, and nodes encompassing controls are called control flow nodes and together are referred to as workflow nodes.


2.0.2. Oozie control flow functionality

[Straight from Apache Oozie documentation]

2.0.2.1. Start control node
The start node is the entry point for a workflow job, it indicates the first workflow node the workflow job must transition to.
When a workflow is started, it automatically transitions to the node specified in the start .
A workflow definition must have one start node.

2.0.2.2. End control node
The end node is the end for a workflow job, it indicates that the workflow job has completed successfully.  When a workflow job reaches the end it finishes successfully (SUCCEEDED).  If one or more actions started by the workflow job are executing when the end node is reached, the actions will be killed. In this scenario the workflow job is still considered as successfully run.  A workflow definition must have one end node.

2.0.2.3. Kill control node
The kill node allows a workflow job to kill itself.  When a workflow job reaches the kill it finishes in error (KILLED).  If one or more actions started by the workflow job are executing when the kill node is reached, the actions will be killed.  A workflow definition may have zero or more kill nodes.

2.0.2.4. Decision node
A decision node enables a workflow to make a selection on the execution path to follow.  The behavior of a decision node can be seen as a switch-case statement.
A decision node consists of a list of predicates-transition pairs plus a default transition. Predicates are evaluated in order or appearance until one of them evaluates to true and the corresponding transition is taken. If none of the predicates evaluates to true the default transition is taken.  Predicates are JSP Expression Language (EL) expressions that resolve into a boolean value, true or false.  The default element in the decision node indicates the transition to take if none of the predicates evaluates to true.  All decision nodes must have a default element to avoid bringing the workflow into an error state if none of the predicates evaluates to true.

2.0.2.5. Fork/join control nodes
A fork node splits one path of execution into multiple concurrent paths of execution.  A join node waits until every concurrent execution path of a previous fork node arrives to it.  The fork and join nodes must be used in pairs. The join node assumes concurrent execution paths are children of the same fork node.


3.0. Oozie actions

Only two action types are covered in this blog.  More in subsequent blogs on oozie.

3.0.1. About the FS (hdfs) action

"The fs action allows to manipulate files and directories in HDFS from a workflow application. The supported commands are move , delete , mkdir , chmod , touchz and chgrp .
The FS commands are executed synchronously from within the FS action, the workflow job will wait until the specified file commands are completed before continuing to the next action.  Path names specified in the fs action can be parameterized (templatized) using EL expressions.  Each file path must specify the file system URI, for move operations, the target must not specified the system URI.

IMPORTANT: All the commands within fs action do not happen atomically, if a fs action fails half way in the commands being executed, successfully executed commands are not rolled back. The fs action, before executing any command must check that source paths exist and target paths don't exist (constraint regarding target relaxed for the move action. See below for details), thus failing before executing any command. Therefore the validity of all paths specified in one fs action are evaluated before any of the file operation are executed. Thus there is less chance of an error occurring while the fs action executes."

3.0.2. About the email action


The email action allows sending emails in Oozie from a workflow application. An email action must provide to addresses, cc addresses (optional), a subject and a body . Multiple recipients of an email can be provided as comma separated addresses.  The email action is executed synchronously, and the workflow job will wait until the specified emails are sent before continuing to the next action.

Apache documentation:
http://oozie.apache.org/docs/3.3.0/DG_EmailActionExtension.html



4.0. Building and executing an Oozie workflow with HDFS action and Email action

Pictorial overview





Sample program specifics

This gist includes components of a simple workflow application that created a directory and moves files within
hdfs to this directory;
Emails are sent out to notify designated users of success/failure of workflow. There is a prepare section,
to allow re-run of the action..the prepare essentially negates the move done by a potential prior run
of the action. Sample data is also included.
The sample application includes:
--------------------------------
1. Oozie actions: hdfs action and email action
2. Oozie workflow controls: start, end, and kill.
3. Workflow components: job.properties and workflow.xml
4. Sample data
5. Commands to deploy workflow, submit and run workflow
6. Oozie web console - screenshots from sample program execution
Pictorial overview of workflow:
-------------------------------
Available at:
http://hadooped.blogspot.com/2013/06/apache-oozie-part-1-workflow-with-hdfs.html
Workflow Components:
--------------------
1. job.properties
File containing:
a) parameter and value declarations that are referenced in the workflows, and
b) environment information referenced by Oozie to run the workflow including name node, job tracker, workflow application path etc
2. workflow.xml
Workflow definition file
Download location:
------------------
GitHub - https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com to contact me if you have access issues.
Directory structure applicable for this post/gist/blog:
-------------------------------------------------------
oozieProject
logs
airawat-syslog
<<node>>
<<year>>
<<month>>
messages
workflowHdfsAndEmailActions
job.prperties
workflow.xml
Oozie SMTP configuration
------------------------
Add the following to the oozie-site.xml, and restart oozie.
Replace values with the same specific to your environment.
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property>
#*****************************
# job.properties
#*****************************
nameNode=hdfs://cdh-nn01.hadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
oozie.wf.application.path=${oozieProjectRoot}/workflowHdfsAndEmailActions
dataInputDirectoryAbsPath=${oozieProjectRoot}/logs/airawat-syslog
makeDirectoryAbsPath=${oozieProjectRoot}/dataDump
dataDestinationDirectoryRelativePath=oozieProject/dataDump
emailToAddress=akhanolk@cdh-dev01
#*******End************************
Note: -The line - "oozie.wf.rerun.failnodes=true" is needed if you want to re-run; There is another config we can use instead as well that specifies which failed nodes to skip. Review Apache Oozie documentation for the same.
<!--******************************************-->
<!--workflow.xml -->
<!--******************************************-->
<workflow-app name="WorkFlowForHDFSAndEmailActions" xmlns="uri:oozie:workflow:0.1">
<start to="hdfsCommands"/>
<action name="hdfsCommands">
<fs>
<mkdir path='${makeDirectoryAbsPath}'/>
<move source='${dataInputDirectoryAbsPath}' target='${dataDestinationDirectoryRelativePath}'/>
</fs>
<ok to="sendEmailSuccess"/>
<error to="sendEmailKill"/>
</action>
<action name="sendEmailSuccess">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:id()} completed successfully</body>
</email>
<ok to="end"/>
<error to="end"/>
</action>
<action name="sendEmailKill">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Status of workflow ${wf:id()}</subject>
<body>The workflow ${wf:id()} had issues and was killed. The error message is: ${wf:errorMessage(wf:lastErrorNode())}</body>
</email>
<ok to="killJobFSAction"/>
<error to="killJobFSAction"/>
</action>
<kill name="killJobFSAction">
<message>"Killed job due to error in FS Action"</message>
</kill>
<end name="end"/>
</workflow-app>
Commands to load data
----------------------
a) Load data
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
b) Validate load
$ hadoop fs -ls -R oozieProject | awk '{print $8}'
You should see...
oozieProject/logs/airawat-syslog/<<node>>/<<year>>/<<month>>/messages
oozieProject/workflowHdfsAndEmailActions/job.properties
oozieProject/workflowHdfsAndEmailActions/workflow.xml
$ hadoop fs -rm -R oozieProject/data
Oozie commands
--------------
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowHdfsAndEmailActions/job.properties -submit
job: 0000001-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000001-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000001-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000001-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000001-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowHdfsAndEmailActions/job.properties -rerun 0000001-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000001-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000001-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
Program output:
---------------
Expected result:
1) The data in the logs directory should be in the directory by name dataDump under oozieProject directory.
2) The directory 'logs' should be deleted.
3) An email indicating success/failure of the application
1)
$ hadoop fs -ls -R oozieProject | awk '{print $8}'
oozieProject/dataDump/airawat-syslog
oozieProject/dataDump/airawat-syslog/cdh-dev01
oozieProject/dataDump/airawat-syslog/cdh-dev01/2013
oozieProject/dataDump/airawat-syslog/cdh-dev01/2013/04
oozieProject/dataDump/airawat-syslog/cdh-dev01/2013/04/messages
oozieProject/dataDump/airawat-syslog/cdh-dev01/2013/05
oozieProject/dataDump/airawat-syslog/cdh-dev01/2013/05/messages
oozieProject/dataDump/airawat-syslog/cdh-dn01
oozieProject/dataDump/airawat-syslog/cdh-dn01/2013
oozieProject/dataDump/airawat-syslog/cdh-dn01/2013/05
oozieProject/dataDump/airawat-syslog/cdh-dn01/2013/05/messages
oozieProject/dataDump/airawat-syslog/cdh-dn02
oozieProject/dataDump/airawat-syslog/cdh-dn02/2013
oozieProject/dataDump/airawat-syslog/cdh-dn02/2013/04
oozieProject/dataDump/airawat-syslog/cdh-dn02/2013/04/messages
oozieProject/dataDump/airawat-syslog/cdh-dn02/2013/05
oozieProject/dataDump/airawat-syslog/cdh-dn02/2013/05/messages
oozieProject/dataDump/airawat-syslog/cdh-dn03
oozieProject/dataDump/airawat-syslog/cdh-dn03/2013
oozieProject/dataDump/airawat-syslog/cdh-dn03/2013/04
oozieProject/dataDump/airawat-syslog/cdh-dn03/2013/04/messages
oozieProject/dataDump/airawat-syslog/cdh-dn03/2013/05
oozieProject/dataDump/airawat-syslog/cdh-dn03/2013/05/messages
oozieProject/dataDump/airawat-syslog/cdh-jt01
oozieProject/dataDump/airawat-syslog/cdh-jt01/2013
oozieProject/dataDump/airawat-syslog/cdh-jt01/2013/04
oozieProject/dataDump/airawat-syslog/cdh-jt01/2013/04/messages
oozieProject/dataDump/airawat-syslog/cdh-jt01/2013/05
oozieProject/dataDump/airawat-syslog/cdh-jt01/2013/05/messages
oozieProject/dataDump/airawat-syslog/cdh-nn01
oozieProject/dataDump/airawat-syslog/cdh-nn01/2013
oozieProject/dataDump/airawat-syslog/cdh-nn01/2013/05
oozieProject/dataDump/airawat-syslog/cdh-nn01/2013/05/messages
oozieProject/dataDump/airawat-syslog/cdh-vms
oozieProject/dataDump/airawat-syslog/cdh-vms/2013
oozieProject/dataDump/airawat-syslog/cdh-vms/2013/05
oozieProject/dataDump/airawat-syslog/cdh-vms/2013/05/messages
oozieProject/workflowHdfsAndEmailActions/job.properties
oozieProject/workflowHdfsAndEmailActions/workflow.xml
Email from the program
-----------------------
From akhanolk@cdh-dev01.localdomain Sun Jul 14 23:08:46 2013
Return-Path: <akhanolk@cdh-dev01.localdomain>
X-Original-To: akhanolk@cdh-dev01
Delivered-To: akhanolk@cdh-dev01.localdomain
From: akhanolk@cdh-dev01.localdomain
To: akhanolk@cdh-dev01.localdomain
Subject: Status of workflow 0000006-130712212133144-oozie-oozi-W
Content-Type: text/plain; charset=us-ascii
Date: Sun, 14 Jul 2013 23:08:46 -0500 (CDT)
Status: R
The workflow 0000006-130712212133144-oozie-oozi-W completed successfully
view raw 09-SampleEmail hosted with ❤ by GitHub
Screenshots of the Oozie web console are available at:
------------------------------------------------------
http://hadooped.blogspot.com/2013/06/apache-oozie-part-1-workflow-with-hdfs.html

Oozie web console
Screenshot of entry of sample application-


This concludes this blog.  Happy hadooping!