Tuesday, May 27, 2014

Distributed Grep : Implementation of distributed grep on the big data set using MapReduce

Grep is a command-line utility for searching plain-text data sets for lines matching a regular expression. Grep was originally developed for the Unix operating system, but is available today for all Unix-like systems. Its name comes from the ed command g/re/p (globally search a regular expression and print), which has the same effect: doing a global search with the regular expression and printing all matching lines

In this tutorial I will demonstarte you to implement distributed grep utility which will work on the files stored on the HDFS using MapReduce.It is a very simple program that receives as an input a regular expression, scans a bunch of input files, find any matching strings while counting the occurrences of each match and finally outputs the result into an output file.

lets create our Mapper as follows

package com.rajkrrsingh.disributedgrep;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistributedGrepMapper extends Mapper<Object, Text, NullWritable, Text>{

@Override
protected void map(Object key, Text value,Context context) throws IOException, InterruptedException {
String txt = value.toString();
String regex = context.getConfiguration().get("regex");

if (txt.matches(regex)) {
context.write(NullWritable.get(), value);
}
}

}

Now we need to implement the driver class which will run the map reduce on the cluster and produce the output.
package com.rajkrrsingh.disributedgrep;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DistributedGrepDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 3) {
System.err.println("Usage : DistributedGrepDriver <regex> <input> <output>");
System.exit(2);
}

conf.set("regex", commandLineArgs[0]);
Job job = new Job(conf,"Distributed Grep Using MapReduce");
job.setJarByClass(DistributedGrepDriver.class);
job.setMapperClass(DistributedGrepMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[2]));
int rc = job.waitForCompletion(true) ? 0 : 1;
System.exit(rc);
}


}