Tuesday, May 27, 2014

Text analytics : binning the large data sets using MapReduce ( MultipleOutputs)

Divide and conquer is best way to overcome a big problem same is also applied to the large dataset.suppose we have a large data sets and we want to run the analytics overs some keywords which are scattered in the dataset then its better to partition the data in some bins based on the keywords and then run analytics over individual data set.
To provide binning of dataset hadoop mapreduce api shipped with the MulitpleOutputs, here you can find the general documentation to get familiar with the :
The MultipleOutputs class simplifies writing output data to multiple outputs

Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own OutputFormat, with its own key class and with its own value class.

Case two: to write data to different files provided by user

MultipleOutputs supports counters, by default they are disabled. The counters group is the MultipleOutputs class name. The names of the counters are the same as the output name. These count the number records written to each output name.

let suppose we have some keywords keyword1,keyword2,keyword3 in the provided data set and we want to create three bins based on the three keyword provided here.
lets configure our mapper which take each line as input and and look the match for the provided keyword,if it find the match in the line then mapper emits the line as a value using mulipleOutput object created while setup of mapper gets called.

package com.rajkrrsingh.binning;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class BinMapper extends Mapper<Object, Text, Text, NullWritable>{

private MultipleOutputs<Text, NullWritable> mos = null;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, NullWritable>(context);
}

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
if(line.contains("keyword1")) {
mos.write("bins", value, NullWritable.get(), "keyword1");
}else if(line.contains("keyword2")) {
mos.write("bins", value,NullWritable.get(),"keyword2");
}else if(line.contains("keyword3")) {
mos.write("bins", value, NullWritable.get(), "keyword3");
}else {
mos.write("bins", value, NullWritable.get(), "no_match");
}
}

@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
mos.close();
}
}

Now configure your Driver class to setup your binning job
package com.rajkrrsingh.binning;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class BinningJobDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 2) {
System.err.println("Usage : BinningJobDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf, "Binning Job");
job.setJarByClass(BinningJobDriver.class);
job.setMapperClass(BinMapper.class);
job.setNumReduceTasks(0);

TextInputFormat.setInputPaths(job, new Path(commandLineArgs[0]));
TextOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));

MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,Text.class, NullWritable.class);
MultipleOutputs.setCountersEnabled(job, true);
int rc = job.waitForCompletion(true) ? 0 : 2;
System.exit(rc);

}

}