Tuesday, September 17, 2013

Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

This post covers, map-side join in Java map-reduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!


What's in this blog?

A sample map-reduce program in Java that joins two datasets, on the map-side - an employee dataset and a department dataset, with the department number as join key.  The department dataset is a very small dataset in MapFile format, is in HDFS, and is added to the distributed cache.  The MapFile is referenced in the map method of the mapper to look up the department name, and emit the employee dataset with department name included.

Apache documentation on DistributedCache:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html

Related blogs:
1. Map-side join sample in Java using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html

Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same.  Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.

Sample program

This gist demonstrates how to do a map-side join, joining a MapFile from distributedcache
with a larger dataset in HDFS.
Includes:
---------
1. Input data and script download
2. Dataset structure review
3. Expected results
4. Mapper code
5. Driver code
6. Data load commands
7. Command to run Java program
8. Results of the program
01. Data and script download
-----------------------------
Google:
<<To be added>>
Email me at airawat.blog@gmail.com if you encounter any issues
gitHub:
<<To be added>>
Directory structure
-------------------
joinProject
data
employees_tsv
employees_tsv
departments_map.tar.gz
MapSideJoin-DistCacheMapFile
src
MapperMapSideJoinDCacheMapFile.java
DriverMapSideJoinDCacheMapFile
jar
MapSideJoinDCacheMapFile.jar
********************************************
*Data structure
********************************************
a) Small dataset (departments_map)
[DeptNo DeptName] - MapFile
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service
b) Large dataset (employees_tsv)
[Emp_no DOB FName LName HireDate DeptNo] - Tab separated
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005
10009 1952-04-19 Sumant Peac F 1985-02-18 d006
...
********************************************
*Expected Results
********************************************
Everything in employees_tsv file followed by a tab and the department name(from the department file)
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
......
/********************************************
*Mapper
*MapperMapSideJoinDCacheMapFile
********************************************/
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperMapSideJoinDCacheMapFile extends
Mapper<LongWritable, Text, Text, Text> {
private MapFile.Reader deptMapReader = null;
private Text txtMapOutputKey = new Text("");
private Text txtMapOutputValue = new Text("");
private Text txtMapLookupKey = new Text("");
private Text txtMapLookupValue = new Text("");
enum MYCOUNTER {
RECORD_COUNT, FILE_EXISTS, LOAD_MAP_ERROR
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Path[] cacheFilesLocal = DistributedCache.getLocalCacheArchives(context
.getConfiguration());
for (Path eachPath : cacheFilesLocal) {
if (eachPath.getName().toString().trim()
.equals("departments_map.tar.gz")) {
URI uriUncompressedFile = new File(eachPath.toString()
+ "/departments_map").toURI();
context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
loadDepartmentsMap(uriUncompressedFile, context);
}
}
}
@SuppressWarnings("deprecation")
private void loadDepartmentsMap(URI uriUncompressedFile, Context context)
throws IOException {
FileSystem dfs = FileSystem.get(context.getConfiguration());
try {
deptMapReader = new MapFile.Reader(dfs,
uriUncompressedFile.toString(), context.getConfiguration());
} catch (Exception e) {
// TODO Auto-generated catch block
context.getCounter(MYCOUNTER.LOAD_MAP_ERROR).increment(1);
e.printStackTrace();
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);
if (value.toString().length() > 0) {
String arrEmpAttributes[] = value.toString().split("\\t");
txtMapLookupKey.set(arrEmpAttributes[6].toString());
try {
deptMapReader.get(txtMapLookupKey, txtMapLookupValue);
} finally {
txtMapLookupValue
.set((txtMapLookupValue.equals(null) || txtMapLookupValue
.equals("")) ? "NOT-FOUND" : txtMapLookupValue
.toString());
}
txtMapOutputKey.set(arrEmpAttributes[0].toString());
txtMapOutputValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrEmpAttributes[3].toString() + "\t"
+ arrEmpAttributes[4].toString() + "\t"
+ arrEmpAttributes[5].toString() + "\t"
+ arrEmpAttributes[6].toString() + "\t"
+ txtMapLookupValue.toString());
}
context.write(txtMapOutputKey, txtMapOutputValue);
txtMapLookupValue.set("");
txtMapLookupKey.set("");
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
deptMapReader.close();
}
}
view raw 04-Mapper hosted with ❤ by GitHub
/********************************************
*Driver
*DriverMapSideJoinDCacheMapFile
********************************************/
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DriverMapSideJoinDCacheMapFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out
.printf("Two parameters are required for DriverMapSideJoinDCacheMapFile- <input dir> <output dir>\n");
return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
job.setJobName("Map-side join with mapfile in DCache");
DistributedCache
.addCacheArchive(
new URI(
"/user/akhanolk/joinProject/data/departments_map.tar.gz"),
conf);
job.setJarByClass(DriverMapSideJoinDCacheMapFile.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapperMapSideJoinDCacheMapFile.class);
job.setNumReduceTasks(0);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new DriverMapSideJoinDCacheMapFile(), args);
System.exit(exitCode);
}
}
view raw 05-Driver hosted with ❤ by GitHub
********************************************
*HDFS load commands
********************************************
hadoop fs -mkdir joinProject
hadoop fs -mkdir joinProject/data
hadoop fs -put joinProject/data/* joinProject/data/
view raw 06-HdfsCommands hosted with ❤ by GitHub
********************************************
*Job run commands
********************************************
hadoop jar ~/Blog/joinProject/MapSideJoin-DistCacheMapFile/jar/MapSideJoinDCacheMapFile.jar DriverMapSideJoinDCacheMapFile /user/akhanolk/joinProject/data/employees_tsv /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache
********************************************
*Program Output
********************************************
hadoop fs -cat /user/akhanolk/joinProject/data/output-MapSideMapFileLookUpDistCache/part* | less
10001 1953-09-02 1953-09-02 Georgi Facello M 1986-06-26 d005 Development
10002 1964-06-02 1964-06-02 Bezalel Simmel F 1985-11-21 d007 Sales
10003 1959-12-03 1959-12-03 Parto Bamford M 1986-08-28 d004 Production
10004 1954-05-01 1954-05-01 Chirstian Koblick M 1986-12-01 d004 Production
10005 1955-01-21 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003 Human Resources
10006 1953-04-20 1953-04-20 Anneke Preusig F 1989-06-02 d005 Development
10009 1952-04-19 1952-04-19 Sumant Peac F 1985-02-18 d006 Quality Management
..
view raw 08-Results hosted with ❤ by GitHub

3 comments:

  1. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  2. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete
  3. Pretty blog, so many ideas in a single site, thanks for the informative article, keep updating more article.
    ivanka hot

    ReplyDelete