Friday, September 20, 2013

NLineInputFormat in Java MapReduce - use case, code sample

This post covers, NLineInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

**********************
Gist
**********************
A common interview question for a Hadoop developer position is whether we can control the number of
mappers for a job. We can - there are a few ways of controlling the number of mappers, as needed.
Using NLineInputFormat is one way.
About NLineInputFormat
----------------------
With this functionality, you can specify exactly how many lines should go to a mapper.
E.g. If your file has 500 lines, and you set number of lines per mapper to 10, you have 50 mappers
(instead of one - assuming the file is smaller than a HDFS block size).
When would you use NLineInputFormat?
------------------------------------
Some examples from Hadoop the definitive guide-
1. In applications that take a small amount of input data and run an extensive (that is, CPU-intensive)
computation for it, then emit their output.
2. Another example...you create a “seed” input file that lists the data sources, one per line. Then
each mapper is allocated a single data source, and it loads the data from that source into HDFS.
Sample program
---------------
The sample program below demonstrates the functionality.
The mapper merely emits the input key-value pairs.
The input is a file with ~224,000 records.
The output is files containing 10,000 records each (so a total of 23 files).
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_tsv
employees_tsv
formatNLineInputFormat
src
NLineInputFormat.java //Original Apache source code
MapperNLineInputFormat.java //Mapper
DriverNLineInputFormat.java //Driver
jar
formatNLineInputFormat.jar
*******************************
*Sample Data
*******************************
EmpID DOB FName LName Gender Hire date DeptID
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
.....
view raw 02-DataReview hosted with ❤ by GitHub
/*******************************************************************
* Mapper
* MapperNLineInputFormat.java
*******************************************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperNLineInputFormat extends
Mapper<LongWritable, Text, LongWritable, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
view raw 03-Mapper hosted with ❤ by GitHub
/*******************************************************************
* Driver
* DriverNLineInputFormat.java
*******************************************************************/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverNLineInputFormat extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverNLineInputFormat- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("NLineInputFormat example");
job.setJarByClass(DriverNLineInputFormat.class);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(job, new Path(args[0]));
job.getConfiguration().setInt(
"mapreduce.input.lineinputformat.linespermap", 10000);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperNLineInputFormat.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverNLineInputFormat(), args);
System.exit(exitCode);
}
}
view raw 04-Driver hosted with ❤ by GitHub
***********************************************
** Commands to load data
***********************************************
$ hadoop fs -mkdir formatProject
$ hadoop fs -put formatProject/data formatProject/
***********************************************
** Commands to run the program
***********************************************
hadoop jar ~/Blog/formatProject/formatNLineInputFormat/jar/formatNLineInputFormat.jar DriverNLineInputFormat /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatNLineInputFormat
***********************************************
** Results
***********************************************
$ for filename in `hadoop fs -ls -R formatProject/data/output-formatNLineInputFormat/part* | awk '{print $8}'`
do
echo "Filename: " $filename " [Record count:" `hadoop fs -cat $filename | wc -l` "]"
done
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00000 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00001 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00002 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00003 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00004 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00005 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00006 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00007 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00008 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00009 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00010 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00011 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00012 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00013 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00014 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00015 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00016 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00017 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00018 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00019 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00020 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00021 [Record count: 10000 ]
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00022 [Record count: 4683 ]
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-* | wc -l
224683
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-m-00022
...
11474355 499977 1956-06-05 Martial Weisert F 1996-09-17 d002
11474407 499979 1962-10-29 Prasadram Waleschkowski M 1994-01-04 d005
11474467 499980 1959-06-28 Gino Usery M 1991-02-11 d007
..
view raw 07-Results hosted with ❤ by GitHub
/******************************************************
* NLineInputFormat.java
* Had to add this to the project, as the version of
* Hadoop I have does not include the NLineInputFormat
* functionality as part of the new API
*****************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
//import org.apache.hadoop.classification.InterfaceAudience;
//import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.util.LineReader;
/**
* NLineInputFormat which splits N lines of input as one split.
*
* In many "pleasantly" parallel applications, each process/mapper processes the
* same input file (s), but with computations are controlled by different
* parameters.(Referred to as "parameter sweeps"). One way to achieve this, is
* to specify a set of parameters (one set per line) as input in a control file
* (which is the input path to the map-reduce application, where as the input
* dataset is specified via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits the input
* file such that by default, one line is fed as a value to one map task, and
* key is the offset. i.e. (k,v) is (LongWritable, Text). The location hints
* will span the whole mapred cluster.
*/
// @InterfaceAudience.Public
// @InterfaceStability.Stable
public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
public static final String LINES_PER_MAP = "mapreduce.input.lineinputformat.linespermap";
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
context.setStatus(genericSplit.toString());
return new LineRecordReader();
}
/**
* Logically splits the set of input files for the job, splits N lines of
* the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status, job.getConfiguration(),
numLinesPerSplit));
}
return splits;
}
public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, int numLinesPerSplit) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit>();
Path fileName = status.getPath();
if (status.isDir()) {
throw new IOException("Not a file: " + fileName);
}
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in = fs.open(fileName);
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
long begin = 0;
long length = 0;
int num = -1;
while ((num = lr.readLine(line)) > 0) {
numLines++;
length += num;
if (numLines == numLinesPerSplit) {
// NLineInputFormat uses LineRecordReader, which always
// reads
// (and consumes) at least one character out of its upper
// split
// boundary. So to make sure that each mapper gets N lines,
// we
// move back the upper split limits of each split
// by one character here.
if (begin == 0) {
splits.add(new FileSplit(fileName, begin, length - 1,
new String[] {}));
} else {
splits.add(new FileSplit(fileName, begin - 1, length,
new String[] {}));
}
begin += length;
length = 0;
numLines = 0;
}
}
if (numLines != 0) {
splits.add(new FileSplit(fileName, begin, length,
new String[] {}));
}
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
}
/**
* Set the number of lines per split
*
* @param job
* the job to modify
* @param numLines
* the number of lines per split
*/
public static void setNumLinesPerSplit(Job job, int numLines) {
job.getConfiguration().setInt(LINES_PER_MAP, numLines);
}
/**
* Get the number of lines per split
*
* @param job
* the job
* @return the number of lines per split
*/
public static int getNumLinesPerSplit(JobContext job) {
return job.getConfiguration().getInt(LINES_PER_MAP, 1);
}
}

9 comments:

  1. Its very useful for beginners like me . :-) . I have a question here, We are about to limit the number of records that a mapper has to be processed with NLineInputFormat. but do we need to mention it in the setInputFormatClass also?

    ReplyDelete
  2. You can find some system specific diagram templates and examples in creately diagram community.

    ReplyDelete
  3. Here i had read the content you had posted. It is much interesting so please keep update like this.

    Hadoop Training in Chennai

    Base SAS Training in Chennai

    ReplyDelete

  4. In Hadoop, MapReduce is a calculation that decomposes large manipulation jobs into individual tasks that can be executed in parallel cross a cluster of servers. The results of tasks can be joined together to compute final results.
    Mapreduce program example
    Hadoop fs command using java api
    /

    ReplyDelete
  5. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete