Thursday, September 19, 2013

MultipleOutputs in Java MapReduce

This post covers, MultipleOutputs, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

********************************
Gist
********************************
Motivation
-----------
The typical mapreduce job creates files with the prefix "part-"..and then the "m" or "r" depending
on whether it is a map or a reduce output, and then the part number. There are scenarios where we
may want to create separate files based on criteria-data keys and/or values. Enter the "MultipleOutputs"
functionality.
More about MultipleOutputs
---------------------------
Here's the write-up from Hadoop the definitive guide-
"MultipleOutputs allows you to write data to files whose names are derived from the output keys and
values, or in fact from an arbitrary string. This allows each reducer (or mapper in a map-only job)
to create more than a single file. Filenames are of the form name-m-nnnnn for map outputs and
name-r-nnnnn for reduce outputs, where name is an arbitrary name that is set by the program,
and nnnnn is an integer designating the part number, starting from zero. The part number
ensures that outputs written from different partitions (mappers or reducers) do not collide in the
case of the same name."
About LazyOutputFormat
-----------------------
A typical mapreduce program can produce output files that are empty, depending on your implemetation.
If you want to suppress creation of empty files, you need to leverage LazyOutputFormat.
Two lines in your driver will do the trick-
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
&
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
Sample program
---------------
This gist includes a sample program that demonstrates the MultipleOutputs functionality.
The input is a file with employee data, a key attribute being the department number.
The output expected is a file for each department, containing employees belonging to the same.
The program also suppresses creation of empty files.
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_tsv
employees_tsv
formatMultipleOutputs
src
MapperFormatMultiOutput.java
ReducerFormatMultiOutput.java
DriverFormatMultiOutput.java
jar
formatMultiOutput.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
.....
view raw 02-DataReview hosted with ❤ by GitHub
*******************************
*Expected results
*******************************
One file for each department.
Within each file, the following employee attributes are required-
DeptNo LName FName EmpNo
***************************************
**Mapper - MapperFormatMultiOutput.java
***************************************
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperFormatMultiOutput extends
Mapper<LongWritable, Text, Text, Text> {
private Text txtKey = new Text("");
private Text txtValue = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
txtKey.set(arrEmpAttributes[6].toString());
txtValue.set(arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[0].toString());
context.write(txtKey, txtValue);
}
}
}
view raw 04-Mapper hosted with ❤ by GitHub
*******************************************
**Reducer - ReducerFormatMultiOutput.java
*******************************************
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class ReducerFormatMultiOutput extends Reducer<Text, Text, Text, Text> {
private MultipleOutputs mos;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(key, value, key.toString());
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}
view raw 05-Reducer hosted with ❤ by GitHub
*******************************************
**Driver - DriverFormatMultiOutput.java
*******************************************
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverFormatMultiOutput extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverFormatMultiOutput- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("MultipleOutputs example");
job.setJarByClass(DriverFormatMultiOutput.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperFormatMultiOutput.class);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(ReducerFormatMultiOutput.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(4);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverFormatMultiOutput(), args);
System.exit(exitCode);
}
}
view raw 06-Driver hosted with ❤ by GitHub
*******************************************
**Commands to load data
*******************************************
$ hadoop fs -mkdir formatProject
$ hadoop fs -put formatProject/data formatProject/
*******************************************
**Command to run program
*******************************************
hadoop jar ~/Blog/formatProject/formatMultiOutputFormat/jar/formatMultiOutput.jar DriverFormatMultiOutput /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatMultiOutput
********************************
**Results
********************************
$ hadoop fs -ls -R formatProject/data/output-formatMultiOutput/d00* | awk '{print $8, $5}'
formatProject/data/output-formatMultiOutput/d001-r-00002 401857
formatProject/data/output-formatMultiOutput/d002-r-00003 336632
formatProject/data/output-formatMultiOutput/d003-r-00000 348770
formatProject/data/output-formatMultiOutput/d004-r-00001 1442822
formatProject/data/output-formatMultiOutput/d005-r-00002 1662566
formatProject/data/output-formatMultiOutput/d006-r-00003 394272
formatProject/data/output-formatMultiOutput/d007-r-00000 1020167
formatProject/data/output-formatMultiOutput/d009-r-00002 475747
$ hadoop fs -cat formatProject/data/output-formatMultiOutput/d001-r-0000 | less
d001 Yetto Lucian 39682
d001 Cooke Padma 49634
d001 Marrevee Giap 49632
..
view raw 08-Results hosted with ❤ by GitHub
**********************
References:
**********************
- Hadoop the definitive guide, 3rd edition
- Apache documentation on MultipleOutputs
http://www.google.com/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&cad=rja&ved=0CCsQFjAA&url=http%3A%2F%2Fhadoop.apache.org%2Fdocs%2Fcurrent%2Fapi%2Forg%2Fapache%2Fhadoop%2Fmapred%2Flib%2FMultipleOutputs.html&ei=fV08Upq1KcifyQGbvIHoBg&usg=AFQjCNFST21nx1BLBEo4100dwm6bjt3CyA&bvm=bv.52434380,d.aWc
- Apache documentation on LazyOutputFormat
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.html
**********************
Credits:
**********************
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html

7 comments:

  1. Very nice example and explanation..........

    ReplyDelete
  2. Hi,

    I have a map-only job and cannot control the number of mappers as it depends on the number of input splits. Can you please let me know if there is any way to customize the name of the output file. I’m trying to

    1. Generate just one output file from a mapper and
    2. Customize the name of the output file to remove -m-0000 completely.

    Thanks.
    Naveen Kumar B.V

    ReplyDelete
  3. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  4. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete
  5. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    Big Data Hadoop training in electronic city

    ReplyDelete