Tuesday, May 27, 2014

MapReduce : Get Distinct Records from the Big data set.

in this tutorial we will do some analytics of data using map-reduce, we will find out the distinct records from the data set using map reduce.
lets walk trough our sample data which look like as follows, its a sample data of a telecom company which has subscriber id along with the tower id,we will run a map reduce on the dataset to find out the distinct records :
subId=00001111911128052627,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.4621702216543667E17
subId=00001111911128052639,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212219.6726312167218586E17
subId=00001111911128052615,towerid=11232w34532543456345623453456984756894758,bytes=122112212212212216.9431647633139046E17
subId=00001111911128052615,towerid=11232w34532543456345623453456984756894757,bytes=122112212212212214.7836041833447418E17
subId=00001111911128052639,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212219.0366596827240525E17
subId=00001111911128052619,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.0686280014540467E17
subId=00001111911128052658,towerid=11232w34532543456345623453456984756894759,bytes=122112212212212216.9860890496178944E17
subId=00001111911128052652,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.303981333116041E17

Here is my mapper code which takes line of data as input and extract and emit the towerid along with the NullWritable.

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistinctMapper extends Mapper<Object, Text, Text, NullWritable> {

private Text outTowerId = new Text();

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {

String[] tokens = value.toString().split(",");
String[] towerIdEx = tokens[1].split("=");
String towerId = towerIdEx[1].toString();
if(towerId != null) {
outTowerId.set(towerId);
}
context.write(outTowerId, NullWritable.get());

}

}

The reducer code is plain identity reducer that writes the key along with the NullWritable values

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable>{

@Override
protected void reduce(Text key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}

}

Now configure your driver class as follows:

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DistinctDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 2) {
System.err.println("Usage : DistinctDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf, "Distinct Towers");
job.setJarByClass(DistinctDriver.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));
int rc = job.waitForCompletion(true)?0:1;
System.exit(rc);
}

}