Tuesday, September 24, 2013

Reduce-side joins in Java map-reduce

1.0. About reduce side joins

Joins of datasets done in the reduce phase are called reduce side joins.  Reduce side joins are easier to implement as they are less stringent than map-side joins that require the data to be sorted and partitioned the same way.  They are less efficient than maps-side joins because  the datasets have to go through the sort and shuffle phase.

What's involved..
1.  The key of the map output, of datasets being joined, has to be the join key - so they reach the same reducer
2.  Each dataset has to be tagged with its identity, in the mapper- to help differentiate between the datasets in the reducer, so they can be processed accordingly.
3.  In each reducer, the data values from both datasets, for keys assigned to the reducer, are available, to be processed as required.
4.  A secondary sort needs to be done to ensure the ordering of the values sent to the reducer
5.  If the input files are of different formats, we would need separate mappers, and we would need to use MultipleInputs class in the driver to add the inputs and associate the specific mapper to the same.
[MultipleInputs.addInputPath( job, (input path n), (inputformat class), (mapper class n));]

Note:  The join between the datasets (employee, current salary - cardinality of 1..1) in the sample program below has been demonstrated in my blog on map side joins of large datasets, as well.  I have used the same datasets here...as the purpose of this blog is to demonstrate the concept.  Whenever possible, reduce-side joins should be avoided.

[Update - 10/15/2013]
I have added a pig equivalent in the final section.

2.0. Sample datasets used in this gist

The datasets used are employees and salaries.  For salary data, there are two files - one file with  current salary (1..1), and one with historical salary data (1..many). Then there is the department data, a small reference dataset, that we will add to distributed cache and look up in the reducer.


3.0. Implementation a reduce-side join 

The sample code is common for a 1..1 as well as 1..many join for the sample datasets.
The mapper is common for both datasets, as the format is the same.

3.0.1. Components/steps/tasks:

1.  Map output key
The key will be the empNo as it is the join key for the datasets employee and salary
[Implementation: in the mapper]

2.  Tagging the data with the dataset identity
Add an attribute called srcIndex to tag the identity of the data (1=employee, 2=salary, 3=salary history)
[Implementation: in the mapper]

3.  Discarding unwanted atributes
[Implementation: in the mapper]

4. Composite key
Make the map output key a composite of empNo and srcIndex
[Implementation: create custom writable]

5.  Partitioner
Partition the data on natural key of empNo
[Implementation: create custom partitioner class]

5.  Sorting
Sort the data on empNo first, and then source index
[Implementation: create custom sorting comparator class]

6.  Grouping
Group the data based on natural key
[Implementation: create custom grouping comparator class]

7. Joining
Iterate through the values for a key and complete the join for employee and salary data, perform lookup of department to include department name in the output
[Implementation: in the reducer]

3.0.2a. Data pipeline for cardinality of 1..1 between employee and salary data:








































3.0.2b. Data pipeline for cardinality of 1..many between employee and salary data:

























3.0.3. The Composite key

The composite key is a combination of the joinKey empNo, and the source Index (1=employee file.., 2=salary file...)
//********************************************************************************
//Class: CompositeKeyWritableRSJ
//Purpose: Custom Writable that serves as composite key
// with attributes joinKey and sourceIndex
//Author: Anagha Khanolkar
//*********************************************************************************
package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
public class CompositeKeyWritableRSJ implements Writable,
WritableComparable<CompositeKeyWritableRSJ> {
// Data members
private String joinKey;// EmployeeID
private int sourceIndex;// 1=Employee data; 2=Salary (current) data; 3=Salary historical data
public CompositeKeyWritableRSJ() {
}
public CompositeKeyWritableRSJ(String joinKey, int sourceIndex) {
this.joinKey = joinKey;
this.sourceIndex = sourceIndex;
}
@Override
public String toString() {
return (new StringBuilder().append(joinKey).append("\t")
.append(sourceIndex)).toString();
}
public void readFields(DataInput dataInput) throws IOException {
joinKey = WritableUtils.readString(dataInput);
sourceIndex = WritableUtils.readVInt(dataInput);
}
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeString(dataOutput, joinKey);
WritableUtils.writeVInt(dataOutput, sourceIndex);
}
public int compareTo(CompositeKeyWritableRSJ objKeyPair) {
int result = joinKey.compareTo(objKeyPair.joinKey);
if (0 == result) {
result = Double.compare(sourceIndex, objKeyPair.sourceIndex);
}
return result;
}
public String getjoinKey() {
return joinKey;
}
public void setjoinKey(String joinKey) {
this.joinKey = joinKey;
}
public int getsourceIndex() {
return sourceIndex;
}
public void setsourceIndex(int sourceIndex) {
this.sourceIndex = sourceIndex;
}
}
view raw 05CompositeKey hosted with ❤ by GitHub


3.0.4. The mapper

In the setup method of the mapper-
1. Get the filename from the input split, cross reference it against the configuration (set in driver), to derive the source index.  [Driver code: Add configuration [key=filename of employee,value=1], [key=filename of current salary dataset,value=2], [key=filename of historical salary dataset,value=3]
2. Build a list of attributes we cant to emit as map output for each data entity

The setup method is called only once, at the beginning of a map task.  So it is the logical place to to identify the source index.

In the map method of the mapper:
3. Build the map output based on attributes required, as specified in the list from #2

Note:  For salary data, we are including the "effective till" date, even though it is not required in the final output because this is common code for a 1..1 as well as 1..many join to salary data.  If the salary data is historical, we want the current salary only, that is "effective till date= 9999-01-01".

//********************************************************************************
//Class: MapperRSJ
//Purpose: Mapper
//Author: Anagha Khanolkar
//*********************************************************************************
package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MapperRSJ extends
Mapper<LongWritable, Text, CompositeKeyWritableRSJ, Text> {
CompositeKeyWritableRSJ ckwKey = new CompositeKeyWritableRSJ();
Text txtValue = new Text("");
int intSrcIndex = 0;
StringBuilder strMapValueBuilder = new StringBuilder("");
List<Integer> lstRequiredAttribList = new ArrayList<Integer>();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// {{
// Get the source index; (employee = 1, salary = 2)
// Added as configuration in driver
FileSplit fsFileSplit = (FileSplit) context.getInputSplit();
intSrcIndex = Integer.parseInt(context.getConfiguration().get(
fsFileSplit.getPath().getName()));
// }}
// {{
// Initialize the list of fields to emit as output based on
// intSrcIndex (1=employee, 2=current salary, 3=historical salary)
if (intSrcIndex == 1) // employee
{
lstRequiredAttribList.add(2); // FName
lstRequiredAttribList.add(3); // LName
lstRequiredAttribList.add(4); // Gender
lstRequiredAttribList.add(6); // DeptNo
} else // salary
{
lstRequiredAttribList.add(1); // Salary
lstRequiredAttribList.add(3); // Effective-to-date (Value of
// 9999-01-01 indicates current
// salary)
}
// }}
}
private String buildMapValue(String arrEntityAttributesList[]) {
// This method returns csv list of values to emit based on data entity
strMapValueBuilder.setLength(0);// Initialize
// Build list of attributes to output based on source - employee/salary
for (int i = 1; i < arrEntityAttributesList.length; i++) {
// If the field is in the list of required output
// append to stringbuilder
if (lstRequiredAttribList.contains(i)) {
strMapValueBuilder.append(arrEntityAttributesList[i]).append(
",");
}
}
if (strMapValueBuilder.length() > 0) {
// Drop last comma
strMapValueBuilder.setLength(strMapValueBuilder.length() - 1);
}
return strMapValueBuilder.toString();
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEntityAttributes[] = value.toString().split(",");
ckwKey.setjoinKey(arrEntityAttributes[0].toString());
ckwKey.setsourceIndex(intSrcIndex);
txtValue.set(buildMapValue(arrEntityAttributes));
context.write(ckwKey, txtValue);
}
}
}
view raw 06-Mapper hosted with ❤ by GitHub

3.0.5. The partitioner

Even though the map output key is composite, we want to partition by the natural join key of empNo, therefore a custom partitioner is in order.
//********************************************************************************
//Class: PartitionerRSJ
//Purpose: Custom partitioner
//Author: Anagha Khanolkar
//*********************************************************************************
package khanolkar.mapreduce.join.samples.reducesidejoin;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionerRSJ extends Partitioner<CompositeKeyWritableRSJ, Text> {
@Override
public int getPartition(CompositeKeyWritableRSJ key, Text value,
int numReduceTasks) {
// Partitions on joinKey (EmployeeID)
return (key.getjoinKey().hashCode() % numReduceTasks);
}
}
view raw 07-Partitioner hosted with ❤ by GitHub


3.0.6. The sort comparator

To ensure that the input to the reducer is sorted on empNo, then on sourceIndex, we need a sort comparator.  This will guarantee that the employee data is the first set in the values list for a key, then the salary data.
package khanolkar.mapreduce.join.samples.reducesidejoin;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//********************************************************************************
//Class: SortingComparatorRSJ
//Purpose: Sorting comparator
//Author: Anagha Khanolkar
//*********************************************************************************
public class SortingComparatorRSJ extends WritableComparator {
protected SortingComparatorRSJ() {
super(CompositeKeyWritableRSJ.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
// Sort on all attributes of composite key
CompositeKeyWritableRSJ key1 = (CompositeKeyWritableRSJ) w1;
CompositeKeyWritableRSJ key2 = (CompositeKeyWritableRSJ) w2;
int cmpResult = key1.getjoinKey().compareTo(key2.getjoinKey());
if (cmpResult == 0)// same joinKey
{
return Double.compare(key1.getsourceIndex(), key2.getsourceIndex());
}
return cmpResult;
}
}


3.0.7. The grouping comparator

This class is needed to indicate the group by attribute - the natural join key of empNo
package khanolkar.mapreduce.join.samples.reducesidejoin;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//********************************************************************************
//Class: GroupingComparatorRSJ
//Purpose: For use as grouping comparator
//Author: Anagha Khanolkar
//*********************************************************************************
public class GroupingComparatorRSJ extends WritableComparator {
protected GroupingComparatorRSJ() {
super(CompositeKeyWritableRSJ.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
// The grouping comparator is the joinKey (Employee ID)
CompositeKeyWritableRSJ key1 = (CompositeKeyWritableRSJ) w1;
CompositeKeyWritableRSJ key2 = (CompositeKeyWritableRSJ) w2;
return key1.getjoinKey().compareTo(key2.getjoinKey());
}
}


3.0.8. The reducer

In the setup method of the reducer (called only once for the task)-
We are checking if the side data, a map file with department data is in the distributed cache and if found, initializing the map file reader

In the reduce method, -
While iterating through the value list -
1. If the data is employee data (sourceIndex=1), we are looking up the department name in the map file with the deptNo, which is the last attribute in the employee data, and appending the department name to the employee data.
2. If the data is historical salary data, we are only emitting salary where the last attribute is '9999-01-01'.

Key point-
We have set the sort comparator to sort on empNo and sourceIndex.
The sourceIndex of employee data is lesser than salary data - as set in the driver.
Therefore, we are assured that the employee data is always first followed by salary data.
So for each distinct empNo, we are iterating through the values, and appending the same and emitting as output.

package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//********************************************************************************
//Class: ReducerRSJ
//Purpose: Reducer
//Author: Anagha Khanolkar
//*********************************************************************************
public class ReducerRSJ extends
Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text> {
StringBuilder reduceValueBuilder = new StringBuilder("");
NullWritable nullWritableKey = NullWritable.get();
Text reduceOutputValue = new Text("");
String strSeparator = ",";
private MapFile.Reader deptMapReader = null;
Text txtMapFileLookupKey = new Text("");
Text txtMapFileLookupValue = new Text("");
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// {{
// Get side data from the distributed cache
Path[] cacheFilesLocal = DistributedCache.getLocalCacheArchives(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim()
.equals("departments_map.tar.gz")) {
URI uriUncompressedFile = new File(eachPath.toString()
+ "/departments_map").toURI();
initializeDepartmentsMap(uriUncompressedFile, context);
}
}
// }}
}
@SuppressWarnings("deprecation")
private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
throws IOException {
// {{
// Initialize the reader of the map file (side data)
FileSystem dfs = FileSystem.get(context.getConfiguration());
try {
deptMapReader = new MapFile.Reader(dfs,
uriUncompressedFile.toString(), context.getConfiguration());
} catch (Exception e) {
e.printStackTrace();
}
// }}
}
private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
StringBuilder reduceValueBuilder, Text value) {
if (key.getsourceIndex() == 1) {
// Employee data
// {{
// Get the department name from the MapFile in distributedCache
// Insert the joinKey (empNo) to beginning of the stringBuilder
reduceValueBuilder.append(key.getjoinKey()).append(strSeparator);
String arrEmpAttributes[] = value.toString().split(",");
txtMapFileLookupKey.set(arrEmpAttributes[3].toString());
try {
deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
} catch (Exception e) {
txtMapFileLookupValue.set("");
} finally {
txtMapFileLookupValue
.set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
.equals("")) ? "NOT-FOUND"
: txtMapFileLookupValue.toString());
}
// }}
// {{
// Append the department name to the map values to form a complete
// CSV of employee attributes
reduceValueBuilder.append(value.toString()).append(strSeparator)
.append(txtMapFileLookupValue.toString())
.append(strSeparator);
// }}
} else if (key.getsourceIndex() == 2) {
// Current recent salary data (1..1 on join key)
// Salary data; Just append the salary, drop the effective-to-date
String arrSalAttributes[] = value.toString().split(",");
reduceValueBuilder.append(arrSalAttributes[0].toString()).append(
strSeparator);
} else // key.getsourceIndex() == 3; Historical salary data
{
// {{
// Get the salary data but extract only current salary
// (to_date='9999-01-01')
String arrSalAttributes[] = value.toString().split(",");
if (arrSalAttributes[1].toString().equals("9999-01-01")) {
// Salary data; Just append
reduceValueBuilder.append(arrSalAttributes[0].toString())
.append(strSeparator);
}
// }}
}
// {{
// Reset
txtMapFileLookupKey.set("");
txtMapFileLookupValue.set("");
// }}
return reduceValueBuilder;
}
@Override
public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
// Iterate through values; First set is csv of employee data
// second set is salary data; The data is already ordered
// by virtue of secondary sort; Append each value;
for (Text value : values) {
buildOutputValue(key, reduceValueBuilder, value);
}
// Drop last comma, set value, and emit output
if (reduceValueBuilder.length() > 1) {
reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
// Emit output
reduceOutputValue.set(reduceValueBuilder.toString());
context.write(nullWritableKey, reduceOutputValue);
} else {
System.out.println("Key=" + key.getjoinKey() + "src="
+ key.getsourceIndex());
}
// Reset variables
reduceValueBuilder.setLength(0);
reduceOutputValue.set("");
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
deptMapReader.close();
}
}
view raw 10-Reducer hosted with ❤ by GitHub


3.0.9. The driver

Besides the usual driver code, we are-
1. Adding side data (department lookup data in map file format - in HDFS) to the distributed cache
2. Adding key-value pairs to the configuration, each key value pair being filename, source index.
This is used by the mapper, to tag data with sourceIndex.
3. And lastly, we are associating all the various classes we created to the job.
package khanolkar.mapreduce.join.samples.reducesidejoin;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//********************************************************************************
//Class: DriverRSJ
//Purpose: Driver for Reduce Side Join of two datasets
// with a 1..1 or 1..many cardinality on join key
//Author: Anagha Khanolkar
//*********************************************************************************
public class DriverRSJ extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// {{
// Exit job if required arguments have not been provided
if (args.length != 3) {
System.out
.printf("Three parameters are required for DriverRSJ- <input dir1> <input dir2> <output dir>\n");
return -1;
}
// }{
// {{
// Job instantiation
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJarByClass(DriverRSJ.class);
job.setJobName("ReduceSideJoin");
// }}
// {{
// Add side data to distributed cache
DistributedCache
.addCacheArchive(
new URI(
"/user/akhanolk/joinProject/data/departments_map.tar.gz"),
conf);
// }}
// {
// Set sourceIndex for input files;
// sourceIndex is an attribute of the compositeKey,
// to drive order, and reference source
// Can be done dynamically; Hard-coded file names for simplicity
conf.setInt("part-e", 1);// Set Employee file to 1
conf.setInt("part-sc", 2);// Set Current salary file to 2
conf.setInt("part-sh", 3);// Set Historical salary file to 3
// }
// {
// Build csv list of input files
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(args[0].toString()).append(",")
.append(args[1].toString());
// }
// {{
// Configure remaining aspects of the job
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setMapperClass(MapperRSJ.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(PartitionerRSJ.class);
job.setSortComparatorClass(SortingComparatorRSJ.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(4);
job.setReducerClass(ReducerRSJ.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// }}
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new DriverRSJ(),
args);
System.exit(exitCode);
}
}
view raw 11-Driver hosted with ❤ by GitHub



4.0. The pig equivalent



Pig script-version 1:
/*************************************
Joining datasets in Pig
Employee..Salary = 1..many
Displaying most recent salary
Without using any join optimizations
**************************************/
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray);
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo;
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_history/part-sh' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray);
filteredSalDS = filter rawSalDS by toDate == '9999-01-01';
salDS = foreach filteredSalDS generate empNo, salary;
joinedDS = join empDS by empNo, salDS by empNo;
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary;
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ';
view raw 12-PigScript hosted with ❤ by GitHub



Pig script-version 2 - eliminating the reduce-side join:
In this script, we are filtering on most recent salary, and then using the merge join optimization (map-side) in Pig, that can be leveraged on sorted input to the join.
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray);
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo;
sortedEmpDS = ORDER empDS by empNo;
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_history/part-sh' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray);
filteredSalDS = filter rawSalDS by toDate == '9999-01-01';
salDS = foreach filteredSalDS generate empNo, salary;
sortedSalDS = ORDER salDS by empNo;
joinedDS = join sortedEmpDS by empNo, sortedSalDS by empNo using 'merge';
finalDS = foreach joinedDS generate sortedEmpDS::empNo,sortedEmpDS::fName,sortedEmpDS::lName,sortedEmpDS::gender,sortedEmpDS::deptNo,sortedSalDS::salary;
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ';
view raw 13-PigOptimized hosted with ❤ by GitHub


Output:
**********************
Output of pig script
**********************
$ hadoop fs -cat joinProject/output/pig-RSJ/part* | less
10001 Facello Georgi M d005 88958
10002 Simmel Bezalel F d007 72527
10003 Bamford Parto M d004 43311
10004 Koblick Chirstian M d004 74057
.........
view raw 14-PigOutput hosted with ❤ by GitHub

Sunday, September 22, 2013

Map-side join of large datasets using CompositeInputFormat

This post covers, map-side join of large datasets using CompositeInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Hive and Pig rock and rule at joining datasets, but it helps to know how to perform joins in java.

Update [10/15/2013]
I have added the pig equivalent at the very bottom of the gist.

Feel free to share any insights or constructive criticism. Cheers!!

Related blogs:
1. Map-side join sample in Java using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2
3. Map-side join sample in Java of two large datasets, leveraging CompositeInputFormat

Sample program:
**********************
**Gist
**********************
This gist details how to inner join two large datasets on the map-side, leveraging the join capability
in mapreduce. Such a join makes sense if both input datasets are too large to qualify for distribution
through distributedcache, and can be implemented if both input datasets can be joined by the join key
and both input datasets are sorted in the same order, by the join key.
There are two critical pieces to engaging the join behavior:
- the input format must be set to CompositeInputFormat.class, and
- the key mapred.join.expr must have a value that is a valid join specification.
Sample program:
Covers inner join of employee and salary data with employee ID as join key in a map-only program
Inner join:
The inner join is a traditional database-style inner join. The map method will be called with a key/value
set only if every dataset in the join contains the key. The TupleWritable value will contain a value for
every dataset in the join, join key excluded.
Key code in the sample program:
conf.setInputFormat(CompositeInputFormat.class);
String strJoinStmt = CompositeInputFormat.compose("inner",
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData);
conf.set("mapred.join.expr", strJoinStmt);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, dirOutput);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
Old API:
I ended up using the old API as the new API does not include CompositeInputFormat in the version of
Hadoop I am running.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
joinProject
data
employees_sorted
part-e
salaries_sorted
part-s
MapSideJoinLargeDatasets
src
KeyValueLongInputFormat.java
KeyValueLongLineRecordReader.java
MapperMapSideJoinLargeDatasets.java
DriverMapSideJoinLargeDatasets.java
jar
MapSideJoinLgDsOAPI.jar
view raw 02-DataAndCode hosted with ❤ by GitHub
********************************
Data Structure Review
********************************
Datasets:
The two datasets are employee and salary datasets.
Join key:
The join key is EmpNo/employee number
Location of join key:
The join key is the first field in both datasets
Sorting:
The data is sorted by the join key "EmpNo" in ascending order.
Sorting is crucial for accuracy of joins
File format:
The files are in text format, with comma as a separator
Cardinality:
Is 1..1 on join key; Both datasets have the same number of records
Employee data [joinProject/data/employees_sorted/part-e]
--------------------------------------------------------
[EmpNo,DOB,FName,LName,Gender,HireDate,DeptNo]
10001,1953-09-02,Georgi,Facello,M,1986-06-26,d005
10002,1964-06-02,Bezalel,Simmel,F,1985-11-21,d007
10003,1959-12-03,Parto,Bamford,M,1986-08-28,d004
10004,1954-05-01,Chirstian,Koblick,M,1986-12-01,d004
10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12,d003
10006,1953-04-20,Anneke,Preusig,F,1989-06-02,d005
.....
Salary data [joinProject/data/salaries_sorted/part-s]
------------------------------------------------------
[EmpNo,Salary,FromDate,ToDate]
10001,88958,2002-06-22,9999-01-01
10002,72527,2001-08-02,9999-01-01
10003,43311,2001-12-01,9999-01-01
10004,74057,2001-11-27,9999-01-01
10005,94692,2001-09-09,9999-01-01
..........
************************************
Expected Results - tab separated
************************************
[EmpNo FName LName Salary]
10001 Georgi Facello 88958
10002 Bezalel Simmel 72527
10003 Parto Bamford 43311
10004 Chirstian Koblick 74057
10005 Kyoichi Maliniak 94692
10006 Anneke Preusig 59755
10009 Sumant Peac 94409
10010 Duangkaew Piveteau 80324
........
******************************
Observations
******************************
Setting the inputformat to KeyValueTextInputFormat resulted in only
part of the data getting joined. I attributed this to the fact that
the EmpNo is numeric and the sort was not working right with the attribute
set as Text. Found that others had encountered the same issue..and one
individualhad created a custom format - KeyValueLongInputFormat and associated record
reader. This gist uses the same code, with minor modifications.
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data
view raw 04-Observations hosted with ❤ by GitHub
/**********************************
*KeyValueLongLineRecordReader.java
*Custom record reader
**********************************/
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class KeyValueLongLineRecordReader implements
RecordReader<LongWritable, Text> {
private final LineRecordReader lineRecordReader;
private byte separator = (byte) ',';
private LongWritable dummyKey;
private Text innerValue;
public Class getKeyClass() {
return LongWritable.class;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
public KeyValueLongLineRecordReader(Configuration job, FileSplit split)
throws IOException {
lineRecordReader = new LineRecordReader(job, split);
dummyKey = lineRecordReader.createKey();
innerValue = lineRecordReader.createValue();
String sepStr = job.get("key.value.separator.in.input.line", ",");
this.separator = (byte) sepStr.charAt(0);
}
public static int findSeparator(byte[] utf, int start, int length, byte sep) {
for (int i = start; i < (start + length); i++) {
if (utf[i] == sep) {
return i;
}
}
return -1;
}
/** Read key/value pair in a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
LongWritable tKey = key;
Text tValue = value;
byte[] line = null;
int lineLen = -1;
if (lineRecordReader.next(dummyKey, innerValue)) {
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if (line == null)
return false;
int pos = findSeparator(line, 0, lineLen, this.separator);
if (pos == -1) {
tKey.set(Long.valueOf(new String(line, 0, lineLen)));
tValue.set("");
} else {
int keyLen = pos;
byte[] keyBytes = new byte[keyLen];
System.arraycopy(line, 0, keyBytes, 0, keyLen);
int valLen = lineLen - keyLen - 1;
byte[] valBytes = new byte[valLen];
System.arraycopy(line, pos + 1, valBytes, 0, valLen);
tKey.set(Long.valueOf(new String(keyBytes)));
tValue.set(valBytes);
}
return true;
}
public float getProgress() {
return lineRecordReader.getProgress();
}
public synchronized long getPos() throws IOException {
return lineRecordReader.getPos();
}
public synchronized void close() throws IOException {
lineRecordReader.close();
}
}
/**********************************
*KeyValueLongInputFormat.java
*Custom key value format
**********************************/
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class KeyValueLongInputFormat extends
FileInputFormat<LongWritable, Text> implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
@Override
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}
protected boolean isSplitable(FileSystem fs, Path file) {
return compressionCodecs.getCodec(file) == null;
}
@Override
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new KeyValueLongLineRecordReader(job, (FileSplit) genericSplit);
}
}
/**********************************
*MapperMapSideJoinLargeDatasets.java
*Mapper
**********************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.join.TupleWritable;
public class MapperMapSideJoinLargeDatasets extends MapReduceBase implements
Mapper<LongWritable, TupleWritable, Text, Text> {
Text txtKey = new Text("");
Text txtValue = new Text("");
@Override
public void map(LongWritable key, TupleWritable value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
if (value.toString().length() > 0) {
txtKey.set(key.toString());
String arrEmpAttributes[] = value.get(0).toString().split(",");
String arrDeptAttributes[] = value.get(1).toString().split(",");
txtValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrDeptAttributes[0].toString());
output.collect(txtKey, txtValue);
}
}
}
view raw 07-Mapper hosted with ❤ by GitHub
/**********************************
*DriverMapSideJoinLargeDatasets
*Driver
**********************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.join.CompositeInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class DriverMapSideJoinLargeDatasets {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf("DriverMapSideJoinLargeDatasets");
conf.setJarByClass(DriverMapSideJoinLargeDatasets.class);
String[] jobArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Path dirEmployeesData = new Path(jobArgs[0]);
Path dirSalaryData = new Path(jobArgs[1]);
Path dirOutput = new Path(jobArgs[2]);
conf.setMapperClass(MapperMapSideJoinLargeDatasets.class);
conf.setInputFormat(CompositeInputFormat.class);
String strJoinStmt = CompositeInputFormat.compose("inner",
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData);
conf.set("mapred.join.expr", strJoinStmt);
conf.setNumReduceTasks(0);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, dirOutput);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
System.exit(job.isSuccessful() ? 0 : 2);
}
}
view raw 08-Driver hosted with ❤ by GitHub
**************************
HDFS data load commands
**************************
hadoop fs -mkdir joinProject
hadoop fs -put joinProject/* joinProject/
view raw 09-LoadCommands hosted with ❤ by GitHub
**************************
Command to run program
**************************
hadoop jar ~/Blog/joinProject/MapSideJoinLargeDatasets/jar/MapSideJoinLgDsOAPI.jar DriverMapSideJoinLargeDatasets /user/akhanolk/joinProject/data/employees_sorted/part-e /user/akhanolk/joinProject/data/salaries_sorted/part-s /user/akhanolk/joinProject/output/output-MapSideJoinLargeDatasets
view raw 10-RunCommands hosted with ❤ by GitHub
**************************
Results
**************************
...
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/09/22 13:11:17 INFO mapred.JobClient: Map-Reduce Framework
13/09/22 13:11:17 INFO mapred.JobClient: Map input records=224683000
13/09/22 13:11:17 INFO mapred.JobClient: Map output records=224683000
...
$ hadoop fs -cat joinProject/output/output-MapSideJoinLargeDatasets/part* | less
10001 Georgi Facello 88958
10002 Bezalel Simmel 72527
10003 Parto Bamford 43311
10004 Chirstian Koblick 74057
10005 Kyoichi Maliniak 94692
10006 Anneke Preusig 59755
10009 Sumant Peac 94409
10010 Duangkaew Piveteau 80324
.....
view raw 11-Results hosted with ❤ by GitHub
**************************
References
**************************
Concepts:
Pro Hadoop
Hadoop the Definitive Guide
Data-Intensive Text Processing with MapReduce
Code:
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data
Data:
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html
view raw 12-References hosted with ❤ by GitHub
*******************************
Pig - map-side join
of datasets with cardinality
of 1..1
Using 'replicated'
or
'merge', if sorted
*********************************
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray);
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo;
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_active/part-sc' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray);
salDS = foreach rawSalDS generate empNo, salary;
joinedDS = join empDS by empNo, salDS by empNo using 'replicated';
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary;
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ1To1';

Friday, September 20, 2013

Handling small files using CombineFileInputFormat in Java MapReduce

This post covers, CombineFileInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

*************************
Gist
*************************
One more gist related to controlling the number of mappers in a mapreduce task.
Background on Inputsplits
--------------------------
An inputsplit is a chunk of the input data allocated to a map task for processing. FileInputFormat
generates inputsplits (and divides the same into records) - one inputsplit for each file, unless the
file spans more than a HDFS block at which point it factors in the configured values of minimum split
size, maximimum split size and block size in determining the split size.
Here's the formula, from Hadoop the definitive guide-
Split size = max( minimumSplitSize, min( maximumSplitSize, HDFSBlockSize))
So, if we go with the default values, the split size = HDFSBlockSize for files spanning more than an
HDFS block.
Problem with mapreduce processing of small files
-------------------------------------------------
We all know that Hadoop works best with large files; But the reality is that we still have to deal
with small files. When you want to process many small files in a mapreduce job, by default, each file
is processed by a map task (So, 1000 small files = 1000 map tasks). Having too many tasks that
finish in a matter of seconds is inefficient.
Increasing the minimum split size, to reduce the number of map tasks, to handle such a situation, is
not the right solution as it will be at the potential cost of locality.
Solution
---------
CombineFileInputFormat packs many files into a split, providing more data for a map task to process.
It factors in node and rack locality so performance is not compromised.
Sample program
---------------
The sample program demonstrates that using CombineFileInput, we can process multiple small files (each file
with size less than HDFS block size), in a single map task.
Old API
--------
The new API in the version of Hadoop I am running does not include CombineFileInput.
Will write another gist with the program using new API, shortly.
Key aspects of the program
----------------------------
1. CombineFileInputFormat is an abstract class; We have to create a subclass that extends it, and
implement the getRecordReader method. This implementation is in the class -ExtendedCombineFileInputFormat.java
(courtesy - http://stackoverflow.com/questions/14270317/implementation-for-combinefileinputformat-hadoop-0-20-205)
2. In the driver, set the value of mapred.max.split.size
3. In the driver, set the input format to the subclass of CombineFileInputFormat
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_partFiles
employees_part1
employees_part2
employees_part3
employees_part4
employees_part5
formatCombineFileInputFormat
src
MapperCombineFileInputFormat.java
DriverCombineFileInputFormat.java
ExtendedCombineFileInputFormat.java
jar
formatCombineFileInputFormatOAPI.jar
*******************************
Data Structure
*******************************
[EmpNo DOB FName LName HireDate DeptNo]
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
.......
.......
*******************************
Expected Results
*******************************
Key goal of demonstration: Process 5 small files in one map task
Emit a subset of the input dataset.
[EmpNo FName LName]
10001 Georgi Facello
10002 Bezalel Simmel
10003 Parto Bamford
/********************************************
*File: MapperCombineFileInputFormat.java
*Usage: Mapper
********************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MapperCombineFileInputFormat extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
Text txtKey = new Text("");
Text txtValue = new Text("");
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
if (value.toString().length() > 0) {
String[] arrEmpAttributes = value.toString().split("\\t");
txtKey.set(arrEmpAttributes[0].toString());
txtValue.set(arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString());
output.collect(txtKey, txtValue);
}
}
}
view raw 04-Mapper hosted with ❤ by GitHub
/********************************************
*File: DriverCombineFileInputFormat.java
*Usage: Driver
********************************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class DriverCombineFileInputFormat {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf("DriverCombineFileInputFormat");
conf.set("mapred.max.split.size", "134217728");//128 MB
conf.setJarByClass(DriverCombineFileInputFormat.class);
String[] jobArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
conf.setMapperClass(MapperCombineFileInputFormat.class);
conf.setInputFormat(ExtendedCombineFileInputFormat.class);
ExtendedCombineFileInputFormat.addInputPath(conf, new Path(jobArgs[0]));
conf.setNumReduceTasks(0);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, new Path(jobArgs[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
System.exit(job.isSuccessful() ? 0 : 2);
}
}
view raw 05-Driver hosted with ❤ by GitHub
/********************************************
*File: ExtendedCombineFileInputFormat.java
*Usage: Sub-class implementation of abstract
class CombineFileInputFormat
********************************************/
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
@SuppressWarnings("deprecation")
public class ExtendedCombineFileInputFormat extends
CombineFileInputFormat<LongWritable, Text> {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
return new CombineFileRecordReader(conf, (CombineFileSplit) split,
reporter, (Class) myCombineFileRecordReader.class);
}
public static class myCombineFileRecordReader implements
RecordReader<LongWritable, Text> {
private final LineRecordReader linerecord;
public myCombineFileRecordReader(CombineFileSplit split,
Configuration conf, Reporter reporter, Integer index)
throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index),
split.getOffset(index), split.getLength(index),
split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}
@Override
public void close() throws IOException {
linerecord.close();
}
@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}
@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
// TODO Auto-generated method stub
return linerecord.next(key, value);
}
}
}
*****************************
*HDFS command to load data
*****************************
hadoop fs -mkdir formatProject
hadoop fs -put formatProject/data formatProject/
*****************************
*Run program
*****************************
hadoop jar ~/Blog/formatProject/formatCombineFileInputFormat/jar/formatCombineFileInputFormatOAPI.jar DriverCombineFileInputFormat /user/akhanolk/formatProject/data/employees_partFiles /user/akhanolk/formatProject/output/output-CombineFileInputFormat
view raw 08-RunProgram hosted with ❤ by GitHub
*****************************
*Results
*****************************
....
13/09/22 17:16:31 INFO mapred.JobClient: Launched map tasks=1
13/09/22 17:16:31 INFO mapred.JobClient: Data-local map tasks=1
13/09/22 17:16:31 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=17885
...
$ hadoop fs -ls -R formatProject/output/output-CombineFileInputFormat/part* | awk '{print $8}'
formatProject/output/output-CombineFileInputFormat/part-00000
$ hadoop fs -cat formatProject/output/output-CombineFileInputFormat/part-00000
10001 Georgi Facello
10002 Bezalel Simmel
10003 Parto Bamford
10004 Chirstian Koblick
10005 Kyoichi Maliniak
.....
view raw 09-Results hosted with ❤ by GitHub
**************************
References
**************************
Apache documentation:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/lib/CombineFileInputFormat.html
Concepts:
Hadoop the Definitive Guide
Code:
http://stackoverflow.com/questions/14270317/implementation-for-combinefileinputformat-hadoop-0-20-205
Data:
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html

NLineInputFormat in Java MapReduce - use case, code sample

This post covers, NLineInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

**********************
Gist
**********************
A common interview question for a Hadoop developer position is whether we can control the number of
mappers for a job. We can - there are a few ways of controlling the number of mappers, as needed.
Using NLineInputFormat is one way.
About NLineInputFormat
----------------------
With this functionality, you can specify exactly how many lines should go to a mapper.
E.g. If your file has 500 lines, and you set number of lines per mapper to 10, you have 50 mappers
(instead of one - assuming the file is smaller than a HDFS block size).
When would you use NLineInputFormat?
------------------------------------
Some examples from Hadoop the definitive guide-
1. In applications that take a small amount of input data and run an extensive (that is, CPU-intensive)
computation for it, then emit their output.
2. Another example...you create a “seed” input file that lists the data sources, one per line. Then
each mapper is allocated a single data source, and it loads the data from that source into HDFS.
Sample program
---------------
The sample program below demonstrates the functionality.
The mapper merely emits the input key-value pairs.
The input is a file with ~224,000 records.
The output is files containing 10,000 records each (so a total of 23 files).
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_tsv
employees_tsv
formatNLineInputFormat
src
NLineInputFormat.java //Original Apache source code
MapperNLineInputFormat.java //Mapper
DriverNLineInputFormat.java //Driver
jar
formatNLineInputFormat.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
.....
view raw 02-DataReview hosted with ❤ by GitHub
/*******************************************************************
* Mapper
* MapperNLineInputFormat.java
*******************************************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperNLineInputFormat extends
Mapper<LongWritable, Text, LongWritable, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
view raw 03-Mapper hosted with ❤ by GitHub
/*******************************************************************
* Driver
* DriverNLineInputFormat.java
*******************************************************************/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverNLineInputFormat extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverNLineInputFormat- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("NLineInputFormat example");
job.setJarByClass(DriverNLineInputFormat.class);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(job, new Path(args[0]));
job.getConfiguration().setInt(
"mapreduce.input.lineinputformat.linespermap", 10000);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperNLineInputFormat.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverNLineInputFormat(), args);
System.exit(exitCode);
}
}
view raw 04-Driver hosted with ❤ by GitHub
***********************************************
** Commands to load data
***********************************************
$ hadoop fs -mkdir formatProject
$ hadoop fs -put formatProject/data formatProject/
***********************************************
** Commands to run the program
***********************************************
hadoop jar ~/Blog/formatProject/formatNLineInputFormat/jar/formatNLineInputFormat.jar DriverNLineInputFormat /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatNLineInputFormat
***********************************************
** Results
***********************************************
$ for filename in `hadoop fs -ls -R formatProject/data/output-formatNLineInputFormat/part* | awk '{print $8}'`
do
echo "Filename: " $filename " [Record count:" `hadoop fs -cat $filename | wc -l` "]"
done
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00000 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00001 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00002 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00003 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00004 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00005 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00006 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00007 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00008 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00009 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00010 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00011 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00012 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00013 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00014 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00015 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00016 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00017 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00018 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00019 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00020 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00021 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00022 [Record count: 4683 ]
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-* | wc -l
224683
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-m-00022
...
11474355 499977 1956-06-05 Martial Weisert F 1996-09-17 d002
11474407 499979 1962-10-29 Prasadram Waleschkowski M 1994-01-04 d005
11474467 499980 1959-06-28 Gino Usery M 1991-02-11 d007
..
view raw 07-Results hosted with ❤ by GitHub
/******************************************************
* NLineInputFormat.java
* Had to add this to the project, as the version of
* Hadoop I have does not include the NLineInputFormat
* functionality as part of the new API
*****************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
//import org.apache.hadoop.classification.InterfaceAudience;
//import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.util.LineReader;
/**
* NLineInputFormat which splits N lines of input as one split.
*
* In many "pleasantly" parallel applications, each process/mapper processes the
* same input file (s), but with computations are controlled by different
* parameters.(Referred to as "parameter sweeps"). One way to achieve this, is
* to specify a set of parameters (one set per line) as input in a control file
* (which is the input path to the map-reduce application, where as the input
* dataset is specified via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits the input
* file such that by default, one line is fed as a value to one map task, and
* key is the offset. i.e. (k,v) is (LongWritable, Text). The location hints
* will span the whole mapred cluster.
*/
// @InterfaceAudience.Public
// @InterfaceStability.Stable
public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
public static final String LINES_PER_MAP = "mapreduce.input.lineinputformat.linespermap";
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
context.setStatus(genericSplit.toString());
return new LineRecordReader();
}
/**
* Logically splits the set of input files for the job, splits N lines of
* the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status, job.getConfiguration(),
numLinesPerSplit));
}
return splits;
}
public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, int numLinesPerSplit) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit>();
Path fileName = status.getPath();
if (status.isDir()) {
throw new IOException("Not a file: " + fileName);
}
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in = fs.open(fileName);
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
long begin = 0;
long length = 0;
int num = -1;
while ((num = lr.readLine(line)) > 0) {
numLines++;
length += num;
if (numLines == numLinesPerSplit) {
// NLineInputFormat uses LineRecordReader, which always
// reads
// (and consumes) at least one character out of its upper
// split
// boundary. So to make sure that each mapper gets N lines,
// we
// move back the upper split limits of each split
// by one character here.
if (begin == 0) {
splits.add(new FileSplit(fileName, begin, length - 1,
new String[] {}));
} else {
splits.add(new FileSplit(fileName, begin - 1, length,
new String[] {}));
}
begin += length;
length = 0;
numLines = 0;
}
}
if (numLines != 0) {
splits.add(new FileSplit(fileName, begin, length,
new String[] {}));
}
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
}
/**
* Set the number of lines per split
*
* @param job
* the job to modify
* @param numLines
* the number of lines per split
*/
public static void setNumLinesPerSplit(Job job, int numLines) {
job.getConfiguration().setInt(LINES_PER_MAP, numLines);
}
/**
* Get the number of lines per split
*
* @param job
* the job
* @return the number of lines per split
*/
public static int getNumLinesPerSplit(JobContext job) {
return job.getConfiguration().getInt(LINES_PER_MAP, 1);
}
}

Thursday, September 19, 2013

MultipleOutputs in Java MapReduce

This post covers, MultipleOutputs, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

********************************
Gist
********************************
Motivation
-----------
The typical mapreduce job creates files with the prefix "part-"..and then the "m" or "r" depending
on whether it is a map or a reduce output, and then the part number. There are scenarios where we
may want to create separate files based on criteria-data keys and/or values. Enter the "MultipleOutputs"
functionality.
More about MultipleOutputs
---------------------------
Here's the write-up from Hadoop the definitive guide-
"MultipleOutputs allows you to write data to files whose names are derived from the output keys and
values, or in fact from an arbitrary string. This allows each reducer (or mapper in a map-only job)
to create more than a single file. Filenames are of the form name-m-nnnnn for map outputs and
name-r-nnnnn for reduce outputs, where name is an arbitrary name that is set by the program,
and nnnnn is an integer designating the part number, starting from zero. The part number
ensures that outputs written from different partitions (mappers or reducers) do not collide in the
case of the same name."
About LazyOutputFormat
-----------------------
A typical mapreduce program can produce output files that are empty, depending on your implemetation.
If you want to suppress creation of empty files, you need to leverage LazyOutputFormat.
Two lines in your driver will do the trick-
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
&
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
Sample program
---------------
This gist includes a sample program that demonstrates the MultipleOutputs functionality.
The input is a file with employee data, a key attribute being the department number.
The output expected is a file for each department, containing employees belonging to the same.
The program also suppresses creation of empty files.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_tsv
employees_tsv
formatMultipleOutputs
src
MapperFormatMultiOutput.java
ReducerFormatMultiOutput.java
DriverFormatMultiOutput.java
jar
formatMultiOutput.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
.....
view raw 02-DataReview hosted with ❤ by GitHub
*******************************
*Expected results
*******************************
One file for each department.
Within each file, the following employee attributes are required-
DeptNo LName FName EmpNo
***************************************
**Mapper - MapperFormatMultiOutput.java
***************************************
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperFormatMultiOutput extends
Mapper<LongWritable, Text, Text, Text> {
private Text txtKey = new Text("");
private Text txtValue = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
txtKey.set(arrEmpAttributes[6].toString());
txtValue.set(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[0].toString());
context.write(txtKey, txtValue);
}
}
}
view raw 04-Mapper hosted with ❤ by GitHub
*******************************************
**Reducer - ReducerFormatMultiOutput.java
*******************************************
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class ReducerFormatMultiOutput extends Reducer<Text, Text, Text, Text> {
private MultipleOutputs mos;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(key, value, key.toString());
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}
view raw 05-Reducer hosted with ❤ by GitHub
*******************************************
**Driver - DriverFormatMultiOutput.java
*******************************************
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverFormatMultiOutput extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverFormatMultiOutput- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("MultipleOutputs example");
job.setJarByClass(DriverFormatMultiOutput.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperFormatMultiOutput.class);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(ReducerFormatMultiOutput.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(4);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverFormatMultiOutput(), args);
System.exit(exitCode);
}
}
view raw 06-Driver hosted with ❤ by GitHub
*******************************************
**Commands to load data
*******************************************
$ hadoop fs -mkdir formatProject
$ hadoop fs -put formatProject/data formatProject/
*******************************************
**Command to run program
*******************************************
hadoop jar ~/Blog/formatProject/formatMultiOutputFormat/jar/formatMultiOutput.jar DriverFormatMultiOutput /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatMultiOutput
********************************
**Results
********************************
$ hadoop fs -ls -R formatProject/data/output-formatMultiOutput/d00* | awk '{print $8, $5}'
formatProject/data/output-formatMultiOutput/d001-r-00002 401857
formatProject/data/output-formatMultiOutput/d002-r-00003 336632
formatProject/data/output-formatMultiOutput/d003-r-00000 348770
formatProject/data/output-formatMultiOutput/d004-r-00001 1442822
formatProject/data/output-formatMultiOutput/d005-r-00002 1662566
formatProject/data/output-formatMultiOutput/d006-r-00003 394272
formatProject/data/output-formatMultiOutput/d007-r-00000 1020167
formatProject/data/output-formatMultiOutput/d009-r-00002 475747
$ hadoop fs -cat formatProject/data/output-formatMultiOutput/d001-r-0000 | less
d001 Yetto Lucian 39682
d001 Cooke Padma 49634
d001 Marrevee Giap 49632
..
view raw 08-Results hosted with ❤ by GitHub
**********************
References:
**********************
- Hadoop the definitive guide, 3rd edition
- Apache documentation on MultipleOutputs
http://www.google.com/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&cad=rja&ved=0CCsQFjAA&url=http%3A%2F%2Fhadoop.apache.org%2Fdocs%2Fcurrent%2Fapi%2Forg%2Fapache%2Fhadoop%2Fmapred%2Flib%2FMultipleOutputs.html&ei=fV08Upq1KcifyQGbvIHoBg&usg=AFQjCNFST21nx1BLBEo4100dwm6bjt3CyA&bvm=bv.52434380,d.aWc
- Apache documentation on LazyOutputFormat
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.html
**********************
Credits:
**********************
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html

Wednesday, September 18, 2013

Secondary sort in Java MapReduce

This post covers, secondary sort in Java mapreduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

Secondary sort in Mapreduce
With mapreduce framework, the keys are sorted but the values associated with each key
are not. In order for the values to be sorted, we need to write code to perform what is
referred to a secondary sort. The sample code in this gist demonstrates such a sort.
The input to the program is a bunch of employee attributes.
The output required is department number (deptNo) in ascending order, and the employee last name,
first name and employee ID in descending order.
The recipe to get the effect of sorting by value is:
1) Make the key a composite of the natural key (deptNo) and the natural value (lName, fName and empNo).
2) The sort comparator should order by the composite key, that is, the natural key and natural
value.
3) The partitioner and grouping comparator for the composite key should consider only the natural
key for partitioning and grouping.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
sortProject
data
employees_tsv
employees_tsv
SecondarySortBasic
src
CompositeKeyWritable.java
SecondarySortBasicMapper.java
SecondarySortBasicPartitioner.java
SecondarySortBasicCompKeySortComparator.java
SecondarySortBasicGroupingComparator.java
SecondarySortBasicReducer.java
SecondarySortBasicDriver.java
jar
SecondarySortBasic.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
....
*******************************
*Expected results
*******************************
Sort order: [DeptID asc, {LName,FName,EmpID} desc]
DeptID LName FName EmpID
d001 Zykh Sudhanshu 205927
d001 Zykh Nidapan 452738
..
d001 Yoshimura Alenka 463297
d001 Yeung Yuguang 483161
..
d001 Acton Basim 105207
d001 Aamodt Sreekrishna 493601
..
d002 Aamodt Yakkov 43290
..
d003 Acton Idoia 211583
..
d004 dAstous Candido 59201
d004 dAstous Berhard 427930
..
d005 Zizka Aamer 409151
d005 Zirintsis Xiaoqiang 52246
....
/***************************************************************
*CustomWritable for the composite key: CompositeKeyWritable
****************************************************************/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
/**
*
* @author akhanolkar
*
* Purpose: A custom writable with two attributes- deptNo and
* NameEmpIDPair;
*/
public class CompositeKeyWritable implements Writable,
WritableComparable<CompositeKeyWritable> {
private String deptNo;
private String lNameEmpIDPair;
public CompositeKeyWritable() {
}
public CompositeKeyWritable(String deptNo, String lNameEmpIDPair) {
this.deptNo = deptNo;
this.lNameEmpIDPair = lNameEmpIDPair;
}
@Override
public String toString() {
return (new StringBuilder().append(deptNo).append("\t")
.append(lNameEmpIDPair)).toString();
}
public void readFields(DataInput dataInput) throws IOException {
deptNo = WritableUtils.readString(dataInput);
lNameEmpIDPair = WritableUtils.readString(dataInput);
}
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeString(dataOutput, deptNo);
WritableUtils.writeString(dataOutput, lNameEmpIDPair);
}
public int compareTo(CompositeKeyWritable objKeyPair) {
// TODO:
/*
* Note: This code will work as it stands; but when CompositeKeyWritable
* is used as key in a map-reduce program, it is de-serialized into an
* object for comapareTo() method to be invoked;
*
* To do: To optimize for speed, implement a raw comparator - will
* support comparison of serialized representations
*/
int result = deptNo.compareTo(objKeyPair.deptNo);
if (0 == result) {
result = lNameEmpIDPair.compareTo(objKeyPair.lNameEmpIDPair);
}
return result;
}
public String getDeptNo() {
return deptNo;
}
public void setDeptNo(String deptNo) {
this.deptNo = deptNo;
}
public String getLNameEmpIDPair() {
return lNameEmpIDPair;
}
public void setLNameEmpIDPair(String lNameEmpIDPair) {
this.lNameEmpIDPair = lNameEmpIDPair;
}
}
/***************************************************************
*Mapper: SecondarySortBasicMapper
***************************************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SecondarySortBasicMapper extends
Mapper<LongWritable, Text, CompositeKeyWritable, NullWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
context.write(
new CompositeKeyWritable(
arrEmpAttributes[6].toString(),
(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0]
.toString())), NullWritable.get());
}
}
}
view raw 04b-Mapper hosted with ❤ by GitHub
/***************************************************************
*Partitioner: SecondarySortBasicPartitioner
***************************************************************/
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class SecondarySortBasicPartitioner extends
Partitioner<CompositeKeyWritable, NullWritable> {
@Override
public int getPartition(CompositeKeyWritable key, NullWritable value,
int numReduceTasks) {
return (key.getDeptNo().hashCode() % numReduceTasks);
}
}
view raw 04c-Partitioner hosted with ❤ by GitHub
/***************************************************************
*SortComparator: SecondarySortBasicCompKeySortComparator
*****************************************************************/
import org.apache.hadoop.io.WritableComparator;
public class SecondarySortBasicCompKeySortComparator extends WritableComparator {
protected SecondarySortBasicCompKeySortComparator() {
super(CompositeKeyWritable.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKeyWritable key1 = (CompositeKeyWritable) w1;
CompositeKeyWritable key2 = (CompositeKeyWritable) w2;
int cmpResult = key1.getDeptNo().compareTo(key2.getDeptNo());
if (cmpResult == 0)// same deptNo
{
return -key1.getLNameEmpIDPair()
.compareTo(key2.getLNameEmpIDPair());
//If the minus is taken out, the values will be in
//ascending order
}
return cmpResult;
}
}
***************************************************************
*GroupingComparator: SecondarySortBasicGroupingComparator
***************************************************************
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SecondarySortBasicGroupingComparator extends WritableComparator {
protected SecondarySortBasicGroupingComparator() {
super(CompositeKeyWritable.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKeyWritable key1 = (CompositeKeyWritable) w1;
CompositeKeyWritable key2 = (CompositeKeyWritable) w2;
return key1.getDeptNo().compareTo(key2.getDeptNo());
}
}
***************************************
*Reducer: SecondarySortBasicReducer
***************************************
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class SecondarySortBasicReducer
extends
Reducer<CompositeKeyWritable, NullWritable, CompositeKeyWritable, NullWritable> {
@Override
public void reduce(CompositeKeyWritable key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
view raw 04f-Reducer hosted with ❤ by GitHub
***************************************
*Driver: SecondarySortBasicDriver
***************************************
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SecondarySortBasicDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for SecondarySortBasicDriver- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("Secondary sort example");
job.setJarByClass(SecondarySortBasicDriver.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(SecondarySortBasicMapper.class);
job.setMapOutputKeyClass(CompositeKeyWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setPartitionerClass(SecondarySortBasicPartitioner.class);
job.setSortComparatorClass(SecondarySortBasicCompKeySortComparator.class);
job.setGroupingComparatorClass(SecondarySortBasicGroupingComparator.class);
job.setReducerClass(SecondarySortBasicReducer.class);
job.setOutputKeyClass(CompositeKeyWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(8);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new SecondarySortBasicDriver(), args);
System.exit(exitCode);
}
}
view raw 04g-Driver hosted with ❤ by GitHub
*******************************
*Command to run the program
*******************************
hadoop jar ~/Blog/sortProject/secondarySortBasic/jar/secondarySortBasic.jar SecondarySortBasicDriver /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/sortProject/data/output-secondarySortBasic
view raw 05-CommandToRun hosted with ❤ by GitHub
*******************************
*Results
*******************************
--Source record count
hadoop fs -cat sortProject/data/employees/employees_tsv | wc -l
2246830
--Results record count
hadoop fs -cat sortProject/data/output-secondarySortBasic/part* | wc -l
2246830
--Files generated
hadoop fs -ls -R sortProject/data/output-secondarySortBasic/part* | awk '{print $8}'
sortProject/data/output-secondarySortBasic/part-r-00000
sortProject/data/output-secondarySortBasic/part-r-00001
sortProject/data/output-secondarySortBasic/part-r-00002
sortProject/data/output-secondarySortBasic/part-r-00003
sortProject/data/output-secondarySortBasic/part-r-00004
sortProject/data/output-secondarySortBasic/part-r-00005
sortProject/data/output-secondarySortBasic/part-r-00006
sortProject/data/output-secondarySortBasic/part-r-00007
--Output
hadoop fs -cat sortProject/data/output-secondarySortBasic/part*
d001 Zykh Sudhanshu 205927
d001 Zykh Nidapan 452738
..
d001 Yoshimura Alenka 463297
d001 Yeung Yuguang 483161
..
d001 Acton Basim 105207
d001 Aamodt Sreekrishna 493601
..
d002 Aamodt Yakkov 43290
..
d003 Acton Idoia 211583
..
d004 dAstous Candido 59201
d004 dAstous Berhard 427930
..
d005 Zizka Aamer 409151
d005 Zirintsis Xiaoqiang 52246
....
view raw 06-Results hosted with ❤ by GitHub
**********************
Reference:
**********************
Hadoop the definitive guide, 3rd edition
**********************
Credits:
**********************
Data from mysql - http://dev.mysql.com/doc/employee/en.index.html

Tuesday, September 17, 2013

Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

This post covers, map-side join in Java map-reduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!


What's in this blog?

A sample map-reduce program in Java that joins two datasets, on the map-side - an employee dataset and a department dataset, with the department number as join key.  The department dataset is a very small dataset in MapFile format, is in HDFS, and is added to the distributed cache.  The MapFile is referenced in the map method of the mapper to look up the department name, and emit the employee dataset with department name included.

Apache documentation on DistributedCache:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html

Related blogs:
1. Map-side join sample in Java using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html

Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same.  Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.

Sample program

This gist demonstrates how to do a map-side join, joining a MapFile from distributedcache
with a larger dataset in HDFS.
Includes:
---------
1. Input data and script download
2. Dataset structure review
3. Expected results
4. Mapper code
5. Driver code
6. Data load commands
7. Command to run Java program
8. Results of the program
01. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_map.tar.gz
MapSideJoin-DistCacheMapFile
src
MapperMapSideJoinDCacheMapFile.java
DriverMapSideJoinDCacheMapFile
jar
MapSideJoinDCacheMapFile.jar
********************************************
*Data structure
********************************************
a) Small dataset (departments_map)
[DeptNo DeptName] - MapFile
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
b) Large dataset (employees_tsv)
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005
10009 1952-04-19 Sumant Peac F 1985-02-18 d006
...
********************************************
*Expected Results
********************************************
Everything in employees_tsv file followed by a tab and the department name(from the department file)
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
......
/********************************************
*Mapper
*MapperMapSideJoinDCacheMapFile
********************************************/
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheMapFile extends
Mapper<LongWritable, Text, Text, Text> {
private MapFile.Reader deptMapReader = null;
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
private Text txtMapLookupKey = new Text("");
private Text txtMapLookupValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, LOAD_MAP_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Path[] cacheFilesLocal = DistributedCache.getLocalCacheArchives(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim()
.equals("departments_map.tar.gz")) {
URI uriUncompressedFile = new File(eachPath.toString()
+ "/departments_map").toURI();
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
loadDepartmentsMap(uriUncompressedFile, context);
}
}
}
@SuppressWarnings("deprecation")
private void loadDepartmentsMap(URI uriUncompressedFile, Context context)
throws IOException {
FileSystem dfs = FileSystem.get(context.getConfiguration());
try {
deptMapReader = new MapFile.Reader(dfs,
uriUncompressedFile.toString(), context.getConfiguration());
} catch (Exception e) {
// TODO Auto-generated catch block
context.getCounter(MYCOUNTER.LOAD_MAP_ERROR).increment(1);
e.printStackTrace();
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
txtMapLookupKey.set(arrEmpAttributes[6].toString());
try {
deptMapReader.get(txtMapLookupKey, txtMapLookupValue);
} finally {
txtMapLookupValue
.set((txtMapLookupValue.equals(null) || txtMapLookupValue
.equals("")) ? "NOT-FOUND" : txtMapLookupValue
.toString());
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t"
+ txtMapLookupValue.toString());
}
context.write(txtMapOutputKey, txtMapOutputValue);
txtMapLookupValue.set("");
txtMapLookupKey.set("");
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
deptMapReader.close();
}
}
view raw 04-Mapper hosted with ❤ by GitHub
/********************************************
*Driver
*DriverMapSideJoinDCacheMapFile
********************************************/
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheMapFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverMapSideJoinDCacheMapFile- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJobName("Map-side join with mapfile in DCache");
DistributedCache
.addCacheArchive(
new URI(
"/user/akhanolk/joinProject/data/departments_map.tar.gz"),
conf);
job.setJarByClass(DriverMapSideJoinDCacheMapFile.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheMapFile.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheMapFile(), args);
System.exit(exitCode);
}
}
view raw 05-Driver hosted with ❤ by GitHub
********************************************
*HDFS load commands
********************************************
hadoop fs -mkdir joinProject
hadoop fs -mkdir joinProject/data
hadoop fs -put joinProject/data/* joinProject/data/
view raw 06-HdfsCommands hosted with ❤ by GitHub
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheMapFile/jar/MapSideJoinDCacheMapFile.jar DriverMapSideJoinDCacheMapFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache
********************************************
*Program Output
********************************************
hadoop fs -cat /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache/part* | less
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management
..
view raw 08-Results hosted with ❤ by GitHub

Monday, September 16, 2013

Map-side join sample in Java using reference data (text file) from distributed cache - Part 1

This post covers, map-side join in Java map-reduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?

A sample map-reduce program in Java that joins two datasets, on the map-side - an employee dataset and a department dataset, with the department number as join key.  The department dataset is a very small dataset, is reference data, is in HDFS, and is added to the distributed cache.  The mapper program retrieves the department data available through distributed cache and and loads the same into a HashMap in the setUp() method of the mapper, and the HashMap is referenced in the map method to get the department name, and emit the employee dataset with department name included.

Section 2 demonstrates a solution where a file in HDFS is added to the distributed cache in the driver code, and accessed in the mapper setup method through the distributedcache.getCacheFiles method.

Section 3 demonstrates a solution where a local file is added to the distributed cache at the command line, and accessed in the mapper setup method.

Apache documentation on DistributedCache:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html

Related blogs:
1. Map-side join sample using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html

Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same.  Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.

2.0. Sample program

In this program, the side data, exists in HDFS, and is added to the distributedcache in the driver code, and referenced in the mapper using DistributedCache.getfiles method.


This gist demonstrates how to do a map-side join, loading one small dataset from DistributedCache into a HashMap
in memory, and joining with a larger dataset.
Includes:
---------
1. Input data and script download
2. Dataset structure review
3. Expected results
4. Mapper code
5. Driver code
6. Data load commands
7. Command to run Java program
8. Results of the program
01. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_sorted
departments_txt
MapSideJoin-DistCacheTxtFile
src
MapperMapSideJoinDCacheTextFile.java
DriverMapSideJoinDCacheTxtFile
jar
MapSideJoinDCacheTextFile.jar
********************************************
*Data structure
********************************************
a) Small dataset (departments_txt)
[DeptNo DeptName] - Tab separated
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
b) Large dataset (employees_tsv)
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005
10009 1952-04-19 Sumant Peac F 1985-02-18 d006
...
********************************************
*Expected Results
********************************************
Everything in employees_tsv file followed by a tab and the department name(from the department file)
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
......
/********************************************
*Mapper
*MapperMapSideJoinDCacheTextFile
********************************************/
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheTextFile extends
Mapper<LongWritable, Text, Text, Text> {
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>();
private BufferedReader brReader;
private String strDeptName = "";
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim().equals("departments_txt")) {
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
loadDepartmentsHashMap(eachPath, context);
}
}
}
private void loadDepartmentsHashMap(Path filePath, Context context)
throws IOException {
String strLineRead = "";
try {
brReader = new BufferedReader(new FileReader(filePath.toString()));
// Read each line, split and load to HashMap
while ((strLineRead = brReader.readLine()) != null) {
String deptFieldArray[] = strLineRead.split("\\t");
DepartmentMap.put(deptFieldArray[0].trim(),
deptFieldArray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
} catch (IOException e) {
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
e.printStackTrace();
}finally {
if (brReader != null) {
brReader.close();
}
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
try {
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString());
} finally {
strDeptName = ((strDeptName.equals(null) || strDeptName
.equals("")) ? "NOT-FOUND" : strDeptName);
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t" + strDeptName);
}
context.write(txtMapOutputKey, txtMapOutputValue);
strDeptName = "";
}
}
view raw 04-Mapper hosted with ❤ by GitHub
/********************************************
*Driver
*DriverMapSideJoinDCacheTxtFile
********************************************/
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheTxtFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJobName("Map-side join with text lookup file in DCache");
DistributedCache
.addCacheFile(
new URI(
"/user/akhanolk/joinProject/data/departments_sorted/departments_txt"),
conf);
job.setJarByClass(DriverMapSideJoinDCacheTxtFile.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheTextFile.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheTxtFile(), args);
System.exit(exitCode);
}
}
view raw 05-Driver hosted with ❤ by GitHub
********************************************
*HDFS load commands
********************************************
hadoop fs -mkdir joinProject
hadoop fs -mkdir joinProject/data
hadoop fs -put joinProject/data/* joinProject/data/
view raw 06-HdfsCommands hosted with ❤ by GitHub
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFile/jar/MapSideJoinDCacheTextFile.jar DriverMapSideJoinDCacheTxtFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTextFileLookUpDistCache
********************************************
*Program Output
********************************************
hadoop fs -cat joinProject/data/output-MapSideTextFileLookUpDistCache/part* | less
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management
10010 1963-06-01 1963-06-01 Duangkaew Piveteau F 1989-08-24 d006 Quality Management
10012 1960-10-04 1960-10-04 Patricio Bridgland M 1992-12-18 d005 Development
10013 1963-06-07 1963-06-07 Eberhardt Terkki M 1985-10-20 d003 Human Resources
.....
view raw 08-Results hosted with ❤ by GitHub

3.0. Variation 

As a variation to the code in section 2.0, this section demonstrates how to add side data that is not in HDFS to distributed cache, through command line, leveraging GenericOptionsParser

This gist is part of a series of gists related to Map-side joins in Java map-reduce.
In the gist - https://gist.github.com/airawat/6597557, we added the reference data available
in HDFS to the distributed cache from the driver code.
This gist demonstrates adding a local file via command line to distributed cache.
Refer gist at https://gist.github.com/airawat/6597557 for-
1. Data samples and structure
2. Expected results
3. Commands to load data to HDFS
The listing below includes:
4. Data and code download location
5. Mapper code
6. Driver code
7. Command to run the program
8. Results
04. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_sorted
departments_txt
MapSideJoin-DistCacheTxtFileGOP
src
MapperMapSideJoinDCacheTextFileGOP.java
DriverMapSideJoinDCacheTxtFileGOP.java
jar
MapSideJoin-DistCacheTxtFileGOP.jar
/********************************************
*Mapper
*MapperMapSideJoinDCacheTextFileGOP
********************************************/
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheTextFileGOP extends
Mapper<LongWritable, Text, Text, Text> {
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>();
private BufferedReader brReader;
private String strDeptName = "";
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
File lookupFile = new File("departments_txt");
String strLineRead = "";
try {
brReader = new BufferedReader(new FileReader(lookupFile));
// Read each line, split and load to HashMap
while ((strLineRead = brReader.readLine()) != null) {
String deptFieldArray[] = strLineRead.split("\\t");
DepartmentMap.put(deptFieldArray[0].trim(),
deptFieldArray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
} catch (IOException e) {
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
e.printStackTrace();
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
try {
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString());
} finally {
strDeptName = ((strDeptName.equals(null) || strDeptName
.equals("")) ? "NOT-FOUND" : strDeptName);
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t" + strDeptName);
}
context.write(txtMapOutputKey, txtMapOutputValue);
strDeptName = "";
}
}
view raw 05-Mapper code hosted with ❤ by GitHub
/********************************************
*Driver
*DriverMapSideJoinDCacheTxtFileGOP
********************************************/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheTxtFileGOP extends Configured implements
Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverMapSideJoinDCacheTxtFileGOP- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("Map-side join with text lookup file in DCache-GenericOptionsParser");
job.setJarByClass(DriverMapSideJoinDCacheTxtFileGOP.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheTextFileGOP.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheTxtFileGOP(), args);
System.exit(exitCode);
}
}
view raw 06-DriverCode hosted with ❤ by GitHub
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFileGOP/jar/MapSideJoin-DistCacheTxtFileGOP.jar DriverMapSideJoinDCacheTxtFileGOP -files /home/akhanolk/Blog/joinProject/data/departments_sorted/departments_txt /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTxtFileLookUpDistCacheGOP
view raw 07-RunProgram hosted with ❤ by GitHub
********************************************
*Program Output
********************************************
See - https://gist.github.com/airawat/6597557
view raw 08-Results hosted with ❤ by GitHub


   

Friday, September 13, 2013

Sequence File - construct, usage, code samples

This post covers, sequence file format, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?


1.  Introduction to sequence file format
2.  Sample code to create a sequence file (compressed and uncompressed), from a text file, in a map reduce program, and to read a sequence file.

2.0. What's a Sequence File?


2.0.1. About sequence files:
A sequence file is a persistent data structure for binary key-value pairs.

2.0.2. Construct:
Sequence files have sync points included after every few records, that align with record boundaries, aiding the reader to sync.  The sync points support splitting of files for mapreduce operations.  Sequence files support record-level and block-level compression.

Apache documentation: http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/SequenceFile.html

Excerpts from Hadoop the definitive guide...
"A sequence file consists of a header followed by one or more records. The first three bytes of a sequence file are the bytes SEQ, which acts as a magic number, followed by a single byte representing the version number. The header contains other fields, including the names of the key and value classes, compression details, user-defined metadata, and the sync marker. 
Structure of sequence file with and without record compression-
















The format for record compression is almost identical to no compression, except the value bytes are compressed using the codec defined in the header. Note that keys are not compressed.
Structure of sequence file with and without block compression-











Block compression compresses multiple records at once; it is therefore more compact than and should generally be preferred over record compression because it has the opportunity to take advantage of similarities between records. A sync marker is written before the start of every block. The format of a block is a field indicating the number of records in the block, followed by four compressed fields: the key lengths, the keys, the value lengths, and the values."

The uncompressed, record-compressed and block-compressed sequence files, share the same header.  Details are below, from the Apache documentation, on sequence files.

SequenceFile Header
  • version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)
  • keyClassName -key class
  • valueClassName - value class
  • compression - A boolean which specifies if compression is turned on for keys/values in this file.
  • blockCompression - A boolean which specifies if block-compression is turned on for keys/values in this file.
  • compression codec - CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).
  • metadata - SequenceFile.Metadata for this file.
  • sync - A sync marker to denote end of the header.
Uncompressed SequenceFile Format
  • Header
  • Record
  • Record length
  • Key length
  • Key
  • Value
  • A sync-marker every few 100 bytes or so.
Record-Compressed SequenceFile Format
  • Header
  • Record
  • Record length
  • Key length
  • Key
  • Compressed Value
  • A sync-marker every few 100 bytes or so.
Block-Compressed SequenceFile Format
  • Header
  • Record Block
  • Uncompressed number of records in the block
  • Compressed key-lengths block-size
  • Compressed key-lengths block
  • Compressed keys block-size
  • Compressed keys block
  • Compressed value-lengths block-size
  • Compressed value-lengths block
  • Compressed values block-size
  • Compressed values block
  • A sync-marker every block.
2.0.3. Datatypes: 
The keys and values need not be instances of Writable, just need to support serialization.

2.0.4. Creating sequence files: 
Uncompressed: Create an instance of SequenceFile.Writer and call append(), to add key-values, in order.  For record and block compressed, refer the Apache documentation.  When creating compressed files, the actual compression algorithm used to compress key and/or values can be specified by using the appropriate CompressionCodec.

2.0.5. Reading data in sequence files: 
Create an instance of SequenceFile.Reader, and iterate through the entries using reader.next(key,value).

2.0.6. Usage
- Data storage for key-value type data
- Container for other files
- Efficient from storage perspective (binary), efficient from a mapreduce processing perspective (supports compression, and splitting)

3.0. Creating a sequence file

This gist demonstrates how to create a sequence file (compressed and uncompressed), from a text file.
Includes:
---------
1. Input data and script download
2. Input data-review
3. Data load commands
4. Mapper code
5. Driver code to create the sequence file out of a text file in HDFS
6. Command to run Java program
7. Results of the program run to create sequence file
8. Java program to read a sequence file, and convert to text file
9. Command to run program from #8, with results
10. Note on creating compressed sequence files
11. Driver code to create a compressed sequence file
12. Command to run program in #11 with results
01. Data and code download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
formatProject
data
departments_sorted
part-m-00000
formatConverterTextToSequence
src
FormatConverterMapper.java
FormatConverterTextToSequenceDriver.java
FormatConverterSequenceToTextDriver.java
jars
formatConverterTextToSequence.jar
formatConverterSequenceToText.jar
**************************************************
Input text file - departments_sorted/part-m-00000
**************************************************
$ more formatProject/data/departments_sorted/part-m-00000
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
view raw 02-SourceData hosted with ❤ by GitHub
**********************************************
hdfs load commands
**********************************************
# Load data
$ hadoop fs -put formatProject/
# Remove unnecessary files
$ hadoop fs -rm -R formatProject/formatConverterTextToSequence/
$ hadoop fs -rm -R formatProject/formatConverterTextToMap/
/*********************************************************************************************************
** Mapper
** formatProject/FormatConverterTextToSequence/src/FormatConverterMapper.java
** Reads text file and emits the contents out as key-value pairs
*********************************************************************************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FormatConverterMapper extends
Mapper<LongWritable, Text, LongWritable, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
/*********************************************************************************************************
** Driver
** formatProject/FormatConverterTextToSequence/src/FormatConverterTextToSequenceDriver.java
*********************************************************************************************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FormatConverterTextToSequenceDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Two parameters are required for FormatConverterTextToSequenceDriver-<input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(FormatConverterTextToSequenceDriver.class);
job.setJobName("Create Sequence File, from text file");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(FormatConverterMapper.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new FormatConverterTextToSequenceDriver(), args);
System.exit(exitCode);
}
}
************************************************
**Command to create sequence file from text file
************************************************
$ hadoop jar formatProject/formatConverterTextToSequence/jars/formatConverterTextToSequence.jar FormatConverterTextToSequenceDriver formatProject/data/departments_sorted/part-m-00000 formatProject/data/departments_sequence
.
.
.
.
$ hadoop fs -ls -R formatProject/data/departments_sequence | awk '{print $8}'
formatProject/data/departments_sequence/_SUCCESS
formatProject/data/departments_sequence/_logs
formatProject/data/departments_sequence/_logs/history
formatProject/data/departments_sequence/_logs/history/cdh-jt01_1376335706356_job_201308121428_0116_conf.xml
formatProject/data/departments_sequence/_logs/history/job_201308121428_0116_1379087496898_akhanolk_Create+Sequence+File%2C+from+text+file
formatProject/data/departments_sequence/part-m-00000
************************************************
**Results
************************************************
$ hadoop fs -text formatProject/data/departments_sequence/part-m-00000
0 d001 Marketing
15 d002 Finance
28 d003 Human Resources
49 d004 Production
65 d005 Development
82 d006 Quality Management
106 d007 Sales
117 d008 Research
131 d009 Customer Service
view raw 07-Results hosted with ❤ by GitHub
/*********************************************************************************************************
** Driver
** formatProject/FormatConverterTextToSequence/src/FormatConverterSequenceToTextDriver.java
*********************************************************************************************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FormatConverterSequenceToTextDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters need to be supplied - <input dir> and <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(FormatConverterSequenceToTextDriver.class);
job.setJobName("Convert Sequence File and Output as Text");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(FormatConverterMapper.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new FormatConverterSequenceToTextDriver(), args);
System.exit(exitCode);
}
}
**************************************************************
**Command to create text file from sequence file & results
**************************************************************
$ hadoop jar formatProject/formatConverterTextToSequence/jars/formatConverterSequenceToText.jar FormatConverterSequenceToTextDriver formatProject/data/departments_sequence/part-m-00000 formatProject/data/departments_text
$ hadoop fs -ls -R formatProject/data/departments_text | awk '{print $8}'
formatProject/data/departments_text/_SUCCESS
formatProject/data/departments_text/_logs
formatProject/data/departments_text/_logs/history
formatProject/data/departments_text/_logs/history/cdh-jt01_1376335706356_job_201308121428_0118_conf.xml
formatProject/data/departments_text/_logs/history/job_201308121428_0118_1379089420495_akhanolk_Convert+Sequence+File+and+Output+as+Text
formatProject/data/departments_text/part-m-00000
$ hadoop fs -cat formatProject/data/departments_text/part-m-00000
0 d001 Marketing
15 d002 Finance
28 d003 Human Resources
49 d004 Production
65 d005 Development
82 d006 Quality Management
106 d007 Sales
117 d008 Research
131 d009 Customer Service
**************************************************************
** Compression and sequence files
**************************************************************
To create an compressed sequence file - and block compression is the recommended option, there are just minor additions to code in the driver [formatProject/FormatConverterTextToSequence/src/FormatConverterTextToSequenceDriver.java]
The sample code here uses SnappyCodec, and block compression.
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);
The next section includes the code.
view raw 10-Compression hosted with ❤ by GitHub
/*************************************************************************************************************
** Driver
** formatProject/FormatConverterTextToSequence/src/FormatConverterTextToBlckCompSequenceDriver.java
*************************************************************************************************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FormatConverterTextToBlckCompSequenceDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf("Two parameters are required for FormatConverterTextToBlckCompSequenceDriver-<input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(FormatConverterTextToBlckCompSequenceDriver.class);
job.setJobName("Create block compressed Sequence File, from text file");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
job.setMapperClass(FormatConverterMapper.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new FormatConverterTextToBlckCompSequenceDriver(), args);
System.exit(exitCode);
}
}
*************************************************************************************
**Command to create block compressed(snappy) sequence file from text file + output
*************************************************************************************
$ hadoop jar formatProject/formatConverterTextToSequence/jars/formatConverterTextToBlkCompSequence.jar FormatConverterTextToBlckCompSequenceDriver formatProject/data/departments_sorted/part-m-00000 formatProject/data/departments_sequence_blckcmp
.
$ hadoop fs -ls -R formatProject/data/departments_sequence_blckcmp | awk '{print $8}'
formatProject/data/departments_sequence_blckcmp/_SUCCESS
formatProject/data/departments_sequence_blckcmp/_logs
formatProject/data/departments_sequence_blckcmp/_logs/history
formatProject/data/departments_sequence_blckcmp/_logs/history/cdh-jt01_1376335706356_job_201308121428_0120_conf.xml
formatProject/data/departments_sequence_blckcmp/_logs/history/job_201308121428_0120_1379091181653_akhanolk_Create+block+compressed+Sequence+File%2C+from+text+f
formatProject/data/departments_sequence_blckcmp/part-m-00000
$ hadoop fs -text formatProject/data/departments_sequence_blckcmp/part-m-00000
13/09/13 11:55:38 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
13/09/13 11:55:38 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
13/09/13 11:55:38 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
13/09/13 11:55:38 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
0 d001 Marketing
15 d002 Finance
28 d003 Human Resources
49 d004 Production
65 d005 Development
82 d006 Quality Management
106 d007 Sales
117 d008 Research
131 d009 Customer Service

4.0. Reading a sequence file

Covered already in the gist under section 3.

5.0. Any thoughts/comments

Any constructive criticism and/or additions/insights is much appreciated.

Cheers!!





Thursday, September 12, 2013

Map File - construct, usage, code samples

This post covers, map file format, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?

1.  Introduction to map file
2.  Sample code to convert a text file to a map file
3.  Sample code to read a map file

2.0. What's a Map File?

2.0.1. Definition:
From Hadoop the Definitive Guide..
A MapFile is a sorted SequenceFile with an index to permit lookups by key. MapFile can be thought of as a persistent form of java.util.Map (although it doesn’t implement this interface), which is able to grow beyond the size of a Map that is kept in memory.
Apache documentation:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/MapFile.html 

2.0.2. Datatypes: 
The keys must be instances of WritableComparable, and the values, Writable.

2.0.3. Creating map files: 
Create an instance of MapFile.Writer and call append(), to add key-values, in order.

2.0.4. Looking up data in map files: 
Create an instance of MapFile.Reader, and call get(key,value).

2.0.5. Construct
The map file is actually a directory.  Within the same, there is an "index" file, and a "data" file.
The data file is a sequence file and has keys and associated values.
The index file is smaller, has key value pairs with the key being the actual key of the data, and the value, the byte offset.  The index file has a fraction of the keys and is determined by MapFile.Writer.GetIndexInterval().

2.0.5.1. Directory structure:
$ hadoop fs -ls formatProject/data/departments_map | awk '{print $8}'
formatProject/data/departments_map/data
formatProject/data/departments_map/index

2.0.5.2. Content of the file 'data':
$ hadoop fs -text formatProject/data/departments_map/data
d001  Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service

2.0.5.3. Content of the file 'index':
$ hadoop fs -text formatProject/data/departments_map/index
d001 121
d002 152
d003 181
d004 218
d005 250
d006 283
d007 323
d008 350
d009 380

2.0.6. Behind the scenes of a look up
The index file is read into memory, the key less than or equal to the one being looked up is (binary) searched for, and the reader seeks to this key and reads up to key being looked up, extracts and returns the value associated with the key.  Returns a null if the key is not found.

If the map file is too large to load into memory, there are configurations that can be set to skip keys in the index.   

2.0.7. Usage
Fast lookups - in joins, among others.
Can also be used as a container for small files, with the filename as the key.

3.0. Creating a map file

This gist demonstrates how to create a map file, from a text file.
Includes:
---------
1. Input data and script download
2. Input data-review
3. Data load commands
4. Java program to create the map file out of a text file in HDFS
5. Command to run Java program
6. Results of the program run to create map file
7. Java program to lookup data in map file
8. Command to run program to do a lookup
01. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
formatProject
data
departments_sorted
part-m-00000
formatConverterTextToMap
src
FormatConverterTextToMap.java
MapFileLookup.java
jars
formatConverterTextToMap.jar
**************************************************
Input text file - departments_sorted/part-m-00000
**************************************************
$ more formatProject/data/departments_sorted/part-m-00000
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
view raw 02-InputData hosted with ❤ by GitHub
**********************************************
hdfs load commands
**********************************************
# Load data
$ hadoop fs -put formatProject/
# Remove unnecessary files
$ hadoop fs -rm -R formatProject/formatConverterTextToMap/
/******************************************
* FormatConverterTextToMap.java
* ****************************************/
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
public class FormatConverterTextToMap {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException{
Configuration conf = new Configuration();
FileSystem fs;
try {
fs = FileSystem.get(conf);
Path inputFile = new Path(args[0]);
Path outputFile = new Path(args[1]);
Text txtKey = new Text();
Text txtValue = new Text();
String strLineInInputFile = "";
String lstKeyValuePair[] = null;
MapFile.Writer writer = null;
FSDataInputStream inputStream = fs.open(inputFile);
try {
writer = new MapFile.Writer(conf, fs, outputFile.toString(),
txtKey.getClass(), txtKey.getClass());
writer.setIndexInterval(1);//Need this as the default is 128, and my data is just 9 records
while (inputStream.available() > 0) {
strLineInInputFile = inputStream.readLine();
lstKeyValuePair = strLineInInputFile.split("\\t");
txtKey.set(lstKeyValuePair[0]);
txtValue.set(lstKeyValuePair[1]);
writer.append(txtKey, txtValue);
}
} finally {
IOUtils.closeStream(writer);
System.out.println("Map file created successfully!!");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
******************************************************************
**Command to run program that creates a map file from text file
******************************************************************
$ hadoop jar formatProject/formatConverterTextToMap/jars/formatConverterTextToMap.jar FormatConverterTextToMap formatProject/data/departments_sorted/part-m-00000 formatProject/data/departments_map
13/09/12 22:05:21 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
13/09/12 22:05:21 INFO compress.CodecPool: Got brand-new compressor [.deflate]
13/09/12 22:05:21 INFO compress.CodecPool: Got brand-new compressor [.deflate]
Map file created successfully!!
view raw 05-RunProgram hosted with ❤ by GitHub
************************************************
**Results
************************************************
$ hadoop fs -ls formatProject/data/departments_map | awk '{print $8}'
formatProject/data/departments_map/data
formatProject/data/departments_map/index
$ hadoop fs -text formatProject/data/departments_map/data
13/09/12 22:44:34 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
13/09/12 22:44:34 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
$ hadoop fs -text formatProject/data/departments_map/index
13/09/12 22:44:56 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
13/09/12 22:44:56 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:44:56 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:44:56 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:44:56 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
d001 121
d002 152
d003 181
d004 218
d005 250
d006 283
d007 323
d008 350
d009 380
view raw 06-Results hosted with ❤ by GitHub
/****************************************
* MapFileLookup.java
* **************************************/
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
public class MapFileLookup {
/*
This program looks up a map file for a certain key and returns the associated value
The call to this program is:
Parameters:
param 1: Path to map file
param 2: Key for which we want to get the value from the map file
Return: The value for the key
Return type: Text
Sample call: hadoop jar MapFileLookup.jar MapFileLookup <map-file-directory> <key>
*/
@SuppressWarnings("deprecation")
public static Text main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = null;
Text txtKey = new Text(args[1]);
Text txtValue = new Text();
MapFile.Reader reader = null;
try {
fs = FileSystem.get(conf);
try {
reader = new MapFile.Reader(fs, args[0].toString(), conf);
reader.get(txtKey, txtValue);
} catch (IOException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(reader != null)
reader.close();
}
System.out.println("The key is " + txtKey.toString()
+ " and the value is " + txtValue.toString());
return txtValue;
}
}
view raw 07-ReadMapFile hosted with ❤ by GitHub
**************************************************************************
**Commands to run program to look up a key in a map file from text file
**************************************************************************
$ hadoop jar formatProject/formatConverterTextToMap/jars/MapFileLookup.jar MapFileLookup formatProject/data/departments_map "d009"
13/09/12 22:53:08 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
13/09/12 22:53:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:53:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:53:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:53:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
13/09/12 22:53:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
The key is d009 and the value is Customer Service

4.0. Looking up a key in a map file

Covered already in the gist under section 3.
The plan is to use the map file in a map-side join in a subsequent blog.

5.0. Any thoughts/comments

Any constructive criticism and/or additions/insights is much appreciated.

Cheers!!

Wednesday, September 11, 2013

Apache Oozie - Part 12: Oozie Shell Action + Passing output from one Oozie action to another

I had read about the Oozie capability to allow passing output from one action to another and forgotten about it, sure enough, it came up at an interview.  Here's some sample code...


1.0. What's covered in the blog?

1. Documentation on the Oozie shell action
2. A sample oozie workflow that includes a shell script action that echoes a count of the number of lines in a file glob, and an email action that captures the output of the shell action and email it.

Version:
Oozie 3.3.0; Pig 0.10.0

Related blogs:
Blog 1: Oozie workflow - hdfs and email actions
Blog 2: Oozie workflow - hdfs, email and hive actions
Blog 3: Oozie workflow - sqoop action (Hive-mysql; sqoop export)
Blog 4: Oozie workflow - java map-reduce (new API) action
Blog 5: Oozie workflow - streaming map-reduce (python) action 
Blog 6: Oozie workflow - java main action
Blog 7: Oozie workflow - Pig action
Blog 8: Oozie sub-workflow
Blog 9a: Oozie coordinator job - time-triggered sub-workflow, fork-join control and decision control
Blog 9b: Oozie coordinator jobs - file triggered 
Blog 9c: Oozie coordinator jobs - dataset availability triggered
Blog 10: Oozie bundle jobs
Blog 11a: Oozie Java API for interfacing with oozie workflows
Blog 12: Oozie shell action + passing output from one action to another


2.0. Documentation on the Oozie Shell Action


Apache documentation is available at - http://oozie.apache.org/docs/3.3.0/DG_ShellActionExtension.html


3.0. Sample program


This gist includes components of a oozie workflow - scripts/code, sample data
and commands; Oozie actions covered: shell action, email action
Action 1: The shell action executes a shell script that does a line count for files in a
glob provided, and writes the line count to standard output
Action 2: The email action emails the output of action 1
Pictorial overview of job:
--------------------------
<<To be added>>
Includes:
---------
Data and script download: 01-DataAndScriptDownload
Data load commands: 02-HdfsLoadCommands
Shell Script: 03-ShellScript
Oozie job properties file: 04-OozieJobProperties
Oozie workflow file: 05-OozieWorkflowXML
Oozie SMTP Configuration: 06-OozieSMTPConfig
Oozie commands 07-OozieJobExecutionCommands
Output email 08-OutputOfProgram
Oozie web console - screenshots 09-OozieWebConsoleScreenshots
01. Data and script download
-----------------------------
Github:
https://github.com/airawat/OozieSamples
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
oozieProject
data
airawat-syslog
<<Node-Name>>
<<Year>>
<<Month>>
messages
workflowShellAction
workflow.xml
job.properties
lineCount.sh
02-Hdfs load commands
----------------------
$ hadoop fs -mkdir oozieProject
$ hadoop fs -put oozieProject/* oozieProject/
#*************************************************
#lineCount.sh
#*************************************************
#!/bin/bash -e
echo "NumberOfLines=`hadoop fs -cat $1 | wc -l`"
view raw 03-ShellScript hosted with ❤ by GitHub
#*************************************************
# job.properties
#*************************************************
nameNode=hdfs://cdh-nn01.chuntikhadoop.com:8020
jobTracker=cdh-jt01:8021
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
oozieProjectRoot=${nameNode}/user/${user.name}/oozieProject
appPath=${oozieProjectRoot}/workflowShellAction
oozie.wf.application.path=${appPath}
inputDir=${oozieProjectRoot}/data/*/*/*/*/*
lineCountShScriptPath=${appPath}/lineCount.sh
lineCountShellScript=lineCount.sh
emailToAddress=akhanolk@cdh-dev01
<!--******************************************-->
<!--workflow.xml -->
<!--******************************************-->
<workflow-app name="WorkFlowForShellActionWithCaptureOutput" xmlns="uri:oozie:workflow:0.1">
<start to="shellAction"/>
<action name="shellAction">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${lineCountShellScript}</exec>
<argument>${inputDir}</argument>
<file>${lineCountShScriptPath}#${lineCountShellScript}</file>
<capture-output/>
</shell>
<ok to="sendEmail"/>
<error to="killAction"/>
</action>
<action name="sendEmail">
<email xmlns="uri:oozie:email-action:0.1">
<to>${emailToAddress}</to>
<subject>Output of workflow ${wf:id()}</subject>
<body>Results from line count: ${wf:actionData('shellAction')['NumberOfLines']}</body>
</email>
<ok to="end"/>
<error to="end"/>
</action>
<kill name="killAction">
<message>"Killed job due to error"</message>
</kill>
<end name="end"/>
</workflow-app>
Oozie SMTP configuration
------------------------
Add the following to the oozie-site.xml, and restart oozie.
Replace values with the same specific to your environment.
<!-- SMTP params-->
<property>
<name>oozie.email.smtp.host</name>
<value>cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
</property>
<property>
<name>oozie.email.from.address</name>
<value>oozie@cdh-dev01</value>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>false</value>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value></value>
</property>
<property>
<name>oozie.email.smtp.password</name>
<value></value>
</property>
view raw 06-SMTPConfig hosted with ❤ by GitHub
06. Oozie commands
-------------------
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowShellAction/job.properties -submit
job: 0000012-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000014-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000014-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000014-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000014-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowShellAction/job.properties -rerun 0000014-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000014-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000014-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
########################
#Program output
########################
From akhanolk@cdh-dev01.localdomain Thu Sep 12 00:51:00 2013
Return-Path: <akhanolk@cdh-dev01.localdomain>
X-Original-To: akhanolk@cdh-dev01
Delivered-To: akhanolk@cdh-dev01.localdomain
From: akhanolk@cdh-dev01.localdomain
To: akhanolk@cdh-dev01.localdomain
Subject: Output of workflow 0000009-130911235633916-oozie-oozi-W
Content-Type: text/plain; charset=us-ascii
Date: Thu, 12 Sep 2013 00:51:00 -0500 (CDT)
Status: R
Results from line count: 5207
view raw 08-Output hosted with ❤ by GitHub

4.0. Oozie web console screenshots