1.0. About reduce side joins
Joins of datasets done in the reduce phase are called reduce side joins. Reduce side joins are easier to implement as they are less stringent than map-side joins that require the data to be sorted and partitioned the same way. They are less efficient than maps-side joins because the datasets have to go through the sort and shuffle phase.What's involved..
1. The key of the map output, of datasets being joined, has to be the join key - so they reach the same reducer
2. Each dataset has to be tagged with its identity, in the mapper- to help differentiate between the datasets in the reducer, so they can be processed accordingly.
3. In each reducer, the data values from both datasets, for keys assigned to the reducer, are available, to be processed as required.
4. A secondary sort needs to be done to ensure the ordering of the values sent to the reducer
5. If the input files are of different formats, we would need separate mappers, and we would need to use MultipleInputs class in the driver to add the inputs and associate the specific mapper to the same.
[MultipleInputs.addInputPath( job, (input path n), (inputformat class), (mapper class n));]
Note: The join between the datasets (employee, current salary - cardinality of 1..1) in the sample program below has been demonstrated in my blog on map side joins of large datasets, as well. I have used the same datasets here...as the purpose of this blog is to demonstrate the concept. Whenever possible, reduce-side joins should be avoided.
[Update - 10/15/2013]
I have added a pig equivalent in the final section.
2.0. Sample datasets used in this gist
The datasets used are employees and salaries. For salary data, there are two files - one file with current salary (1..1), and one with historical salary data (1..many). Then there is the department data, a small reference dataset, that we will add to distributed cache and look up in the reducer.3.0. Implementation a reduce-side join
The sample code is common for a 1..1 as well as 1..many join for the sample datasets.The mapper is common for both datasets, as the format is the same.
3.0.1. Components/steps/tasks:
1. Map output key
The key will be the empNo as it is the join key for the datasets employee and salary
[Implementation: in the mapper]
2. Tagging the data with the dataset identity
Add an attribute called srcIndex to tag the identity of the data (1=employee, 2=salary, 3=salary history)
[Implementation: in the mapper]
3. Discarding unwanted atributes
[Implementation: in the mapper]
4. Composite key
Make the map output key a composite of empNo and srcIndex
[Implementation: create custom writable]
5. Partitioner
Partition the data on natural key of empNo
[Implementation: create custom partitioner class]
5. Sorting
Sort the data on empNo first, and then source index
[Implementation: create custom sorting comparator class]
6. Grouping
Group the data based on natural key
[Implementation: create custom grouping comparator class]
7. Joining
Iterate through the values for a key and complete the join for employee and salary data, perform lookup of department to include department name in the output
[Implementation: in the reducer]
3.0.2a. Data pipeline for cardinality of 1..1 between employee and salary data:
3.0.2b. Data pipeline for cardinality of 1..many between employee and salary data:
3.0.3. The Composite key
The composite key is a combination of the joinKey empNo, and the source Index (1=employee file.., 2=salary file...)
3.0.4. The mapper
In the setup method of the mapper-
1. Get the filename from the input split, cross reference it against the configuration (set in driver), to derive the source index. [Driver code: Add configuration [key=filename of employee,value=1], [key=filename of current salary dataset,value=2], [key=filename of historical salary dataset,value=3]
2. Build a list of attributes we cant to emit as map output for each data entity
The setup method is called only once, at the beginning of a map task. So it is the logical place to to identify the source index.
In the map method of the mapper:
3. Build the map output based on attributes required, as specified in the list from #2
Note: For salary data, we are including the "effective till" date, even though it is not required in the final output because this is common code for a 1..1 as well as 1..many join to salary data. If the salary data is historical, we want the current salary only, that is "effective till date= 9999-01-01".
1. Get the filename from the input split, cross reference it against the configuration (set in driver), to derive the source index. [Driver code: Add configuration [key=filename of employee,value=1], [key=filename of current salary dataset,value=2], [key=filename of historical salary dataset,value=3]
2. Build a list of attributes we cant to emit as map output for each data entity
The setup method is called only once, at the beginning of a map task. So it is the logical place to to identify the source index.
In the map method of the mapper:
3. Build the map output based on attributes required, as specified in the list from #2
Note: For salary data, we are including the "effective till" date, even though it is not required in the final output because this is common code for a 1..1 as well as 1..many join to salary data. If the salary data is historical, we want the current salary only, that is "effective till date= 9999-01-01".
3.0.5. The partitioner
Even though the map output key is composite, we want to partition by the natural join key of empNo, therefore a custom partitioner is in order.
3.0.6. The sort comparator
To ensure that the input to the reducer is sorted on empNo, then on sourceIndex, we need a sort comparator. This will guarantee that the employee data is the first set in the values list for a key, then the salary data.
3.0.7. The grouping comparator
This class is needed to indicate the group by attribute - the natural join key of empNo
3.0.8. The reducer
In the setup method of the reducer (called only once for the task)-
We are checking if the side data, a map file with department data is in the distributed cache and if found, initializing the map file reader
In the reduce method, -
While iterating through the value list -
1. If the data is employee data (sourceIndex=1), we are looking up the department name in the map file with the deptNo, which is the last attribute in the employee data, and appending the department name to the employee data.
2. If the data is historical salary data, we are only emitting salary where the last attribute is '9999-01-01'.
Key point-
We have set the sort comparator to sort on empNo and sourceIndex.
The sourceIndex of employee data is lesser than salary data - as set in the driver.
Therefore, we are assured that the employee data is always first followed by salary data.
So for each distinct empNo, we are iterating through the values, and appending the same and emitting as output.
We are checking if the side data, a map file with department data is in the distributed cache and if found, initializing the map file reader
In the reduce method, -
While iterating through the value list -
1. If the data is employee data (sourceIndex=1), we are looking up the department name in the map file with the deptNo, which is the last attribute in the employee data, and appending the department name to the employee data.
2. If the data is historical salary data, we are only emitting salary where the last attribute is '9999-01-01'.
Key point-
We have set the sort comparator to sort on empNo and sourceIndex.
The sourceIndex of employee data is lesser than salary data - as set in the driver.
Therefore, we are assured that the employee data is always first followed by salary data.
So for each distinct empNo, we are iterating through the values, and appending the same and emitting as output.
3.0.9. The driver
Besides the usual driver code, we are-
1. Adding side data (department lookup data in map file format - in HDFS) to the distributed cache
2. Adding key-value pairs to the configuration, each key value pair being filename, source index.
This is used by the mapper, to tag data with sourceIndex.
3. And lastly, we are associating all the various classes we created to the job.
1. Adding side data (department lookup data in map file format - in HDFS) to the distributed cache
2. Adding key-value pairs to the configuration, each key value pair being filename, source index.
This is used by the mapper, to tag data with sourceIndex.
3. And lastly, we are associating all the various classes we created to the job.
4.0. The pig equivalent
Pig script-version 1:
Pig script-version 2 - eliminating the reduce-side join:
In this script, we are filtering on most recent salary, and then using the merge join optimization (map-side) in Pig, that can be leveraged on sorted input to the join.
Output: