Friday, September 20, 2013

Handling small files using CombineFileInputFormat in Java MapReduce

This post covers, CombineFileInputFormat, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

*************************
Gist
*************************
One more gist related to controlling the number of mappers in a mapreduce task.
Background on Inputsplits
--------------------------
An inputsplit is a chunk of the input data allocated to a map task for processing. FileInputFormat
generates inputsplits (and divides the same into records) - one inputsplit for each file, unless the
file spans more than a HDFS block at which point it factors in the configured values of minimum split
size, maximimum split size and block size in determining the split size.
Here's the formula, from Hadoop the definitive guide-
Split size = max( minimumSplitSize, min( maximumSplitSize, HDFSBlockSize))
So, if we go with the default values, the split size = HDFSBlockSize for files spanning more than an
HDFS block.
Problem with mapreduce processing of small files
-------------------------------------------------
We all know that Hadoop works best with large files; But the reality is that we still have to deal
with small files. When you want to process many small files in a mapreduce job, by default, each file
is processed by a map task (So, 1000 small files = 1000 map tasks). Having too many tasks that
finish in a matter of seconds is inefficient.
Increasing the minimum split size, to reduce the number of map tasks, to handle such a situation, is
not the right solution as it will be at the potential cost of locality.
Solution
---------
CombineFileInputFormat packs many files into a split, providing more data for a map task to process.
It factors in node and rack locality so performance is not compromised.
Sample program
---------------
The sample program demonstrates that using CombineFileInput, we can process multiple small files (each file
with size less than HDFS block size), in a single map task.
Old API
--------
The new API in the version of Hadoop I am running does not include CombineFileInput.
Will write another gist with the program using new API, shortly.
Key aspects of the program
----------------------------
1. CombineFileInputFormat is an abstract class; We have to create a subclass that extends it, and
implement the getRecordReader method. This implementation is in the class -ExtendedCombineFileInputFormat.java
(courtesy - http://stackoverflow.com/questions/14270317/implementation-for-combinefileinputformat-hadoop-0-20-205)
2. In the driver, set the value of mapred.max.split.size
3. In the driver, set the input format to the subclass of CombineFileInputFormat
*******************************
*Data and code download
*******************************
Data and code:
--------------
gitHub:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
Directory structure
-------------------
formatProject
data
employees_partFiles
employees_part1
employees_part2
employees_part3
employees_part4
employees_part5
formatCombineFileInputFormat
src
MapperCombineFileInputFormat.java
DriverCombineFileInputFormat.java
ExtendedCombineFileInputFormat.java
jar
formatCombineFileInputFormatOAPI.jar
*******************************
Data Structure
*******************************
[EmpNo DOB FName LName HireDate DeptNo]
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
.......
.......
*******************************
Expected Results
*******************************
Key goal of demonstration: Process 5 small files in one map task
Emit a subset of the input dataset.
[EmpNo FName LName]
10001 Georgi Facello
10002 Bezalel Simmel
10003 Parto Bamford
/********************************************
*File: MapperCombineFileInputFormat.java
*Usage: Mapper
********************************************/
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MapperCombineFileInputFormat extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
Text txtKey = new Text("");
Text txtValue = new Text("");
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
if (value.toString().length() > 0) {
String[] arrEmpAttributes = value.toString().split("\\t");
txtKey.set(arrEmpAttributes[0].toString());
txtValue.set(arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString());
output.collect(txtKey, txtValue);
}
}
}
view raw 04-Mapper hosted with ❤ by GitHub
/********************************************
*File: DriverCombineFileInputFormat.java
*Usage: Driver
********************************************/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class DriverCombineFileInputFormat {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf("DriverCombineFileInputFormat");
conf.set("mapred.max.split.size", "134217728");//128 MB
conf.setJarByClass(DriverCombineFileInputFormat.class);
String[] jobArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
conf.setMapperClass(MapperCombineFileInputFormat.class);
conf.setInputFormat(ExtendedCombineFileInputFormat.class);
ExtendedCombineFileInputFormat.addInputPath(conf, new Path(jobArgs[0]));
conf.setNumReduceTasks(0);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, new Path(jobArgs[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
System.exit(job.isSuccessful() ? 0 : 2);
}
}
view raw 05-Driver hosted with ❤ by GitHub
/********************************************
*File: ExtendedCombineFileInputFormat.java
*Usage: Sub-class implementation of abstract
class CombineFileInputFormat
********************************************/
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
@SuppressWarnings("deprecation")
public class ExtendedCombineFileInputFormat extends
CombineFileInputFormat<LongWritable, Text> {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
return new CombineFileRecordReader(conf, (CombineFileSplit) split,
reporter, (Class) myCombineFileRecordReader.class);
}
public static class myCombineFileRecordReader implements
RecordReader<LongWritable, Text> {
private final LineRecordReader linerecord;
public myCombineFileRecordReader(CombineFileSplit split,
Configuration conf, Reporter reporter, Integer index)
throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index),
split.getOffset(index), split.getLength(index),
split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}
@Override
public void close() throws IOException {
linerecord.close();
}
@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}
@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
// TODO Auto-generated method stub
return linerecord.next(key, value);
}
}
}
*****************************
*HDFS command to load data
*****************************
hadoop fs -mkdir formatProject
hadoop fs -put formatProject/data formatProject/
*****************************
*Run program
*****************************
hadoop jar ~/Blog/formatProject/formatCombineFileInputFormat/jar/formatCombineFileInputFormatOAPI.jar DriverCombineFileInputFormat /user/akhanolk/formatProject/data/employees_partFiles /user/akhanolk/formatProject/output/output-CombineFileInputFormat
view raw 08-RunProgram hosted with ❤ by GitHub
*****************************
*Results
*****************************
....
13/09/22 17:16:31 INFO mapred.JobClient: Launched map tasks=1
13/09/22 17:16:31 INFO mapred.JobClient: Data-local map tasks=1
13/09/22 17:16:31 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=17885
...
$ hadoop fs -ls -R formatProject/output/output-CombineFileInputFormat/part* | awk '{print $8}'
formatProject/output/output-CombineFileInputFormat/part-00000
$ hadoop fs -cat formatProject/output/output-CombineFileInputFormat/part-00000
10001 Georgi Facello
10002 Bezalel Simmel
10003 Parto Bamford
10004 Chirstian Koblick
10005 Kyoichi Maliniak
.....
view raw 09-Results hosted with ❤ by GitHub
**************************
References
**************************
Apache documentation:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/lib/CombineFileInputFormat.html
Concepts:
Hadoop the Definitive Guide
Code:
http://stackoverflow.com/questions/14270317/implementation-for-combinefileinputformat-hadoop-0-20-205
Data:
The data in this solution is from mysql - http://dev.mysql.com/doc/employee/en.index.html

2 comments:

  1. I have some issues in Handling small files using Combine File Input Format in Java MapReduce your post help m a lot Thank You.
    Regards,
    HADOOP developer training in Hyderabad

    ReplyDelete
  2. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete