Tuesday, December 31, 2013

Log parsing in Hadoop - Part 5: Cascading

1.0. What's in this post?

This post is a part of a series, focussed on log parsing in Java Mapreduce, Pig, Hive, Python...This one covers a simple log parser in Cascading, and includes a sample program, data and commands.

Documentation on Cascading:
http://www.cascading.org/documentation/

Other related blogs:
Log parsing in Hadoop -Part 1: Java 
Log parsing in Hadoop -Part 2: Hive 
Log parsing in Hadoop -Part 3: Pig 
Log parsing in Hadoop -Part 4: Python
Log parsing in Hadoop -Part 5: Cascading
Log parsing in Hadoop -Part 6: Morphlines 


2.0. Sample program


2.0.1. What the program does..
a) It reads syslog generated logs stored in HDFS
b) Regex parses them 
c) Writes successfully parsed records to files in HDFS
d) Writes records that dont match the pattern to HDFS
e) Writes a report to HDFS that contains the count of distinct processes logged.

2.0.2. Sample log data
Sample log data
---------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
Data structure
--------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
view raw 01a-SampleData hosted with ❤ by GitHub


2.0.3. Directory structure of log files
Directory structure of logs
---------------------------
cascadingSamples
data
syslogs
<<Node-Name>>
<<Year>>
<<Month>>
messages
Specifically...
LogParser/data/syslogs/cdh-dev01/2013/04/messages
LogParser/data/syslogs/cdh-dev01/2013/05/messages
LogParser/data/syslogs/cdh-dn01/2013/05/messages
LogParser/data/syslogs/cdh-dn02/2013/04/messages
LogParser/data/syslogs/cdh-dn02/2013/05/messages
LogParser/data/syslogs/cdh-dn03/2013/04/messages
LogParser/data/syslogs/cdh-dn03/2013/05/messages
LogParser/data/syslogs/cdh-jt01/2013/04/messages
LogParser/data/syslogs/cdh-jt01/2013/05/messages
LogParser/data/syslogs/cdh-nn01/2013/05/messages
LogParser/data/syslogs/cdh-vms/2013/05/messages


2.0.4. Log parser in Cascading
package logparser;
import java.util.Properties;
import java.util.Collections;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.tap.hadoop.GlobHfs;
import cascading.tap.MultiSourceTap;
import cascading.tap.hadoop.Hfs;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFunction;
public class LogParser {
public static void main(String[] args) {
// {{
// INSTANTIATE/INITIALIZE
// Set the current job jar
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, LogParser.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
// Arguments
String inputPath = args[ 0 ];
String outputPath = args[ 1 ];
String errorPath = args[ 2 ];
String reportPath = args[ 3 ];
// Scheme for sinks
TextLine sinkTextLineScheme = new TextLine();
// Define what the input file looks like, "offset" is bytes from beginning
TextLine sourceTextLineScheme = new TextLine( new Fields( "offset", "line" ) );
// The inputPath is a file glob, and a so, GlobHfs is used below
GlobHfs sourceFilesGlob = new GlobHfs( sourceTextLineScheme, inputPath );
// Create SOURCE tap to read a resource from the HDFS glob
Tap sourceSyslogTap = new MultiSourceTap(sourceFilesGlob);
// Create a SINK tap to write parsed logs to HDFS
sinkTextLineScheme.setNumSinkParts(2);
Tap sinkParsedLogTap = new Hfs( sinkTextLineScheme, outputPath, SinkMode.REPLACE);
// Create a SINK tap to write reports to HDFS
sinkTextLineScheme.setNumSinkParts(1);
Tap sinkReportTap = new Hfs(sinkTextLineScheme, reportPath, SinkMode.REPLACE );
// Create a TRAP tap to write records that failed parsing
sinkTextLineScheme.setNumSinkParts(1);
Tap sinkTrapTap = new Hfs( sinkTextLineScheme, errorPath , SinkMode.REPLACE );
// }}
// {{
// EXTRACT/PARSE
// Declare the field names
Fields sysLogFields = new Fields( "month", "day", "time", "node", "process", "message" );
// Define the regex pattern to parse the log file with
String sysLogRegex = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)";
// Declare the groups from the above regex we want to keep. Each regex group will be given
// a field name from 'sysLogFields', above, respectively
int[] keepParsedGroups = {1, 2, 3, 4, 5, 6};
// Create the parser
RegexParser parser = new RegexParser( sysLogFields, sysLogRegex, keepParsedGroups );
// Import & parse pipe
// Create the import pipe element, with the name 'import', and with the input argument named "line"
// Replace the incoming tuple with the parser results
// "line" -> parser -> "ts"
Pipe importAndParsePipe = new Each( "import", new Fields( "line" ), parser, Fields.RESULTS );
// }}
// {{
// TRANSFORM
// Transform the process field - remove process ID if found, for better reporting on logs
// Also, convert to lowercase
// E.g. Change "ntpd[1302]" to "ntpd"
String expression = "process.substring(0, (process.indexOf('[') == -1 ? process.length()-1 : process.indexOf('[') )).toLowerCase()";
Fields fieldProcess = new Fields( "process" );
ExpressionFunction expFunc =
new ExpressionFunction( fieldProcess, expression, String.class );
// Pipe for transformed data
Pipe scrubbedDataPipe = new Each( importAndParsePipe, fieldProcess, expFunc, Fields.REPLACE );
// }}
// {{
// REPORT/ANALYZE
// Capture counts by process, as a report, sort by count, desc
// ------------------------------------------------------------
// process count()
// E.g. sshd 4
Pipe reportPipe = new Pipe("reportByProcess", scrubbedDataPipe);
Fields keyFields = new Fields("process");
Fields groupByFields = new Fields( "process");
Fields countField = new Fields( "countOfEvents" );
Fields sortByFields = new Fields( "process");
reportPipe = new GroupBy(reportPipe, groupByFields);
reportPipe = new Every(reportPipe, keyFields,
new Count(countField), Fields.ALL);
reportPipe = new GroupBy(reportPipe,
keyFields,
countField,
false); //true=descending order
//End of reports
//}}
// {{
// EXECUTE
// Connect the taps, pipes, etc., into a flow & execute
FlowDef flowDef = FlowDef.flowDef()
.setName( "Log parser" )
.addSource( importAndParsePipe, sourceSyslogTap )
.addTailSink( scrubbedDataPipe, sinkParsedLogTap )
.addTailSink(reportPipe,sinkReportTap)
.addTrap( importAndParsePipe, sinkTrapTap );
Flow flow = flowConnector.connect(flowDef);
flow.complete();
// }}
}
}


2.0.5. build.gradle file
Gradle documentation is available at- http://www.gradle.org
Here is the build.gradle...
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
archivesBaseName = 'logparser-cascading'
repositories {
mavenLocal()
mavenCentral()
mavenRepo name: 'conjars', url: 'http://conjars.org/repo/'
}
ext.cascadingVersion = '2.5.1'
dependencies {
compile( group: 'cascading', name: 'cascading-core', version: cascadingVersion )
compile( group: 'cascading', name: 'cascading-hadoop', version: cascadingVersion )
}
jar {
description = "Assembles a Hadoop ready jar file"
doFirst {
into( 'lib' ) {
from configurations.compile
} }
manifest {
attributes( "Main-Class": "logparser/LogParser" )
}
}
view raw 03-gradleBuild hosted with ❤ by GitHub

2.0.6. Data and code download 

Code and data download
=======================
Github:
https://github.com/airawat/LogParser/tree/master/cascadingSamples
Directories relevant to this post:
===================================
$ tree -if --noreport LogParser
LogParser
LogParser/cascadingSamples
LogParser/cascadingSamples/jars
LogParser/cascadingSamples/jars/logparser-cascading.jar
LogParser/cascadingSamples/src
LogParser/cascadingSamples/src/main
LogParser/cascadingSamples/src/main/java
LogParser/cascadingSamples/src/main/java/logparser
LogParser/cascadingSamples/src/main/java/logparser/LogParser.java
LogParser/data
LogParser/data/syslogs
LogParser/data/syslogs/cdh-dev01
LogParser/data/syslogs/cdh-dev01/2013
LogParser/data/syslogs/cdh-dev01/2013/04
LogParser/data/syslogs/cdh-dev01/2013/04/messages
LogParser/data/syslogs/cdh-dev01/2013/05
LogParser/data/syslogs/cdh-dev01/2013/05/messages
LogParser/data/syslogs/cdh-dn01
LogParser/data/syslogs/cdh-dn01/2013
LogParser/data/syslogs/cdh-dn01/2013/05
LogParser/data/syslogs/cdh-dn01/2013/05/messages
LogParser/data/syslogs/cdh-dn02
LogParser/data/syslogs/cdh-dn02/2013
LogParser/data/syslogs/cdh-dn02/2013/04
LogParser/data/syslogs/cdh-dn02/2013/04/messages
LogParser/data/syslogs/cdh-dn02/2013/05
LogParser/data/syslogs/cdh-dn02/2013/05/messages
LogParser/data/syslogs/cdh-dn03
LogParser/data/syslogs/cdh-dn03/2013
LogParser/data/syslogs/cdh-dn03/2013/04
LogParser/data/syslogs/cdh-dn03/2013/04/messages
LogParser/data/syslogs/cdh-dn03/2013/05
LogParser/data/syslogs/cdh-dn03/2013/05/messages
LogParser/data/syslogs/cdh-jt01
LogParser/data/syslogs/cdh-jt01/2013
LogParser/data/syslogs/cdh-jt01/2013/04
LogParser/data/syslogs/cdh-jt01/2013/04/messages
LogParser/data/syslogs/cdh-jt01/2013/05
LogParser/data/syslogs/cdh-jt01/2013/05/messages
LogParser/data/syslogs/cdh-nn01
LogParser/data/syslogs/cdh-nn01/2013
LogParser/data/syslogs/cdh-nn01/2013/05
LogParser/data/syslogs/cdh-nn01/2013/05/messages
LogParser/data/syslogs/cdh-vms
LogParser/data/syslogs/cdh-vms/2013
LogParser/data/syslogs/cdh-vms/2013/05
LogParser/data/syslogs/cdh-vms/2013/05/messages



2.0.7. Commands (load data, execute program)
Gradle
=================
$ gradle clean jar
Should generate a jar with dependencies managed.
Load data to HDFS
==================
$ hadoop fs -mkdir cascadingSamples
$ cd ~
$ hadoop fs -put cascadingSamples/data cascadingSamples
$ hadoop fs -put cascadingSamples/jars cascadingSamples
Run program
============
hadoop jar cascadingSamples/jars/logparser-cascading.jar "cascadingSamples/data/syslogs/*/*/*/" "cascadingSamples/Output-LogParser" "cascadingSamples/Output-LogParser/traps" "cascadingSamples/Output-LogParser/reports"
view raw 04-Commands hosted with ❤ by GitHub


2.0.8. Results 
Output files
=============
$ hadoop fs -ls -R cascadingSamples |grep 'part*' | awk '{print $8}'
cascadingSamples/Output-LogParser/part-00000
cascadingSamples/Output-LogParser/part-00001
cascadingSamples/Output-LogParser/part-00002
cascadingSamples/Output-LogParser/part-00003
cascadingSamples/Output-LogParser/part-00004
cascadingSamples/Output-LogParser/part-00005
cascadingSamples/Output-LogParser/part-00006
cascadingSamples/Output-LogParser/part-00007
cascadingSamples/Output-LogParser/part-00008
cascadingSamples/Output-LogParser/part-00009
cascadingSamples/Output-LogParser/part-00010
cascadingSamples/Output-LogParser/reports/part-00000
cascadingSamples/Output-LogParser/traps/part-m-00001-00006
cascadingSamples/Output-LogParser/traps/part-m-00002-00006
Parsed log
===========
$ hadoop fs -cat cascadingSamples/Output-LogParser/part-00003 | less
May 3 11:51:50 cdh-dn02 init tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:52:26 cdh-dn02 kernel nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:52:51 cdh-dn02 kernel hrtimer: interrupt took 6222750 ns
May 3 11:52:53 cdh-dn02 ntpd_initres host name not found: 0.rhel.pool.ntp.org
Report
===========
$ hadoop fs -cat cascadingSamples/Output-LogParser/reports/part-00000 | less
console-kit-daemon 7
gnome-session 11
init 166
kernel 810
login 2
networkmanager 7
nm-dispatcher.action 4
ntpd_initres 4133
polkit-agent-helper-1 8
pulseaudio 18
spice-vdagent 15
sshd 6
sudo 8
udevd 6
Records that failed parsing
============================
$ hadoop fs -cat cascadingSamples/Output-LogParser/traps/part*
May 7 00:40:53 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:42:13 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:43:38 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:45:01 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:47:18 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:47:41 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
Record count of input [syslogs]
===============================
$ hadoop fs -cat cascadingSamples/data/syslogs/*/*/*/messages | wc -l
5207
Record count of output [parsed logs + records that failed parsing]
===================================================================
$ echo $((`hadoop fs -cat cascadingSamples/Output-LogParser/part* | wc -l`+`hadoop fs -cat cascadingSamples/Output-LogParser/traps/part* | wc -l`))
5207
view raw 05-Results hosted with ❤ by GitHub






13 comments:

  1. The information which you provides is very much useful for the Hadoop Learners. Thank you for your valuable information. I
    found Hadoop Training in hyderabad is the best Hadoop Training institute in Hyderabad, India .

    ReplyDelete
  2. Well Said. The content provided is true up to my knowledge. This made me to understand the concepts very clear. Thanks for sharing this wonderful information in here. Keep blogging article like this. I have bookmarked this page for future reference as well.


    Hadoop Training Chennai | Big Data Training | JAVA Course in Chennai

    ReplyDelete
  3. This comment has been removed by the author.

    ReplyDelete
  4. I just want to say I’m new to weblog and certainly savored this page. You actually have outstanding well written articles. Cheers for sharing with us your website.

    Hadoop Training in Chennai

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
  6. Log parsing is very hard task in Hadoop, thank for explaining it easily. I am taking big data training in Hyderabad from Lucidtechsystems. This post helps me in my training. Thank you.

    ReplyDelete
  7. This comment has been removed by the author.

    ReplyDelete
  8. thakyou it vry nice blog for beginners
    https://www.emexotechnologies.com/courses/big-data-analytics-training/big-data-hadoop-training/

    ReplyDelete