This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
********************** | |
Gist | |
********************** | |
A common interview question for a Hadoop developer position is whether we can control the number of | |
mappers for a job. We can - there are a few ways of controlling the number of mappers, as needed. | |
Using NLineInputFormat is one way. | |
About NLineInputFormat | |
---------------------- | |
With this functionality, you can specify exactly how many lines should go to a mapper. | |
E.g. If your file has 500 lines, and you set number of lines per mapper to 10, you have 50 mappers | |
(instead of one - assuming the file is smaller than a HDFS block size). | |
When would you use NLineInputFormat? | |
------------------------------------ | |
Some examples from Hadoop the definitive guide- | |
1. In applications that take a small amount of input data and run an extensive (that is, CPU-intensive) | |
computation for it, then emit their output. | |
2. Another example...you create a “seed” input file that lists the data sources, one per line. Then | |
each mapper is allocated a single data source, and it loads the data from that source into HDFS. | |
Sample program | |
--------------- | |
The sample program below demonstrates the functionality. | |
The mapper merely emits the input key-value pairs. | |
The input is a file with ~224,000 records. | |
The output is files containing 10,000 records each (so a total of 23 files). | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Data and code download | |
******************************* | |
Data and code: | |
-------------- | |
gitHub: | |
<<To be added>> | |
Email me at airawat.blog@gmail.com if you encounter any issues | |
Directory structure | |
------------------- | |
formatProject | |
data | |
employees_tsv | |
employees_tsv | |
formatNLineInputFormat | |
src | |
NLineInputFormat.java //Original Apache source code | |
MapperNLineInputFormat.java //Mapper | |
DriverNLineInputFormat.java //Driver | |
jar | |
formatNLineInputFormat.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************* | |
*Sample Data | |
******************************* | |
EmpID DOB FName LName Gender Hire date DeptID | |
10003 1959-12-03 Parto Bamford M 1986-08-28 d004 | |
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004 | |
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 | |
..... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************************************* | |
* Mapper | |
* MapperNLineInputFormat.java | |
*******************************************************************/ | |
import java.io.IOException; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class MapperNLineInputFormat extends | |
Mapper<LongWritable, Text, LongWritable, Text> { | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
context.write(key, value); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************************************* | |
* Driver | |
* DriverNLineInputFormat.java | |
*******************************************************************/ | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class DriverNLineInputFormat extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 2) { | |
System.out | |
.printf("Two parameters are required for DriverNLineInputFormat- <input dir> <output dir>\n"); | |
return -1; | |
} | |
Job job = new Job(getConf()); | |
job.setJobName("NLineInputFormat example"); | |
job.setJarByClass(DriverNLineInputFormat.class); | |
job.setInputFormatClass(NLineInputFormat.class); | |
NLineInputFormat.addInputPath(job, new Path(args[0])); | |
job.getConfiguration().setInt( | |
"mapreduce.input.lineinputformat.linespermap", 10000); | |
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
job.setMapperClass(MapperNLineInputFormat.class); | |
job.setNumReduceTasks(0); | |
boolean success = job.waitForCompletion(true); | |
return success ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), | |
new DriverNLineInputFormat(), args); | |
System.exit(exitCode); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*********************************************** | |
** Commands to load data | |
*********************************************** | |
$ hadoop fs -mkdir formatProject | |
$ hadoop fs -put formatProject/data formatProject/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*********************************************** | |
** Commands to run the program | |
*********************************************** | |
hadoop jar ~/Blog/formatProject/formatNLineInputFormat/jar/formatNLineInputFormat.jar DriverNLineInputFormat /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/formatProject/data/output-formatNLineInputFormat |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*********************************************** | |
** Results | |
*********************************************** | |
$ for filename in `hadoop fs -ls -R formatProject/data/output-formatNLineInputFormat/part* | awk '{print $8}'` | |
do | |
echo "Filename: " $filename " [Record count:" `hadoop fs -cat $filename | wc -l` "]" | |
done | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00000 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00001 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00002 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00003 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00004 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00005 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00006 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00007 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00008 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00009 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00010 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00011 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00012 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00013 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00014 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00015 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00016 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00017 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00018 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00019 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00020 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00021 [Record count: 10000 ] | |
Filename: formatProject/data/output-formatNLineInputFormat/part-m-00022 [Record count: 4683 ] | |
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-* | wc -l | |
224683 | |
$ hadoop fs -cat formatProject/data/output-formatNLineInputFormat/part-m-00022 | |
... | |
11474355 499977 1956-06-05 Martial Weisert F 1996-09-17 d002 | |
11474407 499979 1962-10-29 Prasadram Waleschkowski M 1994-01-04 d005 | |
11474467 499980 1959-06-28 Gino Usery M 1991-02-11 d007 | |
.. | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/****************************************************** | |
* NLineInputFormat.java | |
* Had to add this to the project, as the version of | |
* Hadoop I have does not include the NLineInputFormat | |
* functionality as part of the new API | |
*****************************************************/ | |
/** | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
//import org.apache.hadoop.classification.InterfaceAudience; | |
//import org.apache.hadoop.classification.InterfaceStability; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.JobContext; | |
import org.apache.hadoop.mapreduce.RecordReader; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; | |
import org.apache.hadoop.util.LineReader; | |
/** | |
* NLineInputFormat which splits N lines of input as one split. | |
* | |
* In many "pleasantly" parallel applications, each process/mapper processes the | |
* same input file (s), but with computations are controlled by different | |
* parameters.(Referred to as "parameter sweeps"). One way to achieve this, is | |
* to specify a set of parameters (one set per line) as input in a control file | |
* (which is the input path to the map-reduce application, where as the input | |
* dataset is specified via a config variable in JobConf.). | |
* | |
* The NLineInputFormat can be used in such applications, that splits the input | |
* file such that by default, one line is fed as a value to one map task, and | |
* key is the offset. i.e. (k,v) is (LongWritable, Text). The location hints | |
* will span the whole mapred cluster. | |
*/ | |
// @InterfaceAudience.Public | |
// @InterfaceStability.Stable | |
public class NLineInputFormat extends FileInputFormat<LongWritable, Text> { | |
public static final String LINES_PER_MAP = "mapreduce.input.lineinputformat.linespermap"; | |
public RecordReader<LongWritable, Text> createRecordReader( | |
InputSplit genericSplit, TaskAttemptContext context) | |
throws IOException { | |
context.setStatus(genericSplit.toString()); | |
return new LineRecordReader(); | |
} | |
/** | |
* Logically splits the set of input files for the job, splits N lines of | |
* the input as one split. | |
* | |
* @see FileInputFormat#getSplits(JobContext) | |
*/ | |
public List<InputSplit> getSplits(JobContext job) throws IOException { | |
List<InputSplit> splits = new ArrayList<InputSplit>(); | |
int numLinesPerSplit = getNumLinesPerSplit(job); | |
for (FileStatus status : listStatus(job)) { | |
splits.addAll(getSplitsForFile(status, job.getConfiguration(), | |
numLinesPerSplit)); | |
} | |
return splits; | |
} | |
public static List<FileSplit> getSplitsForFile(FileStatus status, | |
Configuration conf, int numLinesPerSplit) throws IOException { | |
List<FileSplit> splits = new ArrayList<FileSplit>(); | |
Path fileName = status.getPath(); | |
if (status.isDir()) { | |
throw new IOException("Not a file: " + fileName); | |
} | |
FileSystem fs = fileName.getFileSystem(conf); | |
LineReader lr = null; | |
try { | |
FSDataInputStream in = fs.open(fileName); | |
lr = new LineReader(in, conf); | |
Text line = new Text(); | |
int numLines = 0; | |
long begin = 0; | |
long length = 0; | |
int num = -1; | |
while ((num = lr.readLine(line)) > 0) { | |
numLines++; | |
length += num; | |
if (numLines == numLinesPerSplit) { | |
// NLineInputFormat uses LineRecordReader, which always | |
// reads | |
// (and consumes) at least one character out of its upper | |
// split | |
// boundary. So to make sure that each mapper gets N lines, | |
// we | |
// move back the upper split limits of each split | |
// by one character here. | |
if (begin == 0) { | |
splits.add(new FileSplit(fileName, begin, length - 1, | |
new String[] {})); | |
} else { | |
splits.add(new FileSplit(fileName, begin - 1, length, | |
new String[] {})); | |
} | |
begin += length; | |
length = 0; | |
numLines = 0; | |
} | |
} | |
if (numLines != 0) { | |
splits.add(new FileSplit(fileName, begin, length, | |
new String[] {})); | |
} | |
} finally { | |
if (lr != null) { | |
lr.close(); | |
} | |
} | |
return splits; | |
} | |
/** | |
* Set the number of lines per split | |
* | |
* @param job | |
* the job to modify | |
* @param numLines | |
* the number of lines per split | |
*/ | |
public static void setNumLinesPerSplit(Job job, int numLines) { | |
job.getConfiguration().setInt(LINES_PER_MAP, numLines); | |
} | |
/** | |
* Get the number of lines per split | |
* | |
* @param job | |
* the job | |
* @return the number of lines per split | |
*/ | |
public static int getNumLinesPerSplit(JobContext job) { | |
return job.getConfiguration().getInt(LINES_PER_MAP, 1); | |
} | |
} |
Nice blog ... :)
ReplyDeleteThanks.
ReplyDeleteNice and simple explaination
ReplyDeleteThanks Sajin.
ReplyDeleteIts very useful for beginners like me . :-) . I have a question here, We are about to limit the number of records that a mapper has to be processed with NLineInputFormat. but do we need to mention it in the setInputFormatClass also?
ReplyDeleteYou can find some system specific diagram templates and examples in creately diagram community.
ReplyDeleteHere i had read the content you had posted. It is much interesting so please keep update like this.
ReplyDeleteHadoop Training in Chennai
Base SAS Training in Chennai
ReplyDeleteIn Hadoop, MapReduce is a calculation that decomposes large manipulation jobs into individual tasks that can be executed in parallel cross a cluster of servers. The results of tasks can be joined together to compute final results.
Mapreduce program example
Hadoop fs command using java api
/
thakyou it vry nice blog for beginners
ReplyDeletehttps://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/