1.0. About reduce side joins
Joins of datasets done in the reduce phase are called reduce side joins. Reduce side joins are easier to implement as they are less stringent than map-side joins that require the data to be sorted and partitioned the same way. They are less efficient than maps-side joins because the datasets have to go through the sort and shuffle phase.What's involved..
1. The key of the map output, of datasets being joined, has to be the join key - so they reach the same reducer
2. Each dataset has to be tagged with its identity, in the mapper- to help differentiate between the datasets in the reducer, so they can be processed accordingly.
3. In each reducer, the data values from both datasets, for keys assigned to the reducer, are available, to be processed as required.
4. A secondary sort needs to be done to ensure the ordering of the values sent to the reducer
5. If the input files are of different formats, we would need separate mappers, and we would need to use MultipleInputs class in the driver to add the inputs and associate the specific mapper to the same.
[MultipleInputs.addInputPath( job, (input path n), (inputformat class), (mapper class n));]
Note: The join between the datasets (employee, current salary - cardinality of 1..1) in the sample program below has been demonstrated in my blog on map side joins of large datasets, as well. I have used the same datasets here...as the purpose of this blog is to demonstrate the concept. Whenever possible, reduce-side joins should be avoided.
[Update - 10/15/2013]
I have added a pig equivalent in the final section.
2.0. Sample datasets used in this gist
The datasets used are employees and salaries. For salary data, there are two files - one file with current salary (1..1), and one with historical salary data (1..many). Then there is the department data, a small reference dataset, that we will add to distributed cache and look up in the reducer.3.0. Implementation a reduce-side join
The sample code is common for a 1..1 as well as 1..many join for the sample datasets.The mapper is common for both datasets, as the format is the same.
3.0.1. Components/steps/tasks:
1. Map output key
The key will be the empNo as it is the join key for the datasets employee and salary
[Implementation: in the mapper]
2. Tagging the data with the dataset identity
Add an attribute called srcIndex to tag the identity of the data (1=employee, 2=salary, 3=salary history)
[Implementation: in the mapper]
3. Discarding unwanted atributes
[Implementation: in the mapper]
4. Composite key
Make the map output key a composite of empNo and srcIndex
[Implementation: create custom writable]
5. Partitioner
Partition the data on natural key of empNo
[Implementation: create custom partitioner class]
5. Sorting
Sort the data on empNo first, and then source index
[Implementation: create custom sorting comparator class]
6. Grouping
Group the data based on natural key
[Implementation: create custom grouping comparator class]
7. Joining
Iterate through the values for a key and complete the join for employee and salary data, perform lookup of department to include department name in the output
[Implementation: in the reducer]
3.0.2a. Data pipeline for cardinality of 1..1 between employee and salary data:
3.0.2b. Data pipeline for cardinality of 1..many between employee and salary data:
3.0.3. The Composite key
The composite key is a combination of the joinKey empNo, and the source Index (1=employee file.., 2=salary file...)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//******************************************************************************** | |
//Class: CompositeKeyWritableRSJ | |
//Purpose: Custom Writable that serves as composite key | |
// with attributes joinKey and sourceIndex | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableUtils; | |
public class CompositeKeyWritableRSJ implements Writable, | |
WritableComparable<CompositeKeyWritableRSJ> { | |
// Data members | |
private String joinKey;// EmployeeID | |
private int sourceIndex;// 1=Employee data; 2=Salary (current) data; 3=Salary historical data | |
public CompositeKeyWritableRSJ() { | |
} | |
public CompositeKeyWritableRSJ(String joinKey, int sourceIndex) { | |
this.joinKey = joinKey; | |
this.sourceIndex = sourceIndex; | |
} | |
@Override | |
public String toString() { | |
return (new StringBuilder().append(joinKey).append("\t") | |
.append(sourceIndex)).toString(); | |
} | |
public void readFields(DataInput dataInput) throws IOException { | |
joinKey = WritableUtils.readString(dataInput); | |
sourceIndex = WritableUtils.readVInt(dataInput); | |
} | |
public void write(DataOutput dataOutput) throws IOException { | |
WritableUtils.writeString(dataOutput, joinKey); | |
WritableUtils.writeVInt(dataOutput, sourceIndex); | |
} | |
public int compareTo(CompositeKeyWritableRSJ objKeyPair) { | |
int result = joinKey.compareTo(objKeyPair.joinKey); | |
if (0 == result) { | |
result = Double.compare(sourceIndex, objKeyPair.sourceIndex); | |
} | |
return result; | |
} | |
public String getjoinKey() { | |
return joinKey; | |
} | |
public void setjoinKey(String joinKey) { | |
this.joinKey = joinKey; | |
} | |
public int getsourceIndex() { | |
return sourceIndex; | |
} | |
public void setsourceIndex(int sourceIndex) { | |
this.sourceIndex = sourceIndex; | |
} | |
} |
3.0.4. The mapper
In the setup method of the mapper-
1. Get the filename from the input split, cross reference it against the configuration (set in driver), to derive the source index. [Driver code: Add configuration [key=filename of employee,value=1], [key=filename of current salary dataset,value=2], [key=filename of historical salary dataset,value=3]
2. Build a list of attributes we cant to emit as map output for each data entity
The setup method is called only once, at the beginning of a map task. So it is the logical place to to identify the source index.
In the map method of the mapper:
3. Build the map output based on attributes required, as specified in the list from #2
Note: For salary data, we are including the "effective till" date, even though it is not required in the final output because this is common code for a 1..1 as well as 1..many join to salary data. If the salary data is historical, we want the current salary only, that is "effective till date= 9999-01-01".
1. Get the filename from the input split, cross reference it against the configuration (set in driver), to derive the source index. [Driver code: Add configuration [key=filename of employee,value=1], [key=filename of current salary dataset,value=2], [key=filename of historical salary dataset,value=3]
2. Build a list of attributes we cant to emit as map output for each data entity
The setup method is called only once, at the beginning of a map task. So it is the logical place to to identify the source index.
In the map method of the mapper:
3. Build the map output based on attributes required, as specified in the list from #2
Note: For salary data, we are including the "effective till" date, even though it is not required in the final output because this is common code for a 1..1 as well as 1..many join to salary data. If the salary data is historical, we want the current salary only, that is "effective till date= 9999-01-01".
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//******************************************************************************** | |
//Class: MapperRSJ | |
//Purpose: Mapper | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
public class MapperRSJ extends | |
Mapper<LongWritable, Text, CompositeKeyWritableRSJ, Text> { | |
CompositeKeyWritableRSJ ckwKey = new CompositeKeyWritableRSJ(); | |
Text txtValue = new Text(""); | |
int intSrcIndex = 0; | |
StringBuilder strMapValueBuilder = new StringBuilder(""); | |
List<Integer> lstRequiredAttribList = new ArrayList<Integer>(); | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
// {{ | |
// Get the source index; (employee = 1, salary = 2) | |
// Added as configuration in driver | |
FileSplit fsFileSplit = (FileSplit) context.getInputSplit(); | |
intSrcIndex = Integer.parseInt(context.getConfiguration().get( | |
fsFileSplit.getPath().getName())); | |
// }} | |
// {{ | |
// Initialize the list of fields to emit as output based on | |
// intSrcIndex (1=employee, 2=current salary, 3=historical salary) | |
if (intSrcIndex == 1) // employee | |
{ | |
lstRequiredAttribList.add(2); // FName | |
lstRequiredAttribList.add(3); // LName | |
lstRequiredAttribList.add(4); // Gender | |
lstRequiredAttribList.add(6); // DeptNo | |
} else // salary | |
{ | |
lstRequiredAttribList.add(1); // Salary | |
lstRequiredAttribList.add(3); // Effective-to-date (Value of | |
// 9999-01-01 indicates current | |
// salary) | |
} | |
// }} | |
} | |
private String buildMapValue(String arrEntityAttributesList[]) { | |
// This method returns csv list of values to emit based on data entity | |
strMapValueBuilder.setLength(0);// Initialize | |
// Build list of attributes to output based on source - employee/salary | |
for (int i = 1; i < arrEntityAttributesList.length; i++) { | |
// If the field is in the list of required output | |
// append to stringbuilder | |
if (lstRequiredAttribList.contains(i)) { | |
strMapValueBuilder.append(arrEntityAttributesList[i]).append( | |
","); | |
} | |
} | |
if (strMapValueBuilder.length() > 0) { | |
// Drop last comma | |
strMapValueBuilder.setLength(strMapValueBuilder.length() - 1); | |
} | |
return strMapValueBuilder.toString(); | |
} | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
if (value.toString().length() > 0) { | |
String arrEntityAttributes[] = value.toString().split(","); | |
ckwKey.setjoinKey(arrEntityAttributes[0].toString()); | |
ckwKey.setsourceIndex(intSrcIndex); | |
txtValue.set(buildMapValue(arrEntityAttributes)); | |
context.write(ckwKey, txtValue); | |
} | |
} | |
} |
3.0.5. The partitioner
Even though the map output key is composite, we want to partition by the natural join key of empNo, therefore a custom partitioner is in order.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//******************************************************************************** | |
//Class: PartitionerRSJ | |
//Purpose: Custom partitioner | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Partitioner; | |
public class PartitionerRSJ extends Partitioner<CompositeKeyWritableRSJ, Text> { | |
@Override | |
public int getPartition(CompositeKeyWritableRSJ key, Text value, | |
int numReduceTasks) { | |
// Partitions on joinKey (EmployeeID) | |
return (key.getjoinKey().hashCode() % numReduceTasks); | |
} | |
} |
3.0.6. The sort comparator
To ensure that the input to the reducer is sorted on empNo, then on sourceIndex, we need a sort comparator. This will guarantee that the employee data is the first set in the values list for a key, then the salary data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableComparator; | |
//******************************************************************************** | |
//Class: SortingComparatorRSJ | |
//Purpose: Sorting comparator | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
public class SortingComparatorRSJ extends WritableComparator { | |
protected SortingComparatorRSJ() { | |
super(CompositeKeyWritableRSJ.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
// Sort on all attributes of composite key | |
CompositeKeyWritableRSJ key1 = (CompositeKeyWritableRSJ) w1; | |
CompositeKeyWritableRSJ key2 = (CompositeKeyWritableRSJ) w2; | |
int cmpResult = key1.getjoinKey().compareTo(key2.getjoinKey()); | |
if (cmpResult == 0)// same joinKey | |
{ | |
return Double.compare(key1.getsourceIndex(), key2.getsourceIndex()); | |
} | |
return cmpResult; | |
} | |
} |
3.0.7. The grouping comparator
This class is needed to indicate the group by attribute - the natural join key of empNo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableComparator; | |
//******************************************************************************** | |
//Class: GroupingComparatorRSJ | |
//Purpose: For use as grouping comparator | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
public class GroupingComparatorRSJ extends WritableComparator { | |
protected GroupingComparatorRSJ() { | |
super(CompositeKeyWritableRSJ.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
// The grouping comparator is the joinKey (Employee ID) | |
CompositeKeyWritableRSJ key1 = (CompositeKeyWritableRSJ) w1; | |
CompositeKeyWritableRSJ key2 = (CompositeKeyWritableRSJ) w2; | |
return key1.getjoinKey().compareTo(key2.getjoinKey()); | |
} | |
} |
3.0.8. The reducer
In the setup method of the reducer (called only once for the task)-
We are checking if the side data, a map file with department data is in the distributed cache and if found, initializing the map file reader
In the reduce method, -
While iterating through the value list -
1. If the data is employee data (sourceIndex=1), we are looking up the department name in the map file with the deptNo, which is the last attribute in the employee data, and appending the department name to the employee data.
2. If the data is historical salary data, we are only emitting salary where the last attribute is '9999-01-01'.
Key point-
We have set the sort comparator to sort on empNo and sourceIndex.
The sourceIndex of employee data is lesser than salary data - as set in the driver.
Therefore, we are assured that the employee data is always first followed by salary data.
So for each distinct empNo, we are iterating through the values, and appending the same and emitting as output.
We are checking if the side data, a map file with department data is in the distributed cache and if found, initializing the map file reader
In the reduce method, -
While iterating through the value list -
1. If the data is employee data (sourceIndex=1), we are looking up the department name in the map file with the deptNo, which is the last attribute in the employee data, and appending the department name to the employee data.
2. If the data is historical salary data, we are only emitting salary where the last attribute is '9999-01-01'.
Key point-
We have set the sort comparator to sort on empNo and sourceIndex.
The sourceIndex of employee data is lesser than salary data - as set in the driver.
Therefore, we are assured that the employee data is always first followed by salary data.
So for each distinct empNo, we are iterating through the values, and appending the same and emitting as output.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.URI; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.MapFile; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Reducer; | |
//******************************************************************************** | |
//Class: ReducerRSJ | |
//Purpose: Reducer | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
public class ReducerRSJ extends | |
Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text> { | |
StringBuilder reduceValueBuilder = new StringBuilder(""); | |
NullWritable nullWritableKey = NullWritable.get(); | |
Text reduceOutputValue = new Text(""); | |
String strSeparator = ","; | |
private MapFile.Reader deptMapReader = null; | |
Text txtMapFileLookupKey = new Text(""); | |
Text txtMapFileLookupValue = new Text(""); | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
// {{ | |
// Get side data from the distributed cache | |
Path[] cacheFilesLocal = DistributedCache.getLocalCacheArchives(context | |
.getConfiguration()); | |
for (Path eachPath : cacheFilesLocal) { | |
if (eachPath.getName().toString().trim() | |
.equals("departments_map.tar.gz")) { | |
URI uriUncompressedFile = new File(eachPath.toString() | |
+ "/departments_map").toURI(); | |
initializeDepartmentsMap(uriUncompressedFile, context); | |
} | |
} | |
// }} | |
} | |
@SuppressWarnings("deprecation") | |
private void initializeDepartmentsMap(URI uriUncompressedFile, Context context) | |
throws IOException { | |
// {{ | |
// Initialize the reader of the map file (side data) | |
FileSystem dfs = FileSystem.get(context.getConfiguration()); | |
try { | |
deptMapReader = new MapFile.Reader(dfs, | |
uriUncompressedFile.toString(), context.getConfiguration()); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
// }} | |
} | |
private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key, | |
StringBuilder reduceValueBuilder, Text value) { | |
if (key.getsourceIndex() == 1) { | |
// Employee data | |
// {{ | |
// Get the department name from the MapFile in distributedCache | |
// Insert the joinKey (empNo) to beginning of the stringBuilder | |
reduceValueBuilder.append(key.getjoinKey()).append(strSeparator); | |
String arrEmpAttributes[] = value.toString().split(","); | |
txtMapFileLookupKey.set(arrEmpAttributes[3].toString()); | |
try { | |
deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue); | |
} catch (Exception e) { | |
txtMapFileLookupValue.set(""); | |
} finally { | |
txtMapFileLookupValue | |
.set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue | |
.equals("")) ? "NOT-FOUND" | |
: txtMapFileLookupValue.toString()); | |
} | |
// }} | |
// {{ | |
// Append the department name to the map values to form a complete | |
// CSV of employee attributes | |
reduceValueBuilder.append(value.toString()).append(strSeparator) | |
.append(txtMapFileLookupValue.toString()) | |
.append(strSeparator); | |
// }} | |
} else if (key.getsourceIndex() == 2) { | |
// Current recent salary data (1..1 on join key) | |
// Salary data; Just append the salary, drop the effective-to-date | |
String arrSalAttributes[] = value.toString().split(","); | |
reduceValueBuilder.append(arrSalAttributes[0].toString()).append( | |
strSeparator); | |
} else // key.getsourceIndex() == 3; Historical salary data | |
{ | |
// {{ | |
// Get the salary data but extract only current salary | |
// (to_date='9999-01-01') | |
String arrSalAttributes[] = value.toString().split(","); | |
if (arrSalAttributes[1].toString().equals("9999-01-01")) { | |
// Salary data; Just append | |
reduceValueBuilder.append(arrSalAttributes[0].toString()) | |
.append(strSeparator); | |
} | |
// }} | |
} | |
// {{ | |
// Reset | |
txtMapFileLookupKey.set(""); | |
txtMapFileLookupValue.set(""); | |
// }} | |
return reduceValueBuilder; | |
} | |
@Override | |
public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values, | |
Context context) throws IOException, InterruptedException { | |
// Iterate through values; First set is csv of employee data | |
// second set is salary data; The data is already ordered | |
// by virtue of secondary sort; Append each value; | |
for (Text value : values) { | |
buildOutputValue(key, reduceValueBuilder, value); | |
} | |
// Drop last comma, set value, and emit output | |
if (reduceValueBuilder.length() > 1) { | |
reduceValueBuilder.setLength(reduceValueBuilder.length() - 1); | |
// Emit output | |
reduceOutputValue.set(reduceValueBuilder.toString()); | |
context.write(nullWritableKey, reduceOutputValue); | |
} else { | |
System.out.println("Key=" + key.getjoinKey() + "src=" | |
+ key.getsourceIndex()); | |
} | |
// Reset variables | |
reduceValueBuilder.setLength(0); | |
reduceOutputValue.set(""); | |
} | |
@Override | |
protected void cleanup(Context context) throws IOException, | |
InterruptedException { | |
deptMapReader.close(); | |
} | |
} |
3.0.9. The driver
Besides the usual driver code, we are-
1. Adding side data (department lookup data in map file format - in HDFS) to the distributed cache
2. Adding key-value pairs to the configuration, each key value pair being filename, source index.
This is used by the mapper, to tag data with sourceIndex.
3. And lastly, we are associating all the various classes we created to the job.
1. Adding side data (department lookup data in map file format - in HDFS) to the distributed cache
2. Adding key-value pairs to the configuration, each key value pair being filename, source index.
This is used by the mapper, to tag data with sourceIndex.
3. And lastly, we are associating all the various classes we created to the job.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package khanolkar.mapreduce.join.samples.reducesidejoin; | |
import java.net.URI; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
//******************************************************************************** | |
//Class: DriverRSJ | |
//Purpose: Driver for Reduce Side Join of two datasets | |
// with a 1..1 or 1..many cardinality on join key | |
//Author: Anagha Khanolkar | |
//********************************************************************************* | |
public class DriverRSJ extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
// {{ | |
// Exit job if required arguments have not been provided | |
if (args.length != 3) { | |
System.out | |
.printf("Three parameters are required for DriverRSJ- <input dir1> <input dir2> <output dir>\n"); | |
return -1; | |
} | |
// }{ | |
// {{ | |
// Job instantiation | |
Job job = new Job(getConf()); | |
Configuration conf = job.getConfiguration(); | |
job.setJarByClass(DriverRSJ.class); | |
job.setJobName("ReduceSideJoin"); | |
// }} | |
// {{ | |
// Add side data to distributed cache | |
DistributedCache | |
.addCacheArchive( | |
new URI( | |
"/user/akhanolk/joinProject/data/departments_map.tar.gz"), | |
conf); | |
// }} | |
// { | |
// Set sourceIndex for input files; | |
// sourceIndex is an attribute of the compositeKey, | |
// to drive order, and reference source | |
// Can be done dynamically; Hard-coded file names for simplicity | |
conf.setInt("part-e", 1);// Set Employee file to 1 | |
conf.setInt("part-sc", 2);// Set Current salary file to 2 | |
conf.setInt("part-sh", 3);// Set Historical salary file to 3 | |
// } | |
// { | |
// Build csv list of input files | |
StringBuilder inputPaths = new StringBuilder(); | |
inputPaths.append(args[0].toString()).append(",") | |
.append(args[1].toString()); | |
// } | |
// {{ | |
// Configure remaining aspects of the job | |
FileInputFormat.setInputPaths(job, inputPaths.toString()); | |
FileOutputFormat.setOutputPath(job, new Path(args[2])); | |
job.setMapperClass(MapperRSJ.class); | |
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class); | |
job.setMapOutputValueClass(Text.class); | |
job.setPartitionerClass(PartitionerRSJ.class); | |
job.setSortComparatorClass(SortingComparatorRSJ.class); | |
job.setGroupingComparatorClass(GroupingComparatorRSJ.class); | |
job.setNumReduceTasks(4); | |
job.setReducerClass(ReducerRSJ.class); | |
job.setOutputKeyClass(NullWritable.class); | |
job.setOutputValueClass(Text.class); | |
// }} | |
boolean success = job.waitForCompletion(true); | |
return success ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), new DriverRSJ(), | |
args); | |
System.exit(exitCode); | |
} | |
} |
4.0. The pig equivalent
Pig script-version 1:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/************************************* | |
Joining datasets in Pig | |
Employee..Salary = 1..many | |
Displaying most recent salary | |
Without using any join optimizations | |
**************************************/ | |
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray); | |
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo; | |
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_history/part-sh' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray); | |
filteredSalDS = filter rawSalDS by toDate == '9999-01-01'; | |
salDS = foreach filteredSalDS generate empNo, salary; | |
joinedDS = join empDS by empNo, salDS by empNo; | |
finalDS = foreach joinedDS generate empDS::empNo,empDS::fName,empDS::lName,empDS::gender,empDS::deptNo,salDS::salary; | |
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ'; | |
Pig script-version 2 - eliminating the reduce-side join:
In this script, we are filtering on most recent salary, and then using the merge join optimization (map-side) in Pig, that can be leveraged on sorted input to the join.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rawEmpDS = load '/user/akhanolk/joinProject/data/employees_active/part-e' using PigStorage(',') as (empNo:chararray,dOB:chararray,lName:chararray,fName:chararray,gender:chararray,hireDate:chararray,deptNo:chararray); | |
empDS = foreach rawEmpDS generate empNo,fName,lName,gender,deptNo; | |
sortedEmpDS = ORDER empDS by empNo; | |
rawSalDS = load '/user/akhanolk/joinProject/data/salaries_history/part-sh' using PigStorage(',') as (empNo:chararray,salary:long,fromDate:chararray,toDate:chararray); | |
filteredSalDS = filter rawSalDS by toDate == '9999-01-01'; | |
salDS = foreach filteredSalDS generate empNo, salary; | |
sortedSalDS = ORDER salDS by empNo; | |
joinedDS = join sortedEmpDS by empNo, sortedSalDS by empNo using 'merge'; | |
finalDS = foreach joinedDS generate sortedEmpDS::empNo,sortedEmpDS::fName,sortedEmpDS::lName,sortedEmpDS::gender,sortedEmpDS::deptNo,sortedSalDS::salary; | |
store finalDS into '/user/akhanolk/joinProject/output/pig-RSJ'; |
Output:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
********************** | |
Output of pig script | |
********************** | |
$ hadoop fs -cat joinProject/output/pig-RSJ/part* | less | |
10001 Facello Georgi M d005 88958 | |
10002 Simmel Bezalel F d007 72527 | |
10003 Bamford Parto M d004 43311 | |
10004 Koblick Chirstian M d004 74057 | |
......... |