Wednesday, September 18, 2013

Secondary sort in Java MapReduce

This post covers, secondary sort in Java mapreduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

Secondary sort in Mapreduce
With mapreduce framework, the keys are sorted but the values associated with each key
are not. In order for the values to be sorted, we need to write code to perform what is
referred to a secondary sort. The sample code in this gist demonstrates such a sort.
The input to the program is a bunch of employee attributes.
The output required is department number (deptNo) in ascending order, and the employee last name,
first name and employee ID in descending order.
The recipe to get the effect of sorting by value is:
1) Make the key a composite of the natural key (deptNo) and the natural value (lName, fName and empNo).
2) The sort comparator should order by the composite key, that is, the natural key and natural
value.
3) The partitioner and grouping comparator for the composite key should consider only the natural
key for partitioning and grouping.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
sortProject
data
employees_tsv
employees_tsv
SecondarySortBasic
src
CompositeKeyWritable.java
SecondarySortBasicMapper.java
SecondarySortBasicPartitioner.java
SecondarySortBasicCompKeySortComparator.java
SecondarySortBasicGroupingComparator.java
SecondarySortBasicReducer.java
SecondarySortBasicDriver.java
jar
SecondarySortBasic.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
....
*******************************
*Expected results
*******************************
Sort order: [DeptID asc, {LName,FName,EmpID} desc]
DeptID LName FName EmpID
d001 Zykh Sudhanshu 205927
d001 Zykh Nidapan 452738
..
d001 Yoshimura Alenka 463297
d001 Yeung Yuguang 483161
..
d001 Acton Basim 105207
d001 Aamodt Sreekrishna 493601
..
d002 Aamodt Yakkov 43290
..
d003 Acton Idoia 211583
..
d004 dAstous Candido 59201
d004 dAstous Berhard 427930
..
d005 Zizka Aamer 409151
d005 Zirintsis Xiaoqiang 52246
....
/***************************************************************
*CustomWritable for the composite key: CompositeKeyWritable
****************************************************************/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
/**
*
* @author akhanolkar
*
* Purpose: A custom writable with two attributes- deptNo and
* NameEmpIDPair;
*/
public class CompositeKeyWritable implements Writable,
WritableComparable<CompositeKeyWritable> {
private String deptNo;
private String lNameEmpIDPair;
public CompositeKeyWritable() {
}
public CompositeKeyWritable(String deptNo, String lNameEmpIDPair) {
this.deptNo = deptNo;
this.lNameEmpIDPair = lNameEmpIDPair;
}
@Override
public String toString() {
return (new StringBuilder().append(deptNo).append("\t")
.append(lNameEmpIDPair)).toString();
}
public void readFields(DataInput dataInput) throws IOException {
deptNo = WritableUtils.readString(dataInput);
lNameEmpIDPair = WritableUtils.readString(dataInput);
}
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeString(dataOutput, deptNo);
WritableUtils.writeString(dataOutput, lNameEmpIDPair);
}
public int compareTo(CompositeKeyWritable objKeyPair) {
// TODO:
/*
* Note: This code will work as it stands; but when CompositeKeyWritable
* is used as key in a map-reduce program, it is de-serialized into an
* object for comapareTo() method to be invoked;
*
* To do: To optimize for speed, implement a raw comparator - will
* support comparison of serialized representations
*/
int result = deptNo.compareTo(objKeyPair.deptNo);
if (0 == result) {
result = lNameEmpIDPair.compareTo(objKeyPair.lNameEmpIDPair);
}
return result;
}
public String getDeptNo() {
return deptNo;
}
public void setDeptNo(String deptNo) {
this.deptNo = deptNo;
}
public String getLNameEmpIDPair() {
return lNameEmpIDPair;
}
public void setLNameEmpIDPair(String lNameEmpIDPair) {
this.lNameEmpIDPair = lNameEmpIDPair;
}
}
/***************************************************************
*Mapper: SecondarySortBasicMapper
***************************************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SecondarySortBasicMapper extends
Mapper<LongWritable, Text, CompositeKeyWritable, NullWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
context.write(
new CompositeKeyWritable(
arrEmpAttributes[6].toString(),
(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0]
.toString())), NullWritable.get());
}
}
}
view raw 04b-Mapper hosted with ❤ by GitHub
/***************************************************************
*Partitioner: SecondarySortBasicPartitioner
***************************************************************/
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class SecondarySortBasicPartitioner extends
Partitioner<CompositeKeyWritable, NullWritable> {
@Override
public int getPartition(CompositeKeyWritable key, NullWritable value,
int numReduceTasks) {
return (key.getDeptNo().hashCode() % numReduceTasks);
}
}
view raw 04c-Partitioner hosted with ❤ by GitHub
/***************************************************************
*SortComparator: SecondarySortBasicCompKeySortComparator
*****************************************************************/
import org.apache.hadoop.io.WritableComparator;
public class SecondarySortBasicCompKeySortComparator extends WritableComparator {
protected SecondarySortBasicCompKeySortComparator() {
super(CompositeKeyWritable.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKeyWritable key1 = (CompositeKeyWritable) w1;
CompositeKeyWritable key2 = (CompositeKeyWritable) w2;
int cmpResult = key1.getDeptNo().compareTo(key2.getDeptNo());
if (cmpResult == 0)// same deptNo
{
return -key1.getLNameEmpIDPair()
.compareTo(key2.getLNameEmpIDPair());
//If the minus is taken out, the values will be in
//ascending order
}
return cmpResult;
}
}
***************************************************************
*GroupingComparator: SecondarySortBasicGroupingComparator
***************************************************************
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SecondarySortBasicGroupingComparator extends WritableComparator {
protected SecondarySortBasicGroupingComparator() {
super(CompositeKeyWritable.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKeyWritable key1 = (CompositeKeyWritable) w1;
CompositeKeyWritable key2 = (CompositeKeyWritable) w2;
return key1.getDeptNo().compareTo(key2.getDeptNo());
}
}
***************************************
*Reducer: SecondarySortBasicReducer
***************************************
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class SecondarySortBasicReducer
extends
Reducer<CompositeKeyWritable, NullWritable, CompositeKeyWritable, NullWritable> {
@Override
public void reduce(CompositeKeyWritable key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
view raw 04f-Reducer hosted with ❤ by GitHub
***************************************
*Driver: SecondarySortBasicDriver
***************************************
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SecondarySortBasicDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for SecondarySortBasicDriver- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("Secondary sort example");
job.setJarByClass(SecondarySortBasicDriver.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(SecondarySortBasicMapper.class);
job.setMapOutputKeyClass(CompositeKeyWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setPartitionerClass(SecondarySortBasicPartitioner.class);
job.setSortComparatorClass(SecondarySortBasicCompKeySortComparator.class);
job.setGroupingComparatorClass(SecondarySortBasicGroupingComparator.class);
job.setReducerClass(SecondarySortBasicReducer.class);
job.setOutputKeyClass(CompositeKeyWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(8);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new SecondarySortBasicDriver(), args);
System.exit(exitCode);
}
}
view raw 04g-Driver hosted with ❤ by GitHub
*******************************
*Command to run the program
*******************************
hadoop jar ~/Blog/sortProject/secondarySortBasic/jar/secondarySortBasic.jar SecondarySortBasicDriver /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/sortProject/data/output-secondarySortBasic
view raw 05-CommandToRun hosted with ❤ by GitHub
*******************************
*Results
*******************************
--Source record count
hadoop fs -cat sortProject/data/employees/employees_tsv | wc -l
2246830
--Results record count
hadoop fs -cat sortProject/data/output-secondarySortBasic/part* | wc -l
2246830
--Files generated
hadoop fs -ls -R sortProject/data/output-secondarySortBasic/part* | awk '{print $8}'
sortProject/data/output-secondarySortBasic/part-r-00000
sortProject/data/output-secondarySortBasic/part-r-00001
sortProject/data/output-secondarySortBasic/part-r-00002
sortProject/data/output-secondarySortBasic/part-r-00003
sortProject/data/output-secondarySortBasic/part-r-00004
sortProject/data/output-secondarySortBasic/part-r-00005
sortProject/data/output-secondarySortBasic/part-r-00006
sortProject/data/output-secondarySortBasic/part-r-00007
--Output
hadoop fs -cat sortProject/data/output-secondarySortBasic/part*
d001 Zykh Sudhanshu 205927
d001 Zykh Nidapan 452738
..
d001 Yoshimura Alenka 463297
d001 Yeung Yuguang 483161
..
d001 Acton Basim 105207
d001 Aamodt Sreekrishna 493601
..
d002 Aamodt Yakkov 43290
..
d003 Acton Idoia 211583
..
d004 dAstous Candido 59201
d004 dAstous Berhard 427930
..
d005 Zizka Aamer 409151
d005 Zirintsis Xiaoqiang 52246
....
view raw 06-Results hosted with ❤ by GitHub
**********************
Reference:
**********************
Hadoop the definitive guide, 3rd edition
**********************
Credits:
**********************
Data from mysql - http://dev.mysql.com/doc/employee/en.index.html

13 comments:

  1. Hi This post seems to be very good.
    I also found a simple approach for this...


    Mapper
    -------------------
    public class SecondarySortBasicMapper extends Mapper {

    @Override
    public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {

    if (value.toString().length() > 0) {

    String arrEmpAttributes[] = value.toString().split("::");
    String val = arrEmpAttributes[3].toString() + "::" + arrEmpAttributes[2].toString() + "::" + arrEmpAttributes[0];

    context.write(new Text(arrEmpAttributes[6]),new Text(val));

    }

    }
    }


    Reducer
    ---------------
    public class SecondarySortBasicReducer extends Reducer {


    List list = new ArrayList();

    @Override
    public void reduce(Text key, Iterable values,
    Context context) throws IOException, InterruptedException {
    list.clear();
    for (Text value : values) {
    String valArr[] = value.toString().split("::");
    list.add(valArr[0]+"::"+valArr[1]+"::"+valArr[2]);
    }
    Collections.sort(list);

    for(int i=list.size()-1;i>=0;i--)
    {
    String s = (String)list.get(i);
    context.write(key, new Text(s));
    }
    }
    }



    MainDriver
    ----------------------
    public class SecondarySortBasicDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {


    Job job = new Job(getConf());
    job.setJobName("Simplified Secondary sort example");

    job.setJarByClass(SecondarySortBasicDriver.class);
    FileInputFormat.setInputPaths(job, new Path(args[1]));
    FileOutputFormat.setOutputPath(job, new Path(args[2]));

    job.setMapperClass(SecondarySortBasicMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setReducerClass(SecondarySortBasicReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setNumReduceTasks(1);

    boolean success = job.waitForCompletion(true);
    return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new Configuration(),
    new SecondarySortBasicDriver(), args);
    System.exit(exitCode);
    }
    }

    This would give me the same output in much simpler way.(May be for simple/small data this is ideal)

    ReplyDelete
  2. This comment has been removed by the author.

    ReplyDelete
  3. That's an better alternative for this problem. But if you want to sort few columns ascending and few columns descending, you can't use Collections.sort(). You need to depend on secondary sort. What if you have numerous records that enter into reducer, Collections.sort will be a major performance problem. We want something that is done in memory(default) by hadoop. Hence need to depend on secondary sort.

    ReplyDelete
  4. Good. Very helpful. I didn't find mapper and reducer in any other sites. Whole code is given here that helped me understand better.

    ReplyDelete
  5. When i compiled this code always array out of bond[6] when i fixed by replacing if (value.toString().length() > 0) {
    String arrEmpAttributes[] = value.toString().split("\\t");

    context.write(
    new CompositeKeyWritable(
    arrEmpAttributes[6].toString(),
    (arrEmpAttributes[3].toString() + "\t"
    + arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0]
    .toString())), NullWritable.get());
    }
    By
    String valueStr = value.toString();
    if (!StringUtils.isEmpty(valueStr)) {
    String arrEmpAttributes[] = valueStr.split("\\t"); //Also declare these two outside of the loop
    if(!ArrayUtils.isEmpty(arrEmpAttributes) && arrEmpAttributes.length==6) {
    context.write(
    new CompositeKeyWritable(arrEmpAttributes[6].toString(),
    (arrEmpAttributes[3].toString() + "\t"
    + arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0].toString())),
    NullWritable.get());
    }
    It worked but always umpty output file can anyone help me to fix this?

    ReplyDelete
  6. Has anyone faced the following error while executing the ⁠⁠⁠SecondarySortBasicDriver as main class through eclipse?

    "Could not find or load main class sortProject.⁠⁠⁠SecondarySortBasicDriver". Please help.

    Thanks,
    Joel

    ReplyDelete
  7. thanks mam..nice blog.Appreciate ur effort

    ReplyDelete
  8. Hi,Your post on hadoop sorting was the best post and I understood the concepts very well and thanks for posting Hadoop Training in Velachery | Hadoop Training .

    ReplyDelete
  9. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  10. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    Python Training in electronic city

    DataScience with Python Training in electronic city

    AWS Training in electronic city

    Big Data Hadoop Training in electronic city

    https://www.emexotechnologies.com/courses/other-technology-trainings/devops-training/ Devops Training in electronic city

    ReplyDelete
  11. https://www.emexotechnologies.com/courses/other-technology-trainings/python-training/ Python Training in electronic city

    ReplyDelete
  12. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete