Tuesday, September 24, 2013

Reduce-side joins in Java map-reduce

1.0. About reduce side joins

Joins of datasets done in the reduce phase are called reduce side joins.  Reduce side joins are easier to implement as they are less stringent than map-side joins that require the data to be sorted and partitioned the same way.  They are less efficient than maps-side joins because  the datasets have to go through the sort and shuffle phase.

What's involved..
1.  The key of the map output, of datasets being joined, has to be the join key - so they reach the same reducer
2.  Each dataset has to be tagged with its identity, in the mapper- to help differentiate between the datasets in the reducer, so they can be processed accordingly.
3.  In each reducer, the data values from both datasets, for keys assigned to the reducer, are available, to be processed as required.
4.  A secondary sort needs to be done to ensure the ordering of the values sent to the reducer
5.  If the input files are of different formats, we would need separate mappers, and we would need to use MultipleInputs class in the driver to add the inputs and associate the specific mapper to the same.
[MultipleInputs.addInputPath( job, (input path n), (inputformat class), (mapper class n));]

Note:  The join between the datasets (employee, current salary - cardinality of 1..1) in the sample program below has been demonstrated in my blog on map side joins of large datasets, as well.  I have used the same datasets here...as the purpose of this blog is to demonstrate the concept.  Whenever possible, reduce-side joins should be avoided.

[Update - 10/15/2013]
I have added a pig equivalent in the final section.

2.0. Sample datasets used in this gist

The datasets used are employees and salaries.  For salary data, there are two files - one file with  current salary (1..1), and one with historical salary data (1..many). Then there is the department data, a small reference dataset, that we will add to distributed cache and look up in the reducer.

3.0. Implementation a reduce-side join 

The sample code is common for a 1..1 as well as 1..many join for the sample datasets.
The mapper is common for both datasets, as the format is the same.

3.0.1. Components/steps/tasks:

1.  Map output key
The key will be the empNo as it is the join key for the datasets employee and salary
[Implementation: in the mapper]

2.  Tagging the data with the dataset identity
Add an attribute called srcIndex to tag the identity of the data (1=employee, 2=salary, 3=salary history)
[Implementation: in the mapper]

3.  Discarding unwanted atributes
[Implementation: in the mapper]

4. Composite key
Make the map output key a composite of empNo and srcIndex
[Implementation: create custom writable]

5.  Partitioner
Partition the data on natural key of empNo
[Implementation: create custom partitioner class]

5.  Sorting
Sort the data on empNo first, and then source index
[Implementation: create custom sorting comparator class]

6.  Grouping
Group the data based on natural key
[Implementation: create custom grouping comparator class]

7. Joining
Iterate through the values for a key and complete the join for employee and salary data, perform lookup of department to include department name in the output
[Implementation: in the reducer]

3.0.2a. Data pipeline for cardinality of 1..1 between employee and salary data:

3.0.2b. Data pipeline for cardinality of 1..many between employee and salary data:

3.0.3. The Composite key

The composite key is a combination of the joinKey empNo, and the source Index (1=employee file.., 2=salary file...)

3.0.4. The mapper

In the setup method of the mapper-
1. Get the filename from the input split, cross reference it against the configuration (set in driver), to derive the source index.  [Driver code: Add configuration [key=filename of employee,value=1], [key=filename of current salary dataset,value=2], [key=filename of historical salary dataset,value=3]
2. Build a list of attributes we cant to emit as map output for each data entity

The setup method is called only once, at the beginning of a map task.  So it is the logical place to to identify the source index.

In the map method of the mapper:
3. Build the map output based on attributes required, as specified in the list from #2

Note:  For salary data, we are including the "effective till" date, even though it is not required in the final output because this is common code for a 1..1 as well as 1..many join to salary data.  If the salary data is historical, we want the current salary only, that is "effective till date= 9999-01-01".

3.0.5. The partitioner

Even though the map output key is composite, we want to partition by the natural join key of empNo, therefore a custom partitioner is in order.

3.0.6. The sort comparator

To ensure that the input to the reducer is sorted on empNo, then on sourceIndex, we need a sort comparator.  This will guarantee that the employee data is the first set in the values list for a key, then the salary data.

3.0.7. The grouping comparator

This class is needed to indicate the group by attribute - the natural join key of empNo

3.0.8. The reducer

In the setup method of the reducer (called only once for the task)-
We are checking if the side data, a map file with department data is in the distributed cache and if found, initializing the map file reader

In the reduce method, -
While iterating through the value list -
1. If the data is employee data (sourceIndex=1), we are looking up the department name in the map file with the deptNo, which is the last attribute in the employee data, and appending the department name to the employee data.
2. If the data is historical salary data, we are only emitting salary where the last attribute is '9999-01-01'.

Key point-
We have set the sort comparator to sort on empNo and sourceIndex.
The sourceIndex of employee data is lesser than salary data - as set in the driver.
Therefore, we are assured that the employee data is always first followed by salary data.
So for each distinct empNo, we are iterating through the values, and appending the same and emitting as output.

3.0.9. The driver

Besides the usual driver code, we are-
1. Adding side data (department lookup data in map file format - in HDFS) to the distributed cache
2. Adding key-value pairs to the configuration, each key value pair being filename, source index.
This is used by the mapper, to tag data with sourceIndex.
3. And lastly, we are associating all the various classes we created to the job.

4.0. The pig equivalent

Pig script-version 1:

Pig script-version 2 - eliminating the reduce-side join:
In this script, we are filtering on most recent salary, and then using the merge join optimization (map-side) in Pig, that can be leveraged on sorted input to the join.



  1. Hi Anagha!

    I was messing around with this. I was able to get the job to run if i disable the reducer, but I get an appending error if I run it with the reducer. I assume this might be do to not having this file: departments_map.tar.gz

    I just wanted to see how I could get the job to run the emp dataset and the salary history dataset ( 1... many) without having to do the join on departments as well. Is the job built out to require the distrib cache tarball?


    Dan D. Tran (ddantran19@gmail.com)

  2. This comment has been removed by the author.

  3. Very very clear explanation, thx a lot

  4. Indeed a very good explanation. Thanks again Anagha

  5. This comment has been removed by the author.

  6. Here you are using "CompositeKeyWritableRSJ" object as an output key for a mapper. Since, "CompositeKeyWritableRSJ" is a class wouldn't that be enough to override the "hashCode()" method (to return a value such that it is helpful for grouping within a reducer) instead of implementing GroupingComparator (GroupingComparatorRSJ)?

    I mean if we implement a hashcode() method (as below) in CompositeKeyWritableRSJ class:
    public int hashcode(){
    return joinKey; // empiid string in CompositeKeyWritableRSJ

    wouldn't this be good enough instead of implementing a new "GroupingComparatorRSJ" class. Also, I think if we override the "hashcode()" method then it's not even need to write the "partitioner" class (i.e. PartitionerRSJ) because, I think defautl partitioner uses the hasdCode of the output key (of the mapper)

  7. Thanks for the explanation.
    Sir currently i'm doing a project on LIBRA-A lightweight strategy for solving data skew(The imbalance in the amount of data assign to each reducer) which occurs mainly in reducer side applications.
    So can i use Reduce side join in my project to reduce data skew.
    Please reply me.
    Thank you..

  8. you can visit to the below url , they are offering very good videos on Hadoop:

    For free videos from previous sessions refer:

  9. Can not say enough thanks. U rock . but may be relevant or not u can say
    Co-location is not hard by to implement in Hadoop/any file system those who know the internals and till that is not widespread except ETL pre-processing /Unstructured data will not touch open source/hadoop .
    The sorting/redistribution how ever u do it with 2 -3 large data sets it will not work. It is network bound.period.

  10. That is very interesting; you are a very skilled blogger. I have shared your website in my social networks! A very nice guide. I will definitely follow these tips. Thank you for sharing such detailed article.

    Hadoop Online Training
    Data Science Online Training

  11. Being new to the blogging world I feel like there is still so much to learn. Your tips helped to clarify a few things for me as well as giving..

    MSBI Training in Chennai

    Informatica Training in Chennai

    Dataware Housing Training in Chennai

  12. "Nice and good article.. it is very useful for me to learn and understand easily.. thanks for sharing your valuable information and time.. please keep updating.php jobs in hyderabad.

  13. • Nice and good article. It is very useful for me to learn and understand easily. Thanks for sharing your valuable information and time. Please keep updatingAzure Online Training hyderabad

  14. I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in MapReduce

  15. This paragraph gives clear idea for the new viewers of blogging, Thanks you .
    MapReduce Training in Noida

  16. Hi,your post on joins are very good i understood about Reduce-side joins in Java map-reduce especially
    the diagrammatic representation was good Hadoop Training in Velachery | Hadoop Training .

  17. CIITN is located in Prime location in Noida having best connectivity via all modes of public transport. CIITN offer both weekend and weekdays courses to facilitate Hadoop aspirants. Among all Hadoop Training Institute in Noida , CIITN's Big Data and Hadoop Certification course is designed to prepare you to match all required knowledge for real time job assignment in the Big Data world with top level companies. CIITN puts more focus in project based training and facilitated with Hadoop 2.7 with Cloud Lab—a cloud-based Hadoop environment lab setup for hands-on experience.

    CIITNOIDA is the good choice for Big Data Hadoop Training in NOIDA in the final year. I have also completed my summer training from here. It provides high quality Hadoop training with Live projects. The best thing about CIITNOIDA is its experienced trainers and updated course content. They even provide you placement guidance and have their own development cell. You can attend their free demo class and then decide.

    Hadoop Training in Noida
    Big Data Hadoop Training in Noida

  18. Thank you for sharing such great information very useful to us.
    Hadoop Training in Noida

  19. thakyou it vry nice blog for beginners

  20. It was really a nice article and i was really impressed by reading this Big data hadoop online Course Bangalore

  21. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.


  22. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    Big Data Hadoop training in electronic city

  23. Hi Anagha.
    I am new to Hadoop. I tried running the program on the dataset and am receiving the following error.
    C:\Users\nsita>hadoop jar c:\java\jar\RSJProgram.jar reducesidejoin.DriverRSJ /playground/data/part-e /playground/data/part-sc /playground/data/RSJReduceOutput
    19/01/16 19:49:03 INFO client.RMProxy: Connecting to ResourceManager at /
    19/01/16 19:49:03 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    19/01/16 19:49:04 INFO input.FileInputFormat: Total input files to process : 2
    19/01/16 19:49:04 INFO mapreduce.JobSubmitter: number of splits:2
    19/01/16 19:49:04 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547637839810_0014
    19/01/16 19:49:04 INFO impl.YarnClientImpl: Submitted application application_1547637839810_0014
    19/01/16 19:49:04 INFO mapreduce.Job: The url to track the job: http://DESKTOP-JCF7H50:8088/proxy/application_1547637839810_0014/
    19/01/16 19:49:04 INFO mapreduce.Job: Running job: job_1547637839810_0014
    19/01/16 19:49:12 INFO mapreduce.Job: Job job_1547637839810_0014 running in uber mode : false
    19/01/16 19:49:12 INFO mapreduce.Job: map 0% reduce 0%
    19/01/16 19:49:18 INFO mapreduce.Job: Task Id : attempt_1547637839810_0014_m_000000_0, Status : FAILED
    Error: java.lang.NumberFormatException: null
    at java.lang.Integer.parseInt(Unknown Source)
    at java.lang.Integer.parseInt(Unknown Source)
    at reducesidejoin.MapperRSJ.setup(MapperRSJ.java:35)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:175)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Unknown Source)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:169)

    I feel the error is emitted from the MapperRSJ but I am not able to rectify the problem. Would be able to help me out here

    intSrcIndex = Integer.parseInt(context.getConfiguration().get(fsFileSplit.getPath().getName()));