1.0. What's in this blog?
A sample map-reduce program in Java that joins two datasets, on the map-side - an employee dataset and a department dataset, with the department number as join key. The department dataset is a very small dataset, is reference data, is in HDFS, and is added to the distributed cache. The mapper program retrieves the department data available through distributed cache and and loads the same into a HashMap in the setUp() method of the mapper, and the HashMap is referenced in the map method to get the department name, and emit the employee dataset with department name included.
Section 2 demonstrates a solution where a file in HDFS is added to the distributed cache in the driver code, and accessed in the mapper setup method through the distributedcache.getCacheFiles method.
Section 3 demonstrates a solution where a local file is added to the distributed cache at the command line, and accessed in the mapper setup method.
Section 2 demonstrates a solution where a file in HDFS is added to the distributed cache in the driver code, and accessed in the mapper setup method through the distributedcache.getCacheFiles method.
Section 3 demonstrates a solution where a local file is added to the distributed cache at the command line, and accessed in the mapper setup method.
Apache documentation on DistributedCache:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html
Related blogs:
1. Map-side join sample using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2
Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html
Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same. Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.
Related blogs:
1. Map-side join sample using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2
Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html
Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same. Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.
2.0. Sample program
In this program, the side data, exists in HDFS, and is added to the distributedcache in the driver code, and referenced in the mapper using DistributedCache.getfiles method.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This gist demonstrates how to do a map-side join, loading one small dataset from DistributedCache into a HashMap | |
in memory, and joining with a larger dataset. | |
Includes: | |
--------- | |
1. Input data and script download | |
2. Dataset structure review | |
3. Expected results | |
4. Mapper code | |
5. Driver code | |
6. Data load commands | |
7. Command to run Java program | |
8. Results of the program |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
01. Data and script download | |
----------------------------- | |
Google: | |
<<To be added>> | |
Email me at airawat.blog@gmail.com if you encounter any issues | |
gitHub: | |
<<To be added>> | |
Directory structure | |
------------------- | |
joinProject | |
data | |
employees_tsv | |
employees_tsv | |
departments_sorted | |
departments_txt | |
MapSideJoin-DistCacheTxtFile | |
src | |
MapperMapSideJoinDCacheTextFile.java | |
DriverMapSideJoinDCacheTxtFile | |
jar | |
MapSideJoinDCacheTextFile.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Data structure | |
******************************************** | |
a) Small dataset (departments_txt) | |
[DeptNo DeptName] - Tab separated | |
d001 Marketing | |
d002 Finance | |
d003 Human Resources | |
d004 Production | |
d005 Development | |
d006 Quality Management | |
d007 Sales | |
d008 Research | |
d009 Customer Service | |
b) Large dataset (employees_tsv) | |
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated | |
10001 1953-09-02 Georgi Facello M 1986-06-26 d005 | |
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007 | |
10003 1959-12-03 Parto Bamford M 1986-08-28 d004 | |
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004 | |
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 | |
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005 | |
10009 1952-04-19 Sumant Peac F 1985-02-18 d006 | |
... | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Expected Results | |
******************************************** | |
Everything in employees_tsv file followed by a tab and the department name(from the department file) | |
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development | |
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales | |
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production | |
...... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*Mapper | |
*MapperMapSideJoinDCacheTextFile | |
********************************************/ | |
import java.io.BufferedReader; | |
import java.io.FileNotFoundException; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class MapperMapSideJoinDCacheTextFile extends | |
Mapper<LongWritable, Text, Text, Text> { | |
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>(); | |
private BufferedReader brReader; | |
private String strDeptName = ""; | |
private Text txtMapOutputKey = new Text(""); | |
private Text txtMapOutputValue = new Text(""); | |
enum MYCOUNTER { | |
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR | |
} | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(context | |
.getConfiguration()); | |
for (Path eachPath : cacheFilesLocal) { | |
if (eachPath.getName().toString().trim().equals("departments_txt")) { | |
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1); | |
loadDepartmentsHashMap(eachPath, context); | |
} | |
} | |
} | |
private void loadDepartmentsHashMap(Path filePath, Context context) | |
throws IOException { | |
String strLineRead = ""; | |
try { | |
brReader = new BufferedReader(new FileReader(filePath.toString())); | |
// Read each line, split and load to HashMap | |
while ((strLineRead = brReader.readLine()) != null) { | |
String deptFieldArray[] = strLineRead.split("\\t"); | |
DepartmentMap.put(deptFieldArray[0].trim(), | |
deptFieldArray[1].trim()); | |
} | |
} catch (FileNotFoundException e) { | |
e.printStackTrace(); | |
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1); | |
} catch (IOException e) { | |
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1); | |
e.printStackTrace(); | |
}finally { | |
if (brReader != null) { | |
brReader.close(); | |
} | |
} | |
} | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1); | |
if (value.toString().length() > 0) { | |
String arrEmpAttributes[] = value.toString().split("\\t"); | |
try { | |
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString()); | |
} finally { | |
strDeptName = ((strDeptName.equals(null) || strDeptName | |
.equals("")) ? "NOT-FOUND" : strDeptName); | |
} | |
txtMapOutputKey.set(arrEmpAttributes[0].toString()); | |
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[2].toString() + "\t" | |
+ arrEmpAttributes[3].toString() + "\t" | |
+ arrEmpAttributes[4].toString() + "\t" | |
+ arrEmpAttributes[5].toString() + "\t" | |
+ arrEmpAttributes[6].toString() + "\t" + strDeptName); | |
} | |
context.write(txtMapOutputKey, txtMapOutputValue); | |
strDeptName = ""; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*Driver | |
*DriverMapSideJoinDCacheTxtFile | |
********************************************/ | |
import java.net.URI; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class DriverMapSideJoinDCacheTxtFile extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 2) { | |
System.out | |
.printf("Two parameters are required- <input dir> <output dir>\n"); | |
return -1; | |
} | |
Job job = new Job(getConf()); | |
Configuration conf = job.getConfiguration(); | |
job.setJobName("Map-side join with text lookup file in DCache"); | |
DistributedCache | |
.addCacheFile( | |
new URI( | |
"/user/akhanolk/joinProject/data/departments_sorted/departments_txt"), | |
conf); | |
job.setJarByClass(DriverMapSideJoinDCacheTxtFile.class); | |
FileInputFormat.setInputPaths(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
job.setMapperClass(MapperMapSideJoinDCacheTextFile.class); | |
job.setNumReduceTasks(0); | |
boolean success = job.waitForCompletion(true); | |
return success ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), | |
new DriverMapSideJoinDCacheTxtFile(), args); | |
System.exit(exitCode); | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*HDFS load commands | |
******************************************** | |
hadoop fs -mkdir joinProject | |
hadoop fs -mkdir joinProject/data | |
hadoop fs -put joinProject/data/* joinProject/data/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Job run commands | |
******************************************** | |
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFile/jar/MapSideJoinDCacheTextFile.jar DriverMapSideJoinDCacheTxtFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTextFileLookUpDistCache | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Program Output | |
******************************************** | |
hadoop fs -cat joinProject/data/output-MapSideTextFileLookUpDistCache/part* | less | |
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development | |
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales | |
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production | |
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production | |
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources | |
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development | |
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management | |
10010 1963-06-01 1963-06-01 Duangkaew Piveteau F 1989-08-24 d006 Quality Management | |
10012 1960-10-04 1960-10-04 Patricio Bridgland M 1992-12-18 d005 Development | |
10013 1963-06-07 1963-06-07 Eberhardt Terkki M 1985-10-20 d003 Human Resources | |
..... |
3.0. Variation
As a variation to the code in section 2.0, this section demonstrates how to add side data that is not in HDFS to distributed cache, through command line, leveraging GenericOptionsParser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This gist is part of a series of gists related to Map-side joins in Java map-reduce. | |
In the gist - https://gist.github.com/airawat/6597557, we added the reference data available | |
in HDFS to the distributed cache from the driver code. | |
This gist demonstrates adding a local file via command line to distributed cache. | |
Refer gist at https://gist.github.com/airawat/6597557 for- | |
1. Data samples and structure | |
2. Expected results | |
3. Commands to load data to HDFS | |
The listing below includes: | |
4. Data and code download location | |
5. Mapper code | |
6. Driver code | |
7. Command to run the program | |
8. Results |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
04. Data and script download | |
----------------------------- | |
Google: | |
<<To be added>> | |
Email me at airawat.blog@gmail.com if you encounter any issues | |
gitHub: | |
<<To be added>> | |
Directory structure | |
------------------- | |
joinProject | |
data | |
employees_tsv | |
employees_tsv | |
departments_sorted | |
departments_txt | |
MapSideJoin-DistCacheTxtFileGOP | |
src | |
MapperMapSideJoinDCacheTextFileGOP.java | |
DriverMapSideJoinDCacheTxtFileGOP.java | |
jar | |
MapSideJoin-DistCacheTxtFileGOP.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*Mapper | |
*MapperMapSideJoinDCacheTextFileGOP | |
********************************************/ | |
import java.io.BufferedReader; | |
import java.io.File; | |
import java.io.FileNotFoundException; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class MapperMapSideJoinDCacheTextFileGOP extends | |
Mapper<LongWritable, Text, Text, Text> { | |
private static HashMap<String, String> DepartmentMap = new HashMap<String, String>(); | |
private BufferedReader brReader; | |
private String strDeptName = ""; | |
private Text txtMapOutputKey = new Text(""); | |
private Text txtMapOutputValue = new Text(""); | |
enum MYCOUNTER { | |
RECORD_COUNT, FILE_EXISTS, FILE_NOT_FOUND, SOME_OTHER_ERROR | |
} | |
@Override | |
protected void setup(Context context) throws IOException, | |
InterruptedException { | |
File lookupFile = new File("departments_txt"); | |
String strLineRead = ""; | |
try { | |
brReader = new BufferedReader(new FileReader(lookupFile)); | |
// Read each line, split and load to HashMap | |
while ((strLineRead = brReader.readLine()) != null) { | |
String deptFieldArray[] = strLineRead.split("\\t"); | |
DepartmentMap.put(deptFieldArray[0].trim(), | |
deptFieldArray[1].trim()); | |
} | |
} catch (FileNotFoundException e) { | |
e.printStackTrace(); | |
context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1); | |
} catch (IOException e) { | |
context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1); | |
e.printStackTrace(); | |
} | |
} | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1); | |
if (value.toString().length() > 0) { | |
String arrEmpAttributes[] = value.toString().split("\\t"); | |
try { | |
strDeptName = DepartmentMap.get(arrEmpAttributes[6].toString()); | |
} finally { | |
strDeptName = ((strDeptName.equals(null) || strDeptName | |
.equals("")) ? "NOT-FOUND" : strDeptName); | |
} | |
txtMapOutputKey.set(arrEmpAttributes[0].toString()); | |
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[1].toString() + "\t" | |
+ arrEmpAttributes[2].toString() + "\t" | |
+ arrEmpAttributes[3].toString() + "\t" | |
+ arrEmpAttributes[4].toString() + "\t" | |
+ arrEmpAttributes[5].toString() + "\t" | |
+ arrEmpAttributes[6].toString() + "\t" + strDeptName); | |
} | |
context.write(txtMapOutputKey, txtMapOutputValue); | |
strDeptName = ""; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******************************************** | |
*Driver | |
*DriverMapSideJoinDCacheTxtFileGOP | |
********************************************/ | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class DriverMapSideJoinDCacheTxtFileGOP extends Configured implements | |
Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 2) { | |
System.out | |
.printf("Two parameters are required for DriverMapSideJoinDCacheTxtFileGOP- <input dir> <output dir>\n"); | |
return -1; | |
} | |
Job job = new Job(getConf()); | |
job.setJobName("Map-side join with text lookup file in DCache-GenericOptionsParser"); | |
job.setJarByClass(DriverMapSideJoinDCacheTxtFileGOP.class); | |
FileInputFormat.setInputPaths(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
job.setMapperClass(MapperMapSideJoinDCacheTextFileGOP.class); | |
job.setNumReduceTasks(0); | |
boolean success = job.waitForCompletion(true); | |
return success ? 0 : 1; | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), | |
new DriverMapSideJoinDCacheTxtFileGOP(), args); | |
System.exit(exitCode); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Job run commands | |
******************************************** | |
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheTxtFileGOP/jar/MapSideJoin-DistCacheTxtFileGOP.jar DriverMapSideJoinDCacheTxtFileGOP -files /home/akhanolk/Blog/joinProject/data/departments_sorted/departments_txt /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideTxtFileLookUpDistCacheGOP |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
******************************************** | |
*Program Output | |
******************************************** | |
See - https://gist.github.com/airawat/6597557 |
Great blog, helped me a lot in implementing distributedCache. Is there a way in which I can write from my reducer directly to a directory in s3 using distributedCache ?
ReplyDeletethakyou it vry nice blog for beginners
ReplyDeletehttps://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/
Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
ReplyDeletehttps://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/