This post covers, secondary sort in Java mapreduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!
Reducer --------------- public class SecondarySortBasicReducer extends Reducer {
List list = new ArrayList();
@Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { list.clear(); for (Text value : values) { String valArr[] = value.toString().split("::"); list.add(valArr[0]+"::"+valArr[1]+"::"+valArr[2]); } Collections.sort(list);
for(int i=list.size()-1;i>=0;i--) { String s = (String)list.get(i); context.write(key, new Text(s)); } } }
MainDriver ---------------------- public class SecondarySortBasicDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Job job = new Job(getConf()); job.setJobName("Simplified Secondary sort example");
job.setJarByClass(SecondarySortBasicDriver.class); FileInputFormat.setInputPaths(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2]));
That's an better alternative for this problem. But if you want to sort few columns ascending and few columns descending, you can't use Collections.sort(). You need to depend on secondary sort. What if you have numerous records that enter into reducer, Collections.sort will be a major performance problem. We want something that is done in memory(default) by hadoop. Hence need to depend on secondary sort.
When i compiled this code always array out of bond[6] when i fixed by replacing if (value.toString().length() > 0) { String arrEmpAttributes[] = value.toString().split("\\t");
context.write( new CompositeKeyWritable( arrEmpAttributes[6].toString(), (arrEmpAttributes[3].toString() + "\t" + arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0] .toString())), NullWritable.get()); } By String valueStr = value.toString(); if (!StringUtils.isEmpty(valueStr)) { String arrEmpAttributes[] = valueStr.split("\\t"); //Also declare these two outside of the loop if(!ArrayUtils.isEmpty(arrEmpAttributes) && arrEmpAttributes.length==6) { context.write( new CompositeKeyWritable(arrEmpAttributes[6].toString(), (arrEmpAttributes[3].toString() + "\t" + arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0].toString())), NullWritable.get()); } It worked but always umpty output file can anyone help me to fix this?
Hi This post seems to be very good.
ReplyDeleteI also found a simple approach for this...
Mapper
-------------------
public class SecondarySortBasicMapper extends Mapper {
@Override
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("::");
String val = arrEmpAttributes[3].toString() + "::" + arrEmpAttributes[2].toString() + "::" + arrEmpAttributes[0];
context.write(new Text(arrEmpAttributes[6]),new Text(val));
}
}
}
Reducer
---------------
public class SecondarySortBasicReducer extends Reducer {
List list = new ArrayList();
@Override
public void reduce(Text key, Iterable values,
Context context) throws IOException, InterruptedException {
list.clear();
for (Text value : values) {
String valArr[] = value.toString().split("::");
list.add(valArr[0]+"::"+valArr[1]+"::"+valArr[2]);
}
Collections.sort(list);
for(int i=list.size()-1;i>=0;i--)
{
String s = (String)list.get(i);
context.write(key, new Text(s));
}
}
}
MainDriver
----------------------
public class SecondarySortBasicDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setJobName("Simplified Secondary sort example");
job.setJarByClass(SecondarySortBasicDriver.class);
FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setMapperClass(SecondarySortBasicMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SecondarySortBasicReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new SecondarySortBasicDriver(), args);
System.exit(exitCode);
}
}
This would give me the same output in much simpler way.(May be for simple/small data this is ideal)
This comment has been removed by the author.
ReplyDeleteThat's an better alternative for this problem. But if you want to sort few columns ascending and few columns descending, you can't use Collections.sort(). You need to depend on secondary sort. What if you have numerous records that enter into reducer, Collections.sort will be a major performance problem. We want something that is done in memory(default) by hadoop. Hence need to depend on secondary sort.
ReplyDeleteGood. Very helpful. I didn't find mapper and reducer in any other sites. Whole code is given here that helped me understand better.
ReplyDeleteWhen i compiled this code always array out of bond[6] when i fixed by replacing if (value.toString().length() > 0) {
ReplyDeleteString arrEmpAttributes[] = value.toString().split("\\t");
context.write(
new CompositeKeyWritable(
arrEmpAttributes[6].toString(),
(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0]
.toString())), NullWritable.get());
}
By
String valueStr = value.toString();
if (!StringUtils.isEmpty(valueStr)) {
String arrEmpAttributes[] = valueStr.split("\\t"); //Also declare these two outside of the loop
if(!ArrayUtils.isEmpty(arrEmpAttributes) && arrEmpAttributes.length==6) {
context.write(
new CompositeKeyWritable(arrEmpAttributes[6].toString(),
(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0].toString())),
NullWritable.get());
}
It worked but always umpty output file can anyone help me to fix this?
Has anyone faced the following error while executing the SecondarySortBasicDriver as main class through eclipse?
ReplyDelete"Could not find or load main class sortProject.SecondarySortBasicDriver". Please help.
Thanks,
Joel
thanks mam..nice blog.Appreciate ur effort
ReplyDeleteHi,Your post on hadoop sorting was the best post and I understood the concepts very well and thanks for posting Hadoop Training in Velachery | Hadoop Training .
ReplyDeletethakyou it vry nice blog for beginners
ReplyDeletehttps://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/
Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
ReplyDeletePython Training in electronic city
DataScience with Python Training in electronic city
AWS Training in electronic city
Big Data Hadoop Training in electronic city
https://www.emexotechnologies.com/courses/other-technology-trainings/devops-training/ Devops Training in electronic city
https://www.emexotechnologies.com/courses/other-technology-trainings/python-training/ Python Training in electronic city
ReplyDeleteGood Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
ReplyDeletehttps://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/
very nice blog,keep sharing more blogs.
ReplyDeletehadoop admin online course