Tuesday, July 2, 2013

Log parsing in Hadoop -Part 1: Java

This post includes sample scripts, data and commands to parse a log file in java using regex.

Other related blogs:
Log parsing in Hadoop -Part 1: Java - using regex 
Log parsing in Hadoop -Part 2: Hive - using regex 
Log parsing in Hadoop -Part 3: Pig - using regex 
Log parsing in Hadoop -Part 4: Python - using regex 


This gist includes a mapper, reducer and driver in java that can parse log files using
regex; The code for combiner is the same as reducer;
Usecase: Count the number of occurances of processes that got logged, inception to date.
Includes:
---------
Sample data and scripts for download:01-ScriptAndDataDownload
Sample data and structure: 02-SampleDataAndStructure
Mapper: 03-LogEventCountMapper.java
Reducer: 04-LogEventCountReducer.java
Driver: 05-LogEventCount.java
Commands: 06-Commands
Sample output: 07-Output
Code files and data download
-----------------------------
The code files, and sample data are available for download at:
https://groups.google.com/forum/?hl=en&fromgroups#!topic/hadooped/0KSGUAREb84
The following is the directory structure with directories listed in bold.
LogParserSample
logs
airawat-syslog
2013
04
messages
2013
05
messages
JavaProgram
LogAnalysisEventLevel/src/Airawat/Oozie/Samples
LogEventCount.java
LogEventCountMapper.java
LogEventCountReducer.java
Application
lib
LogEventCount.jar
Sample data
------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
Structure
----------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
Commands to load data and run mapreduce program
-------------------------------------------------
a) Load the data
$ hadoop fs -mkdir LogParserSample
$ hadoop fs -mkdir LogParserSample/logs
$ hadoop fs -put LogParserSample/logs/* LogParserSample/logs/
$ hadoop fs -ls -R LogParserSample/ | awk {'print $8'}
LogParserSample/logs
LogParserSample/logs/airawat-syslog
LogParserSample/logs/airawat-syslog/2013
LogParserSample/logs/airawat-syslog/2013/04
LogParserSample/logs/airawat-syslog/2013/04/messages
LogParserSample/logs/airawat-syslog/2013/05
LogParserSample/logs/airawat-syslog/2013/05/messages
b) Run the map reduce program
$ hadoop jar LogParserSample/Application/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount LogParserSample/logs/*/*/*/ LogParserSample/myCLIOutput
c) Results:
$ hadoop fs -ls -R LogParserSample | awk {'print $8'}
LogParserSample/logs
LogParserSample/logs/airawat-syslog
LogParserSample/logs/airawat-syslog/2013
LogParserSample/logs/airawat-syslog/2013/04
LogParserSample/logs/airawat-syslog/2013/04/messages
LogParserSample/logs/airawat-syslog/2013/05
LogParserSample/logs/airawat-syslog/2013/05/messages
LogParserSample/myCLIOutput
LogParserSample/myCLIOutput/_SUCCESS
LogParserSample/myCLIOutput/_logs
LogParserSample/myCLIOutput/_logs/history
LogParserSample/myCLIOutput/_logs/history/cdh-jt01_1371140867475_job_201306131127_0177_conf.xml
LogParserSample/myCLIOutput/_logs/history/job_201306131127_0177_1371573190526_airawat_Syslog+Event+Rollup
LogParserSample/myCLIOutput/part-r-00000
d) View output
$ hadoop fs -cat LogParserSample/myCLIOutput/part-r-00000
view raw 03-Commands hosted with ❤ by GitHub
//Mapper: LogEventCountMapper.java
package Airawat.Oozie.Samples;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* The following is the code for the mapper class:
*/
public class LogEventCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/*
* The map method runs once for each line of text in the input file.
* The method receives a key of type LongWritable, a value of type
* Text, and a Context object.
*/
String strLogEntryPattern = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)";
public static final int NUM_FIELDS = 6;
Text strEvent = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String strLogEntryLine = value.toString();
Pattern objPtrn = Pattern.compile(strLogEntryPattern);
Matcher objPatternMatcher = objPtrn.matcher(strLogEntryLine);
if (!objPatternMatcher.matches() || NUM_FIELDS != objPatternMatcher.groupCount()) {
System.err.println("Bad log entry (or problem with RE?):");
System.err.println(strLogEntryLine);
return;
}
/*
System.out.println("Month_Name: " + objPatternMatcher.group(1));
System.out.println("Day: " + objPatternMatcher.group(2));
System.out.println("Time: " + objPatternMatcher.group(3));
System.out.println("Node: " + objPatternMatcher.group(4));
System.out.println("Process: " + objPatternMatcher.group(5));
System.out.println("LogMessage: " + objPatternMatcher.group(6));
*/
strEvent.set(objPatternMatcher.group(5));
/*
* Call the write method on the Context object to emit a key
* and a value from the map method.
*/
context.write(strEvent, new IntWritable(1));
}
}
//Reducer: LogEventCountReducer.java
package Airawat.Oozie.Samples;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* The following is the code for the reducer class:
*/
public class LogEventCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int intEventCount = 0;
/*
* For each value in the set of values passed by the mapper:
*/
for (IntWritable value : values) {
/*
* Add the value to the SysLog event counter for this key.
*/
intEventCount += value.get();
}
/*
* Call the write method on the Context object to emit a key
* and a value from the reduce method.
*/
context.write(key, new IntWritable(intEventCount));
}
}
//Driver: LogEventCount.java
package Airawat.Oozie.Samples;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
/*
* The following is the code for the driver class:
*/
public class LogEventCount {
public static void main(String[] args) throws Exception {
/*
* The expected command-line arguments are the paths containing
* input and output data. Terminate the job if the number of
* command-line arguments is not exactly 2.
*/
if (args.length != 2) {
System.out.printf(
"Usage: SysLogEventCount <input dir> <output dir>\n");
System.exit(-1);
}
/*
* Instantiate a Job object for your job's configuration.
*/
Job job = new Job();
/*
* Specify the jar file that contains your driver, mapper, and reducer.
* Hadoop will transfer this jar file to nodes in your cluster running
* mapper and reducer tasks.
*/
job.setJarByClass(LogEventCount.class);
/*
* Specify an easily-decipherable name for the job.
* This job name will appear in reports and logs.
*/
job.setJobName("Syslog Event Rollup");
/*
* Specify the paths to the input and output data based on the
* command-line arguments.
*/
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/*
* Specify the mapper and reducer classes.
*/
job.setMapperClass(LogEventCountMapper.class);
job.setReducerClass(LogEventCountReducer.class);
/*
* For the logs count application, the mapper's output keys and
* values have the same data types as the reducer's output keys
* and values: Text and IntWritable.
*
* When they are not the same data types, you must call the
* setMapOutputKeyClass and setMapOutputValueClass
* methods.
*/
/*
* Specify the job's output key and value classes.
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
/*
* Start the MapReduce job and wait for it to finish.
* If it finishes successfully, return 0. If not, return 1.
*/
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
Output of mapreduce program
----------------------------
init: 23
kernel: 58
ntpd_initres[1705]: 792
sudo: 2
udevd[361]: 1
view raw 07-Output hosted with ❤ by GitHub

6 comments:

  1. Great Job. Very helpful. Thank you.

    ReplyDelete
  2. can you please provide any info of how to write he pattern for the same.

    ReplyDelete
  3. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete
  4. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete
  5. why you have gave the package name with OOzie, have u scheduled it through oozie ?

    ReplyDelete