Monday, June 17, 2013

Apache Oozie - Part 6: Oozie workflow with java main action


What's covered in the blog?

1. Documentation on the Oozie java action
2. A sample workflow that includes oozie java action to process some syslog generated log files.  Instructions on loading sample data and running the workflow are provided, along with some notes based on my learnings.

Versions covered:
Oozie 3.3.0;

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 11b: Oozie Web Service API for interfacing with oozie workflows


Your thoughts/updates:
If you want to share your thoughts/updates, email me at airawat.blog@gmail.com.



About the Oozie java main action

Excerpt from Apache Oozie documentation...
The java action will execute the public static void main(String[] args) method of the specified main Java class.  Java applications are executed in the Hadoop cluster as map-reduce job with a single Mapper task.  The workflow job will wait until the java application completes its execution before continuing to the next action.  The java action has to be configured with the job-tracker, name-node, main Java class, JVM options and arguments.

To indicate an ok action transition, the main Java class must complete gracefully the main method invocation.  To indicate an error action transition, the main Java class must throw an exception.  The main Java class must not call System.exit(int n) as this will make the java action to do an error transition regardless of the used exit code.

A java action can be configured to perform HDFS files/directories cleanup before starting the Java application. This capability enables Oozie to retry a Java application in the situation of a transient or non-transient failure (This can be used to cleanup any temporary data which may have been created by the Java application in case of failure).

A java action can create a Hadoop configuration. The Hadoop configuration is made available as a local file to the Java application in its running directory, the file name is oozie-action.conf.xml . Similar to map-reduce and pig actions it is possible to refer a job.xml file and using inline configuration properties. For repeated configuration properties later values override earlier ones.

Inline property values can be parameterized (templatized) using EL expressions.

The Hadoop mapred.job.tracker (=job-tracker=) and fs.default.name (=name-node=) properties must not be present in the job-xml and in the inline configuration.

As with map-reduce and pig actions, it is possible to add files and archives to be available to the Java application. Refer to section [#FilesAchives][Adding Files and Archives for the Job].

The capture-output element can be used to propagate values back into Oozie context, which can then be accessed via EL-functions. This needs to be written out as a java properties format file. The filename is obtained via a System property specified by the constant JavaMainMapper.OOZIE_JAVA_MAIN_CAPTURE_OUTPUT_FILE

IMPORTANT: Because the Java application is run from within a Map-Reduce job, from Hadoop 0.20. onwards a queue must be assigned to it. The queue name must be specified as a configuration property.

Syntax:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="[NODE-NAME]">
        <java>
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <job-xml>[JOB-XML]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <main-class>[MAIN-CLASS]</main-class>
<java-opts>[JAVA-STARTUP-OPTS]</java-opts>
<arg>ARGUMENT</arg>
            ...
            <file>[FILE-PATH]</file>
            ...
            <archive>[FILE-PATH]</archive>
            ...
            <capture-output />
        </java>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>
The prepare element, if present, indicates a list of path do delete before starting the Java application. This should be used exclusively for directory cleanup for the Java application to be executed.

The java-opts element, if present, contains the command line parameters which are to be used to start the JVM that will execute the Java application. Using this element is equivalent to use the mapred.child.java.opts configuration property.

The arg elements, if present, contains arguments for the main function. The value of each arg element is considered a single argument and they are passed to the main method in the same order.

All the above elements can be parameterized (templatized) using EL expressions.


Apache Oozie documentation:
http://oozie.apache.org/docs/3.3.0/WorkflowFunctionalSpec.html#a3.2.7_Java_Action


Sample workflow application

Highlights:
For this exercise, I have loaded some syslog generated logs to hdfs and am running a Java map reduce program through Oozie as a java action (not a map-reduce action) to run a report on the logs.

Pictorial representation of the workflow:

Components of workflow application:

















Workflow application:

This gist includes components of a oozie workflow - scripts/code, sample data
and commands; Oozie actions covered: java main action; Oozie controls
covered: start, kill, end; The java program uses regex to parse the logs, and
also extracts pat of the mapper input directory path and includes in the key
emitted.
Usecase
-------
Parse Syslog generated log files to generate reports;
Pictorial overview of job:
--------------------------
http://hadooped.blogspot.com/2013/06/apache-oozie-part-6-oozie-workflow-with.html
Includes:
---------
Sample data and structure: 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Java MR - Mapper code: 04A-MapperJavaCode
Java MR - Reducer code: 04B-ReducerJavaCode
Java MR - Driver code: 04C-DriverJavaCode
Command to test Java MR program: 04D-CommandTestJavaMRProg
Oozie job properties file: 05-OozieJobProperties
Oozie workflow file: 06-OozieWorkflowXML
Oozie commands 07-OozieJobExecutionCommands
Output -Report1 08-OutputOfJavaProgram
Oozie web console - screenshots 09-OozieWebConsoleScreenshots
01a. Sample data
-----------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
01b. Structure
---------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
02a. Data download
-------------------
gitHub:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues.
Directory structure applicable for this blog/gist:
--------------------------------------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
workflowJavaMainAction
workflow.xml
job.properties
lib
LogEventCount.jar
03-Hdfs load commands
----------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
// Source code for Mapper
//-----------------------------------------------------------
// LogEventCountMapper.java
//-----------------------------------------------------------
// Java program that parses logs using regex
// The program counts the number of processes logged by year.
// E.g. Key=2013-ntpd; Value=1;
package Airawat.Oozie.Samples;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class LogEventCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
String strLogEntryPattern = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)";
public static final int NUM_FIELDS = 6;
Text strEvent = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String strLogEntryLine = value.toString();
Pattern objPtrn = Pattern.compile(strLogEntryPattern);
Matcher objPatternMatcher = objPtrn.matcher(strLogEntryLine);
if (!objPatternMatcher.matches() || NUM_FIELDS != objPatternMatcher.groupCount()) {
System.err.println("Bad log entry (or problem with RE?):");
System.err.println(strLogEntryLine);
return;
}
/*
System.out.println("Month_Name: " + objPatternMatcher.group(1));
System.out.println("Day: " + objPatternMatcher.group(2));
System.out.println("Time: " + objPatternMatcher.group(3));
System.out.println("Node: " + objPatternMatcher.group(4));
System.out.println("Process: " + objPatternMatcher.group(5));
System.out.println("LogMessage: " + objPatternMatcher.group(6));
*/
//Oh what a pretty chunk of code ;)
strEvent.set(((FileSplit)context.getInputSplit()).getPath().toString().substring((((FileSplit)context.getInputSplit()).getPath().toString().length()-16), (((FileSplit)context.getInputSplit()).getPath().toString().length()-12)) + "-" + ((objPatternMatcher.group(5).toString().indexOf("[")) == -1 ? (objPatternMatcher.group(5).toString().substring(0,(objPatternMatcher.group(5).length()-1))) : (objPatternMatcher.group(5).toString().substring(0,(objPatternMatcher.group(5).toString().indexOf("["))))));
context.write(strEvent, new IntWritable(1));
}
}
// Source code for reducer
//--------------------------
// LogEventCountReducer.java
//--------------------------
package Airawat.Oozie.Samples;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LogEventCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int intEventCount = 0;
for (IntWritable value : values) {
intEventCount += value.get();
}
context.write(key, new IntWritable(intEventCount));
}
}
// Source code for reducer
//--------------------------
// LogEventCountReducer.java
//--------------------------
package Airawat.Oozie.Samples;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
public class LogEventCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf(
"Usage: Airawat.Oozie.Samples.LogEventCount <input dir> <output dir>\n");
System.exit(-1);
}
//Instantiate a Job object for your job's configuration.
Job job = new Job();
//Job jar file
job.setJarByClass(LogEventCount.class);
//Job name
job.setJobName("Syslog Event Rollup");
//Paths
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//Mapper and reducer classes
job.setMapperClass(LogEventCountMapper.class);
job.setReducerClass(LogEventCountReducer.class);
//Job's output key and value classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//Number of reduce tasks
job.setNumReduceTasks(3);
//Start the MapReduce job, wait for it to finish.
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
Commands to test the java program
---------------------------------
a) Command to run the program
$ hadoop jar oozieProject/workflowJavaMainAction/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount "oozieProject/data/*/*/*/*/*" "oozieProject/workflowJavaMainAction/myCLIOutput"
b) Command to view results
$ hadoop fs -cat oozieProject/workflowJavaMainAction/myCLIOutput/part* | sort
c) Results
2013-NetworkManager 7
22013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
#*****************************
# job.properties
#*****************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowJavaMainAction
oozie.wf.application.path=${appPath}
inputDir=${oozieProjectRoot}/data/*/*/*/*/*
outputDir=${appPath}/output
<!--*************************************************-->
<!--*******06-workflow.xml***************************-->
<!--*************************************************-->
<workflow-app name="WorkflowJavaMainAction" xmlns="uri:oozie:workflow:0.1">
<start to="javaMainAction"/>
<action name="javaMainAction">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>Airawat.Oozie.Samples.LogEventCount</main-class>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to="end"/>
<error to="killJobAction"/>
</action>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
07. Oozie commands
-------------------
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowJavaMainAction/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowJavaMainAction/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
08-Workflow application output
-------------------------------
$ hadoop fs -ls -R oozieProject/workflowJavaMainAction/output/part* | awk '{print $8}'
oozieProject/workflowJavaMainAction/output/part-r-00000
$ hadoop fs -cat oozieProject/workflowJavaMainAction/output/part-r-00000
2013-NetworkManager 7
2013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
09 - Oozie web console - screenshots
-------------------------------------
Available at-
http://hadooped.blogspot.com/2013/06/apache-oozie-part-6-oozie-workflow-with.html



Oozie web console - screenshots:


















References


Map reduce cookbook
https://cwiki.apache.org/OOZIE/map-reduce-cookbook.html

How to use a sharelib in Oozie
http://blog.cloudera.com/blog/2012/12/how-to-use-the-sharelib-in-apache-oozie/

Everything-you-wanted-to-know-but-were-afraid-to-ask-about-oozie
http://www.slideshare.net/ChicagoHUG/everything-you-wanted-to-know-but-were-afraid-to-ask-about-oozie

Oozie workflow use cases
https://github.com/yahoo/oozie/wiki/Oozie-WF-use-cases 





7 comments:

  1. This tutorial is GREEEAAAAT. Excellent layout, data sample and educational instructions. Have adapted your code to run with Hortonworks HDP 2.1 (Hadoop 2.40). Made a few changes from your implementation. 1) GZip all syslog files into a single gz file for the mapper input (instead of the 37 files & 20 folders in the original uncompressed sample). 2) simplify the map output key (can no longer extract 2013 from the Filesplit as the input file is now a single GZ file.

    BTW, In the map method, to remove [nnnn] in the event name (Example: "NetworkManager[1459]" becomes "NetworkManager") I think this code is shorter than testing "[" and do substring: objPatternMatcher.group(5).replaceAll("\\[\\d+\\]", "")

    ReplyDelete
  2. Hi Anagha,
    Thanks a lot for the wonderful post!! Your blogs are easy to understand and code examples are very helpful. Great stuff.

    Need one suggestion regarding the logging in Map-Reduce Driver class. when I am calling the driver class as java action, everything runs fine But the logger statements are not visible in oozie Job Log tab. Is there any configure in order to see these logs? Any suggestion/ hints

    Thanks

    ReplyDelete
  3. hi your tutorial was very good ,keep rocking expecting more post from you Hadoop Training in Velachery | Hadoop Training .

    ReplyDelete
  4. This comment has been removed by the author.

    ReplyDelete
  5. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  6. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete
  7. Just found your post by searching on the Google, I am Impressed and Learned Lot of new thing from your post.

    ivanka hot

    ReplyDelete