Tuesday, December 31, 2013

Log parsing in Hadoop - Part 5: Cascading

1.0. What's in this post?

This post is a part of a series, focussed on log parsing in Java Mapreduce, Pig, Hive, Python...This one covers a simple log parser in Cascading, and includes a sample program, data and commands.

Documentation on Cascading:
http://www.cascading.org/documentation/

Other related blogs:
Log parsing in Hadoop -Part 1: Java 
Log parsing in Hadoop -Part 2: Hive 
Log parsing in Hadoop -Part 3: Pig 
Log parsing in Hadoop -Part 4: Python
Log parsing in Hadoop -Part 5: Cascading
Log parsing in Hadoop -Part 6: Morphlines 


2.0. Sample program


2.0.1. What the program does..
a) It reads syslog generated logs stored in HDFS
b) Regex parses them 
c) Writes successfully parsed records to files in HDFS
d) Writes records that dont match the pattern to HDFS
e) Writes a report to HDFS that contains the count of distinct processes logged.

2.0.2. Sample log data
Sample log data
---------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
Data structure
--------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
view raw 01a-SampleData hosted with ❤ by GitHub


2.0.3. Directory structure of log files
Directory structure of logs
---------------------------
cascadingSamples
data
syslogs
<<Node-Name>>
<<Year>>
<<Month>>
messages
Specifically...
LogParser/data/syslogs/cdh-dev01/2013/04/messages
LogParser/data/syslogs/cdh-dev01/2013/05/messages
LogParser/data/syslogs/cdh-dn01/2013/05/messages
LogParser/data/syslogs/cdh-dn02/2013/04/messages
LogParser/data/syslogs/cdh-dn02/2013/05/messages
LogParser/data/syslogs/cdh-dn03/2013/04/messages
LogParser/data/syslogs/cdh-dn03/2013/05/messages
LogParser/data/syslogs/cdh-jt01/2013/04/messages
LogParser/data/syslogs/cdh-jt01/2013/05/messages
LogParser/data/syslogs/cdh-nn01/2013/05/messages
LogParser/data/syslogs/cdh-vms/2013/05/messages


2.0.4. Log parser in Cascading
package logparser;
import java.util.Properties;
import java.util.Collections;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.tap.hadoop.GlobHfs;
import cascading.tap.MultiSourceTap;
import cascading.tap.hadoop.Hfs;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFunction;
public class LogParser {
public static void main(String[] args) {
// {{
// INSTANTIATE/INITIALIZE
// Set the current job jar
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, LogParser.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
// Arguments
String inputPath = args[ 0 ];
String outputPath = args[ 1 ];
String errorPath = args[ 2 ];
String reportPath = args[ 3 ];
// Scheme for sinks
TextLine sinkTextLineScheme = new TextLine();
// Define what the input file looks like, "offset" is bytes from beginning
TextLine sourceTextLineScheme = new TextLine( new Fields( "offset", "line" ) );
// The inputPath is a file glob, and a so, GlobHfs is used below
GlobHfs sourceFilesGlob = new GlobHfs( sourceTextLineScheme, inputPath );
// Create SOURCE tap to read a resource from the HDFS glob
Tap sourceSyslogTap = new MultiSourceTap(sourceFilesGlob);
// Create a SINK tap to write parsed logs to HDFS
sinkTextLineScheme.setNumSinkParts(2);
Tap sinkParsedLogTap = new Hfs( sinkTextLineScheme, outputPath, SinkMode.REPLACE);
// Create a SINK tap to write reports to HDFS
sinkTextLineScheme.setNumSinkParts(1);
Tap sinkReportTap = new Hfs(sinkTextLineScheme, reportPath, SinkMode.REPLACE );
// Create a TRAP tap to write records that failed parsing
sinkTextLineScheme.setNumSinkParts(1);
Tap sinkTrapTap = new Hfs( sinkTextLineScheme, errorPath , SinkMode.REPLACE );
// }}
// {{
// EXTRACT/PARSE
// Declare the field names
Fields sysLogFields = new Fields( "month", "day", "time", "node", "process", "message" );
// Define the regex pattern to parse the log file with
String sysLogRegex = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)";
// Declare the groups from the above regex we want to keep. Each regex group will be given
// a field name from 'sysLogFields', above, respectively
int[] keepParsedGroups = {1, 2, 3, 4, 5, 6};
// Create the parser
RegexParser parser = new RegexParser( sysLogFields, sysLogRegex, keepParsedGroups );
// Import & parse pipe
// Create the import pipe element, with the name 'import', and with the input argument named "line"
// Replace the incoming tuple with the parser results
// "line" -> parser -> "ts"
Pipe importAndParsePipe = new Each( "import", new Fields( "line" ), parser, Fields.RESULTS );
// }}
// {{
// TRANSFORM
// Transform the process field - remove process ID if found, for better reporting on logs
// Also, convert to lowercase
// E.g. Change "ntpd[1302]" to "ntpd"
String expression = "process.substring(0, (process.indexOf('[') == -1 ? process.length()-1 : process.indexOf('[') )).toLowerCase()";
Fields fieldProcess = new Fields( "process" );
ExpressionFunction expFunc =
new ExpressionFunction( fieldProcess, expression, String.class );
// Pipe for transformed data
Pipe scrubbedDataPipe = new Each( importAndParsePipe, fieldProcess, expFunc, Fields.REPLACE );
// }}
// {{
// REPORT/ANALYZE
// Capture counts by process, as a report, sort by count, desc
// ------------------------------------------------------------
// process count()
// E.g. sshd 4
Pipe reportPipe = new Pipe("reportByProcess", scrubbedDataPipe);
Fields keyFields = new Fields("process");
Fields groupByFields = new Fields( "process");
Fields countField = new Fields( "countOfEvents" );
Fields sortByFields = new Fields( "process");
reportPipe = new GroupBy(reportPipe, groupByFields);
reportPipe = new Every(reportPipe, keyFields,
new Count(countField), Fields.ALL);
reportPipe = new GroupBy(reportPipe,
keyFields,
countField,
false); //true=descending order
//End of reports
//}}
// {{
// EXECUTE
// Connect the taps, pipes, etc., into a flow & execute
FlowDef flowDef = FlowDef.flowDef()
.setName( "Log parser" )
.addSource( importAndParsePipe, sourceSyslogTap )
.addTailSink( scrubbedDataPipe, sinkParsedLogTap )
.addTailSink(reportPipe,sinkReportTap)
.addTrap( importAndParsePipe, sinkTrapTap );
Flow flow = flowConnector.connect(flowDef);
flow.complete();
// }}
}
}


2.0.5. build.gradle file
Gradle documentation is available at- http://www.gradle.org
Here is the build.gradle...
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
archivesBaseName = 'logparser-cascading'
repositories {
mavenLocal()
mavenCentral()
mavenRepo name: 'conjars', url: 'http://conjars.org/repo/'
}
ext.cascadingVersion = '2.5.1'
dependencies {
compile( group: 'cascading', name: 'cascading-core', version: cascadingVersion )
compile( group: 'cascading', name: 'cascading-hadoop', version: cascadingVersion )
}
jar {
description = "Assembles a Hadoop ready jar file"
doFirst {
into( 'lib' ) {
from configurations.compile
} }
manifest {
attributes( "Main-Class": "logparser/LogParser" )
}
}
view raw 03-gradleBuild hosted with ❤ by GitHub

2.0.6. Data and code download 

Code and data download
=======================
Github:
https://github.com/airawat/LogParser/tree/master/cascadingSamples
Directories relevant to this post:
===================================
$ tree -if --noreport LogParser
LogParser
LogParser/cascadingSamples
LogParser/cascadingSamples/jars
LogParser/cascadingSamples/jars/logparser-cascading.jar
LogParser/cascadingSamples/src
LogParser/cascadingSamples/src/main
LogParser/cascadingSamples/src/main/java
LogParser/cascadingSamples/src/main/java/logparser
LogParser/cascadingSamples/src/main/java/logparser/LogParser.java
LogParser/data
LogParser/data/syslogs
LogParser/data/syslogs/cdh-dev01
LogParser/data/syslogs/cdh-dev01/2013
LogParser/data/syslogs/cdh-dev01/2013/04
LogParser/data/syslogs/cdh-dev01/2013/04/messages
LogParser/data/syslogs/cdh-dev01/2013/05
LogParser/data/syslogs/cdh-dev01/2013/05/messages
LogParser/data/syslogs/cdh-dn01
LogParser/data/syslogs/cdh-dn01/2013
LogParser/data/syslogs/cdh-dn01/2013/05
LogParser/data/syslogs/cdh-dn01/2013/05/messages
LogParser/data/syslogs/cdh-dn02
LogParser/data/syslogs/cdh-dn02/2013
LogParser/data/syslogs/cdh-dn02/2013/04
LogParser/data/syslogs/cdh-dn02/2013/04/messages
LogParser/data/syslogs/cdh-dn02/2013/05
LogParser/data/syslogs/cdh-dn02/2013/05/messages
LogParser/data/syslogs/cdh-dn03
LogParser/data/syslogs/cdh-dn03/2013
LogParser/data/syslogs/cdh-dn03/2013/04
LogParser/data/syslogs/cdh-dn03/2013/04/messages
LogParser/data/syslogs/cdh-dn03/2013/05
LogParser/data/syslogs/cdh-dn03/2013/05/messages
LogParser/data/syslogs/cdh-jt01
LogParser/data/syslogs/cdh-jt01/2013
LogParser/data/syslogs/cdh-jt01/2013/04
LogParser/data/syslogs/cdh-jt01/2013/04/messages
LogParser/data/syslogs/cdh-jt01/2013/05
LogParser/data/syslogs/cdh-jt01/2013/05/messages
LogParser/data/syslogs/cdh-nn01
LogParser/data/syslogs/cdh-nn01/2013
LogParser/data/syslogs/cdh-nn01/2013/05
LogParser/data/syslogs/cdh-nn01/2013/05/messages
LogParser/data/syslogs/cdh-vms
LogParser/data/syslogs/cdh-vms/2013
LogParser/data/syslogs/cdh-vms/2013/05
LogParser/data/syslogs/cdh-vms/2013/05/messages



2.0.7. Commands (load data, execute program)
Gradle
=================
$ gradle clean jar
Should generate a jar with dependencies managed.
Load data to HDFS
==================
$ hadoop fs -mkdir cascadingSamples
$ cd ~
$ hadoop fs -put cascadingSamples/data cascadingSamples
$ hadoop fs -put cascadingSamples/jars cascadingSamples
Run program
============
hadoop jar cascadingSamples/jars/logparser-cascading.jar "cascadingSamples/data/syslogs/*/*/*/" "cascadingSamples/Output-LogParser" "cascadingSamples/Output-LogParser/traps" "cascadingSamples/Output-LogParser/reports"
view raw 04-Commands hosted with ❤ by GitHub


2.0.8. Results 
Output files
=============
$ hadoop fs -ls -R cascadingSamples |grep 'part*' | awk '{print $8}'
cascadingSamples/Output-LogParser/part-00000
cascadingSamples/Output-LogParser/part-00001
cascadingSamples/Output-LogParser/part-00002
cascadingSamples/Output-LogParser/part-00003
cascadingSamples/Output-LogParser/part-00004
cascadingSamples/Output-LogParser/part-00005
cascadingSamples/Output-LogParser/part-00006
cascadingSamples/Output-LogParser/part-00007
cascadingSamples/Output-LogParser/part-00008
cascadingSamples/Output-LogParser/part-00009
cascadingSamples/Output-LogParser/part-00010
cascadingSamples/Output-LogParser/reports/part-00000
cascadingSamples/Output-LogParser/traps/part-m-00001-00006
cascadingSamples/Output-LogParser/traps/part-m-00002-00006
Parsed log
===========
$ hadoop fs -cat cascadingSamples/Output-LogParser/part-00003 | less
May 3 11:51:50 cdh-dn02 init tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:52:26 cdh-dn02 kernel nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:52:51 cdh-dn02 kernel hrtimer: interrupt took 6222750 ns
May 3 11:52:53 cdh-dn02 ntpd_initres host name not found: 0.rhel.pool.ntp.org
Report
===========
$ hadoop fs -cat cascadingSamples/Output-LogParser/reports/part-00000 | less
console-kit-daemon 7
gnome-session 11
init 166
kernel 810
login 2
networkmanager 7
nm-dispatcher.action 4
ntpd_initres 4133
polkit-agent-helper-1 8
pulseaudio 18
spice-vdagent 15
sshd 6
sudo 8
udevd 6
Records that failed parsing
============================
$ hadoop fs -cat cascadingSamples/Output-LogParser/traps/part*
May 7 00:40:53 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:42:13 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:43:38 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:45:01 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:47:18 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:47:41 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
Record count of input [syslogs]
===============================
$ hadoop fs -cat cascadingSamples/data/syslogs/*/*/*/messages | wc -l
5207
Record count of output [parsed logs + records that failed parsing]
===================================================================
$ echo $((`hadoop fs -cat cascadingSamples/Output-LogParser/part* | wc -l`+`hadoop fs -cat cascadingSamples/Output-LogParser/traps/part* | wc -l`))
5207
view raw 05-Results hosted with ❤ by GitHub






Wednesday, November 13, 2013

UDF's Part 2: Custom GenericUDF in Hive (NVL2)


1.0. What's in this blog?


In my previous blog on creating custom UDFs in Hive, I covered a sample basic UDF.  This blog covers generic UDF creation, to mimic the same NVL2 functionality covered in the previous blog.  It includes sample data, java code for creating the UDF, expected results, commands to execute and the output.  
[hive 0.10]

About UDFs:
UDF stands for User Defined Function.  In Hive, there are (a) reusable functions available, as part of core Hive (out of the box) that can be used in Hive queries; They are called UDFs, even though they are not user-defined.  And then there are (b) functions that one can create in Java, also called UDFs, and use in Hive queries.  The focus of this blog is custom UDFs (b), specifically generic UDFs.

About generic UDF:
UDFs in Hive have are extensions of either UDF or GenericUDF classes.  GenericUDFs are more optimal from a performance perspective as they use short circuit evaluation and lazy evaluationwhen compared to UDFs that use reflection.  GenericUDFs support non-primitive Hive types like arrays, structs and maps in addition to primitive types, unlike UDFs that support only primitive types.  

About NVL2:
NVL2 takes three parameters, we will refer to as expr1, expr2 and expr3.
NVL2 lets you determine the value returned by a query based on whether a specified expression is null or not null. If expr1 is not null, then NVL2 returns expr2. If expr1 is null, then NVL2 returns expr3.


2.0. NVL2 generic UDF in Hive


1: Create the test data file for a Hive external table
**************************
Input data
**************************
1. Execute locally on the node you are running Hive client from
-----------------------------------------------------------------
Create input file/data to use for the demo.
Since this gist is merely for instructional purpose, the dataset is small.
cd ~
mkdir hiveProject
cd hiveProject
vi Departments_UDFTest
Paste this..ensuring the fields are delimited by tabs and record with new line.
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008
d009 Customer Service
2. Hadoop commands
--------------------
hadoop fs -mkdir hiveProject
hadoop fs -put hiveProject/Departments_UDFTest hiveProject
view raw 01-InputData hosted with ❤ by GitHub

2: Create the Hive table
*********************************************
Setting up the Hive table
*********************************************
In hive shell....
a) Create table:
CREATE EXTERNAL TABLE IF NOT EXISTS departments_UDFTest
(
deptNo String,
deptName String
)
Row format delimited
fields terminated by '\t'
LOCATION '/user/akhanolk/hiveProject';
b) Quick test:
Select * from departments_UDFTest;
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 NULL
d009 Customer Service

3: Create the UDF in Java
package khanolkar.HiveUDFs;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@UDFType(deterministic = true)
@Description(name = "NVL2", value = "NVL2(expr1,expr2,expr3) returns expr3, if expr1 is null, otherwise returns expr2;", extended = "NVL2(expr1,expr2,expr3) returns expr3, if expr1 is null, otherwise retruns expr2")
public class NVL2GenericUDF extends GenericUDF {
private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
private ObjectInspector[] argumentOIs;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
argumentOIs = arguments;
// First check - do we have the right number of arguments?
if (arguments.length != 3) {
throw new UDFArgumentLengthException(
"The operator 'NVL2' accepts 3 arguments.");
}
// Second check - throw exception if any complex types have been passed
// as parameters
if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE
|| arguments[1].getCategory() != ObjectInspector.Category.PRIMITIVE
|| arguments[2].getCategory() != ObjectInspector.Category.PRIMITIVE)
throw new UDFArgumentTypeException(0,
"Only primitive type arguments are accepted");
// Third check - throw exception if the data types across parameters are
// different
if (!(returnOIResolver.update(arguments[0]) && returnOIResolver
.update(arguments[1]))
|| !(returnOIResolver.update(arguments[1]) && returnOIResolver
.update(arguments[2]))) {
throw new UDFArgumentTypeException(2,
"The arguments of function NLV2 should have the same type, "
+ "but they are different: \""
+ arguments[0].getTypeName() + "\" and \""
+ arguments[1].getTypeName() + "\" and \""
+ arguments[2].getTypeName() + "\"");
}
returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(
true);
return returnOIResolver.get();
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
// The NVL2 functionality
Object retVal = returnOIResolver.convertIfNecessary(arguments[0].get(),
argumentOIs[0]);
if (retVal == null) {
retVal = returnOIResolver.convertIfNecessary(arguments[2].get(),
argumentOIs[2]);
} else {
retVal = returnOIResolver.convertIfNecessary(arguments[1].get(),
argumentOIs[1]);
}
return retVal;
}
@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("if ");
sb.append(children[0]);
sb.append(" is null ");
sb.append("returns");
sb.append(children[2]);
sb.append("else ");
sb.append("returns");
sb.append(children[1]);
return sb.toString();
}
}
view raw 03a-UDFCode hosted with ❤ by GitHub

Concepts
-------------------------
There are three methods-
1. initialize() - called once, at first. The goal of this method is to determine the return type
from the arguments. The user can also throw an Exception to signal that bad types are being sent
to the method. The returnOIResolver is a built-in class that determines the return type by finding
the type of non-null variables and using that type. The ObjectInspector is used to the transform
raw records into objects that Hive can access. The initialize() method is passed an
ObjectInspector for each argument
2. evaluate() - where the logic for the function should be written.
The evaluate method has access to the values passed to the method stored in an array of DeferredObject
values. The returnOIResolver created in the initialize method is used to get values from the
DeferredObjects.
3. getDisplayString() - The final method to override is getDisplayString(), is used inside the Hadoop
tasks to display debugging information when the function is being used.
Annotations:
@UDFType(deterministic = true) annotation: Indicates that the UDF returns the same value any time its
called
@Description(...) annotation: Includes information that is displayed when you do a describe on the UDF

4: Expected results
****************************
Expected results
****************************
Query:
select deptNo,NVL2(deptName,deptName,'Procrastrination') from departments_UDFTest;
The null in the department name for department d008, should be returned as "Procrastrination".
For the rest of the records, the query should return the data in Hive, as is.

5: Try out the UDF
*******************************
Testing the UDF
*******************************
1) Add jar
hive> add jar hiveProject/jars/NVL2GenericUDF.jar;
2) Create alias for the function
hive> CREATE TEMPORARY FUNCTION NVL2
AS 'khanolkar.HiveUDFs.NVL2GenericUDF';
3) Test the description provided
hive> DESCRIBE FUNCTION NVL2;
OK
NVL2(expr1,expr2,expr3) returns expr3, if expr1 is null, otherwise returns expr2;
4) Test if there are checks in place for number of parameters
hive> select deptNo,NVL2(deptName,deptName) from departments_UDFTest;
FAILED: SemanticException [Error 10015]: Line 1:14 Arguments length mismatch 'deptName': The operator 'NVL2' accepts 3 arguments.
hive> select deptNo,NVL2(deptName,deptName,123,1) from departments_UDFTest;
FAILED: SemanticException [Error 10015]: Line 1:14 Arguments length mismatch '1': The operator 'NVL2' accepts 3 arguments.
5) Results
hive> select deptNo,NVL2(deptName,deptName,'Procrastrination') from departments_UDFTest;
OK
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Procrastrination
d009 Customer Service
view raw 05-HiveTest hosted with ❤ by GitHub



3.0. Making the UDF permanently available when you launch the hive shell

There are several ways to make a custom UDF available when you launch the Hive shell, bypassing the need to execute the "add jar..." statement before using a custom UDF.  I have listed a couple of them.

Option 1:
From "Programming Hive"

Your function may also be added permanently to Hive, however this requires a small modification to a Hive Java file and then rebuilding Hive.
Inside the Hive source code, a one-line change is required to the FunctionRegistry class found atql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java. Then you rebuild Hive following the instructions that come with the source distribution.
While it is recommended that you redeploy the entire new build, only the hive-exec-*.jar, where \* is the version number, needs to be replaced.
Option 2:
Add it to the .hiverc file on each node from where hive queries will be run.


4.0. References

Apache documentation:
http://hive.apache.org/docs/r0.10.0/api/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.html
https://cwiki.apache.org/confluence/display/Hive/OperatorsAndFunctions

A good article on creating a UDF that involves non-primitive types - link

Programming Hive - from O'Reilly

That's it for this blog.  Do share any additional insights with me.
Cheers!


Monday, November 4, 2013

UDF's Part 1: Custom simple eval UDFs in Pig and Hive (NVL2)


1.0. What's in this blog?


A demonstration of creating a custom simple eval UDF to mimic NVL2 functionality from the DBMS world, in Pig and Hive.  It includes sample data, java code for creating the UDF, expected results, commands to execute and the output.

About NVL2:
NVL2 takes three parameters, we will refer to as expr1, expr2 and expr3.
NVL2 lets you determine the value returned by a query based on whether a specified expression is null or not null. If expr1 is not null, then NVL2 returns expr2. If expr1 is null, then NVL2 returns expr3.

2.0. NVL2 UDF in Hive


1: Create the test data file for a Hive external table
**************************
Input data
**************************
1. Execute locally on the node you are running Hive client from
-----------------------------------------------------------------
Create input file/data to use for the demo.
Since this gist is merely for instructional purpose, the dataset is small.
cd ~
mkdir hiveProject
cd hiveProject
vi Departments_UDFTest
Paste this..ensuring the fields are delimited by tabs and record with new line.
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008
d009 Customer Service
2. Hadoop commands
--------------------
hadoop fs -mkdir hiveProject
hadoop fs -put hiveProject/Departments_UDFTest hiveProject
view raw 01-InputData hosted with ❤ by GitHub

2: Create the Hive table
*********************************************
Setting up the Hive table
*********************************************
In hive shell....
a) Create table:
CREATE EXTERNAL TABLE IF NOT EXISTS departments_UDFTest
(
deptNo String,
deptName String
)
Row format delimited
fields terminated by '\t'
LOCATION '/user/akhanolk/hiveProject';
b) Quick test:
Select * from departments_UDFTest;
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 NULL
d009 Customer Service

3: Create the UDF in Java
//------------------------------------------------------------------
// Filename: NVL2.java
//------------------------------------------------------------------
package khanolkar.HiveUDFs;
import java.io.IOException;
import org.apache.hadoop.hive.ql.exec.UDF;
public class NVL2 extends UDF {
String expr1, expr2, expr3;
public NVL2() {
}
public String evaluate(String pExpr1, String pExpr2, String pExpr3)
throws IOException {
try {
expr1 = (String) pExpr1;
expr2 = (String) pExpr2;
expr3 = (String) pExpr3;
return (expr1 != null ? expr2 : expr3);
} catch (Exception e) {
// Cause task failure
throw new IOException("Error with Hive UDF, NVL2!", e);
}
}
}
view raw 03-UDFCode hosted with ❤ by GitHub

4: Expected results
****************************
Expected results
****************************
Query:
select deptNo,NVL2(deptName,deptName,'Procrastrination') from departments_UDFTest;
The null in the department name for department d008, should be returned as "Procrastrination".
For the rest of the records, the query should return the data in Hive, as is.

5: Test the UDF
*******************************
Testing the UDF
*******************************
hive> add jar hiveProject/jars/NVL2.jar;
hive> CREATE TEMPORARY FUNCTION NVL2
AS 'khanolkar.HiveUDFs.NVL2';
hive> select deptNo,NVL2(deptName,deptName) from departments_UDFTest;
FAILED: SemanticException [Error 10014]: Line 1:14 Wrong arguments 'deptName': No matching method for class khanolkar.HiveUDFs.NVL2 with (string, string). Possible choices: _FUNC_(string, string, string)
hive> select deptNo,NVL2(deptName,deptName,'Procrastrination') from departments_UDFTest;
OK
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Procrastrination
d009 Customer Service
view raw 05-HiveTest hosted with ❤ by GitHub

3.0. NVL2 UDF in Pig


We will reuse data from section 2.
1: Create the UDF in Java
package khanolkar.pigUDFs;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
// Custom UDF
// Name: NVL2
// Parameters: Tuple with three Strings
// Purpose: Facilitates handling nulls + replacing non-null values
// If the first parameter is null, returns the third parameter,
// otherwise returns the second parameter
// E.g. NVL2(null,"Busy bee","Sloth") = "Sloth"
// E.g. NVL2("Anagha","Busy bee","Sloth") = "Busy bee"
// Returns: Null if tuple is empty
// Null if the three input parameters are not in the tuple
// Otherwise, Result of applying NVL2 logic
public class NVL2 extends EvalFunc<String> {
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return null;
try {
if (input.size() == 3) {
String expr1 = (String) input.get(0);
String expr2 = (String) input.get(1);
String expr3 = (String) input.get(2);
return (expr1 != null ? expr2 : expr3);
} else {
return null;
}
} catch (Exception e) {
// Cause task failure
throw new IOException("Error with UDF, NVL2!", e);
}
}
}
view raw 01-NVL2.java hosted with ❤ by GitHub

2: Create the pig script
#--------------------------------------------------------------------------------------
# Pig Script
# NVL2UDFDemo.pig
#--------------------------------------------------------------------------------------
register NVL2.jar;
define NVL2 khanolkar.pigUDFs.NVL2;
rawDS = load 'departments' using PigStorage() as (deptNo:chararray, deptName:chararray);
transformedDS = foreach rawDS generate $0, NVL2($1,$1,'Procrastination');
dump transformedDS;

3: Test the UDF
[Modify path of the data file between local and HDFS locations in the pig script - better - make it parameterized]
#---------------------------
# Command to test
#---------------------------
On the cluster
$ pig pigProject/evalFunc/NVL2/NVL2UDFDemo.pig
Locally
$ pig -x local pigProject/evalFunc/NVL2/NVL2UDFDemo.pig
view raw 05-Execute hosted with ❤ by GitHub

4: Results
#---------------------------
# Output data
#---------------------------
(d001,Marketing)
(d002,Finance)
(d003,Human Resources)
(d004,Production)
(d005,Development)
(d006,Quality Management)
(d007,Sales)
(d008,Procrastination)
(d009,Customer Service)
view raw 06-Output hosted with ❤ by GitHub



Do share any additional insights/comments.
Cheers!

Follow me on Twitter:

Wednesday, October 30, 2013

Apache Oozie - Part 13: Oozie SSH Action


1.0. What's covered in the blog?

1. Documentation on the Oozie SSH action
2. Sample oozie workflow application that demonstrates the SSH action - SSH to a specific node, as a specified user, and executes a local shell script that loads a local file to HDFS.

It was tricky getting this action working - and the solution is not something covered in the Apache documentation.  Issues and resolution are documented below.  

Version:
Oozie 3.3.0

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another
Blog 13: Oozie workflow - SSH action


2.0. Documentation on the Oozie SSH Action


Apache documentation is available at - http://oozie.apache.org/docs/3.3.0/WorkflowFunctionalSpec.html#a3.2.5_Ssh_Action

Note: The functionality was going to be eventually removed but later decided that it would remain.
So, disregard any mention of deprecation.


3.0. Sample workflow application


3.0.1. Highlights:
Oozie server is running on node cdh-dev01 in my environment.
With the sample workflow application, I am going to submit an Oozie job while logged in as myself (akhanolk), on this machine (Oozie server - cdh-dev01) from the CLI.
The workflow executes a shell script on cdh-dn01 as user akhanolk.  The shell script loads a local file to HDFS.  If the file load completes successfully, the workflow sends an email to me.

3.0.2. Pictorial overview:













3.0.3. SSH setup:
1. Passphrase-less SSH for akhanolk from cdh-dev01 (Oozie server) to cdh-dn01 (remote node) and vice versa
2.  Passphrase-less SSH for oozie user ID (oozie in my case) on cdh-dev01 to cdh-dn01 as akhanolk
[Running ps -ef | grep oozie on Oozie server will give you the configured Oozie user ID]

3.0.4. Workflow application components:
workflow definition (workflow.xml - in HDFS)
job properties file (job.properties from node submitting job)
Shell script (uploadFile.sh) on remote node (cdh-dn01; At /home/akhanolk/scripts)
Data file (employees_data) on remote node (cdh-dn01; At /home/akhanolk/data)

3.0.5. Desired result:
Upon execution of the workflow, the employees_data on cdh-dn01 should get moved to a specified directory in HDFS

3.0.6. Subsequent sections cover-

  1. Data and script download
  2. Oozie job properties file        
  3. Oozie workflow  file
  4. Shell script - uploadFile.sh
  5. Data load commands                
  6. Oozie SMTP configuration
  7. SSH setup          
  8. Oozie commands                    
  9. Output in HDFS
  10. Output email                      
  11. Oozie web console - screenshots
  12. Issues encountered and resolution



3.0.7. Data and script download:
************************************
*Data and code/application download
************************************
Data and code:
--------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure of application download
--------------------------------------------
oozieProject
workflowSshAction
job.properties
workflow.xml
scripts
uploadFile.sh
data
employees_data



3.0.8. Oozie job.properties file:
#*************************************************
# job.properties
#*************************************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowSshAction
oozie.wf.application.path=${appPath}
inputDir=${oozieProjectRoot}/data
focusNodeLogin=akhanolk@cdh-dn01
shellScriptPath=~/scripts/uploadFile.sh
emailToAddress=akhanolk@cdh-dev01



3.0.9. Oozie workflow.xml:
<!--******************************************-->
<!--workflow.xml -->
<!--******************************************-->
<workflow-app name="WorkFlowForSshAction" xmlns="uri:oozie:workflow:0.1">
<start to="sshAction"/>
<action name="sshAction">
<ssh xmlns="uri:oozie:ssh-action:0.1">
<host>${focusNodeLogin}</host>
<command>${shellScriptPath}</command>
<capture-output/>
</ssh>
<ok to="sendEmail"/>
<error to="killAction"/>
</action>
<action name="sendEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Output of workflow ${wf:id()}</subject>
<body>Status of the file move: ${wf:actionData('sshAction')['STATUS']}</body>
</email>
<ok to="end"/>
<error to="end"/>
</action>
<kill name="killAction">
<message>"Killed job due to error"</message>
</kill>
<end name="end"/>
</workflow-app>
view raw 04-WorkflowXML hosted with ❤ by GitHub



3.0.10. Shell script (fileUpload.sh):
#################################
# Name: uploadFile.sh
# Location: remote node where we
# want to run an
# operation
#################################
#!/bin/bash
hadoop fs -rm -R oozieProject/results-sshAction/*
hadoop fs -put ~/data/* oozieProject/results-sshAction/
status=$?
if [ $status = 0 ]; then
echo "STATUS=SUCCESS"
else
echo "STATUS=FAIL"
fi
view raw 05-ShellScript hosted with ❤ by GitHub



3.0.11. HDFS load commands:
*****************************************
Location of files/scripts & commands
*****************************************
I have pasted information specific to my environment; Modify as required.
1) Node (cdh-dev01) where the Oozie CLI will be used to submit/run Oozie workflow:
Structure/Path:
~/oozieProject/workflowSshAction/job.properties
2) HDFS:
Workflow directory structure:
/user/akhanolk/oozieProject/workflowSshAction/workflow.xml
Commands to load:
hadoop fs -mkdir oozieProject
hadoop fs -mkdir oozieProject/workflowSshAction
hadoop fs -put ~/oozieProject/workflowSshAction/workflow.xml oozieProject/workflowSshAction
Output directory structure:
/user/akhanolk/oozieProject/results-sshAction
Command:
hadoop fs -mkdir oozieProject/results-sshAction
3) Remote node (cdh-dn01) where we want to run a shell script:
Directory structure/Path:
~/scripts/uploadFile.sh
~/data/employee_data



3.0.12. Oozie SMTP configuration:
Oozie SMTP configuration
------------------------
Add the following to the oozie-site.xml, and restart oozie.
Replace values with the same specific to your environment.
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property>



3.0.13. Oozie SSH setup:
************************
SSH setup
************************
Issues:
Review my section on issues encountered to see all the issues and fixes I had to make
to get the workflow application to work.
------------------------------------------------------------------------------------------------------
Oozie documentation:
To run SSH Testcases and for easier Hadoop start/stop configure SSH to localhost to be passphrase-less.
Create your SSH keys without a passphrase and add the public key to the authorized file:
$ ssh-keygen -t dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys2
Test that you can ssh without password:
$ ssh localhost
------------------------------------------------------------------------------------------------------
SSH tutorial:
Setup ssh - https://www.digitalocean.com/community/articles/how-to-set-up-ssh-keys--2
view raw 08b-SSHSetup hosted with ❤ by GitHub



3.0.14. Oozie commands:
Oozie commands
---------------
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowSshAction/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowSshAction/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.



3.0.15. Output in HDFS:
************************
Output
************************
[akhanolk@cdh-dev01 ~]$ hadoop fs -ls oozieProject/res*
Found 1 items
-rw-r--r-- 3 akhanolk akhanolk 13821993 2013-10-30 20:59 oozieProject/results-sshAction/employees_data
view raw 10-Output hosted with ❤ by GitHub



3.0.16. Output email:
********************
Output email
********************
From akhanolk@cdh-dev01.localdomain Wed Oct 30 22:59:16 2013
Return-Path: <akhanolk@cdh-dev01.localdomain>
X-Original-To: akhanolk@cdh-dev01
Delivered-To: akhanolk@cdh-dev01.localdomain
From: akhanolk@cdh-dev01.localdomain
To: akhanolk@cdh-dev01.localdomain
Subject: Output of workflow 0000003-131029234028597-oozie-oozi-W
Content-Type: text/plain; charset=us-ascii
Date: Wed, 30 Oct 2013 22:59:16 -0500 (CDT)
Status: R
Status of the file move: SUCCESS
view raw 11-OutputEmail hosted with ❤ by GitHub



3.0.17. Issues encountered:
*************************
Issues encountered
*************************
Permissions denied error:
-------------------------
....
2013-10-29 16:13:25,949 WARN org.apache.oozie.command.wf.ActionStartXCommand:
USER[akhanolk] GROUP[-] TOKEN[] APP[WorkFlowForSshAction] JOB[0000002-
131029144918199-oozie-oozi-W] ACTION[0000002-131029144918199-oozie-oozi-
W@sshAction] Error starting action [sshAction]. ErrorType [NON_TRANSIENT],
ErrorCode [AUTH_FAILED], Message [AUTH_FAILED: Not able to perform operation
[ssh -o PasswordAuthentication=no -o KbdInteractiveDevices=no -o
StrictHostKeyChecking=no -o ConnectTimeout=20 akhanolk@cdh-dn01
mkdir -p oozie-oozi/0000002-131029144918199-oozie-oozi-W/sshAction--ssh/ ]
| ErrorStream: Permission denied (publickey,gssapi-keyex,gssapi-with-mic,password).
Steps taken to resolve:
-----------------------
a)
Tried running the command in square brackets, above, manually from cdh-dev01 (Oozie server),
when logged in as akhanolk. It worked! But the worklow in Oozie didnt;
b)
Tried running as Oozie-
sudo -u oozie ssh -o PasswordAuthentication=no -o KbdInteractiveDevices=no -o
StrictHostKeyChecking=no -o ConnectTimeout=20 akhanolk@cdh-dn01 mkdir
-p oozie-oozi/0000001-1310081859355-oozie-oozi-W/action1--ssh/
Got the error
Permission denied (publickey,gssapi-keyex,gssapi-with-mic,password).
c)
Googled - and chanced upon this-
http://stackoverflow.com/questions/19272430/oozie-ssh-action
So, performed the necessary actions detailed below to allow oozie to ssh to cdh-dn01 as akhanolk
On cdh-dev01 (my Oozie server), located the oozie home directory and ran ssh keygen
Appended the public key to authorized_keys file home/akhanolk/.ssh/authorized_keys on cdh-dev01
Appended the same public key to authorized_keys file in cdh-dn01 (remote node) at
home/akhanolk/.ssh/authorized_keys
Issue resolved!!



3.0.18. Oozie web console - screenshots:



































Any additional insights are greatly appreciated.
Cheers!

New Impala e-Book from O’Reilly Media - Free

Folks,
Check this out...
http://blog.cloudera.com/blog/2013/10/download-the-new-impala-e-book-from-oreilly-media/

Download location:
http://www.cloudera.com/content/cloudera/en/resources/library/aboutcloudera/cloudera-impala-ebook.html

Thanks to Manish Verma, for emailing me the link.

Cheers!


Tuesday, September 24, 2013

Reduce-side joins in Java map-reduce

1.0. About reduce side joins

Joins of datasets done in the reduce phase are called reduce side joins.  Reduce side joins are easier to implement as they are less stringent than map-side joins that require the data to be sorted and partitioned the same way.  They are less efficient than maps-side joins because  the datasets have to go through the sort and shuffle phase.

What's involved..
1.  The key of the map output, of datasets being joined, has to be the join key - so they reach the same reducer
2.  Each dataset has to be tagged with its identity, in the mapper- to help differentiate between the datasets in the reducer, so they can be processed accordingly.
3.  In each reducer, the data values from both datasets, for keys assigned to the reducer, are available, to be processed as required.
4.  A secondary sort needs to be done to ensure the ordering of the values sent to the reducer
5.  If the input files are of different formats, we would need separate mappers, and we would need to use MultipleInputs class in the driver to add the inputs and associate the specific mapper to the same.
[MultipleInputs.addInputPath( job, (input path n), (inputformat class), (mapper class n));]

Note:  The join between the datasets (employee, current salary - cardinality of 1..1) in the sample program below has been demonstrated in my blog on map side joins of large datasets, as well.  I have used the same datasets here...as the purpose of this blog is to demonstrate the concept.  Whenever possible, reduce-side joins should be avoided.

[Update - 10/15/2013]
I have added a pig equivalent in the final section.

2.0. Sample datasets used in this gist

The datasets used are employees and salaries.  For salary data, there are two files - one file with  current salary (1..1), and one with historical salary data (1..many). Then there is the department data, a small reference dataset, that we will add to distributed cache and look up in the reducer.


3.0. Implementation a reduce-side join 

The sample code is common for a 1..1 as well as 1..many join for the sample datasets.
The mapper is common for both datasets, as the format is the same.

3.0.1. Components/steps/tasks:

1.  Map output key
The key will be the empNo as it is the join key for the datasets employee and salary
[Implementation: in the mapper]

2.  Tagging the data with the dataset identity
Add an attribute called srcIndex to tag the identity of the data (1=employee, 2=salary, 3=salary history)
[Implementation: in the mapper]

3.  Discarding unwanted atributes
[Implementation: in the mapper]

4. Composite key
Make the map output key a composite of empNo and srcIndex
[Implementation: create custom writable]

5.  Partitioner
Partition the data on natural key of empNo
[Implementation: create custom partitioner class]

5.  Sorting
Sort the data on empNo first, and then source index
[Implementation: create custom sorting comparator class]

6.  Grouping
Group the data based on natural key
[Implementation: create custom grouping comparator class]

7. Joining
Iterate through the values for a key and complete the join for employee and salary data, perform lookup of department to include department name in the output
[Implementation: in the reducer]

3.0.2a. Data pipeline for cardinality of 1..1 between employee and salary data:








































3.0.2b. Data pipeline for cardinality of 1..many between employee and salary data:

























3.0.3. The Composite key

The composite key is a combination of the joinKey empNo, and the source Index (1=employee file.., 2=salary file...)
//********************************************************************************
//Class: CompositeKeyWritableRSJ
//Purpose: Custom Writable that serves as composite key
// with attributes joinKey and sourceIndex
//Author: Anagha Khanolkar
//*********************************************************************************
package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
public class CompositeKeyWritableRSJ implements Writable,
WritableComparable<CompositeKeyWritableRSJ> {
// Data members
private String joinKey;// EmployeeID
private int sourceIndex;// 1=Employee data; 2=Salary (current) data; 3=Salary historical data
public CompositeKeyWritableRSJ() {
}
public CompositeKeyWritableRSJ(String joinKey, int sourceIndex) {
this.joinKey = joinKey;
this.sourceIndex = sourceIndex;
}
@Override
public String toString() {
return (new StringBuilder().append(joinKey).append("\t")
.append(sourceIndex)).toString();
}
public void readFields(DataInput dataInput) throws IOException {
joinKey = WritableUtils.readString(dataInput);
sourceIndex = WritableUtils.readVInt(dataInput);
}
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeString(dataOutput, joinKey);
WritableUtils.writeVInt(dataOutput, sourceIndex);
}
public int compareTo(CompositeKeyWritableRSJ objKeyPair) {
int result = joinKey.compareTo(objKeyPair.joinKey);
if (0 == result) {
result = Double.compare(sourceIndex, objKeyPair.sourceIndex);
}
return result;
}
public String getjoinKey() {
return joinKey;
}
public void setjoinKey(String joinKey) {
this.joinKey = joinKey;
}
public int getsourceIndex() {
return sourceIndex;
}
public void setsourceIndex(int sourceIndex) {
this.sourceIndex = sourceIndex;
}
}
view raw 05CompositeKey hosted with ❤ by GitHub


3.0.4. The mapper

In the setup method of the mapper-
1. Get the filename from the input split, cross reference it against the configuration (set in driver), to derive the source index.  [Driver code: Add configuration [key=filename of employee,value=1], [key=filename of current salary dataset,value=2], [key=filename of historical salary dataset,value=3]
2. Build a list of attributes we cant to emit as map output for each data entity

The setup method is called only once, at the beginning of a map task.  So it is the logical place to to identify the source index.

In the map method of the mapper:
3. Build the map output based on attributes required, as specified in the list from #2

Note:  For salary data, we are including the "effective till" date, even though it is not required in the final output because this is common code for a 1..1 as well as 1..many join to salary data.  If the salary data is historical, we want the current salary only, that is "effective till date= 9999-01-01".

//********************************************************************************
//Class: MapperRSJ
//Purpose: Mapper
//Author: Anagha Khanolkar
//*********************************************************************************
package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MapperRSJ extends
Mapper<LongWritable, Text, CompositeKeyWritableRSJ, Text> {
CompositeKeyWritableRSJ ckwKey = new CompositeKeyWritableRSJ();
Text txtValue = new Text("");
int intSrcIndex = 0;
StringBuilder strMapValueBuilder = new StringBuilder("");
List<Integer> lstRequiredAttribList = new ArrayList<Integer>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// {{
// Get the source index; (employee = 1, salary = 2)
// Added as configuration in driver
FileSplit fsFileSplit = (FileSplit) context.getInputSplit();
intSrcIndex = Integer.parseInt(context.getConfiguration().get(
fsFileSplit.getPath().getName()));
// }}
// {{
// Initialize the list of fields to emit as output based on
// intSrcIndex (1=employee, 2=current salary, 3=historical salary)
if (intSrcIndex == 1) // employee
{
lstRequiredAttribList.add(2); // FName
lstRequiredAttribList.add(3); // LName
lstRequiredAttribList.add(4); // Gender
lstRequiredAttribList.add(6); // DeptNo
} else // salary
{
lstRequiredAttribList.add(1); // Salary
lstRequiredAttribList.add(3); // Effective-to-date (Value of
// 9999-01-01 indicates current
// salary)
}
// }}
}
private String buildMapValue(String arrEntityAttributesList[]) {
// This method returns csv list of values to emit based on data entity
strMapValueBuilder.setLength(0);// Initialize
// Build list of attributes to output based on source - employee/salary
for (int i = 1; i < arrEntityAttributesList.length; i++) {
// If the field is in the list of required output
// append to stringbuilder
if (lstRequiredAttribList.contains(i)) {
strMapValueBuilder.append(arrEntityAttributesList[i]).append(
",");
}
}
if (strMapValueBuilder.length() > 0) {
// Drop last comma
strMapValueBuilder.setLength(strMapValueBuilder.length() - 1);
}
return strMapValueBuilder.toString();
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEntityAttributes[] = value.toString().split(",");
ckwKey.setjoinKey(arrEntityAttributes[0].toString());
ckwKey.setsourceIndex(intSrcIndex);
txtValue.set(buildMapValue(arrEntityAttributes));
context.write(ckwKey, txtValue);
}
}
}
view raw 06-Mapper hosted with ❤ by GitHub

3.0.5. The partitioner

Even though the map output key is composite, we want to partition by the natural join key of empNo, therefore a custom partitioner is in order.
//********************************************************************************
//Class: PartitionerRSJ
//Purpose: Custom partitioner
//Author: Anagha Khanolkar
//*********************************************************************************
package khanolkar.mapreduce.join.samples.reducesidejoin;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionerRSJ extends Partitioner<CompositeKeyWritableRSJ, Text> {
@Override
public int getPartition(CompositeKeyWritableRSJ key, Text value,
int numReduceTasks) {
// Partitions on joinKey (EmployeeID)
return (key.getjoinKey().hashCode() % numReduceTasks);
}
}
view raw 07-Partitioner hosted with ❤ by GitHub


3.0.6. The sort comparator

To ensure that the input to the reducer is sorted on empNo, then on sourceIndex, we need a sort comparator.  This will guarantee that the employee data is the first set in the values list for a key, then the salary data.
package khanolkar.mapreduce.join.samples.reducesidejoin;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//********************************************************************************
//Class: SortingComparatorRSJ
//Purpose: Sorting comparator
//Author: Anagha Khanolkar
//*********************************************************************************
public class SortingComparatorRSJ extends WritableComparator {
protected SortingComparatorRSJ() {
super(CompositeKeyWritableRSJ.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
// Sort on all attributes of composite key
CompositeKeyWritableRSJ key1 = (CompositeKeyWritableRSJ) w1;
CompositeKeyWritableRSJ key2 = (CompositeKeyWritableRSJ) w2;
int cmpResult = key1.getjoinKey().compareTo(key2.getjoinKey());
if (cmpResult == 0)// same joinKey
{
return Double.compare(key1.getsourceIndex(), key2.getsourceIndex());
}
return cmpResult;
}
}


3.0.7. The grouping comparator

This class is needed to indicate the group by attribute - the natural join key of empNo
package khanolkar.mapreduce.join.samples.reducesidejoin;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//********************************************************************************
//Class: GroupingComparatorRSJ
//Purpose: For use as grouping comparator
//Author: Anagha Khanolkar
//*********************************************************************************
public class GroupingComparatorRSJ extends WritableComparator {
protected GroupingComparatorRSJ() {
super(CompositeKeyWritableRSJ.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
// The grouping comparator is the joinKey (Employee ID)
CompositeKeyWritableRSJ key1 = (CompositeKeyWritableRSJ) w1;
CompositeKeyWritableRSJ key2 = (CompositeKeyWritableRSJ) w2;
return key1.getjoinKey().compareTo(key2.getjoinKey());
}
}


3.0.8. The reducer

In the setup method of the reducer (called only once for the task)-
We are checking if the side data, a map file with department data is in the distributed cache and if found, initializing the map file reader

In the reduce method, -
While iterating through the value list -
1. If the data is employee data (sourceIndex=1), we are looking up the department name in the map file with the deptNo, which is the last attribute in the employee data, and appending the department name to the employee data.
2. If the data is historical salary data, we are only emitting salary where the last attribute is '9999-01-01'.

Key point-
We have set the sort comparator to sort on empNo and sourceIndex.
The sourceIndex of employee data is lesser than salary data - as set in the driver.
Therefore, we are assured that the employee data is always first followed by salary data.
So for each distinct empNo, we are iterating through the values, and appending the same and emitting as output.

package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//********************************************************************************
//Class: ReducerRSJ
//Purpose: Reducer
//Author: Anagha Khanolkar
//*********************************************************************************
public class ReducerRSJ extends
Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text> {
StringBuilder reduceValueBuilder = new StringBuilder("");
NullWritable nullWritableKey = NullWritable.get();
Text reduceOutputValue = new Text("");
String strSeparator = ",";
private MapFile.Reader deptMapReader = null;
Text txtMapFileLookupKey = new Text("");
Text txtMapFileLookupValue = new Text("");
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// {{
// Get side data from the distributed cache
Path[] cacheFilesLocal = DistributedCache.getLocalCacheArchives(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim()
.equals("departments_map.tar.gz")) {
URI uriUncompressedFile = new File(eachPath.toString()
+ "/departments_map").toURI();
initializeDepartmentsMap(uriUncompressedFile, context);
}
}
// }}
}
@SuppressWarnings("deprecation")
private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
throws IOException {
// {{
// Initialize the reader of the map file (side data)
FileSystem dfs = FileSystem.get(context.getConfiguration());
try {
deptMapReader = new MapFile.Reader(dfs,
uriUncompressedFile.toString(), context.getConfiguration());
} catch (Exception e) {
e.printStackTrace();
}
// }}
}
private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
StringBuilder reduceValueBuilder, Text value) {
if (key.getsourceIndex() == 1) {
// Employee data
// {{
// Get the department name from the MapFile in distributedCache
// Insert the joinKey (empNo) to beginning of the stringBuilder
reduceValueBuilder.append(key.getjoinKey()).append(strSeparator);
String arrEmpAttributes[] = value.toString().split(",");
txtMapFileLookupKey.set(arrEmpAttributes[3].toString());
try {
deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
} catch (Exception e) {
txtMapFileLookupValue.set("");
} finally {
txtMapFileLookupValue
.set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
.equals("")) ? "NOT-FOUND"
: txtMapFileLookupValue.toString());
}
// }}
// {{
// Append the department name to the map values to form a complete
// CSV of employee attributes
reduceValueBuilder.append(value.toString()).append(strSeparator)
.append(txtMapFileLookupValue.toString())
.append(strSeparator);
// }}
} else if (key.getsourceIndex() == 2) {
// Current recent salary data (1..1 on join key)
// Salary data; Just append the salary, drop the effective-to-date
String arrSalAttributes[] = value.toString().split(",");
reduceValueBuilder.append(arrSalAttributes[0].toString()).append(
strSeparator);
} else // key.getsourceIndex() == 3; Historical salary data
{
// {{
// Get the salary data but extract only current salary
// (to_date='9999-01-01')
String arrSalAttributes[] = value.toString().split(",");
if (arrSalAttributes[1].toString().equals("9999-01-01")) {
// Salary data; Just append
reduceValueBuilder.append(arrSalAttributes[0].toString())
.append(strSeparator);
}
// }}
}
// {{
// Reset
txtMapFileLookupKey.set("");
txtMapFileLookupValue.set("");
// }}
return reduceValueBuilder;
}
@Override
public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
// Iterate through values; First set is csv of employee data
// second set is salary data; The data is already ordered
// by virtue of secondary sort; Append each value;
for (Text value : values) {
buildOutputValue(key, reduceValueBuilder, value);
}
// Drop last comma, set value, and emit output
if (reduceValueBuilder.length() > 1) {
reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
// Emit output
reduceOutputValue.set(reduceValueBuilder.toString());
context.write(nullWritableKey, reduceOutputValue);
} else {
System.out.println("Key=" + key.getjoinKey() + "src="
+ key.getsourceIndex());
}
// Reset variables
reduceValueBuilder.setLength(0);
reduceOutputValue.set("");
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
deptMapReader.close();
}
}
view raw 10-Reducer hosted with ❤ by GitHub


3.0.9. The driver

Besides the usual driver code, we are-
1. Adding side data (department lookup data in map file format - in HDFS) to the distributed cache
2. Adding key-value pairs to the configuration, each key value pair being filename, source index.
This is used by the mapper, to tag data with sourceIndex.
3. And lastly, we are associating all the various classes we created to the job.
package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//********************************************************************************
//Class: DriverRSJ
//Purpose: Driver for Reduce Side Join of two datasets
// with a 1..1 or 1..many cardinality on join key
//Author: Anagha Khanolkar
//*********************************************************************************
public class DriverRSJ extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// {{
// Exit job if required arguments have not been provided
if (args.length != 3) {
System.out
.printf("Three parameters are required for DriverRSJ- <input dir1> <input dir2> <output dir>\n");
return -1;
}
// }{
// {{
// Job instantiation
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJarByClass(DriverRSJ.class);
job.setJobName("ReduceSideJoin");
// }}
// {{
// Add side data to distributed cache
DistributedCache
.addCacheArchive(
new URI(
"/user/akhanolk/joinProject/data/departments_map.tar.gz"),
conf);
// }}
// {
// Set sourceIndex for input files;
// sourceIndex is an attribute of the compositeKey,
// to drive order, and reference source
// Can be done dynamically; Hard-coded file names for simplicity
conf.setInt("part-e", 1);// Set Employee file to 1
conf.setInt("part-sc", 2);// Set Current salary file to 2
conf.setInt("part-sh", 3);// Set Historical salary file to 3
// }
// {
// Build csv list of input files
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(args[0].toString()).append(",")
.append(args[1].toString());
// }
// {{
// Configure remaining aspects of the job
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setMapperClass(MapperRSJ.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(PartitionerRSJ.class);
job.setSortComparatorClass(SortingComparatorRSJ.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(4);
job.setReducerClass(ReducerRSJ.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// }}
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new DriverRSJ(),
args);
System.exit(exitCode);
}
}
view raw 11-Driver hosted with ❤ by GitHub



4.0. The pig equivalent



Pig script-version 1:
/*************************************
Joining datasets in Pig
Employee..Salary = 1..many
Displaying most recent salary
Without using any join optimizations
**************************************/
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray);
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo;
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_history/part-sh' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray);
filteredSalDS = filter rawSalDS by toDate == '9999-01-01';
salDS = foreach filteredSalDS generate empNo, salary;
joinedDS = join empDS by empNo, salDS by empNo;
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary;
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ';
view raw 12-PigScript hosted with ❤ by GitHub



Pig script-version 2 - eliminating the reduce-side join:
In this script, we are filtering on most recent salary, and then using the merge join optimization (map-side) in Pig, that can be leveraged on sorted input to the join.
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray);
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo;
sortedEmpDS = ORDER empDS by empNo;
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_history/part-sh' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray);
filteredSalDS = filter rawSalDS by toDate == '9999-01-01';
salDS = foreach filteredSalDS generate empNo, salary;
sortedSalDS = ORDER salDS by empNo;
joinedDS = join sortedEmpDS by empNo, sortedSalDS by empNo using 'merge';
finalDS = foreach joinedDS generate sortedEmpDS::empNo,sortedEmpDS::fName,sortedEmpDS::lName,sortedEmpDS::gender,sortedEmpDS::deptNo,sortedSalDS::salary;
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ';
view raw 13-PigOptimized hosted with ❤ by GitHub


Output:
**********************
Output of pig script
**********************
$ hadoop fs -cat joinProject/output/pig-RSJ/part* | less
10001 Facello Georgi M d005 88958
10002 Simmel Bezalel F d007 72527
10003 Bamford Parto M d004 43311
10004 Koblick Chirstian M d004 74057
.........
view raw 14-PigOutput hosted with ❤ by GitHub

Sunday, September 22, 2013

Map-side join of large datasets using CompositeInputFormat

This post covers, map-side join of large datasets using CompositeInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Hive and Pig rock and rule at joining datasets, but it helps to know how to perform joins in java.

Update [10/15/2013]
I have added the pig equivalent at the very bottom of the gist.

Feel free to share any insights or constructive criticism. Cheers!!

Related blogs:
1. Map-side join sample in Java using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2
3. Map-side join sample in Java of two large datasets, leveraging CompositeInputFormat

Sample program:
**********************
**Gist
**********************
This gist details how to inner join two large datasets on the map-side, leveraging the join capability
in mapreduce. Such a join makes sense if both input datasets are too large to qualify for distribution
through distributedcache, and can be implemented if both input datasets can be joined by the join key
and both input datasets are sorted in the same order, by the join key.
There are two critical pieces to engaging the join behavior:
- the input format must be set to CompositeInputFormat.class, and
- the key mapred.join.expr must have a value that is a valid join specification.
Sample program:
Covers inner join of employee and salary data with employee ID as join key in a map-only program
Inner join:
The inner join is a traditional database-style inner join. The map method will be called with a key/value
set only if every dataset in the join contains the key. The TupleWritable value will contain a value for
every dataset in the join, join key excluded.
Key code in the sample program:
conf.setInputFormat(CompositeInputFormat.class);
String strJoinStmt = CompositeInputFormat.compose("inner",
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData);
conf.set("mapred.join.expr", strJoinStmt);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, dirOutput);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
Old API:
I ended up using the old API as the new API does not include CompositeInputFormat in the version of
Hadoop I am running.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
joinProject
data
employees_sorted
part-e
salaries_sorted
part-s
MapSideJoinLargeDatasets
src
KeyValueLongInputFormat.java
KeyValueLongLineRecordReader.java
MapperMapSideJoinLargeDatasets.java
DriverMapSideJoinLargeDatasets.java
jar
MapSideJoinLgDsOAPI.jar
view raw 02-DataAndCode hosted with ❤ by GitHub
********************************
Data Structure Review
********************************
Datasets:
The two datasets are employee and salary datasets.
Join key:
The join key is EmpNo/employee number
Location of join key:
The join key is the first field in both datasets
Sorting:
The data is sorted by the join key "EmpNo" in ascending order.
Sorting is crucial for accuracy of joins
File format:
The files are in text format, with comma as a separator
Cardinality:
Is 1..1 on join key; Both datasets have the same number of records
Employee data [joinProject/data/employees_sorted/part-e]
--------------------------------------------------------
[EmpNo,DOB,FName,LName,Gender,HireDate,DeptNo]
10001,1953-09-02,Georgi,Facello,M,1986-06-26,d005
10002,1964-06-02,Bezalel,Simmel,F,1985-11-21,d007
10003,1959-12-03,Parto,Bamford,M,1986-08-28,d004
10004,1954-05-01,Chirstian,Koblick,M,1986-12-01,d004
10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12,d003
10006,1953-04-20,Anneke,Preusig,F,1989-06-02,d005
.....
Salary data [joinProject/data/salaries_sorted/part-s]
------------------------------------------------------
[EmpNo,Salary,FromDate,ToDate]
10001,88958,2002-06-22,9999-01-01
10002,72527,2001-08-02,9999-01-01
10003,43311,2001-12-01,9999-01-01
10004,74057,2001-11-27,9999-01-01
10005,94692,2001-09-09,9999-01-01
..........
************************************
Expected Results - tab separated
************************************
[EmpNo FName LName Salary]
10001 Georgi Facello 88958
10002 Bezalel Simmel 72527
10003 Parto Bamford 43311
10004 Chirstian Koblick 74057
10005 Kyoichi Maliniak 94692
10006 Anneke Preusig 59755
10009 Sumant Peac 94409
10010 Duangkaew Piveteau 80324
........
******************************
Observations
******************************
Setting the inputformat to KeyValueTextInputFormat resulted in only
part of the data getting joined. I attributed this to the fact that
the EmpNo is numeric and the sort was not working right with the attribute
set as Text. Found that others had encountered the same issue..and one
individualhad created a custom format - KeyValueLongInputFormat and associated record
reader. This gist uses the same code, with minor modifications.
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data
view raw 04-Observations hosted with ❤ by GitHub
/**********************************
*KeyValueLongLineRecordReader.java
*Custom record reader
**********************************/
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class KeyValueLongLineRecordReader implements
RecordReader<LongWritable, Text> {
private final LineRecordReader lineRecordReader;
private byte separator = (byte) ',';
private LongWritable dummyKey;
private Text innerValue;
public Class getKeyClass() {
return LongWritable.class;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
public KeyValueLongLineRecordReader(Configuration job, FileSplit split)
throws IOException {
lineRecordReader = new LineRecordReader(job, split);
dummyKey = lineRecordReader.createKey();
innerValue = lineRecordReader.createValue();
String sepStr = job.get("key.value.separator.in.input.line", ",");
this.separator = (byte) sepStr.charAt(0);
}
public static int findSeparator(byte[] utf, int start, int length, byte sep) {
for (int i = start; i < (start + length); i++) {
if (utf[i] == sep) {
return i;
}
}
return -1;
}
/** Read key/value pair in a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
LongWritable tKey = key;
Text tValue = value;
byte[] line = null;
int lineLen = -1;
if (lineRecordReader.next(dummyKey, innerValue)) {
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if (line == null)
return false;
int pos = findSeparator(line, 0, lineLen, this.separator);
if (pos == -1) {
tKey.set(Long.valueOf(new String(line, 0, lineLen)));
tValue.set("");
} else {
int keyLen = pos;
byte[] keyBytes = new byte[keyLen];
System.arraycopy(line, 0, keyBytes, 0, keyLen);
int valLen = lineLen - keyLen - 1;
byte[] valBytes = new byte[valLen];
System.arraycopy(line, pos + 1, valBytes, 0, valLen);
tKey.set(Long.valueOf(new String(keyBytes)));
tValue.set(valBytes);
}
return true;
}
public float getProgress() {
return lineRecordReader.getProgress();
}
public synchronized long getPos() throws IOException {
return lineRecordReader.getPos();
}
public synchronized void close() throws IOException {
lineRecordReader.close();
}
}
/**********************************
*KeyValueLongInputFormat.java
*Custom key value format
**********************************/
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class KeyValueLongInputFormat extends
FileInputFormat<LongWritable, Text> implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
@Override
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}
protected boolean isSplitable(FileSystem fs, Path file) {
return compressionCodecs.getCodec(file) == null;
}
@Override
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new KeyValueLongLineRecordReader(job, (FileSplit) genericSplit);
}
}
/**********************************
*MapperMapSideJoinLargeDatasets.java
*Mapper
**********************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.join.TupleWritable;
public class MapperMapSideJoinLargeDatasets extends MapReduceBase implements
Mapper<LongWritable, TupleWritable, Text, Text> {
Text txtKey = new Text("");
Text txtValue = new Text("");
@Override
public void map(LongWritable key, TupleWritable value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
if (value.toString().length() > 0) {
txtKey.set(key.toString());
String arrEmpAttributes[] = value.get(0).toString().split(",");
String arrDeptAttributes[] = value.get(1).toString().split(",");
txtValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrDeptAttributes[0].toString());
output.collect(txtKey, txtValue);
}
}
}
view raw 07-Mapper hosted with ❤ by GitHub
/**********************************
*DriverMapSideJoinLargeDatasets
*Driver
**********************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.join.CompositeInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class DriverMapSideJoinLargeDatasets {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf("DriverMapSideJoinLargeDatasets");
conf.setJarByClass(DriverMapSideJoinLargeDatasets.class);
String[] jobArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Path dirEmployeesData = new Path(jobArgs[0]);
Path dirSalaryData = new Path(jobArgs[1]);
Path dirOutput = new Path(jobArgs[2]);
conf.setMapperClass(MapperMapSideJoinLargeDatasets.class);
conf.setInputFormat(CompositeInputFormat.class);
String strJoinStmt = CompositeInputFormat.compose("inner",
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData);
conf.set("mapred.join.expr", strJoinStmt);
conf.setNumReduceTasks(0);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, dirOutput);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
System.exit(job.isSuccessful() ? 0 : 2);
}
}
view raw 08-Driver hosted with ❤ by GitHub
**************************
HDFS data load commands
**************************
hadoop fs -mkdir joinProject
hadoop fs -put joinProject/* joinProject/
view raw 09-LoadCommands hosted with ❤ by GitHub
**************************
Command to run program
**************************
hadoop jar ~/Blog/joinProject/MapSideJoinLargeDatasets/jar/MapSideJoinLgDsOAPI.jar DriverMapSideJoinLargeDatasets /user/akhanolk/joinProject/data/employees_sorted/part-e /user/akhanolk/joinProject/data/salaries_sorted/part-s /user/akhanolk/joinProject/output/output-MapSideJoinLargeDatasets
view raw 10-RunCommands hosted with ❤ by GitHub
**************************
Results
**************************
...
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/09/22 13:11:17 INFO mapred.JobClient: Map-Reduce Framework
13/09/22 13:11:17 INFO mapred.JobClient: Map input records=224683000
13/09/22 13:11:17 INFO mapred.JobClient: Map output records=224683000
...
$ hadoop fs -cat joinProject/output/output-MapSideJoinLargeDatasets/part* | less
10001 Georgi Facello 88958
10002 Bezalel Simmel 72527
10003 Parto Bamford 43311
10004 Chirstian Koblick 74057
10005 Kyoichi Maliniak 94692
10006 Anneke Preusig 59755
10009 Sumant Peac 94409
10010 Duangkaew Piveteau 80324
.....
view raw 11-Results hosted with ❤ by GitHub
**************************
References
**************************
Concepts:
Pro Hadoop
Hadoop the Definitive Guide
Data-Intensive Text Processing with MapReduce
Code:
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data
Data:
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html
view raw 12-References hosted with ❤ by GitHub
*******************************
Pig - map-side join
of datasets with cardinality
of 1..1
Using 'replicated'
or
'merge', if sorted
*********************************
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray);
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo;
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_active/part-sc' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray);
salDS = foreach rawSalDS generate empNo, salary;
joinedDS = join empDS by empNo, salDS by empNo using 'replicated';
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary;
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ1To1';

Friday, September 20, 2013

Handling small files using CombineFileInputFormat in Java MapReduce

This post covers, CombineFileInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

*************************
Gist
*************************
One more gist related to controlling the number of mappers in a mapreduce task.
Background on Inputsplits
--------------------------
An inputsplit is a chunk of the input data allocated to a map task for processing. FileInputFormat
generates inputsplits (and divides the same into records) - one inputsplit for each file, unless the
file spans more than a HDFS block at which point it factors in the configured values of minimum split
size, maximimum split size and block size in determining the split size.
Here's the formula, from Hadoop the definitive guide-
Split size = max( minimumSplitSize, min( maximumSplitSize, HDFSBlockSize))
So, if we go with the default values, the split size = HDFSBlockSize for files spanning more than an
HDFS block.
Problem with mapreduce processing of small files
-------------------------------------------------
We all know that Hadoop works best with large files; But the reality is that we still have to deal
with small files. When you want to process many small files in a mapreduce job, by default, each file
is processed by a map task (So, 1000 small files = 1000 map tasks). Having too many tasks that
finish in a matter of seconds is inefficient.
Increasing the minimum split size, to reduce the number of map tasks, to handle such a situation, is
not the right solution as it will be at the potential cost of locality.
Solution
---------
CombineFileInputFormat packs many files into a split, providing more data for a map task to process.
It factors in node and rack locality so performance is not compromised.
Sample program
---------------
The sample program demonstrates that using CombineFileInput, we can process multiple small files (each file
with size less than HDFS block size), in a single map task.
Old API
--------
The new API in the version of Hadoop I am running does not include CombineFileInput.
Will write another gist with the program using new API, shortly.
Key aspects of the program
----------------------------
1. CombineFileInputFormat is an abstract class; We have to create a subclass that extends it, and
implement the getRecordReader method. This implementation is in the class -ExtendedCombineFileInputFormat.java
(courtesy - http://stackoverflow.com/questions/14270317/implementation-for-combinefileinputformat-hadoop-0-20-205)
2. In the driver, set the value of mapred.max.split.size
3. In the driver, set the input format to the subclass of CombineFileInputFormat
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_partFiles
employees_part1
employees_part2
employees_part3
employees_part4
employees_part5
formatCombineFileInputFormat
src
MapperCombineFileInputFormat.java
DriverCombineFileInputFormat.java
ExtendedCombineFileInputFormat.java
jar
formatCombineFileInputFormatOAPI.jar
*******************************
Data Structure
*******************************
[EmpNo DOB FName LName HireDate DeptNo]
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
.......
.......
*******************************
Expected Results
*******************************
Key goal of demonstration: Process 5 small files in one map task
Emit a subset of the input dataset.
[EmpNo FName LName]
10001 Georgi Facello
10002 Bezalel Simmel
10003 Parto Bamford
/********************************************
*File: MapperCombineFileInputFormat.java
*Usage: Mapper
********************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MapperCombineFileInputFormat extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
Text txtKey = new Text("");
Text txtValue = new Text("");
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
if (value.toString().length() > 0) {
String[] arrEmpAttributes = value.toString().split("\\t");
txtKey.set(arrEmpAttributes[0].toString());
txtValue.set(arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString());
output.collect(txtKey, txtValue);
}
}
}
view raw 04-Mapper hosted with ❤ by GitHub
/********************************************
*File: DriverCombineFileInputFormat.java
*Usage: Driver
********************************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class DriverCombineFileInputFormat {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf("DriverCombineFileInputFormat");
conf.set("mapred.max.split.size", "134217728");//128 MB
conf.setJarByClass(DriverCombineFileInputFormat.class);
String[] jobArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
conf.setMapperClass(MapperCombineFileInputFormat.class);
conf.setInputFormat(ExtendedCombineFileInputFormat.class);
ExtendedCombineFileInputFormat.addInputPath(conf, new Path(jobArgs[0]));
conf.setNumReduceTasks(0);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, new Path(jobArgs[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
System.exit(job.isSuccessful() ? 0 : 2);
}
}
view raw 05-Driver hosted with ❤ by GitHub
/********************************************
*File: ExtendedCombineFileInputFormat.java
*Usage: Sub-class implementation of abstract
class CombineFileInputFormat
********************************************/
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
@SuppressWarnings("deprecation")
public class ExtendedCombineFileInputFormat extends
CombineFileInputFormat<LongWritable, Text> {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
return new CombineFileRecordReader(conf, (CombineFileSplit) split,
reporter, (Class) myCombineFileRecordReader.class);
}
public static class myCombineFileRecordReader implements
RecordReader<LongWritable, Text> {
private final LineRecordReader linerecord;
public myCombineFileRecordReader(CombineFileSplit split,
Configuration conf, Reporter reporter, Integer index)
throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index),
split.getOffset(index), split.getLength(index),
split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}
@Override
public void close() throws IOException {
linerecord.close();
}
@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}
@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
// TODO Auto-generated method stub
return linerecord.next(key, value);
}
}
}
*****************************
*HDFS command to load data
*****************************
hadoop fs -mkdir formatProject
hadoop fs -put formatProject/data formatProject/
*****************************
*Run program
*****************************
hadoop jar ~/Blog/formatProject/formatCombineFileInputFormat/jar/formatCombineFileInputFormatOAPI.jar DriverCombineFileInputFormat /user/akhanolk/formatProject/data/employees_partFiles /user/akhanolk/formatProject/output/output-CombineFileInputFormat
view raw 08-RunProgram hosted with ❤ by GitHub
*****************************
*Results
*****************************
....
13/09/22 17:16:31 INFO mapred.JobClient: Launched map tasks=1
13/09/22 17:16:31 INFO mapred.JobClient: Data-local map tasks=1
13/09/22 17:16:31 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=17885
...
$ hadoop fs -ls -R formatProject/output/output-CombineFileInputFormat/part* | awk '{print $8}'
formatProject/output/output-CombineFileInputFormat/part-00000
$ hadoop fs -cat formatProject/output/output-CombineFileInputFormat/part-00000
10001 Georgi Facello
10002 Bezalel Simmel
10003 Parto Bamford
10004 Chirstian Koblick
10005 Kyoichi Maliniak
.....
view raw 09-Results hosted with ❤ by GitHub
**************************
References
**************************
Apache documentation:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/lib/CombineFileInputFormat.html
Concepts:
Hadoop the Definitive Guide
Code:
http://stackoverflow.com/questions/14270317/implementation-for-combinefileinputformat-hadoop-0-20-205
Data:
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html

NLineInputFormat in Java MapReduce - use case, code sample

This post covers, NLineInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

**********************
Gist
**********************
A common interview question for a Hadoop developer position is whether we can control the number of
mappers for a job. We can - there are a few ways of controlling the number of mappers, as needed.
Using NLineInputFormat is one way.
About NLineInputFormat
----------------------
With this functionality, you can specify exactly how many lines should go to a mapper.
E.g. If your file has 500 lines, and you set number of lines per mapper to 10, you have 50 mappers
(instead of one - assuming the file is smaller than a HDFS block size).
When would you use NLineInputFormat?
------------------------------------
Some examples from Hadoop the definitive guide-
1. In applications that take a small amount of input data and run an extensive (that is, CPU-intensive)
computation for it, then emit their output.
2. Another example...you create a “seed” input file that lists the data sources, one per line. Then
each mapper is allocated a single data source, and it loads the data from that source into HDFS.
Sample program
---------------
The sample program below demonstrates the functionality.
The mapper merely emits the input key-value pairs.
The input is a file with ~224,000 records.
The output is files containing 10,000 records each (so a total of 23 files).
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_tsv
employees_tsv
formatNLineInputFormat
src
NLineInputFormat.java //Original Apache source code
MapperNLineInputFormat.java //Mapper
DriverNLineInputFormat.java //Driver
jar
formatNLineInputFormat.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
.....
view raw 02-DataReview hosted with ❤ by GitHub
/*******************************************************************
* Mapper
* MapperNLineInputFormat.java
*******************************************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperNLineInputFormat extends
Mapper<LongWritable, Text, LongWritable, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
view raw 03-Mapper hosted with ❤ by GitHub
/*******************************************************************
* Driver
* DriverNLineInputFormat.java
*******************************************************************/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverNLineInputFormat extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverNLineInputFormat- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("NLineInputFormat example");
job.setJarByClass(DriverNLineInputFormat.class);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(job, new Path(args[0]));
job.getConfiguration().setInt(
"mapreduce.input.lineinputformat.linespermap", 10000);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperNLineInputFormat.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverNLineInputFormat(), args);
System.exit(exitCode);
}
}
view raw 04-Driver hosted with ❤ by GitHub
***********************************************
** Commands to load data
***********************************************
$ hadoop fs -mkdir formatProject
$ hadoop fs -put formatProject/data formatProject/
***********************************************
** Commands to run the program
***********************************************
hadoop jar ~/Blog/formatProject/formatNLineInputFormat/jar/formatNLineInputFormat.jar DriverNLineInputFormat /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatNLineInputFormat
***********************************************
** Results
***********************************************
$ for filename in `hadoop fs -ls -R formatProject/data/output-formatNLineInputFormat/part* | awk '{print $8}'`
do
echo "Filename: " $filename " [Record count:" `hadoop fs -cat $filename | wc -l` "]"
done
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00000 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00001 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00002 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00003 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00004 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00005 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00006 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00007 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00008 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00009 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00010 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00011 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00012 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00013 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00014 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00015 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00016 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00017 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00018 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00019 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00020 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00021 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00022 [Record count: 4683 ]
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-* | wc -l
224683
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-m-00022
...
11474355 499977 1956-06-05 Martial Weisert F 1996-09-17 d002
11474407 499979 1962-10-29 Prasadram Waleschkowski M 1994-01-04 d005
11474467 499980 1959-06-28 Gino Usery M 1991-02-11 d007
..
view raw 07-Results hosted with ❤ by GitHub
/******************************************************
* NLineInputFormat.java
* Had to add this to the project, as the version of
* Hadoop I have does not include the NLineInputFormat
* functionality as part of the new API
*****************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
//import org.apache.hadoop.classification.InterfaceAudience;
//import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.util.LineReader;
/**
* NLineInputFormat which splits N lines of input as one split.
*
* In many "pleasantly" parallel applications, each process/mapper processes the
* same input file (s), but with computations are controlled by different
* parameters.(Referred to as "parameter sweeps"). One way to achieve this, is
* to specify a set of parameters (one set per line) as input in a control file
* (which is the input path to the map-reduce application, where as the input
* dataset is specified via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits the input
* file such that by default, one line is fed as a value to one map task, and
* key is the offset. i.e. (k,v) is (LongWritable, Text). The location hints
* will span the whole mapred cluster.
*/
// @InterfaceAudience.Public
// @InterfaceStability.Stable
public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
public static final String LINES_PER_MAP = "mapreduce.input.lineinputformat.linespermap";
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
context.setStatus(genericSplit.toString());
return new LineRecordReader();
}
/**
* Logically splits the set of input files for the job, splits N lines of
* the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status, job.getConfiguration(),
numLinesPerSplit));
}
return splits;
}
public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, int numLinesPerSplit) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit>();
Path fileName = status.getPath();
if (status.isDir()) {
throw new IOException("Not a file: " + fileName);
}
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in = fs.open(fileName);
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
long begin = 0;
long length = 0;
int num = -1;
while ((num = lr.readLine(line)) > 0) {
numLines++;
length += num;
if (numLines == numLinesPerSplit) {
// NLineInputFormat uses LineRecordReader, which always
// reads
// (and consumes) at least one character out of its upper
// split
// boundary. So to make sure that each mapper gets N lines,
// we
// move back the upper split limits of each split
// by one character here.
if (begin == 0) {
splits.add(new FileSplit(fileName, begin, length - 1,
new String[] {}));
} else {
splits.add(new FileSplit(fileName, begin - 1, length,
new String[] {}));
}
begin += length;
length = 0;
numLines = 0;
}
}
if (numLines != 0) {
splits.add(new FileSplit(fileName, begin, length,
new String[] {}));
}
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
}
/**
* Set the number of lines per split
*
* @param job
* the job to modify
* @param numLines
* the number of lines per split
*/
public static void setNumLinesPerSplit(Job job, int numLines) {
job.getConfiguration().setInt(LINES_PER_MAP, numLines);
}
/**
* Get the number of lines per split
*
* @param job
* the job
* @return the number of lines per split
*/
public static int getNumLinesPerSplit(JobContext job) {
return job.getConfiguration().getInt(LINES_PER_MAP, 1);
}
}

Thursday, September 19, 2013

MultipleOutputs in Java MapReduce

This post covers, MultipleOutputs, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

********************************
Gist
********************************
Motivation
-----------
The typical mapreduce job creates files with the prefix "part-"..and then the "m" or "r" depending
on whether it is a map or a reduce output, and then the part number. There are scenarios where we
may want to create separate files based on criteria-data keys and/or values. Enter the "MultipleOutputs"
functionality.
More about MultipleOutputs
---------------------------
Here's the write-up from Hadoop the definitive guide-
"MultipleOutputs allows you to write data to files whose names are derived from the output keys and
values, or in fact from an arbitrary string. This allows each reducer (or mapper in a map-only job)
to create more than a single file. Filenames are of the form name-m-nnnnn for map outputs and
name-r-nnnnn for reduce outputs, where name is an arbitrary name that is set by the program,
and nnnnn is an integer designating the part number, starting from zero. The part number
ensures that outputs written from different partitions (mappers or reducers) do not collide in the
case of the same name."
About LazyOutputFormat
-----------------------
A typical mapreduce program can produce output files that are empty, depending on your implemetation.
If you want to suppress creation of empty files, you need to leverage LazyOutputFormat.
Two lines in your driver will do the trick-
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
&
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
Sample program
---------------
This gist includes a sample program that demonstrates the MultipleOutputs functionality.
The input is a file with employee data, a key attribute being the department number.
The output expected is a file for each department, containing employees belonging to the same.
The program also suppresses creation of empty files.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_tsv
employees_tsv
formatMultipleOutputs
src
MapperFormatMultiOutput.java
ReducerFormatMultiOutput.java
DriverFormatMultiOutput.java
jar
formatMultiOutput.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
.....
view raw 02-DataReview hosted with ❤ by GitHub
*******************************
*Expected results
*******************************
One file for each department.
Within each file, the following employee attributes are required-
DeptNo LName FName EmpNo
***************************************
**Mapper - MapperFormatMultiOutput.java
***************************************
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperFormatMultiOutput extends
Mapper<LongWritable, Text, Text, Text> {
private Text txtKey = new Text("");
private Text txtValue = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
txtKey.set(arrEmpAttributes[6].toString());
txtValue.set(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[0].toString());
context.write(txtKey, txtValue);
}
}
}
view raw 04-Mapper hosted with ❤ by GitHub
*******************************************
**Reducer - ReducerFormatMultiOutput.java
*******************************************
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class ReducerFormatMultiOutput extends Reducer<Text, Text, Text, Text> {
private MultipleOutputs mos;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(key, value, key.toString());
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}
view raw 05-Reducer hosted with ❤ by GitHub
*******************************************
**Driver - DriverFormatMultiOutput.java
*******************************************
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverFormatMultiOutput extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverFormatMultiOutput- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("MultipleOutputs example");
job.setJarByClass(DriverFormatMultiOutput.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperFormatMultiOutput.class);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(ReducerFormatMultiOutput.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(4);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverFormatMultiOutput(), args);
System.exit(exitCode);
}
}
view raw 06-Driver hosted with ❤ by GitHub
*******************************************
**Commands to load data
*******************************************
$ hadoop fs -mkdir formatProject
$ hadoop fs -put formatProject/data formatProject/
*******************************************
**Command to run program
*******************************************
hadoop jar ~/Blog/formatProject/formatMultiOutputFormat/jar/formatMultiOutput.jar DriverFormatMultiOutput /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatMultiOutput
********************************
**Results
********************************
$ hadoop fs -ls -R formatProject/data/output-formatMultiOutput/d00* | awk '{print $8, $5}'
formatProject/data/output-formatMultiOutput/d001-r-00002 401857
formatProject/data/output-formatMultiOutput/d002-r-00003 336632
formatProject/data/output-formatMultiOutput/d003-r-00000 348770
formatProject/data/output-formatMultiOutput/d004-r-00001 1442822
formatProject/data/output-formatMultiOutput/d005-r-00002 1662566
formatProject/data/output-formatMultiOutput/d006-r-00003 394272
formatProject/data/output-formatMultiOutput/d007-r-00000 1020167
formatProject/data/output-formatMultiOutput/d009-r-00002 475747
$ hadoop fs -cat formatProject/data/output-formatMultiOutput/d001-r-0000 | less
d001 Yetto Lucian 39682
d001 Cooke Padma 49634
d001 Marrevee Giap 49632
..
view raw 08-Results hosted with ❤ by GitHub
**********************
References:
**********************
- Hadoop the definitive guide, 3rd edition
- Apache documentation on MultipleOutputs
http://www.google.com/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&cad=rja&ved=0CCsQFjAA&url=http%3A%2F%2Fhadoop.apache.org%2Fdocs%2Fcurrent%2Fapi%2Forg%2Fapache%2Fhadoop%2Fmapred%2Flib%2FMultipleOutputs.html&ei=fV08Upq1KcifyQGbvIHoBg&usg=AFQjCNFST21nx1BLBEo4100dwm6bjt3CyA&bvm=bv.52434380,d.aWc
- Apache documentation on LazyOutputFormat
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.html
**********************
Credits:
**********************
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html

Wednesday, September 18, 2013

Secondary sort in Java MapReduce

This post covers, secondary sort in Java mapreduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

Secondary sort in Mapreduce
With mapreduce framework, the keys are sorted but the values associated with each key
are not. In order for the values to be sorted, we need to write code to perform what is
referred to a secondary sort. The sample code in this gist demonstrates such a sort.
The input to the program is a bunch of employee attributes.
The output required is department number (deptNo) in ascending order, and the employee last name,
first name and employee ID in descending order.
The recipe to get the effect of sorting by value is:
1) Make the key a composite of the natural key (deptNo) and the natural value (lName, fName and empNo).
2) The sort comparator should order by the composite key, that is, the natural key and natural
value.
3) The partitioner and grouping comparator for the composite key should consider only the natural
key for partitioning and grouping.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
sortProject
data
employees_tsv
employees_tsv
SecondarySortBasic
src
CompositeKeyWritable.java
SecondarySortBasicMapper.java
SecondarySortBasicPartitioner.java
SecondarySortBasicCompKeySortComparator.java
SecondarySortBasicGroupingComparator.java
SecondarySortBasicReducer.java
SecondarySortBasicDriver.java
jar
SecondarySortBasic.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
....
*******************************
*Expected results
*******************************
Sort order: [DeptID asc, {LName,FName,EmpID} desc]
DeptID LName FName EmpID
d001 Zykh Sudhanshu 205927
d001 Zykh Nidapan 452738
..
d001 Yoshimura Alenka 463297
d001 Yeung Yuguang 483161
..
d001 Acton Basim 105207
d001 Aamodt Sreekrishna 493601
..
d002 Aamodt Yakkov 43290
..
d003 Acton Idoia 211583
..
d004 dAstous Candido 59201
d004 dAstous Berhard 427930
..
d005 Zizka Aamer 409151
d005 Zirintsis Xiaoqiang 52246
....
/***************************************************************
*CustomWritable for the composite key: CompositeKeyWritable
****************************************************************/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
/**
*
* @author akhanolkar
*
* Purpose: A custom writable with two attributes- deptNo and
* NameEmpIDPair;
*/
public class CompositeKeyWritable implements Writable,
WritableComparable<CompositeKeyWritable> {
private String deptNo;
private String lNameEmpIDPair;
public CompositeKeyWritable() {
}
public CompositeKeyWritable(String deptNo, String lNameEmpIDPair) {
this.deptNo = deptNo;
this.lNameEmpIDPair = lNameEmpIDPair;
}
@Override
public String toString() {
return (new StringBuilder().append(deptNo).append("\t")
.append(lNameEmpIDPair)).toString();
}
public void readFields(DataInput dataInput) throws IOException {
deptNo = WritableUtils.readString(dataInput);
lNameEmpIDPair = WritableUtils.readString(dataInput);
}
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeString(dataOutput, deptNo);
WritableUtils.writeString(dataOutput, lNameEmpIDPair);
}
public int compareTo(CompositeKeyWritable objKeyPair) {
// TODO:
/*
* Note: This code will work as it stands; but when CompositeKeyWritable
* is used as key in a map-reduce program, it is de-serialized into an
* object for comapareTo() method to be invoked;
*
* To do: To optimize for speed, implement a raw comparator - will
* support comparison of serialized representations
*/
int result = deptNo.compareTo(objKeyPair.deptNo);
if (0 == result) {
result = lNameEmpIDPair.compareTo(objKeyPair.lNameEmpIDPair);
}
return result;
}
public String getDeptNo() {
return deptNo;
}
public void setDeptNo(String deptNo) {
this.deptNo = deptNo;
}
public String getLNameEmpIDPair() {
return lNameEmpIDPair;
}
public void setLNameEmpIDPair(String lNameEmpIDPair) {
this.lNameEmpIDPair = lNameEmpIDPair;
}
}
/***************************************************************
*Mapper: SecondarySortBasicMapper
***************************************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SecondarySortBasicMapper extends
Mapper<LongWritable, Text, CompositeKeyWritable, NullWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
context.write(
new CompositeKeyWritable(
arrEmpAttributes[6].toString(),
(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0]
.toString())), NullWritable.get());
}
}
}
view raw 04b-Mapper hosted with ❤ by GitHub
/***************************************************************
*Partitioner: SecondarySortBasicPartitioner
***************************************************************/
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class SecondarySortBasicPartitioner extends
Partitioner<CompositeKeyWritable, NullWritable> {
@Override
public int getPartition(CompositeKeyWritable key, NullWritable value,
int numReduceTasks) {
return (key.getDeptNo().hashCode() % numReduceTasks);
}
}
view raw 04c-Partitioner hosted with ❤ by GitHub
/***************************************************************
*SortComparator: SecondarySortBasicCompKeySortComparator
*****************************************************************/
import org.apache.hadoop.io.WritableComparator;
public class SecondarySortBasicCompKeySortComparator extends WritableComparator {
protected SecondarySortBasicCompKeySortComparator() {
super(CompositeKeyWritable.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKeyWritable key1 = (CompositeKeyWritable) w1;
CompositeKeyWritable key2 = (CompositeKeyWritable) w2;
int cmpResult = key1.getDeptNo().compareTo(key2.getDeptNo());
if (cmpResult == 0)// same deptNo
{
return -key1.getLNameEmpIDPair()
.compareTo(key2.getLNameEmpIDPair());
//If the minus is taken out, the values will be in
//ascending order
}
return cmpResult;
}
}
***************************************************************
*GroupingComparator: SecondarySortBasicGroupingComparator
***************************************************************
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SecondarySortBasicGroupingComparator extends WritableComparator {
protected SecondarySortBasicGroupingComparator() {
super(CompositeKeyWritable.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKeyWritable key1 = (CompositeKeyWritable) w1;
CompositeKeyWritable key2 = (CompositeKeyWritable) w2;
return key1.getDeptNo().compareTo(key2.getDeptNo());
}
}
***************************************
*Reducer: SecondarySortBasicReducer
***************************************
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class SecondarySortBasicReducer
extends
Reducer<CompositeKeyWritable, NullWritable, CompositeKeyWritable, NullWritable> {
@Override
public void reduce(CompositeKeyWritable key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
view raw 04f-Reducer hosted with ❤ by GitHub
***************************************
*Driver: SecondarySortBasicDriver
***************************************
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SecondarySortBasicDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for SecondarySortBasicDriver- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("Secondary sort example");
job.setJarByClass(SecondarySortBasicDriver.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(SecondarySortBasicMapper.class);
job.setMapOutputKeyClass(CompositeKeyWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setPartitionerClass(SecondarySortBasicPartitioner.class);
job.setSortComparatorClass(SecondarySortBasicCompKeySortComparator.class);
job.setGroupingComparatorClass(SecondarySortBasicGroupingComparator.class);
job.setReducerClass(SecondarySortBasicReducer.class);
job.setOutputKeyClass(CompositeKeyWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(8);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new SecondarySortBasicDriver(), args);
System.exit(exitCode);
}
}
view raw 04g-Driver hosted with ❤ by GitHub
*******************************
*Command to run the program
*******************************
hadoop jar ~/Blog/sortProject/secondarySortBasic/jar/secondarySortBasic.jar SecondarySortBasicDriver /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/sortProject/data/output-secondarySortBasic
view raw 05-CommandToRun hosted with ❤ by GitHub
*******************************
*Results
*******************************
--Source record count
hadoop fs -cat sortProject/data/employees/employees_tsv | wc -l
2246830
--Results record count
hadoop fs -cat sortProject/data/output-secondarySortBasic/part* | wc -l
2246830
--Files generated
hadoop fs -ls -R sortProject/data/output-secondarySortBasic/part* | awk '{print $8}'
sortProject/data/output-secondarySortBasic/part-r-00000
sortProject/data/output-secondarySortBasic/part-r-00001
sortProject/data/output-secondarySortBasic/part-r-00002
sortProject/data/output-secondarySortBasic/part-r-00003
sortProject/data/output-secondarySortBasic/part-r-00004
sortProject/data/output-secondarySortBasic/part-r-00005
sortProject/data/output-secondarySortBasic/part-r-00006
sortProject/data/output-secondarySortBasic/part-r-00007
--Output
hadoop fs -cat sortProject/data/output-secondarySortBasic/part*
d001 Zykh Sudhanshu 205927
d001 Zykh Nidapan 452738
..
d001 Yoshimura Alenka 463297
d001 Yeung Yuguang 483161
..
d001 Acton Basim 105207
d001 Aamodt Sreekrishna 493601
..
d002 Aamodt Yakkov 43290
..
d003 Acton Idoia 211583
..
d004 dAstous Candido 59201
d004 dAstous Berhard 427930
..
d005 Zizka Aamer 409151
d005 Zirintsis Xiaoqiang 52246
....
view raw 06-Results hosted with ❤ by GitHub
**********************
Reference:
**********************
Hadoop the definitive guide, 3rd edition
**********************
Credits:
**********************
Data from mysql - http://dev.mysql.com/doc/employee/en.index.html

Tuesday, September 17, 2013

Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

This post covers, map-side join in Java map-reduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!


What's in this blog?

A sample map-reduce program in Java that joins two datasets, on the map-side - an employee dataset and a department dataset, with the department number as join key.  The department dataset is a very small dataset in MapFile format, is in HDFS, and is added to the distributed cache.  The MapFile is referenced in the map method of the mapper to look up the department name, and emit the employee dataset with department name included.

Apache documentation on DistributedCache:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html

Related blogs:
1. Map-side join sample in Java using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html

Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same.  Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.

Sample program

This gist demonstrates how to do a map-side join, joining a MapFile from distributedcache
with a larger dataset in HDFS.
Includes:
---------
1. Input data and script download
2. Dataset structure review
3. Expected results
4. Mapper code
5. Driver code
6. Data load commands
7. Command to run Java program
8. Results of the program
01. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_map.tar.gz
MapSideJoin-DistCacheMapFile
src
MapperMapSideJoinDCacheMapFile.java
DriverMapSideJoinDCacheMapFile
jar
MapSideJoinDCacheMapFile.jar
********************************************
*Data structure
********************************************
a) Small dataset (departments_map)
[DeptNo DeptName] - MapFile
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
b) Large dataset (employees_tsv)
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005
10009 1952-04-19 Sumant Peac F 1985-02-18 d006
...
********************************************
*Expected Results
********************************************
Everything in employees_tsv file followed by a tab and the department name(from the department file)
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
......
/********************************************
*Mapper
*MapperMapSideJoinDCacheMapFile
********************************************/
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheMapFile extends
Mapper<LongWritable, Text, Text, Text> {
private MapFile.Reader deptMapReader = null;
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
private Text txtMapLookupKey = new Text("");
private Text txtMapLookupValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, LOAD_MAP_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Path[] cacheFilesLocal = DistributedCache.getLocalCacheArchives(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim()
.equals("departments_map.tar.gz")) {
URI uriUncompressedFile = new File(eachPath.toString()
+ "/departments_map").toURI();
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
loadDepartmentsMap(uriUncompressedFile, context);
}
}
}
@SuppressWarnings("deprecation")
private void loadDepartmentsMap(URI uriUncompressedFile, Context context)
throws IOException {
FileSystem dfs = FileSystem.get(context.getConfiguration());
try {
deptMapReader = new MapFile.Reader(dfs,
uriUncompressedFile.toString(), context.getConfiguration());
} catch (Exception e) {
// TODO Auto-generated catch block
context.getCounter(MYCOUNTER.LOAD_MAP_ERROR).increment(1);
e.printStackTrace();
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
txtMapLookupKey.set(arrEmpAttributes[6].toString());
try {
deptMapReader.get(txtMapLookupKey, txtMapLookupValue);
} finally {
txtMapLookupValue
.set((txtMapLookupValue.equals(null) || txtMapLookupValue
.equals("")) ? "NOT-FOUND" : txtMapLookupValue
.toString());
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t"
+ txtMapLookupValue.toString());
}
context.write(txtMapOutputKey, txtMapOutputValue);
txtMapLookupValue.set("");
txtMapLookupKey.set("");
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
deptMapReader.close();
}
}
view raw 04-Mapper hosted with ❤ by GitHub
/********************************************
*Driver
*DriverMapSideJoinDCacheMapFile
********************************************/
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheMapFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverMapSideJoinDCacheMapFile- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJobName("Map-side join with mapfile in DCache");
DistributedCache
.addCacheArchive(
new URI(
"/user/akhanolk/joinProject/data/departments_map.tar.gz"),
conf);
job.setJarByClass(DriverMapSideJoinDCacheMapFile.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheMapFile.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheMapFile(), args);
System.exit(exitCode);
}
}
view raw 05-Driver hosted with ❤ by GitHub
********************************************
*HDFS load commands
********************************************
hadoop fs -mkdir joinProject
hadoop fs -mkdir joinProject/data
hadoop fs -put joinProject/data/* joinProject/data/
view raw 06-HdfsCommands hosted with ❤ by GitHub
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheMapFile/jar/MapSideJoinDCacheMapFile.jar DriverMapSideJoinDCacheMapFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache
********************************************
*Program Output
********************************************
hadoop fs -cat /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache/part* | less
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management
..
view raw 08-Results hosted with ❤ by GitHub

Monday, September 16, 2013

Map-side join sample in Java using reference data (text file) from distributed cache - Part 1

This post covers, map-side join in Java map-reduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?

A sample map-reduce program in Java that joins two datasets, on the map-side - an employee dataset and a department dataset, with the department number as join key.  The department dataset is a very small dataset, is reference data, is in HDFS, and is added to the distributed cache.  The mapper program retrieves the department data available through distributed cache and and loads the same into a HashMap in the setUp() method of the mapper, and the HashMap is referenced in the map method to get the department name, and emit the employee dataset with department name included.

Section 2 demonstrates a solution where a file in HDFS is added to the distributed cache in the driver code, and accessed in the mapper setup method through the distributedcache.getCacheFiles method.

Section 3 demonstrates a solution where a local file is added to the distributed cache at the command line, and accessed in the mapper setup method.

Apache documentation on DistributedCache:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html

Related blogs:
1. Map-side join sample using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html

Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same.  Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.

2.0. Sample program

In this program, the side data, exists in HDFS, and is added to the distributedcache in the driver code, and referenced in the mapper using DistributedCache.getfiles method.


This gist demonstrates how to do a map-side join, loading one small dataset from DistributedCache into a HashMap
in memory, and joining with a larger dataset.
Includes:
---------
1. Input data and script download
2. Dataset structure review
3. Expected results
4. Mapper code
5. Driver code
6. Data load commands
7. Command to run Java program
8. Results of the program
01. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_sorted
departments_txt
MapSideJoin-DistCacheTxtFile
src
MapperMapSideJoinDCacheTextFile.java
DriverMapSideJoinDCacheTxtFile
jar
MapSideJoinDCacheTextFile.jar
********************************************
*Data structure
********************************************
a) Small dataset (departments_txt)
[DeptNo DeptName] - Tab separated
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
b) Large dataset (employees_tsv)
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005
10009 1952-04-19 Sumant Peac F 1985-02-18 d006
...
********************************************
*Expected Results
********************************************
Everything in employees_tsv file followed by a tab and the department name(from the department file)
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
......
/********************************************
*Mapper
*MapperMapSideJoinDCacheTextFile
********************************************/
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheTextFile extends
Mapper<LongWritable, Text, Text, Text> {
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>();
private BufferedReader brReader;
private String strDeptName = "";
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim().equals("departments_txt")) {
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
loadDepartmentsHashMap(eachPath, context);
}
}
}
private void loadDepartmentsHashMap(Path filePath, Context context)
throws IOException {
String strLineRead = "";
try {
brReader = new BufferedReader(new FileReader(filePath.toString()));
// Read each line, split and load to HashMap
while ((strLineRead = brReader.readLine()) != null) {
String deptFieldArray[] = strLineRead.split("\\t");
DepartmentMap.put(deptFieldArray[0].trim(),
deptFieldArray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
} catch (IOException e) {
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
e.printStackTrace();
}finally {
if (brReader != null) {
brReader.close();
}
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
try {
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString());
} finally {
strDeptName = ((strDeptName.equals(null) || strDeptName
.equals("")) ? "NOT-FOUND" : strDeptName);
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t" + strDeptName);
}
context.write(txtMapOutputKey, txtMapOutputValue);
strDeptName = "";
}
}
view raw 04-Mapper hosted with ❤ by GitHub
/********************************************
*Driver
*DriverMapSideJoinDCacheTxtFile
********************************************/
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheTxtFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJobName("Map-side join with text lookup file in DCache");
DistributedCache
.addCacheFile(
new URI(
"/user/akhanolk/joinProject/data/departments_sorted/departments_txt"),
conf);
job.setJarByClass(DriverMapSideJoinDCacheTxtFile.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheTextFile.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheTxtFile(), args);
System.exit(exitCode);
}
}
view raw 05-Driver hosted with ❤ by GitHub
********************************************
*HDFS load commands
********************************************
hadoop fs -mkdir joinProject
hadoop fs -mkdir joinProject/data
hadoop fs -put joinProject/data/* joinProject/data/
view raw 06-HdfsCommands hosted with ❤ by GitHub
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFile/jar/MapSideJoinDCacheTextFile.jar DriverMapSideJoinDCacheTxtFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTextFileLookUpDistCache
********************************************
*Program Output
********************************************
hadoop fs -cat joinProject/data/output-MapSideTextFileLookUpDistCache/part* | less
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management
10010 1963-06-01 1963-06-01 Duangkaew Piveteau F 1989-08-24 d006 Quality Management
10012 1960-10-04 1960-10-04 Patricio Bridgland M 1992-12-18 d005 Development
10013 1963-06-07 1963-06-07 Eberhardt Terkki M 1985-10-20 d003 Human Resources
.....
view raw 08-Results hosted with ❤ by GitHub

3.0. Variation 

As a variation to the code in section 2.0, this section demonstrates how to add side data that is not in HDFS to distributed cache, through command line, leveraging GenericOptionsParser

This gist is part of a series of gists related to Map-side joins in Java map-reduce.
In the gist - https://gist.github.com/airawat/6597557, we added the reference data available
in HDFS to the distributed cache from the driver code.
This gist demonstrates adding a local file via command line to distributed cache.
Refer gist at https://gist.github.com/airawat/6597557 for-
1. Data samples and structure
2. Expected results
3. Commands to load data to HDFS
The listing below includes:
4. Data and code download location
5. Mapper code
6. Driver code
7. Command to run the program
8. Results
04. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_sorted
departments_txt
MapSideJoin-DistCacheTxtFileGOP
src
MapperMapSideJoinDCacheTextFileGOP.java
DriverMapSideJoinDCacheTxtFileGOP.java
jar
MapSideJoin-DistCacheTxtFileGOP.jar
/********************************************
*Mapper
*MapperMapSideJoinDCacheTextFileGOP
********************************************/
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheTextFileGOP extends
Mapper<LongWritable, Text, Text, Text> {
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>();
private BufferedReader brReader;
private String strDeptName = "";
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
File lookupFile = new File("departments_txt");
String strLineRead = "";
try {
brReader = new BufferedReader(new FileReader(lookupFile));
// Read each line, split and load to HashMap
while ((strLineRead = brReader.readLine()) != null) {
String deptFieldArray[] = strLineRead.split("\\t");
DepartmentMap.put(deptFieldArray[0].trim(),
deptFieldArray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
} catch (IOException e) {
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
e.printStackTrace();
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
try {
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString());
} finally {
strDeptName = ((strDeptName.equals(null) || strDeptName
.equals("")) ? "NOT-FOUND" : strDeptName);
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t" + strDeptName);
}
context.write(txtMapOutputKey, txtMapOutputValue);
strDeptName = "";
}
}
view raw 05-Mapper code hosted with ❤ by GitHub
/********************************************
*Driver
*DriverMapSideJoinDCacheTxtFileGOP
********************************************/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheTxtFileGOP extends Configured implements
Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverMapSideJoinDCacheTxtFileGOP- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("Map-side join with text lookup file in DCache-GenericOptionsParser");
job.setJarByClass(DriverMapSideJoinDCacheTxtFileGOP.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheTextFileGOP.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheTxtFileGOP(), args);
System.exit(exitCode);
}
}
view raw 06-DriverCode hosted with ❤ by GitHub
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFileGOP/jar/MapSideJoin-DistCacheTxtFileGOP.jar DriverMapSideJoinDCacheTxtFileGOP -files /home/akhanolk/Blog/joinProject/data/departments_sorted/departments_txt /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTxtFileLookUpDistCacheGOP
view raw 07-RunProgram hosted with ❤ by GitHub
********************************************
*Program Output
********************************************
See - https://gist.github.com/airawat/6597557
view raw 08-Results hosted with ❤ by GitHub


   

Friday, September 13, 2013

Sequence File - construct, usage, code samples

This post covers, sequence file format, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?


1.  Introduction to sequence file format
2.  Sample code to create a sequence file (compressed and uncompressed), from a text file, in a map reduce program, and to read a sequence file.

2.0. What's a Sequence File?


2.0.1. About sequence files:
A sequence file is a persistent data structure for binary key-value pairs.

2.0.2. Construct:
Sequence files have sync points included after every few records, that align with record boundaries, aiding the reader to sync.  The sync points support splitting of files for mapreduce operations.  Sequence files support record-level and block-level compression.

Apache documentation: http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/SequenceFile.html

Excerpts from Hadoop the definitive guide...
"A sequence file consists of a header followed by one or more records. The first three bytes of a sequence file are the bytes SEQ, which acts as a magic number, followed by a single byte representing the version number. The header contains other fields, including the names of the key and value classes, compression details, user-defined metadata, and the sync marker. 
Structure of sequence file with and without record compression-
















The format for record compression is almost identical to no compression, except the value bytes are compressed using the codec defined in the header. Note that keys are not compressed.
Structure of sequence file with and without block compression-











Block compression compresses multiple records at once; it is therefore more compact than and should generally be preferred over record compression because it has the opportunity to take advantage of similarities between records. A sync marker is written before the start of every block. The format of a block is a field indicating the number of records in the block, followed by four compressed fields: the key lengths, the keys, the value lengths, and the values."

The uncompressed, record-compressed and block-compressed sequence files, share the same header.  Details are below, from the Apache documentation, on sequence files.

SequenceFile Header
  • version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)
  • keyClassName -key class
  • valueClassName - value class
  • compression - A boolean which specifies if compression is turned on for keys/values in this file.
  • blockCompression - A boolean which specifies if block-compression is turned on for keys/values in this file.
  • compression codec - CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).
  • metadata - SequenceFile.Metadata for this file.
  • sync - A sync marker to denote end of the header.
Uncompressed SequenceFile Format
  • Header
  • Record
  • Record length
  • Key length
  • Key
  • Value
  • A sync-marker every few 100 bytes or so.
Record-Compressed SequenceFile Format
  • Header
  • Record
  • Record length
  • Key length
  • Key
  • Compressed Value
  • A sync-marker every few 100 bytes or so.
Block-Compressed SequenceFile Format
  • Header
  • Record Block
  • Uncompressed number of records in the block
  • Compressed key-lengths block-size
  • Compressed key-lengths block
  • Compressed keys block-size
  • Compressed keys block
  • Compressed value-lengths block-size
  • Compressed value-lengths block
  • Compressed values block-size
  • Compressed values block
  • A sync-marker every block.
2.0.3. Datatypes: 
The keys and values need not be instances of Writable, just need to support serialization.

2.0.4. Creating sequence files: 
Uncompressed: Create an instance of SequenceFile.Writer and call append(), to add key-values, in order.  For record and block compressed, refer the Apache documentation.  When creating compressed files, the actual compression algorithm used to compress key and/or values can be specified by using the appropriate CompressionCodec.

2.0.5. Reading data in sequence files: 
Create an instance of SequenceFile.Reader, and iterate through the entries using reader.next(key,value).

2.0.6. Usage
- Data storage for key-value type data
- Container for other files
- Efficient from storage perspective (binary), efficient from a mapreduce processing perspective (supports compression, and splitting)

3.0. Creating a sequence file

This gist demonstrates how to create a sequence file (compressed and uncompressed), from a text file.
Includes:
---------
1. Input data and script download
2. Input data-review
3. Data load commands
4. Mapper code
5. Driver code to create the sequence file out of a text file in HDFS
6. Command to run Java program
7. Results of the program run to create sequence file
8. Java program to read a sequence file, and convert to text file
9. Command to run program from #8, with results
10. Note on creating compressed sequence files
11. Driver code to create a compressed sequence file
12. Command to run program in #11 with results
01. Data and code download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
formatProject
data
departments_sorted
part-m-00000
formatConverterTextToSequence
src
FormatConverterMapper.java
FormatConverterTextToSequenceDriver.java
FormatConverterSequenceToTextDriver.java
jars
formatConverterTextToSequence.jar
formatConverterSequenceToText.jar
**************************************************
Input text file - departments_sorted/part-m-00000
**************************************************
$ more formatProject/data/departments_sorted/part-m-00000
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
view raw 02-SourceData hosted with ❤ by GitHub
**********************************************
hdfs load commands
**********************************************
# Load data
$ hadoop fs -put formatProject/
# Remove unnecessary files
$ hadoop fs -rm -R formatProject/formatConverterTextToSequence/
$ hadoop fs -rm -R formatProject/formatConverterTextToMap/
/*********************************************************************************************************
** Mapper
** formatProject/FormatConverterTextToSequence/src/FormatConverterMapper.java
** Reads text file and emits the contents out as key-value pairs
*********************************************************************************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FormatConverterMapper extends
Mapper<LongWritable, Text, LongWritable, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
/*********************************************************************************************************
** Driver
** formatProject/FormatConverterTextToSequence/src/FormatConverterTextToSequenceDriver.java
*********************************************************************************************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FormatConverterTextToSequenceDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Two parameters are required for FormatConverterTextToSequenceDriver-<input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(FormatConverterTextToSequenceDriver.class);
job.setJobName("Create Sequence File, from text file");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(FormatConverterMapper.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new FormatConverterTextToSequenceDriver(), args);
System.exit(exitCode);
}
}
************************************************
**Command to create sequence file from text file
************************************************
$ hadoop jar formatProject/formatConverterTextToSequence/jars/formatConverterTextToSequence.jar FormatConverterTextToSequenceDriver formatProject/data/departments_sorted/part-m-00000 formatProject/data/departments_sequence
.
.
.
.
$ hadoop fs -ls -R formatProject/data/departments_sequence | awk '{print $8}'
formatProject/data/departments_sequence/_SUCCESS
formatProject/data/departments_sequence/_logs
formatProject/data/departments_sequence/_logs/history
formatProject/data/departments_sequence/_logs/history/cdh-jt01_1376335706356_job_201308121428_0116_conf.xml
formatProject/data/departments_sequence/_logs/history/job_201308121428_0116_1379087496898_akhanolk_Create+Sequence+File%2C+from+text+file
formatProject/data/departments_sequence/part-m-00000
************************************************
**Results
************************************************
$ hadoop fs -text formatProject/data/departments_sequence/part-m-00000
0 d001 Marketing
15 d002 Finance
28 d003 Human Resources
49 d004 Production
65 d005 Development
82 d006 Quality Management
106 d007 Sales
117 d008 Research
131 d009 Customer Service
view raw 07-Results hosted with ❤ by GitHub
/*********************************************************************************************************
** Driver
** formatProject/FormatConverterTextToSequence/src/FormatConverterSequenceToTextDriver.java
*********************************************************************************************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FormatConverterSequenceToTextDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters need to be supplied - <input dir> and <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(FormatConverterSequenceToTextDriver.class);
job.setJobName("Convert Sequence File and Output as Text");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(FormatConverterMapper.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new FormatConverterSequenceToTextDriver(), args);
System.exit(exitCode);
}
}
**************************************************************
**Command to create text file from sequence file & results
**************************************************************
$ hadoop jar formatProject/formatConverterTextToSequence/jars/formatConverterSequenceToText.jar FormatConverterSequenceToTextDriver formatProject/data/departments_sequence/part-m-00000 formatProject/data/departments_text
$ hadoop fs -ls -R formatProject/data/departments_text | awk '{print $8}'
formatProject/data/departments_text/_SUCCESS
formatProject/data/departments_text/_logs
formatProject/data/departments_text/_logs/history
formatProject/data/departments_text/_logs/history/cdh-jt01_1376335706356_job_201308121428_0118_conf.xml
formatProject/data/departments_text/_logs/history/job_201308121428_0118_1379089420495_akhanolk_Convert+Sequence+File+and+Output+as+Text
formatProject/data/departments_text/part-m-00000
$ hadoop fs -cat formatProject/data/departments_text/part-m-00000
0 d001 Marketing
15 d002 Finance
28 d003 Human Resources
49 d004 Production
65 d005 Development
82 d006 Quality Management
106 d007 Sales
117 d008 Research
131 d009 Customer Service
**************************************************************
** Compression and sequence files
**************************************************************
To create an compressed sequence file - and block compression is the recommended option, there are just minor additions to code in the driver [formatProject/FormatConverterTextToSequence/src/FormatConverterTextToSequenceDriver.java]
The sample code here uses SnappyCodec, and block compression.
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);
The next section includes the code.
view raw 10-Compression hosted with ❤ by GitHub
/*************************************************************************************************************
** Driver
** formatProject/FormatConverterTextToSequence/src/FormatConverterTextToBlckCompSequenceDriver.java
*************************************************************************************************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FormatConverterTextToBlckCompSequenceDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Two parameters are required for FormatConverterTextToBlckCompSequenceDriver-<input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(FormatConverterTextToBlckCompSequenceDriver.class);
job.setJobName("Create block compressed Sequence File, from text file");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
job.setMapperClass(FormatConverterMapper.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new FormatConverterTextToBlckCompSequenceDriver(), args);
System.exit(exitCode);
}
}
*************************************************************************************
**Command to create block compressed(snappy) sequence file from text file + output
*************************************************************************************
$ hadoop jar formatProject/formatConverterTextToSequence/jars/formatConverterTextToBlkCompSequence.jar FormatConverterTextToBlckCompSequenceDriver formatProject/data/departments_sorted/part-m-00000 formatProject/data/departments_sequence_blckcmp
.
$ hadoop fs -ls -R formatProject/data/departments_sequence_blckcmp | awk '{print $8}'
formatProject/data/departments_sequence_blckcmp/_SUCCESS
formatProject/data/departments_sequence_blckcmp/_logs
formatProject/data/departments_sequence_blckcmp/_logs/history
formatProject/data/departments_sequence_blckcmp/_logs/history/cdh-jt01_1376335706356_job_201308121428_0120_conf.xml
formatProject/data/departments_sequence_blckcmp/_logs/history/job_201308121428_0120_1379091181653_akhanolk_Create+block+compressed+Sequence+File%2C+from+text+f
formatProject/data/departments_sequence_blckcmp/part-m-00000
$ hadoop fs -text formatProject/data/departments_sequence_blckcmp/part-m-00000
13/09/13 11:55:38 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
13/09/13 11:55:38 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
13/09/13 11:55:38 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
13/09/13 11:55:38 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
0 d001 Marketing
15 d002 Finance
28 d003 Human Resources
49 d004 Production
65 d005 Development
82 d006 Quality Management
106 d007 Sales
117 d008 Research
131 d009 Customer Service

4.0. Reading a sequence file

Covered already in the gist under section 3.

5.0. Any thoughts/comments

Any constructive criticism and/or additions/insights is much appreciated.

Cheers!!





Thursday, September 12, 2013

Map File - construct, usage, code samples

This post covers, map file format, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?

1.  Introduction to map file
2.  Sample code to convert a text file to a map file
3.  Sample code to read a map file

2.0. What's a Map File?

2.0.1. Definition:
From Hadoop the Definitive Guide..
A MapFile is a sorted SequenceFile with an index to permit lookups by key. MapFile can be thought of as a persistent form of java.util.Map (although it doesn’t implement this interface), which is able to grow beyond the size of a Map that is kept in memory.
Apache documentation:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/MapFile.html 

2.0.2. Datatypes: 
The keys must be instances of WritableComparable, and the values, Writable.

2.0.3. Creating map files: 
Create an instance of MapFile.Writer and call append(), to add key-values, in order.

2.0.4. Looking up data in map files: 
Create an instance of MapFile.Reader, and call get(key,value).

2.0.5. Construct
The map file is actually a directory.  Within the same, there is an "index" file, and a "data" file.
The data file is a sequence file and has keys and associated values.
The index file is smaller, has key value pairs with the key being the actual key of the data, and the value, the byte offset.  The index file has a fraction of the keys and is determined by MapFile.Writer.GetIndexInterval().

2.0.5.1. Directory structure:
$ hadoop fs -ls formatProject/data/departments_map | awk '{print $8}'
formatProject/data/departments_map/data
formatProject/data/departments_map/index

2.0.5.2. Content of the file 'data':
$ hadoop fs -text formatProject/data/departments_map/data
d001  Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service

2.0.5.3. Content of the file 'index':
$ hadoop fs -text formatProject/data/departments_map/index
d001 121
d002 152
d003 181
d004 218
d005 250
d006 283
d007 323
d008 350
d009 380

2.0.6. Behind the scenes of a look up
The index file is read into memory, the key less than or equal to the one being looked up is (binary) searched for, and the reader seeks to this key and reads up to key being looked up, extracts and returns the value associated with the key.  Returns a null if the key is not found.

If the map file is too large to load into memory, there are configurations that can be set to skip keys in the index.   

2.0.7. Usage
Fast lookups - in joins, among others.
Can also be used as a container for small files, with the filename as the key.

3.0. Creating a map file

This gist demonstrates how to create a map file, from a text file.
Includes:
---------
1. Input data and script download
2. Input data-review
3. Data load commands
4. Java program to create the map file out of a text file in HDFS
5. Command to run Java program
6. Results of the program run to create map file
7. Java program to lookup data in map file
8. Command to run program to do a lookup
01. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
formatProject
data
departments_sorted
part-m-00000
formatConverterTextToMap
src
FormatConverterTextToMap.java
MapFileLookup.java
jars
formatConverterTextToMap.jar
**************************************************
Input text file - departments_sorted/part-m-00000
**************************************************
$ more formatProject/data/departments_sorted/part-m-00000
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
view raw 02-InputData hosted with ❤ by GitHub
**********************************************
hdfs load commands
**********************************************
# Load data
$ hadoop fs -put formatProject/
# Remove unnecessary files
$ hadoop fs -rm -R formatProject/formatConverterTextToMap/
/******************************************
* FormatConverterTextToMap.java
* ****************************************/
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
public class FormatConverterTextToMap {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException{
Configuration conf = new Configuration();
FileSystem fs;
try {
fs = FileSystem.get(conf);
Path inputFile = new Path(args[0]);
Path outputFile = new Path(args[1]);
Text txtKey = new Text();
Text txtValue = new Text();
String strLineInInputFile = "";
String lstKeyValuePair[] = null;
MapFile.Writer writer = null;
FSDataInputStream inputStream = fs.open(inputFile);
try {
writer = new MapFile.Writer(conf, fs, outputFile.toString(),
txtKey.getClass(), txtKey.getClass());
writer.setIndexInterval(1);//Need this as the default is 128, and my data is just 9 records
while (inputStream.available() > 0) {
strLineInInputFile = inputStream.readLine();
lstKeyValuePair = strLineInInputFile.split("\\t");
txtKey.set(lstKeyValuePair[0]);
txtValue.set(lstKeyValuePair[1]);
writer.append(txtKey, txtValue);
}
} finally {
IOUtils.closeStream(writer);
System.out.println("Map file created successfully!!");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
******************************************************************
**Command to run program that creates a map file from text file
******************************************************************
$ hadoop jar formatProject/formatConverterTextToMap/jars/formatConverterTextToMap.jar FormatConverterTextToMap formatProject/data/departments_sorted/part-m-00000 formatProject/data/departments_map
13/09/12 22:05:21 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
13/09/12 22:05:21 INFO compress.CodecPool: Got brand-new compressor [.deflate]
13/09/12 22:05:21 INFO compress.CodecPool: Got brand-new compressor [.deflate]
Map file created successfully!!
view raw 05-RunProgram hosted with ❤ by GitHub
************************************************
**Results
************************************************
$ hadoop fs -ls formatProject/data/departments_map | awk '{print $8}'
formatProject/data/departments_map/data
formatProject/data/departments_map/index
$ hadoop fs -text formatProject/data/departments_map/data
13/09/12 22:44:34 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
13/09/12 22:44:34 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
$ hadoop fs -text formatProject/data/departments_map/index
13/09/12 22:44:56 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
13/09/12 22:44:56 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:44:56 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:44:56 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:44:56 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
d001 121
d002 152
d003 181
d004 218
d005 250
d006 283
d007 323
d008 350
d009 380
view raw 06-Results hosted with ❤ by GitHub
/****************************************
* MapFileLookup.java
* **************************************/
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
public class MapFileLookup {
/*
This program looks up a map file for a certain key and returns the associated value
The call to this program is:
Parameters:
param 1: Path to map file
param 2: Key for which we want to get the value from the map file
Return: The value for the key
Return type: Text
Sample call: hadoop jar MapFileLookup.jar MapFileLookup <map-file-directory> <key>
*/
@SuppressWarnings("deprecation")
public static Text main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = null;
Text txtKey = new Text(args[1]);
Text txtValue = new Text();
MapFile.Reader reader = null;
try {
fs = FileSystem.get(conf);
try {
reader = new MapFile.Reader(fs, args[0].toString(), conf);
reader.get(txtKey, txtValue);
} catch (IOException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(reader != null)
reader.close();
}
System.out.println("The key is " + txtKey.toString()
+ " and the value is " + txtValue.toString());
return txtValue;
}
}
view raw 07-ReadMapFile hosted with ❤ by GitHub
**************************************************************************
**Commands to run program to look up a key in a map file from text file
**************************************************************************
$ hadoop jar formatProject/formatConverterTextToMap/jars/MapFileLookup.jar MapFileLookup formatProject/data/departments_map "d009"
13/09/12 22:53:08 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
13/09/12 22:53:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:53:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:53:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:53:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:53:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
The key is d009 and the value is Customer Service

4.0. Looking up a key in a map file

Covered already in the gist under section 3.
The plan is to use the map file in a map-side join in a subsequent blog.

5.0. Any thoughts/comments

Any constructive criticism and/or additions/insights is much appreciated.

Cheers!!

Wednesday, September 11, 2013

Apache Oozie - Part 12: Oozie Shell Action + Passing output from one Oozie action to another

I had read about the Oozie capability to allow passing output from one action to another and forgotten about it, sure enough, it came up at an interview.  Here's some sample code...


1.0. What's covered in the blog?

1. Documentation on the Oozie shell action
2. A sample oozie workflow that includes a shell script action that echoes a count of the number of lines in a file glob, and an email action that captures the output of the shell action and email it.

Version:
Oozie 3.3.0; Pig 0.10.0

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie shell action + passing output from one action to another


2.0. Documentation on the Oozie Shell Action


Apache documentation is available at - http://oozie.apache.org/docs/3.3.0/DG_ShellActionExtension.html


3.0. Sample program


This gist includes components of a oozie workflow - scripts/code, sample data
and commands; Oozie actions covered: shell action, email action
Action 1: The shell action executes a shell script that does a line count for files in a
glob provided, and writes the line count to standard output
Action 2: The email action emails the output of action 1
Pictorial overview of job:
--------------------------
<<To be added>>
Includes:
---------
Data and script download: 01-DataAndScriptDownload
Data load commands: 02-HdfsLoadCommands
Shell Script: 03-ShellScript
Oozie job properties file: 04-OozieJobProperties
Oozie workflow file: 05-OozieWorkflowXML
Oozie SMTP Configuration: 06-OozieSMTPConfig
Oozie commands 07-OozieJobExecutionCommands
Output email 08-OutputOfProgram
Oozie web console - screenshots 09-OozieWebConsoleScreenshots
01. Data and script download
-----------------------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
workflowShellAction
workflow.xml
job.properties
lineCount.sh
02-Hdfs load commands
----------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
#*************************************************
#lineCount.sh
#*************************************************
#!/bin/bash -e
echo "NumberOfLines=`hadoop fs -cat $1 | wc -l`"
view raw 03-ShellScript hosted with ❤ by GitHub
#*************************************************
# job.properties
#*************************************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowShellAction
oozie.wf.application.path=${appPath}
inputDir=${oozieProjectRoot}/data/*/*/*/*/*
lineCountShScriptPath=${appPath}/lineCount.sh
lineCountShellScript=lineCount.sh
emailToAddress=akhanolk@cdh-dev01
<!--******************************************-->
<!--workflow.xml -->
<!--******************************************-->
<workflow-app name="WorkFlowForShellActionWithCaptureOutput" xmlns="uri:oozie:workflow:0.1">
<start to="shellAction"/>
<action name="shellAction">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${lineCountShellScript}</exec>
<argument>${inputDir}</argument>
<file>${lineCountShScriptPath}#${lineCountShellScript}</file>
<capture-output/>
</shell>
<ok to="sendEmail"/>
<error to="killAction"/>
</action>
<action name="sendEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Output of workflow ${wf:id()}</subject>
<body>Results from line count: ${wf:actionData('shellAction')['NumberOfLines']}</body>
</email>
<ok to="end"/>
<error to="end"/>
</action>
<kill name="killAction">
<message>"Killed job due to error"</message>
</kill>
<end name="end"/>
</workflow-app>
Oozie SMTP configuration
------------------------
Add the following to the oozie-site.xml, and restart oozie.
Replace values with the same specific to your environment.
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property>
view raw 06-SMTPConfig hosted with ❤ by GitHub
06. Oozie commands
-------------------
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowShellAction/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowShellAction/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
########################
#Program output
########################
From akhanolk@cdh-dev01.localdomain Thu Sep 12 00:51:00 2013
Return-Path: <akhanolk@cdh-dev01.localdomain>
X-Original-To: akhanolk@cdh-dev01
Delivered-To: akhanolk@cdh-dev01.localdomain
From: akhanolk@cdh-dev01.localdomain
To: akhanolk@cdh-dev01.localdomain
Subject: Output of workflow 0000009-130911235633916-oozie-oozi-W
Content-Type: text/plain; charset=us-ascii
Date: Thu, 12 Sep 2013 00:51:00 -0500 (CDT)
Status: R
Results from line count: 5207
view raw 08-Output hosted with ❤ by GitHub

4.0. Oozie web console screenshots












Wednesday, August 28, 2013

Apache Hive: The .hiverc file

What is .hiverc file?

It is a file that is executed when you launch the hive shell - making it an ideal place for adding any hive configuration/customization you want set, on start of the hive shell. This could be:
- Setting column headers to be visible in query results
- Making the current database name part of the hive prompt
- Adding any jars or files
- Registering UDFs

.hiverc file location

The file is loaded from the hive conf directory.
I have the CDH4.2 distribution and the location is: /etc/hive/conf.cloudera.hive1
If the file does not exist, you can create it.
It needs to be deployed to every node from where you might launch the Hive shell.
[Note: I had to create the file;  The distribution did not come with it.]

Sample .hiverc

add jar /home/airawat/hadoop-lib/hive-contrib-0.10.0-cdh4.2.0.jar;
set hive.exec.mode.local.auto=true;
set hive.cli.print.header=true;
set hive.cli.print.current.db=true;
set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=30000000;








Tuesday, July 16, 2013

Apache Oozie - Part 8: Subworkflow


1.0. What's covered in the blog?

1) Apache documentation on sub-workflows
2) A sample program that includes components of a oozie workflow application with a java main action and a subworkflow containing a sqoop action.  Scripts/code, sample dataset and commands are included;  Oozie actions covered: java action, sqoop action (mysql database); 

Versions:
Oozie 3.3.0, Sqoop (1.4.2) with Mysql (5.1.69)

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie workflow - shell action +passing output from one action to another


2.0. Apache documentation on sub-workflows


The sub-workflow action runs a child workflow job, the child workflow job can be in the same Oozie system or in another Oozie system.  The parent workflow job will wait until the child workflow job has completed.

Syntax:






















The child workflow job runs in the same Oozie system instance where the parent workflow job is running.
The app-path element specifies the path to the workflow application of the child workflow job.
The propagate-configuration flag, if present, indicates that the workflow job configuration should be propagated to the child workflow.

The configuration section can be used to specify the job properties that are required to run the child workflow job.  The configuration of the sub-workflow action can be parameterized (templatized) using EL expressions.

Link to Apache documentation:
http://oozie.apache.org/docs/3.3.0/WorkflowFunctionalSpec.html#a3.2.6_Sub-workflow_Action

Note:
For a typical on-demand workflow, you have core components - job.properties and workflow.xml.  For a sub workflow, you need yet another workflow.xml that clearly defines activities to occur in the sub-workflow.  In the parent workflow, the sub-workflow is referenced.  To keep it neat, best to have a sub-directory to hold the sub-workflow core components.  Also, a single job.properties is sufficient. 

3.0. Sample workflow application

Highlights:
The workflow has two actions - one is a java main action and the other is a sub-workflow action.

The java main action parses log files on hdfs and generates a report.
The sub-workflow action executes after success of the java main action, and pipes the report in hdfs to mysql database.


Pictorial overview:





Components of such a workflow application:
























Application details:

This gist includes components of a oozie workflow application - scripts/code, sample data
and commands; Oozie actions covered: sub-workflow, email java main action,
sqoop action (to mysql); Oozie controls covered: decision;
Pictorial overview:
--------------------
http://hadooped.blogspot.com/2013/07/apache-oozie-part-8-subworkflow.html
Usecase:
--------
Parse Syslog generated log files to generate reports; Export reports to RDBMS;
Includes:
---------
Sample data defintion and structure 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Mysql database setup: 04-mysqlDBSetup
Sqoop task -standalone tryout: 05-SqoopStandAloneTryout
App job properties file: 06-JobProperties
Workflow defintion -Parent: 07-WorkflowXMLMain
Independent test of LogParser jar: 08-LogParserStandaloneTestHowTo
Workflow defintion -DataExporter: 09-SubWorkflowXMLDataExporter
Oozie commands: 10-OozieJobExecutionCommands
Output of LogParser: 11a-OutputLogParser
Output in mysql: 11b-OutputDataExporter
Oozie web console - screenshots: 12-OozieWebConsoleScreenshots
Java LogParser code: 13-JavaCodeHyperlink
01a. Sample data (log files)
----------------------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
01b. Structure
---------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
02a. Data and code download
----------------------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
workflowWithSubworkflow
job.properties
workflow.xml
lib
LogEventCount.jar
dataExporterSubWorkflowApp
workflow.xml
03-Hdfs load commands
----------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
*********************************
Mysql database setup tasks
*********************************
a) Create database:
mysql>
create database airawat;
b) Switch to database created:
mysql>
use airawat;
c) Create destination table for sqoop export from hdfs:
mysql>
CREATE TABLE IF NOT EXISTS Logged_Process_Count_By_Year(
year_and_process varchar(100),
occurrence INTEGER);
d) Ensure your sqoop user has access to database created:
mysql>
grant all on airawat.* to myUser@'myMachine';
view raw 04-mysqlDBSetup hosted with ❤ by GitHub
Tryout the sqoop task- outside of workflow
-------------------------------------------
Use the dataset from my gist-
https://gist.github.com/airawat/5970026
*********************************
Sqoop command
*********************************
Pre-requisties:
1. Dataset to be exported should exist on HDFS
2. mySql table that is the destination for the export should exist
Command:
--Run on node that acts as sqoop client;
$ sqoop export \
--connect jdbc:mysql://cdh-dev01/airawat \
--username devUser \
--password myPwd \
--table Logged_Process_Count_By_Year \
--direct \
--export-dir "oozieProject/datasetGeneratorApp/outputDir" \
--fields-terminated-by "\t"
*********************************
Results in mysql
*********************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
#*************************************************
# job.properties
#*************************************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozieLibPath=${nameNode}/user/oozie/share/lib
oozie.libpath=${oozieLibPath}
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
# Paths
#------
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowWithSubworkflow
oozie.wf.application.path=${appPath}
# For logParserAction (java main action)
#---------------------------------------
inputDir=${oozieProjectRoot}/data/*/*/*/*/*
outputDir=${appPath}/output
inputDirRecordCount=`cat ${inputDir} | wc -l`
minRequiredRecordCount=1
# For dataExporterSubWorkflow (having sqoop action)
#---------------------------------------------------
subWorkflowCodeDir=${appPath}/dataExporterSubWorkflowApp
mysqlServer=cdh-dev01
mysqlServerDB=airawat
mysqlServerDBUID=devUser
mysqlServerDBPwd=myPwd
triggerDatasetDir=${outputDir}
triggerDataFiles=${triggerDatasetDir}/part*
sqoopInputRecordCount=`cat ${triggerDataFiles} | wc -l`
<!------------------------------------------------------------->
<!-----Workflow defintion file - workflow.xml ----------------->
<!------------------------------------------------------------->
<workflow-app name="SubWorkflow-Parent" xmlns="uri:oozie:workflow:0.1">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="logParserAction">
${inputDirRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<action name="logParserAction">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>Airawat.Oozie.Samples.LogEventCount</main-class>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to="dataExporterSubWorkflow"/>
<error to="killJob"/>
</action>
<action name='dataExporterSubWorkflow'>
<sub-workflow>
<app-path>${subWorkflowCodeDir}</app-path>
<propagate-configuration/>
</sub-workflow>
<ok to="end"/>
<error to="killJob" />
</action>
<kill name="killJob">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
#*******************************************
# LogParser program - standalone test
#*******************************************
Commands to test the java program
a) Command to run the program
$ $ hadoop jar oozieProject/workflowWithSubworkflow/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount "oozieProject/data/*/*/*/*/*" "oozieProject/workflowWithSubworkflow/myCLIOutput"
b) Command to view results
$ hadoop fs -cat oozieProject/workflowWithSubworkflow/myCLIOutput/part* | sort
c) Results
2013-NetworkManager 7
2013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
<!------------------------------------------------------------->
<!--Sub-Workflow defintion file - workflow.xml ---------------->
<!------------------------------------------------------------->
<workflow-app name="SubworkflowApp-SubWf-DataExporter" xmlns="uri:oozie:workflow:0.1">
<start to="inputAvailableCheckDecision"/>
<decision name="inputAvailableCheckDecision">
<switch>
<case to="sqoopAction">
${sqoopInputRecordCount gt minRequiredRecordCount}
</case>
<default to="end"/>
</switch>
</decision>
<action name="sqoopAction">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.libpath</name>
<value>${oozieLibPath}</value>
</property>
</configuration>
<command>export --connect jdbc:mysql://${mysqlServer}/${mysqlServerDB} --username ${mysqlServerDBUID} --password ${mysql
ServerDBPwd} --table Logged_Process_Count_By_Year --direct --export-dir ${triggerDatasetDir} --fields-terminated-by "\t"</command>
</sqoop>
<ok to="end"/>
<error to="killJob"/>
</action>
<kill name="killJob">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
</kill>
<end name="end" />
</workflow-app>
****************************************
10. Oozie job commands
****************************************
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowWithSubworkflow/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowWithSubworkflow/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
****************************************
Output - Log Parser program
****************************************
$ hadoop fs -cat oozieProject/workflowWithSubworkflow/output/part*
2013-NetworkManager 7
22013-console-kit-daemon 7
2013-gnome-session 11
2013-init 166
2013-kernel 810
2013-login 2
2013-NetworkManager 7
2013-nm-dispatcher.action 4
2013-ntpd_initres 4133
2013-polkit-agent-helper-1 8
2013-pulseaudio 18
2013-spice-vdagent 15
2013-sshd 6
2013-sudo 8
2013-udevd 6
****************************************
Output - data export from hdfs to mysql
****************************************
mysql> select * from Logged_Process_Count_By_Year order by occurrence desc;
+----------------------------+------------+
| year_and_process | occurrence |
+----------------------------+------------+
| 2013-ntpd_initres | 4133 |
| 2013-kernel | 810 |
| 2013-init | 166 |
| 2013-pulseaudio | 18 |
| 2013-spice-vdagent | 15 |
| 2013-gnome-session | 11 |
| 2013-sudo | 8 |
| 2013-polkit-agent-helper-1 | 8 |
| 2013-console-kit-daemon | 7 |
| 2013-NetworkManager | 7 |
| 2013-udevd | 6 |
| 2013-sshd | 6 |
| 2013-nm-dispatcher.action | 4 |
| 2013-login | 2 |
+----------------------------+------------+
14 rows in set (0.00 sec)
Screenshots from Oozie web console:
-----------------------------------
Available at:
http://hadooped.blogspot.com/2013/07/apache-oozie-part-8-subworkflow.html



Oozie web console - screenshots: