Sunday, September 22, 2013

Map-side join of large datasets using CompositeInputFormat

This post covers, map-side join of large datasets using CompositeInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Hive and Pig rock and rule at joining datasets, but it helps to know how to perform joins in java.

Update [10/15/2013]
I have added the pig equivalent at the very bottom of the gist.

Feel free to share any insights or constructive criticism. Cheers!!

Related blogs:
1. Map-side join sample in Java using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2
3. Map-side join sample in Java of two large datasets, leveraging CompositeInputFormat

Sample program:
**********************
**Gist
**********************
This gist details how to inner join two large datasets on the map-side, leveraging the join capability
in mapreduce. Such a join makes sense if both input datasets are too large to qualify for distribution
through distributedcache, and can be implemented if both input datasets can be joined by the join key
and both input datasets are sorted in the same order, by the join key.
There are two critical pieces to engaging the join behavior:
- the input format must be set to CompositeInputFormat.class, and
- the key mapred.join.expr must have a value that is a valid join specification.
Sample program:
Covers inner join of employee and salary data with employee ID as join key in a map-only program
Inner join:
The inner join is a traditional database-style inner join. The map method will be called with a key/value
set only if every dataset in the join contains the key. The TupleWritable value will contain a value for
every dataset in the join, join key excluded.
Key code in the sample program:
conf.setInputFormat(CompositeInputFormat.class);
String strJoinStmt = CompositeInputFormat.compose("inner",
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData);
conf.set("mapred.join.expr", strJoinStmt);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, dirOutput);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
Old API:
I ended up using the old API as the new API does not include CompositeInputFormat in the version of
Hadoop I am running.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
joinProject
data
employees_sorted
part-e
salaries_sorted
part-s
MapSideJoinLargeDatasets
src
KeyValueLongInputFormat.java
KeyValueLongLineRecordReader.java
MapperMapSideJoinLargeDatasets.java
DriverMapSideJoinLargeDatasets.java
jar
MapSideJoinLgDsOAPI.jar
view raw 02-DataAndCode hosted with ❤ by GitHub
********************************
Data Structure Review
********************************
Datasets:
The two datasets are employee and salary datasets.
Join key:
The join key is EmpNo/employee number
Location of join key:
The join key is the first field in both datasets
Sorting:
The data is sorted by the join key "EmpNo" in ascending order.
Sorting is crucial for accuracy of joins
File format:
The files are in text format, with comma as a separator
Cardinality:
Is 1..1 on join key; Both datasets have the same number of records
Employee data [joinProject/data/employees_sorted/part-e]
--------------------------------------------------------
[EmpNo,DOB,FName,LName,Gender,HireDate,DeptNo]
10001,1953-09-02,Georgi,Facello,M,1986-06-26,d005
10002,1964-06-02,Bezalel,Simmel,F,1985-11-21,d007
10003,1959-12-03,Parto,Bamford,M,1986-08-28,d004
10004,1954-05-01,Chirstian,Koblick,M,1986-12-01,d004
10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12,d003
10006,1953-04-20,Anneke,Preusig,F,1989-06-02,d005
.....
Salary data [joinProject/data/salaries_sorted/part-s]
------------------------------------------------------
[EmpNo,Salary,FromDate,ToDate]
10001,88958,2002-06-22,9999-01-01
10002,72527,2001-08-02,9999-01-01
10003,43311,2001-12-01,9999-01-01
10004,74057,2001-11-27,9999-01-01
10005,94692,2001-09-09,9999-01-01
..........
************************************
Expected Results - tab separated
************************************
[EmpNo FName LName Salary]
10001 Georgi Facello 88958
10002 Bezalel Simmel 72527
10003 Parto Bamford 43311
10004 Chirstian Koblick 74057
10005 Kyoichi Maliniak 94692
10006 Anneke Preusig 59755
10009 Sumant Peac 94409
10010 Duangkaew Piveteau 80324
........
******************************
Observations
******************************
Setting the inputformat to KeyValueTextInputFormat resulted in only
part of the data getting joined. I attributed this to the fact that
the EmpNo is numeric and the sort was not working right with the attribute
set as Text. Found that others had encountered the same issue..and one
individualhad created a custom format - KeyValueLongInputFormat and associated record
reader. This gist uses the same code, with minor modifications.
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data
view raw 04-Observations hosted with ❤ by GitHub
/**********************************
*KeyValueLongLineRecordReader.java
*Custom record reader
**********************************/
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class KeyValueLongLineRecordReader implements
RecordReader<LongWritable, Text> {
private final LineRecordReader lineRecordReader;
private byte separator = (byte) ',';
private LongWritable dummyKey;
private Text innerValue;
public Class getKeyClass() {
return LongWritable.class;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
public KeyValueLongLineRecordReader(Configuration job, FileSplit split)
throws IOException {
lineRecordReader = new LineRecordReader(job, split);
dummyKey = lineRecordReader.createKey();
innerValue = lineRecordReader.createValue();
String sepStr = job.get("key.value.separator.in.input.line", ",");
this.separator = (byte) sepStr.charAt(0);
}
public static int findSeparator(byte[] utf, int start, int length, byte sep) {
for (int i = start; i < (start + length); i++) {
if (utf[i] == sep) {
return i;
}
}
return -1;
}
/** Read key/value pair in a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
LongWritable tKey = key;
Text tValue = value;
byte[] line = null;
int lineLen = -1;
if (lineRecordReader.next(dummyKey, innerValue)) {
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if (line == null)
return false;
int pos = findSeparator(line, 0, lineLen, this.separator);
if (pos == -1) {
tKey.set(Long.valueOf(new String(line, 0, lineLen)));
tValue.set("");
} else {
int keyLen = pos;
byte[] keyBytes = new byte[keyLen];
System.arraycopy(line, 0, keyBytes, 0, keyLen);
int valLen = lineLen - keyLen - 1;
byte[] valBytes = new byte[valLen];
System.arraycopy(line, pos + 1, valBytes, 0, valLen);
tKey.set(Long.valueOf(new String(keyBytes)));
tValue.set(valBytes);
}
return true;
}
public float getProgress() {
return lineRecordReader.getProgress();
}
public synchronized long getPos() throws IOException {
return lineRecordReader.getPos();
}
public synchronized void close() throws IOException {
lineRecordReader.close();
}
}
/**********************************
*KeyValueLongInputFormat.java
*Custom key value format
**********************************/
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class KeyValueLongInputFormat extends
FileInputFormat<LongWritable, Text> implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
@Override
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}
protected boolean isSplitable(FileSystem fs, Path file) {
return compressionCodecs.getCodec(file) == null;
}
@Override
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new KeyValueLongLineRecordReader(job, (FileSplit) genericSplit);
}
}
/**********************************
*MapperMapSideJoinLargeDatasets.java
*Mapper
**********************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.join.TupleWritable;
public class MapperMapSideJoinLargeDatasets extends MapReduceBase implements
Mapper<LongWritable, TupleWritable, Text, Text> {
Text txtKey = new Text("");
Text txtValue = new Text("");
@Override
public void map(LongWritable key, TupleWritable value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
if (value.toString().length() > 0) {
txtKey.set(key.toString());
String arrEmpAttributes[] = value.get(0).toString().split(",");
String arrDeptAttributes[] = value.get(1).toString().split(",");
txtValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrDeptAttributes[0].toString());
output.collect(txtKey, txtValue);
}
}
}
view raw 07-Mapper hosted with ❤ by GitHub
/**********************************
*DriverMapSideJoinLargeDatasets
*Driver
**********************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.join.CompositeInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class DriverMapSideJoinLargeDatasets {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf("DriverMapSideJoinLargeDatasets");
conf.setJarByClass(DriverMapSideJoinLargeDatasets.class);
String[] jobArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Path dirEmployeesData = new Path(jobArgs[0]);
Path dirSalaryData = new Path(jobArgs[1]);
Path dirOutput = new Path(jobArgs[2]);
conf.setMapperClass(MapperMapSideJoinLargeDatasets.class);
conf.setInputFormat(CompositeInputFormat.class);
String strJoinStmt = CompositeInputFormat.compose("inner",
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData);
conf.set("mapred.join.expr", strJoinStmt);
conf.setNumReduceTasks(0);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, dirOutput);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
System.exit(job.isSuccessful() ? 0 : 2);
}
}
view raw 08-Driver hosted with ❤ by GitHub
**************************
HDFS data load commands
**************************
hadoop fs -mkdir joinProject
hadoop fs -put joinProject/* joinProject/
view raw 09-LoadCommands hosted with ❤ by GitHub
**************************
Command to run program
**************************
hadoop jar ~/Blog/joinProject/MapSideJoinLargeDatasets/jar/MapSideJoinLgDsOAPI.jar DriverMapSideJoinLargeDatasets /user/akhanolk/joinProject/data/employees_sorted/part-e /user/akhanolk/joinProject/data/salaries_sorted/part-s /user/akhanolk/joinProject/output/output-MapSideJoinLargeDatasets
view raw 10-RunCommands hosted with ❤ by GitHub
**************************
Results
**************************
...
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/09/22 13:11:17 INFO mapred.JobClient: Map-Reduce Framework
13/09/22 13:11:17 INFO mapred.JobClient: Map input records=224683000
13/09/22 13:11:17 INFO mapred.JobClient: Map output records=224683000
...
$ hadoop fs -cat joinProject/output/output-MapSideJoinLargeDatasets/part* | less
10001 Georgi Facello 88958
10002 Bezalel Simmel 72527
10003 Parto Bamford 43311
10004 Chirstian Koblick 74057
10005 Kyoichi Maliniak 94692
10006 Anneke Preusig 59755
10009 Sumant Peac 94409
10010 Duangkaew Piveteau 80324
.....
view raw 11-Results hosted with ❤ by GitHub
**************************
References
**************************
Concepts:
Pro Hadoop
Hadoop the Definitive Guide
Data-Intensive Text Processing with MapReduce
Code:
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data
Data:
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html
view raw 12-References hosted with ❤ by GitHub
*******************************
Pig - map-side join
of datasets with cardinality
of 1..1
Using 'replicated'
or
'merge', if sorted
*********************************
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray);
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo;
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_active/part-sc' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray);
salDS = foreach rawSalDS generate empNo, salary;
joinedDS = join empDS by empNo, salDS by empNo using 'replicated';
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary;
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ1To1';

8 comments:

  1. Really good blog :) thanks for sharing.

    I have one question on the CompositeInputFormat. How does it work internally? It looks like it will read the all the paths and do a inner join on the key and the result will be passed on to the mapper. If this is true, other than "inner" as op is there any other operation that can be done?

    ReplyDelete
  2. Hi,

    I would like to know how to deal with the case where you have unsorted data. For example the input data was as follows:

    Employee data [joinProject/data/employees_sorted/part-e]
    --------------------------------------------------------
    [EmpNo,DOB,FName,LName,Gender,HireDate,DeptNo]
    10001,1953-09-02,Georgi,Facello,M,1986-06-26,d005
    10002,1964-06-02,Bezalel,Simmel,F,1985-11-21,d007
    10003,1959-12-03,Parto,Bamford,M,1986-08-28,d004
    10004,1954-05-01,Chirstian,Koblick,M,1986-12-01,d004
    10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12,d003
    10006,1953-04-20,Anneke,Preusig,F,1989-06-02,d005
    .....
    Salary data [joinProject/data/salaries_sorted/part-s]
    ------------------------------------------------------
    [EmpNo,Salary,FromDate,ToDate]
    10001,88958,2002-06-22,9999-01-01
    10002,72527,2001-08-02,9999-01-01
    10004,43311,2001-12-01,9999-01-01
    10003,74057,2001-11-27,9999-01-01
    10005,94692,2001-09-09,9999-01-01
    ..........

    and the desired output was :

    ************************************
    Expected Results - tab separated
    ************************************
    [EmpNo FName LName Salary]
    10001 Georgi Facello 88958
    10002 Bezalel Simmel 72527
    10003 Parto Bamford 43311
    10004 Chirstian Koblick 74057
    10005 Kyoichi Maliniak 94692
    10006 Anneke Preusig 59755
    10009 Sumant Peac 94409
    10010 Duangkaew Piveteau 80324

    ReplyDelete
  3. very useful. Do we need to have same number of records in both the tables?
    I tried this with both inner and outer. Inner join gives me the expected results but outer join is not. Can you try outer join and let me know. My code is here
    https://github.com/swapnaraja/hadoopCode/tree/master/mapReduce/joins/compositeinput

    ReplyDelete
  4. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  5. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    Big Data Hadoop training in electronic city

    ReplyDelete
  6. Nice blog..! I really loved reading through this article. Thanks for sharing such
    a amazing post with us and keep blogging...pmp training Chennai | pmp training centers in Chenai | pmp training institutes in Chennai | pmp training and certification in Chennai | pmp training in velachery

    ReplyDelete