Update [10/15/2013]
I have added the pig equivalent at the very bottom of the gist.
Feel free to share any insights or constructive criticism. Cheers!!
Related blogs:
1. Map-side join sample in Java using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2
3. Map-side join sample in Java of two large datasets, leveraging CompositeInputFormat
Sample program:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
********************** | |
**Gist | |
********************** | |
This gist details how to inner join two large datasets on the map-side, leveraging the join capability | |
in mapreduce. Such a join makes sense if both input datasets are too large to qualify for distribution | |
through distributedcache, and can be implemented if both input datasets can be joined by the join key | |
and both input datasets are sorted in the same order, by the join key. | |
There are two critical pieces to engaging the join behavior: | |
- the input format must be set to CompositeInputFormat.class, and | |
- the key mapred.join.expr must have a value that is a valid join specification. | |
Sample program: | |
Covers inner join of employee and salary data with employee ID as join key in a map-only program | |
Inner join: | |
The inner join is a traditional database-style inner join. The map method will be called with a key/value | |
set only if every dataset in the join contains the key. The TupleWritable value will contain a value for | |
every dataset in the join, join key excluded. | |
Key code in the sample program: | |
conf.setInputFormat(CompositeInputFormat.class); | |
String strJoinStmt = CompositeInputFormat.compose("inner", | |
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData); | |
conf.set("mapred.join.expr", strJoinStmt); | |
conf.setOutputFormat(TextOutputFormat.class); | |
TextOutputFormat.setOutputPath(conf, dirOutput); | |
conf.setOutputKeyClass(Text.class); | |
conf.setOutputValueClass(Text.class); | |
Old API: | |
I ended up using the old API as the new API does not include CompositeInputFormat in the version of | |
Hadoop I am running. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Data and code download | |
******************************* | |
Data and code: | |
-------------- | |
gitHub: | |
<<To be added>> | |
Email me at airawat.blog@gmail.com if you encounter any issues | |
Directory structure | |
------------------- | |
joinProject | |
data | |
employees_sorted | |
part-e | |
salaries_sorted | |
part-s | |
MapSideJoinLargeDatasets | |
src | |
KeyValueLongInputFormat.java | |
KeyValueLongLineRecordReader.java | |
MapperMapSideJoinLargeDatasets.java | |
DriverMapSideJoinLargeDatasets.java | |
jar | |
MapSideJoinLgDsOAPI.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************** | |
Data Structure Review | |
******************************** | |
Datasets: | |
The two datasets are employee and salary datasets. | |
Join key: | |
The join key is EmpNo/employee number | |
Location of join key: | |
The join key is the first field in both datasets | |
Sorting: | |
The data is sorted by the join key "EmpNo" in ascending order. | |
Sorting is crucial for accuracy of joins | |
File format: | |
The files are in text format, with comma as a separator | |
Cardinality: | |
Is 1..1 on join key; Both datasets have the same number of records | |
Employee data [joinProject/data/employees_sorted/part-e] | |
-------------------------------------------------------- | |
[EmpNo,DOB,FName,LName,Gender,HireDate,DeptNo] | |
10001,1953-09-02,Georgi,Facello,M,1986-06-26,d005 | |
10002,1964-06-02,Bezalel,Simmel,F,1985-11-21,d007 | |
10003,1959-12-03,Parto,Bamford,M,1986-08-28,d004 | |
10004,1954-05-01,Chirstian,Koblick,M,1986-12-01,d004 | |
10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12,d003 | |
10006,1953-04-20,Anneke,Preusig,F,1989-06-02,d005 | |
..... | |
Salary data [joinProject/data/salaries_sorted/part-s] | |
------------------------------------------------------ | |
[EmpNo,Salary,FromDate,ToDate] | |
10001,88958,2002-06-22,9999-01-01 | |
10002,72527,2001-08-02,9999-01-01 | |
10003,43311,2001-12-01,9999-01-01 | |
10004,74057,2001-11-27,9999-01-01 | |
10005,94692,2001-09-09,9999-01-01 | |
.......... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************************ | |
Expected Results - tab separated | |
************************************ | |
[EmpNo FName LName Salary] | |
10001 Georgi Facello 88958 | |
10002 Bezalel Simmel 72527 | |
10003 Parto Bamford 43311 | |
10004 Chirstian Koblick 74057 | |
10005 Kyoichi Maliniak 94692 | |
10006 Anneke Preusig 59755 | |
10009 Sumant Peac 94409 | |
10010 Duangkaew Piveteau 80324 | |
........ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
****************************** | |
Observations | |
****************************** | |
Setting the inputformat to KeyValueTextInputFormat resulted in only | |
part of the data getting joined. I attributed this to the fact that | |
the EmpNo is numeric and the sort was not working right with the attribute | |
set as Text. Found that others had encountered the same issue..and one | |
individualhad created a custom format - KeyValueLongInputFormat and associated record | |
reader. This gist uses the same code, with minor modifications. | |
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/********************************** | |
*KeyValueLongLineRecordReader.java | |
*Custom record reader | |
**********************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.FileSplit; | |
import org.apache.hadoop.mapred.LineRecordReader; | |
import org.apache.hadoop.mapred.RecordReader; | |
public class KeyValueLongLineRecordReader implements | |
RecordReader<LongWritable, Text> { | |
private final LineRecordReader lineRecordReader; | |
private byte separator = (byte) ','; | |
private LongWritable dummyKey; | |
private Text innerValue; | |
public Class getKeyClass() { | |
return LongWritable.class; | |
} | |
public LongWritable createKey() { | |
return new LongWritable(); | |
} | |
public Text createValue() { | |
return new Text(); | |
} | |
public KeyValueLongLineRecordReader(Configuration job, FileSplit split) | |
throws IOException { | |
lineRecordReader = new LineRecordReader(job, split); | |
dummyKey = lineRecordReader.createKey(); | |
innerValue = lineRecordReader.createValue(); | |
String sepStr = job.get("key.value.separator.in.input.line", ","); | |
this.separator = (byte) sepStr.charAt(0); | |
} | |
public static int findSeparator(byte[] utf, int start, int length, byte sep) { | |
for (int i = start; i < (start + length); i++) { | |
if (utf[i] == sep) { | |
return i; | |
} | |
} | |
return -1; | |
} | |
/** Read key/value pair in a line. */ | |
public synchronized boolean next(LongWritable key, Text value) | |
throws IOException { | |
LongWritable tKey = key; | |
Text tValue = value; | |
byte[] line = null; | |
int lineLen = -1; | |
if (lineRecordReader.next(dummyKey, innerValue)) { | |
line = innerValue.getBytes(); | |
lineLen = innerValue.getLength(); | |
} else { | |
return false; | |
} | |
if (line == null) | |
return false; | |
int pos = findSeparator(line, 0, lineLen, this.separator); | |
if (pos == -1) { | |
tKey.set(Long.valueOf(new String(line, 0, lineLen))); | |
tValue.set(""); | |
} else { | |
int keyLen = pos; | |
byte[] keyBytes = new byte[keyLen]; | |
System.arraycopy(line, 0, keyBytes, 0, keyLen); | |
int valLen = lineLen - keyLen - 1; | |
byte[] valBytes = new byte[valLen]; | |
System.arraycopy(line, pos + 1, valBytes, 0, valLen); | |
tKey.set(Long.valueOf(new String(keyBytes))); | |
tValue.set(valBytes); | |
} | |
return true; | |
} | |
public float getProgress() { | |
return lineRecordReader.getProgress(); | |
} | |
public synchronized long getPos() throws IOException { | |
return lineRecordReader.getPos(); | |
} | |
public synchronized void close() throws IOException { | |
lineRecordReader.close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/********************************** | |
*KeyValueLongInputFormat.java | |
*Custom key value format | |
**********************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.io.compress.CompressionCodecFactory; | |
import org.apache.hadoop.mapred.FileInputFormat; | |
import org.apache.hadoop.mapred.FileSplit; | |
import org.apache.hadoop.mapred.InputSplit; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.JobConfigurable; | |
import org.apache.hadoop.mapred.RecordReader; | |
import org.apache.hadoop.mapred.Reporter; | |
public class KeyValueLongInputFormat extends | |
FileInputFormat<LongWritable, Text> implements JobConfigurable { | |
private CompressionCodecFactory compressionCodecs = null; | |
@Override | |
public void configure(JobConf conf) { | |
compressionCodecs = new CompressionCodecFactory(conf); | |
} | |
protected boolean isSplitable(FileSystem fs, Path file) { | |
return compressionCodecs.getCodec(file) == null; | |
} | |
@Override | |
public RecordReader<LongWritable, Text> getRecordReader( | |
InputSplit genericSplit, JobConf job, Reporter reporter) | |
throws IOException { | |
reporter.setStatus(genericSplit.toString()); | |
return new KeyValueLongLineRecordReader(job, (FileSplit) genericSplit); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/********************************** | |
*MapperMapSideJoinLargeDatasets.java | |
*Mapper | |
**********************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.MapReduceBase; | |
import org.apache.hadoop.mapred.Mapper; | |
import org.apache.hadoop.mapred.OutputCollector; | |
import org.apache.hadoop.mapred.Reporter; | |
import org.apache.hadoop.mapred.join.TupleWritable; | |
public class MapperMapSideJoinLargeDatasets extends MapReduceBase implements | |
Mapper<LongWritable, TupleWritable, Text, Text> { | |
Text txtKey = new Text(""); | |
Text txtValue = new Text(""); | |
@Override | |
public void map(LongWritable key, TupleWritable value, | |
OutputCollector<Text, Text> output, Reporter reporter) | |
throws IOException { | |
if (value.toString().length() > 0) { | |
txtKey.set(key.toString()); | |
String arrEmpAttributes[] = value.get(0).toString().split(","); | |
String arrDeptAttributes[] = value.get(1).toString().split(","); | |
txtValue.set(arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[2].toString() + "\t" | |
+ arrDeptAttributes[0].toString()); | |
output.collect(txtKey, txtValue); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/********************************** | |
*DriverMapSideJoinLargeDatasets | |
*Driver | |
**********************************/ | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.JobClient; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.RunningJob; | |
import org.apache.hadoop.mapred.TextOutputFormat; | |
import org.apache.hadoop.mapred.join.CompositeInputFormat; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
public class DriverMapSideJoinLargeDatasets { | |
public static void main(String[] args) throws Exception { | |
JobConf conf = new JobConf("DriverMapSideJoinLargeDatasets"); | |
conf.setJarByClass(DriverMapSideJoinLargeDatasets.class); | |
String[] jobArgs = new GenericOptionsParser(conf, args) | |
.getRemainingArgs(); | |
Path dirEmployeesData = new Path(jobArgs[0]); | |
Path dirSalaryData = new Path(jobArgs[1]); | |
Path dirOutput = new Path(jobArgs[2]); | |
conf.setMapperClass(MapperMapSideJoinLargeDatasets.class); | |
conf.setInputFormat(CompositeInputFormat.class); | |
String strJoinStmt = CompositeInputFormat.compose("inner", | |
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData); | |
conf.set("mapred.join.expr", strJoinStmt); | |
conf.setNumReduceTasks(0); | |
conf.setOutputFormat(TextOutputFormat.class); | |
TextOutputFormat.setOutputPath(conf, dirOutput); | |
conf.setOutputKeyClass(Text.class); | |
conf.setOutputValueClass(Text.class); | |
RunningJob job = JobClient.runJob(conf); | |
while (!job.isComplete()) { | |
Thread.sleep(1000); | |
} | |
System.exit(job.isSuccessful() ? 0 : 2); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************** | |
HDFS data load commands | |
************************** | |
hadoop fs -mkdir joinProject | |
hadoop fs -put joinProject/* joinProject/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************** | |
Command to run program | |
************************** | |
hadoop jar ~/Blog/joinProject/MapSideJoinLargeDatasets/jar/MapSideJoinLgDsOAPI.jar DriverMapSideJoinLargeDatasets /user/akhanolk/joinProject/data/employees_sorted/part-e /user/akhanolk/joinProject/data/salaries_sorted/part-s /user/akhanolk/joinProject/output/output-MapSideJoinLargeDatasets |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************** | |
Results | |
************************** | |
... | |
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 | |
13/09/22 13:11:17 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 | |
13/09/22 13:11:17 INFO mapred.JobClient: Map-Reduce Framework | |
13/09/22 13:11:17 INFO mapred.JobClient: Map input records=224683000 | |
13/09/22 13:11:17 INFO mapred.JobClient: Map output records=224683000 | |
... | |
$ hadoop fs -cat joinProject/output/output-MapSideJoinLargeDatasets/part* | less | |
10001 Georgi Facello 88958 | |
10002 Bezalel Simmel 72527 | |
10003 Parto Bamford 43311 | |
10004 Chirstian Koblick 74057 | |
10005 Kyoichi Maliniak 94692 | |
10006 Anneke Preusig 59755 | |
10009 Sumant Peac 94409 | |
10010 Duangkaew Piveteau 80324 | |
..... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
************************** | |
References | |
************************** | |
Concepts: | |
Pro Hadoop | |
Hadoop the Definitive Guide | |
Data-Intensive Text Processing with MapReduce | |
Code: | |
http://stackoverflow.com/questions/13415359/hadoop-compositeinputformat-not-joining-all-data | |
Data: | |
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
Pig - map-side join | |
of datasets with cardinality | |
of 1..1 | |
Using 'replicated' | |
or | |
'merge', if sorted | |
********************************* | |
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray); | |
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo; | |
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_active/part-sc' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray); | |
salDS = foreach rawSalDS generate empNo, salary; | |
joinedDS = join empDS by empNo, salDS by empNo using 'replicated'; | |
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary; | |
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ1To1'; |
thank you, very helpful
ReplyDeleteReally good blog :) thanks for sharing.
ReplyDeleteI have one question on the CompositeInputFormat. How does it work internally? It looks like it will read the all the paths and do a inner join on the key and the result will be passed on to the mapper. If this is true, other than "inner" as op is there any other operation that can be done?
Hi,
ReplyDeleteI would like to know how to deal with the case where you have unsorted data. For example the input data was as follows:
Employee data [joinProject/data/employees_sorted/part-e]
--------------------------------------------------------
[EmpNo,DOB,FName,LName,Gender,HireDate,DeptNo]
10001,1953-09-02,Georgi,Facello,M,1986-06-26,d005
10002,1964-06-02,Bezalel,Simmel,F,1985-11-21,d007
10003,1959-12-03,Parto,Bamford,M,1986-08-28,d004
10004,1954-05-01,Chirstian,Koblick,M,1986-12-01,d004
10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12,d003
10006,1953-04-20,Anneke,Preusig,F,1989-06-02,d005
.....
Salary data [joinProject/data/salaries_sorted/part-s]
------------------------------------------------------
[EmpNo,Salary,FromDate,ToDate]
10001,88958,2002-06-22,9999-01-01
10002,72527,2001-08-02,9999-01-01
10004,43311,2001-12-01,9999-01-01
10003,74057,2001-11-27,9999-01-01
10005,94692,2001-09-09,9999-01-01
..........
and the desired output was :
************************************
Expected Results - tab separated
************************************
[EmpNo FName LName Salary]
10001 Georgi Facello 88958
10002 Bezalel Simmel 72527
10003 Parto Bamford 43311
10004 Chirstian Koblick 74057
10005 Kyoichi Maliniak 94692
10006 Anneke Preusig 59755
10009 Sumant Peac 94409
10010 Duangkaew Piveteau 80324
Very useful. Thanks
ReplyDeletevery useful. Do we need to have same number of records in both the tables?
ReplyDeleteI tried this with both inner and outer. Inner join gives me the expected results but outer join is not. Can you try outer join and let me know. My code is here
https://github.com/swapnaraja/hadoopCode/tree/master/mapReduce/joins/compositeinput
thakyou it vry nice blog for beginners
ReplyDeletehttps://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/
Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
ReplyDeleteBig Data Hadoop training in electronic city
Nice blog..! I really loved reading through this article. Thanks for sharing such
ReplyDeletea amazing post with us and keep blogging...pmp training Chennai | pmp training centers in Chenai | pmp training institutes in Chennai | pmp training and certification in Chennai | pmp training in velachery