Monday, September 16, 2013

Map-side join sample in Java using reference data (text file) from distributed cache - Part 1

This post covers, map-side join in Java map-reduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?

A sample map-reduce program in Java that joins two datasets, on the map-side - an employee dataset and a department dataset, with the department number as join key.  The department dataset is a very small dataset, is reference data, is in HDFS, and is added to the distributed cache.  The mapper program retrieves the department data available through distributed cache and and loads the same into a HashMap in the setUp() method of the mapper, and the HashMap is referenced in the map method to get the department name, and emit the employee dataset with department name included.

Section 2 demonstrates a solution where a file in HDFS is added to the distributed cache in the driver code, and accessed in the mapper setup method through the distributedcache.getCacheFiles method.

Section 3 demonstrates a solution where a local file is added to the distributed cache at the command line, and accessed in the mapper setup method.

Apache documentation on DistributedCache:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html

Related blogs:
1. Map-side join sample using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html

Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same.  Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.

2.0. Sample program

In this program, the side data, exists in HDFS, and is added to the distributedcache in the driver code, and referenced in the mapper using DistributedCache.getfiles method.


This gist demonstrates how to do a map-side join, loading one small dataset from DistributedCache into a HashMap
in memory, and joining with a larger dataset.
Includes:
---------
1. Input data and script download
2. Dataset structure review
3. Expected results
4. Mapper code
5. Driver code
6. Data load commands
7. Command to run Java program
8. Results of the program
01. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_sorted
departments_txt
MapSideJoin-DistCacheTxtFile
src
MapperMapSideJoinDCacheTextFile.java
DriverMapSideJoinDCacheTxtFile
jar
MapSideJoinDCacheTextFile.jar
********************************************
*Data structure
********************************************
a) Small dataset (departments_txt)
[DeptNo DeptName] - Tab separated
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
b) Large dataset (employees_tsv)
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005
10009 1952-04-19 Sumant Peac F 1985-02-18 d006
...
********************************************
*Expected Results
********************************************
Everything in employees_tsv file followed by a tab and the department name(from the department file)
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
......
/********************************************
*Mapper
*MapperMapSideJoinDCacheTextFile
********************************************/
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheTextFile extends
Mapper<LongWritable, Text, Text, Text> {
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>();
private BufferedReader brReader;
private String strDeptName = "";
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim().equals("departments_txt")) {
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
loadDepartmentsHashMap(eachPath, context);
}
}
}
private void loadDepartmentsHashMap(Path filePath, Context context)
throws IOException {
String strLineRead = "";
try {
brReader = new BufferedReader(new FileReader(filePath.toString()));
// Read each line, split and load to HashMap
while ((strLineRead = brReader.readLine()) != null) {
String deptFieldArray[] = strLineRead.split("\\t");
DepartmentMap.put(deptFieldArray[0].trim(),
deptFieldArray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
} catch (IOException e) {
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
e.printStackTrace();
}finally {
if (brReader != null) {
brReader.close();
}
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
try {
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString());
} finally {
strDeptName = ((strDeptName.equals(null) || strDeptName
.equals("")) ? "NOT-FOUND" : strDeptName);
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t" + strDeptName);
}
context.write(txtMapOutputKey, txtMapOutputValue);
strDeptName = "";
}
}
view raw 04-Mapper hosted with ❤ by GitHub
/********************************************
*Driver
*DriverMapSideJoinDCacheTxtFile
********************************************/
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheTxtFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJobName("Map-side join with text lookup file in DCache");
DistributedCache
.addCacheFile(
new URI(
"/user/akhanolk/joinProject/data/departments_sorted/departments_txt"),
conf);
job.setJarByClass(DriverMapSideJoinDCacheTxtFile.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheTextFile.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheTxtFile(), args);
System.exit(exitCode);
}
}
view raw 05-Driver hosted with ❤ by GitHub
********************************************
*HDFS load commands
********************************************
hadoop fs -mkdir joinProject
hadoop fs -mkdir joinProject/data
hadoop fs -put joinProject/data/* joinProject/data/
view raw 06-HdfsCommands hosted with ❤ by GitHub
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFile/jar/MapSideJoinDCacheTextFile.jar DriverMapSideJoinDCacheTxtFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTextFileLookUpDistCache
********************************************
*Program Output
********************************************
hadoop fs -cat joinProject/data/output-MapSideTextFileLookUpDistCache/part* | less
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management
10010 1963-06-01 1963-06-01 Duangkaew Piveteau F 1989-08-24 d006 Quality Management
10012 1960-10-04 1960-10-04 Patricio Bridgland M 1992-12-18 d005 Development
10013 1963-06-07 1963-06-07 Eberhardt Terkki M 1985-10-20 d003 Human Resources
.....
view raw 08-Results hosted with ❤ by GitHub

3.0. Variation 

As a variation to the code in section 2.0, this section demonstrates how to add side data that is not in HDFS to distributed cache, through command line, leveraging GenericOptionsParser

This gist is part of a series of gists related to Map-side joins in Java map-reduce.
In the gist - https://gist.github.com/airawat/6597557, we added the reference data available
in HDFS to the distributed cache from the driver code.
This gist demonstrates adding a local file via command line to distributed cache.
Refer gist at https://gist.github.com/airawat/6597557 for-
1. Data samples and structure
2. Expected results
3. Commands to load data to HDFS
The listing below includes:
4. Data and code download location
5. Mapper code
6. Driver code
7. Command to run the program
8. Results
04. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_sorted
departments_txt
MapSideJoin-DistCacheTxtFileGOP
src
MapperMapSideJoinDCacheTextFileGOP.java
DriverMapSideJoinDCacheTxtFileGOP.java
jar
MapSideJoin-DistCacheTxtFileGOP.jar
/********************************************
*Mapper
*MapperMapSideJoinDCacheTextFileGOP
********************************************/
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheTextFileGOP extends
Mapper<LongWritable, Text, Text, Text> {
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>();
private BufferedReader brReader;
private String strDeptName = "";
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
File lookupFile = new File("departments_txt");
String strLineRead = "";
try {
brReader = new BufferedReader(new FileReader(lookupFile));
// Read each line, split and load to HashMap
while ((strLineRead = brReader.readLine()) != null) {
String deptFieldArray[] = strLineRead.split("\\t");
DepartmentMap.put(deptFieldArray[0].trim(),
deptFieldArray[1].trim());
}
} catch (FileNotFoundException e) {
e.printStackTrace();
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
} catch (IOException e) {
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
e.printStackTrace();
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
try {
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString());
} finally {
strDeptName = ((strDeptName.equals(null) || strDeptName
.equals("")) ? "NOT-FOUND" : strDeptName);
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t" + strDeptName);
}
context.write(txtMapOutputKey, txtMapOutputValue);
strDeptName = "";
}
}
view raw 05-Mapper code hosted with ❤ by GitHub
/********************************************
*Driver
*DriverMapSideJoinDCacheTxtFileGOP
********************************************/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheTxtFileGOP extends Configured implements
Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverMapSideJoinDCacheTxtFileGOP- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
job.setJobName("Map-side join with text lookup file in DCache-GenericOptionsParser");
job.setJarByClass(DriverMapSideJoinDCacheTxtFileGOP.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheTextFileGOP.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheTxtFileGOP(), args);
System.exit(exitCode);
}
}
view raw 06-DriverCode hosted with ❤ by GitHub
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFileGOP/jar/MapSideJoin-DistCacheTxtFileGOP.jar DriverMapSideJoinDCacheTxtFileGOP -files /home/akhanolk/Blog/joinProject/data/departments_sorted/departments_txt /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTxtFileLookUpDistCacheGOP
view raw 07-RunProgram hosted with ❤ by GitHub
********************************************
*Program Output
********************************************
See - https://gist.github.com/airawat/6597557
view raw 08-Results hosted with ❤ by GitHub


   

3 comments:

  1. Great blog, helped me a lot in implementing distributedCache. Is there a way in which I can write from my reducer directly to a directory in s3 using distributedCache ?

    ReplyDelete
  2. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  3. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete