Showing posts with label Map File. Show all posts
Showing posts with label Map File. Show all posts

Tuesday, September 24, 2013

Reduce-side joins in Java map-reduce

1.0. About reduce side joins

Joins of datasets done in the reduce phase are called reduce side joins.  Reduce side joins are easier to implement as they are less stringent than map-side joins that require the data to be sorted and partitioned the same way.  They are less efficient than maps-side joins because  the datasets have to go through the sort and shuffle phase.

What's involved..
1.  The key of the map output, of datasets being joined, has to be the join key - so they reach the same reducer
2.  Each dataset has to be tagged with its identity, in the mapper- to help differentiate between the datasets in the reducer, so they can be processed accordingly.
3.  In each reducer, the data values from both datasets, for keys assigned to the reducer, are available, to be processed as required.
4.  A secondary sort needs to be done to ensure the ordering of the values sent to the reducer
5.  If the input files are of different formats, we would need separate mappers, and we would need to use MultipleInputs class in the driver to add the inputs and associate the specific mapper to the same.
[MultipleInputs.addInputPath( job, (input path n), (inputformat class), (mapper class n));]

Note:  The join between the datasets (employee, current salary - cardinality of 1..1) in the sample program below has been demonstrated in my blog on map side joins of large datasets, as well.  I have used the same datasets here...as the purpose of this blog is to demonstrate the concept.  Whenever possible, reduce-side joins should be avoided.

[Update - 10/15/2013]
I have added a pig equivalent in the final section.

2.0. Sample datasets used in this gist

The datasets used are employees and salaries.  For salary data, there are two files - one file with  current salary (1..1), and one with historical salary data (1..many). Then there is the department data, a small reference dataset, that we will add to distributed cache and look up in the reducer.


3.0. Implementation a reduce-side join 

The sample code is common for a 1..1 as well as 1..many join for the sample datasets.
The mapper is common for both datasets, as the format is the same.

3.0.1. Components/steps/tasks:

1.  Map output key
The key will be the empNo as it is the join key for the datasets employee and salary
[Implementation: in the mapper]

2.  Tagging the data with the dataset identity
Add an attribute called srcIndex to tag the identity of the data (1=employee, 2=salary, 3=salary history)
[Implementation: in the mapper]

3.  Discarding unwanted atributes
[Implementation: in the mapper]

4. Composite key
Make the map output key a composite of empNo and srcIndex
[Implementation: create custom writable]

5.  Partitioner
Partition the data on natural key of empNo
[Implementation: create custom partitioner class]

5.  Sorting
Sort the data on empNo first, and then source index
[Implementation: create custom sorting comparator class]

6.  Grouping
Group the data based on natural key
[Implementation: create custom grouping comparator class]

7. Joining
Iterate through the values for a key and complete the join for employee and salary data, perform lookup of department to include department name in the output
[Implementation: in the reducer]

3.0.2a. Data pipeline for cardinality of 1..1 between employee and salary data:








































3.0.2b. Data pipeline for cardinality of 1..many between employee and salary data:

























3.0.3. The Composite key

The composite key is a combination of the joinKey empNo, and the source Index (1=employee file.., 2=salary file...)


3.0.4. The mapper

In the setup method of the mapper-
1. Get the filename from the input split, cross reference it against the configuration (set in driver), to derive the source index.  [Driver code: Add configuration [key=filename of employee,value=1], [key=filename of current salary dataset,value=2], [key=filename of historical salary dataset,value=3]
2. Build a list of attributes we cant to emit as map output for each data entity

The setup method is called only once, at the beginning of a map task.  So it is the logical place to to identify the source index.

In the map method of the mapper:
3. Build the map output based on attributes required, as specified in the list from #2

Note:  For salary data, we are including the "effective till" date, even though it is not required in the final output because this is common code for a 1..1 as well as 1..many join to salary data.  If the salary data is historical, we want the current salary only, that is "effective till date= 9999-01-01".


3.0.5. The partitioner

Even though the map output key is composite, we want to partition by the natural join key of empNo, therefore a custom partitioner is in order.


3.0.6. The sort comparator

To ensure that the input to the reducer is sorted on empNo, then on sourceIndex, we need a sort comparator.  This will guarantee that the employee data is the first set in the values list for a key, then the salary data.


3.0.7. The grouping comparator

This class is needed to indicate the group by attribute - the natural join key of empNo


3.0.8. The reducer

In the setup method of the reducer (called only once for the task)-
We are checking if the side data, a map file with department data is in the distributed cache and if found, initializing the map file reader

In the reduce method, -
While iterating through the value list -
1. If the data is employee data (sourceIndex=1), we are looking up the department name in the map file with the deptNo, which is the last attribute in the employee data, and appending the department name to the employee data.
2. If the data is historical salary data, we are only emitting salary where the last attribute is '9999-01-01'.

Key point-
We have set the sort comparator to sort on empNo and sourceIndex.
The sourceIndex of employee data is lesser than salary data - as set in the driver.
Therefore, we are assured that the employee data is always first followed by salary data.
So for each distinct empNo, we are iterating through the values, and appending the same and emitting as output.



3.0.9. The driver

Besides the usual driver code, we are-
1. Adding side data (department lookup data in map file format - in HDFS) to the distributed cache
2. Adding key-value pairs to the configuration, each key value pair being filename, source index.
This is used by the mapper, to tag data with sourceIndex.
3. And lastly, we are associating all the various classes we created to the job.



4.0. The pig equivalent



Pig script-version 1:



Pig script-version 2 - eliminating the reduce-side join:
In this script, we are filtering on most recent salary, and then using the merge join optimization (map-side) in Pig, that can be leveraged on sorted input to the join.


Output:

Tuesday, September 17, 2013

Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

This post covers, map-side join in Java map-reduce, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!


What's in this blog?

A sample map-reduce program in Java that joins two datasets, on the map-side - an employee dataset and a department dataset, with the department number as join key.  The department dataset is a very small dataset in MapFile format, is in HDFS, and is added to the distributed cache.  The MapFile is referenced in the map method of the mapper to look up the department name, and emit the employee dataset with department name included.

Apache documentation on DistributedCache:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html

Related blogs:
1. Map-side join sample in Java using reference data (text file) from distributed cache - Part 1
2. Map-side join sample in Java using reference data (MapFile) from distributed cache - Part 2

Data used in this blog:
http://dev.mysql.com/doc/employee/en.index.html

Pig and Hive for joins:
Pig and Hive have join capabilities built-in, and are optimized for the same.  Programs with joins written in java are more performant, but time-consuming to code, test and support - and in some companies considered an anti-pattern for joins.

Sample program

Thursday, September 12, 2013

Map File - construct, usage, code samples

This post covers, map file format, has links to Apache documentation, my notes on the topic and my sample program demonstrating the functionality. Feel free to share any insights or constructive criticism. Cheers!!

1.0. What's in this blog?

1.  Introduction to map file
2.  Sample code to convert a text file to a map file
3.  Sample code to read a map file

2.0. What's a Map File?

2.0.1. Definition:
From Hadoop the Definitive Guide..
A MapFile is a sorted SequenceFile with an index to permit lookups by key. MapFile can be thought of as a persistent form of java.util.Map (although it doesn’t implement this interface), which is able to grow beyond the size of a Map that is kept in memory.
Apache documentation:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/MapFile.html 

2.0.2. Datatypes: 
The keys must be instances of WritableComparable, and the values, Writable.

2.0.3. Creating map files: 
Create an instance of MapFile.Writer and call append(), to add key-values, in order.

2.0.4. Looking up data in map files: 
Create an instance of MapFile.Reader, and call get(key,value).

2.0.5. Construct
The map file is actually a directory.  Within the same, there is an "index" file, and a "data" file.
The data file is a sequence file and has keys and associated values.
The index file is smaller, has key value pairs with the key being the actual key of the data, and the value, the byte offset.  The index file has a fraction of the keys and is determined by MapFile.Writer.GetIndexInterval().

2.0.5.1. Directory structure:
$ hadoop fs -ls formatProject/data/departments_map | awk '{print $8}'
formatProject/data/departments_map/data
formatProject/data/departments_map/index

2.0.5.2. Content of the file 'data':
$ hadoop fs -text formatProject/data/departments_map/data
d001  Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service

2.0.5.3. Content of the file 'index':
$ hadoop fs -text formatProject/data/departments_map/index
d001 121
d002 152
d003 181
d004 218
d005 250
d006 283
d007 323
d008 350
d009 380

2.0.6. Behind the scenes of a look up
The index file is read into memory, the key less than or equal to the one being looked up is (binary) searched for, and the reader seeks to this key and reads up to key being looked up, extracts and returns the value associated with the key.  Returns a null if the key is not found.

If the map file is too large to load into memory, there are configurations that can be set to skip keys in the index.   

2.0.7. Usage
Fast lookups - in joins, among others.
Can also be used as a container for small files, with the filename as the key.

3.0. Creating a map file


4.0. Looking up a key in a map file

Covered already in the gist under section 3.
The plan is to use the map file in a map-side join in a subsequent blog.

5.0. Any thoughts/comments

Any constructive criticism and/or additions/insights is much appreciated.

Cheers!!