This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Secondary sort in Mapreduce | |
With mapreduce framework, the keys are sorted but the values associated with each key | |
are not. In order for the values to be sorted, we need to write code to perform what is | |
referred to a secondary sort. The sample code in this gist demonstrates such a sort. | |
The input to the program is a bunch of employee attributes. | |
The output required is department number (deptNo) in ascending order, and the employee last name, | |
first name and employee ID in descending order. | |
The recipe to get the effect of sorting by value is: | |
1) Make the key a composite of the natural key (deptNo) and the natural value (lName, fName and empNo). | |
2) The sort comparator should order by the composite key, that is, the natural key and natural | |
value. | |
3) The partitioner and grouping comparator for the composite key should consider only the natural | |
key for partitioning and grouping. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Data and code download | |
******************************* | |
Data and code: | |
-------------- | |
gitHub: | |
<<To be added>> | |
Email me at airawat.blog@gmail.com if you encounter any issues | |
Directory structure | |
------------------- | |
sortProject | |
data | |
employees_tsv | |
employees_tsv | |
SecondarySortBasic | |
src | |
CompositeKeyWritable.java | |
SecondarySortBasicMapper.java | |
SecondarySortBasicPartitioner.java | |
SecondarySortBasicCompKeySortComparator.java | |
SecondarySortBasicGroupingComparator.java | |
SecondarySortBasicReducer.java | |
SecondarySortBasicDriver.java | |
jar | |
SecondarySortBasic.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Sample Data | |
******************************* | |
EmpID DOB FName LName Gender Hire date DeptID | |
10003 1959-12-03 Parto Bamford M 1986-08-28 d004 | |
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004 | |
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 | |
.... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Expected results | |
******************************* | |
Sort order: [DeptID asc, {LName,FName,EmpID} desc] | |
DeptID LName FName EmpID | |
d001 Zykh Sudhanshu 205927 | |
d001 Zykh Nidapan 452738 | |
.. | |
d001 Yoshimura Alenka 463297 | |
d001 Yeung Yuguang 483161 | |
.. | |
d001 Acton Basim 105207 | |
d001 Aamodt Sreekrishna 493601 | |
.. | |
d002 Aamodt Yakkov 43290 | |
.. | |
d003 Acton Idoia 211583 | |
.. | |
d004 dAstous Candido 59201 | |
d004 dAstous Berhard 427930 | |
.. | |
d005 Zizka Aamer 409151 | |
d005 Zirintsis Xiaoqiang 52246 | |
.... | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*************************************************************** | |
*CustomWritable for the composite key: CompositeKeyWritable | |
****************************************************************/ | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableUtils; | |
/** | |
* | |
* @author akhanolkar | |
* | |
* Purpose: A custom writable with two attributes- deptNo and | |
* NameEmpIDPair; | |
*/ | |
public class CompositeKeyWritable implements Writable, | |
WritableComparable<CompositeKeyWritable> { | |
private String deptNo; | |
private String lNameEmpIDPair; | |
public CompositeKeyWritable() { | |
} | |
public CompositeKeyWritable(String deptNo, String lNameEmpIDPair) { | |
this.deptNo = deptNo; | |
this.lNameEmpIDPair = lNameEmpIDPair; | |
} | |
@Override | |
public String toString() { | |
return (new StringBuilder().append(deptNo).append("\t") | |
.append(lNameEmpIDPair)).toString(); | |
} | |
public void readFields(DataInput dataInput) throws IOException { | |
deptNo = WritableUtils.readString(dataInput); | |
lNameEmpIDPair = WritableUtils.readString(dataInput); | |
} | |
public void write(DataOutput dataOutput) throws IOException { | |
WritableUtils.writeString(dataOutput, deptNo); | |
WritableUtils.writeString(dataOutput, lNameEmpIDPair); | |
} | |
public int compareTo(CompositeKeyWritable objKeyPair) { | |
// TODO: | |
/* | |
* Note: This code will work as it stands; but when CompositeKeyWritable | |
* is used as key in a map-reduce program, it is de-serialized into an | |
* object for comapareTo() method to be invoked; | |
* | |
* To do: To optimize for speed, implement a raw comparator - will | |
* support comparison of serialized representations | |
*/ | |
int result = deptNo.compareTo(objKeyPair.deptNo); | |
if (0 == result) { | |
result = lNameEmpIDPair.compareTo(objKeyPair.lNameEmpIDPair); | |
} | |
return result; | |
} | |
public String getDeptNo() { | |
return deptNo; | |
} | |
public void setDeptNo(String deptNo) { | |
this.deptNo = deptNo; | |
} | |
public String getLNameEmpIDPair() { | |
return lNameEmpIDPair; | |
} | |
public void setLNameEmpIDPair(String lNameEmpIDPair) { | |
this.lNameEmpIDPair = lNameEmpIDPair; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*************************************************************** | |
*Mapper: SecondarySortBasicMapper | |
***************************************************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class SecondarySortBasicMapper extends | |
Mapper<LongWritable, Text, CompositeKeyWritable, NullWritable> { | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
if (value.toString().length() > 0) { | |
String arrEmpAttributes[] = value.toString().split("\\t"); | |
context.write( | |
new CompositeKeyWritable( | |
arrEmpAttributes[6].toString(), | |
(arrEmpAttributes[3].toString() + "\t" | |
+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0] | |
.toString())), NullWritable.get()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*************************************************************** | |
*Partitioner: SecondarySortBasicPartitioner | |
***************************************************************/ | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.Partitioner; | |
public class SecondarySortBasicPartitioner extends | |
Partitioner<CompositeKeyWritable, NullWritable> { | |
@Override | |
public int getPartition(CompositeKeyWritable key, NullWritable value, | |
int numReduceTasks) { | |
return (key.getDeptNo().hashCode() % numReduceTasks); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*************************************************************** | |
*SortComparator: SecondarySortBasicCompKeySortComparator | |
*****************************************************************/ | |
import org.apache.hadoop.io.WritableComparator; | |
public class SecondarySortBasicCompKeySortComparator extends WritableComparator { | |
protected SecondarySortBasicCompKeySortComparator() { | |
super(CompositeKeyWritable.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
CompositeKeyWritable key1 = (CompositeKeyWritable) w1; | |
CompositeKeyWritable key2 = (CompositeKeyWritable) w2; | |
int cmpResult = key1.getDeptNo().compareTo(key2.getDeptNo()); | |
if (cmpResult == 0)// same deptNo | |
{ | |
return -key1.getLNameEmpIDPair() | |
.compareTo(key2.getLNameEmpIDPair()); | |
//If the minus is taken out, the values will be in | |
//ascending order | |
} | |
return cmpResult; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*************************************************************** | |
*GroupingComparator: SecondarySortBasicGroupingComparator | |
*************************************************************** | |
import org.apache.hadoop.io.WritableComparable; | |
import org.apache.hadoop.io.WritableComparator; | |
public class SecondarySortBasicGroupingComparator extends WritableComparator { | |
protected SecondarySortBasicGroupingComparator() { | |
super(CompositeKeyWritable.class, true); | |
} | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
CompositeKeyWritable key1 = (CompositeKeyWritable) w1; | |
CompositeKeyWritable key2 = (CompositeKeyWritable) w2; | |
return key1.getDeptNo().compareTo(key2.getDeptNo()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*************************************** | |
*Reducer: SecondarySortBasicReducer | |
*************************************** | |
import java.io.IOException; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.Reducer; | |
public class SecondarySortBasicReducer | |
extends | |
Reducer<CompositeKeyWritable, NullWritable, CompositeKeyWritable, NullWritable> { | |
@Override | |
public void reduce(CompositeKeyWritable key, Iterable<NullWritable> values, | |
Context context) throws IOException, InterruptedException { | |
for (NullWritable value : values) { | |
context.write(key, NullWritable.get()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*************************************** | |
*Driver: SecondarySortBasicDriver | |
*************************************** | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class SecondarySortBasicDriver extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 2) { | |
System.out | |
.printf("Two parameters are required for SecondarySortBasicDriver- <input dir> <output dir>\n"); | |
return -1; | |
} | |
Job job = new Job(getConf()); | |
job.setJobName("Secondary sort example"); | |
job.setJarByClass(SecondarySortBasicDriver.class); | |
FileInputFormat.setInputPaths(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
job.setMapperClass(SecondarySortBasicMapper.class); | |
job.setMapOutputKeyClass(CompositeKeyWritable.class); | |
job.setMapOutputValueClass(NullWritable.class); | |
job.setPartitionerClass(SecondarySortBasicPartitioner.class); | |
job.setSortComparatorClass(SecondarySortBasicCompKeySortComparator.class); | |
job.setGroupingComparatorClass(SecondarySortBasicGroupingComparator.class); | |
job.setReducerClass(SecondarySortBasicReducer.class); | |
job.setOutputKeyClass(CompositeKeyWritable.class); | |
job.setOutputValueClass(NullWritable.class); | |
job.setNumReduceTasks(8); | |
boolean success = job.waitForCompletion(true); | |
return success ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), | |
new SecondarySortBasicDriver(), args); | |
System.exit(exitCode); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Command to run the program | |
******************************* | |
hadoop jar ~/Blog/sortProject/secondarySortBasic/jar/secondarySortBasic.jar SecondarySortBasicDriver /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/sortProject/data/output-secondarySortBasic |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Results | |
******************************* | |
--Source record count | |
hadoop fs -cat sortProject/data/employees/employees_tsv | wc -l | |
2246830 | |
--Results record count | |
hadoop fs -cat sortProject/data/output-secondarySortBasic/part* | wc -l | |
2246830 | |
--Files generated | |
hadoop fs -ls -R sortProject/data/output-secondarySortBasic/part* | awk '{print $8}' | |
sortProject/data/output-secondarySortBasic/part-r-00000 | |
sortProject/data/output-secondarySortBasic/part-r-00001 | |
sortProject/data/output-secondarySortBasic/part-r-00002 | |
sortProject/data/output-secondarySortBasic/part-r-00003 | |
sortProject/data/output-secondarySortBasic/part-r-00004 | |
sortProject/data/output-secondarySortBasic/part-r-00005 | |
sortProject/data/output-secondarySortBasic/part-r-00006 | |
sortProject/data/output-secondarySortBasic/part-r-00007 | |
--Output | |
hadoop fs -cat sortProject/data/output-secondarySortBasic/part* | |
d001 Zykh Sudhanshu 205927 | |
d001 Zykh Nidapan 452738 | |
.. | |
d001 Yoshimura Alenka 463297 | |
d001 Yeung Yuguang 483161 | |
.. | |
d001 Acton Basim 105207 | |
d001 Aamodt Sreekrishna 493601 | |
.. | |
d002 Aamodt Yakkov 43290 | |
.. | |
d003 Acton Idoia 211583 | |
.. | |
d004 dAstous Candido 59201 | |
d004 dAstous Berhard 427930 | |
.. | |
d005 Zizka Aamer 409151 | |
d005 Zirintsis Xiaoqiang 52246 | |
.... | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
********************** | |
Reference: | |
********************** | |
Hadoop the definitive guide, 3rd edition | |
********************** | |
Credits: | |
********************** | |
Data from mysql - http://dev.mysql.com/doc/employee/en.index.html |
Hi This post seems to be very good.
ReplyDeleteI also found a simple approach for this...
Mapper
-------------------
public class SecondarySortBasicMapper extends Mapper {
@Override
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("::");
String val = arrEmpAttributes[3].toString() + "::" + arrEmpAttributes[2].toString() + "::" + arrEmpAttributes[0];
context.write(new Text(arrEmpAttributes[6]),new Text(val));
}
}
}
Reducer
---------------
public class SecondarySortBasicReducer extends Reducer {
List list = new ArrayList();
@Override
public void reduce(Text key, Iterable values,
Context context) throws IOException, InterruptedException {
list.clear();
for (Text value : values) {
String valArr[] = value.toString().split("::");
list.add(valArr[0]+"::"+valArr[1]+"::"+valArr[2]);
}
Collections.sort(list);
for(int i=list.size()-1;i>=0;i--)
{
String s = (String)list.get(i);
context.write(key, new Text(s));
}
}
}
MainDriver
----------------------
public class SecondarySortBasicDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setJobName("Simplified Secondary sort example");
job.setJarByClass(SecondarySortBasicDriver.class);
FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setMapperClass(SecondarySortBasicMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SecondarySortBasicReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new SecondarySortBasicDriver(), args);
System.exit(exitCode);
}
}
This would give me the same output in much simpler way.(May be for simple/small data this is ideal)
This comment has been removed by the author.
ReplyDeleteThat's an better alternative for this problem. But if you want to sort few columns ascending and few columns descending, you can't use Collections.sort(). You need to depend on secondary sort. What if you have numerous records that enter into reducer, Collections.sort will be a major performance problem. We want something that is done in memory(default) by hadoop. Hence need to depend on secondary sort.
ReplyDeleteGood. Very helpful. I didn't find mapper and reducer in any other sites. Whole code is given here that helped me understand better.
ReplyDeleteWhen i compiled this code always array out of bond[6] when i fixed by replacing if (value.toString().length() > 0) {
ReplyDeleteString arrEmpAttributes[] = value.toString().split("\\t");
context.write(
new CompositeKeyWritable(
arrEmpAttributes[6].toString(),
(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0]
.toString())), NullWritable.get());
}
By
String valueStr = value.toString();
if (!StringUtils.isEmpty(valueStr)) {
String arrEmpAttributes[] = valueStr.split("\\t"); //Also declare these two outside of the loop
if(!ArrayUtils.isEmpty(arrEmpAttributes) && arrEmpAttributes.length==6) {
context.write(
new CompositeKeyWritable(arrEmpAttributes[6].toString(),
(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0].toString())),
NullWritable.get());
}
It worked but always umpty output file can anyone help me to fix this?
Has anyone faced the following error while executing the SecondarySortBasicDriver as main class through eclipse?
ReplyDelete"Could not find or load main class sortProject.SecondarySortBasicDriver". Please help.
Thanks,
Joel
thanks mam..nice blog.Appreciate ur effort
ReplyDeleteHi,Your post on hadoop sorting was the best post and I understood the concepts very well and thanks for posting Hadoop Training in Velachery | Hadoop Training .
ReplyDeletethakyou it vry nice blog for beginners
ReplyDeletehttps://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/
Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
ReplyDeletePython Training in electronic city
DataScience with Python Training in electronic city
AWS Training in electronic city
Big Data Hadoop Training in electronic city
https://www.emexotechnologies.com/courses/other-technology-trainings/devops-training/ Devops Training in electronic city
https://www.emexotechnologies.com/courses/other-technology-trainings/python-training/ Python Training in electronic city
ReplyDeleteGood Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
ReplyDeletehttps://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/
very nice blog,keep sharing more blogs.
ReplyDeletehadoop admin online course