Wednesday, May 28, 2014

Hadoop Mapreduce : Working with Counters to get the outlook of data

The Hadoop system records a set of metric counters for each job that it runs. For example, the number of input records mapped, the number of bytes it reads from or writes to HDFS, etc. To profile your applications, you may wish to record other values as well. For example, if the records sent into your mappers fall into two categories (call them "A" and "B"), you may wish to count the total number of A-records seen vs. the total number of B-records.

lets run a small analytic on the following dataset to figure out how many records are of age less than 15 and how many of them are of greater than the 60 year of age.

"","Country","Region","Population","AgeGrp","FertilityRate","LifeExpectancy","ChildMortality","CellularSubscribers","LiteracyRate","GNI","PrimarySchoolEnrollmentMale","PrimarySchoolEnrollmentFemale"
"2","Albania","Europe",3162,21,1.75,74,16.7,96.39,NA,8820,NA,NA
"4","Andorra","Europe",78,15,NA,82,3.2,75.49,NA,NA,78.4,79.4
"8","Armenia","Europe",2969,62,1.74,71,16.4,103.57,99.6,6100,NA,NA
"10","Austria","Europe",8464,71,1.44,81,4,154.78,NA,42050,NA,NA
"11","Azerbaijan","Europe",9309,12,1.96,71,35.2,108.75,NA,8960,85.3,84.1
"16","Belarus","Europe",9405,9,1.47,71,5.2,111.88,NA,14460,NA,NA
"17","Belgium","Europe",11060,18,1.85,80,4.2,116.61,NA,39190,98.9,99.2
"22","Bosnia and Herzegovina","Europe",3834,71,1.26,76,6.7,84.52,97.9,9190,86.5,88.4
"26","Bulgaria","Europe",7278,9,1.51,74,12.1,140.68,NA,14160,99.3,99.7

lets configure our Mapper class with the Counter group "Under_15" and "Over_60" and extract the AgeGrp if we found the record which is greater than 60 increment the counter with count 1 and if the record is less than 15 then increment the counter with count 1.

package com.rajkrrsingh.counter;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CounterExMapper extends Mapper<Object, Text, NullWritable, NullWritable>{

public static final String UNDER_FIFTEEN_GROUP = "Under_15";
public static final String OVER_SIXTY_GROUP = "Over_60";

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {

String[] tokens = value.toString().split(",");

if(tokens[4] !=null && Integer.parseInt(tokens[4])>60) {
context.getCounter(OVER_SIXTY_GROUP, "Over_60").increment(1);
}else if (tokens[4] !=null && Integer.parseInt(tokens[4])<15) {
context.getCounter(UNDER_FIFTEEN_GROUP, "Under_15").increment(1);
}else {
context.getCounter(UNDER_FIFTEEN_GROUP, "NA").increment(1);
}
}

}

here is out job driver to get the overview of records which are greater than 60 and less than 15.

package com.rajkrrsingh.counter;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.Counter;

public class CounterDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();

if(commandLineArgs.length != 2) {
System.err.println("Usage : CounterDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf,"Count the record with age <15 and >60");
job.setJarByClass(CounterDriver.class);
job.setMapperClass(CounterExMapper.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));

int rc = job.waitForCompletion(true) ?0:1;

if(rc==0) {
for (Counter counter : job.getCounters().getGroup(CounterExMapper.OVER_SIXTY_GROUP)) {
System.out.println(counter.getDisplayName() + "\t"+ counter.getValue());
}

for (Counter counter : job.getCounters().getGroup(CounterExMapper.UNDER_FIFTEEN_GROUP)) {
System.out.println(counter.getDisplayName() + "\t"+ counter.getValue());
}
}

FileSystem.get(conf).delete(new Path(commandLineArgs[1]), true);

System.exit(rc);

}

}

Tuesday, May 27, 2014

Text analytics : binning the large data sets using MapReduce ( MultipleOutputs)

Divide and conquer is best way to overcome a big problem same is also applied to the large dataset.suppose we have a large data sets and we want to run the analytics overs some keywords which are scattered in the dataset then its better to partition the data in some bins based on the keywords and then run analytics over individual data set.
To provide binning of dataset hadoop mapreduce api shipped with the MulitpleOutputs, here you can find the general documentation to get familiar with the :
The MultipleOutputs class simplifies writing output data to multiple outputs

Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own OutputFormat, with its own key class and with its own value class.

Case two: to write data to different files provided by user

MultipleOutputs supports counters, by default they are disabled. The counters group is the MultipleOutputs class name. The names of the counters are the same as the output name. These count the number records written to each output name.

let suppose we have some keywords keyword1,keyword2,keyword3 in the provided data set and we want to create three bins based on the three keyword provided here.
lets configure our mapper which take each line as input and and look the match for the provided keyword,if it find the match in the line then mapper emits the line as a value using mulipleOutput object created while setup of mapper gets called.

package com.rajkrrsingh.binning;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class BinMapper extends Mapper<Object, Text, Text, NullWritable>{

private MultipleOutputs<Text, NullWritable> mos = null;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, NullWritable>(context);
}

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
if(line.contains("keyword1")) {
mos.write("bins", value, NullWritable.get(), "keyword1");
}else if(line.contains("keyword2")) {
mos.write("bins", value,NullWritable.get(),"keyword2");
}else if(line.contains("keyword3")) {
mos.write("bins", value, NullWritable.get(), "keyword3");
}else {
mos.write("bins", value, NullWritable.get(), "no_match");
}
}

@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
mos.close();
}
}

Now configure your Driver class to setup your binning job
package com.rajkrrsingh.binning;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class BinningJobDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 2) {
System.err.println("Usage : BinningJobDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf, "Binning Job");
job.setJarByClass(BinningJobDriver.class);
job.setMapperClass(BinMapper.class);
job.setNumReduceTasks(0);

TextInputFormat.setInputPaths(job, new Path(commandLineArgs[0]));
TextOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));

MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,Text.class, NullWritable.class);
MultipleOutputs.setCountersEnabled(job, true);
int rc = job.waitForCompletion(true) ? 0 : 2;
System.exit(rc);

}

}



MapReduce : Get Distinct Records from the Big data set.

in this tutorial we will do some analytics of data using map-reduce, we will find out the distinct records from the data set using map reduce.
lets walk trough our sample data which look like as follows, its a sample data of a telecom company which has subscriber id along with the tower id,we will run a map reduce on the dataset to find out the distinct records :
subId=00001111911128052627,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.4621702216543667E17
subId=00001111911128052639,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212219.6726312167218586E17
subId=00001111911128052615,towerid=11232w34532543456345623453456984756894758,bytes=122112212212212216.9431647633139046E17
subId=00001111911128052615,towerid=11232w34532543456345623453456984756894757,bytes=122112212212212214.7836041833447418E17
subId=00001111911128052639,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212219.0366596827240525E17
subId=00001111911128052619,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.0686280014540467E17
subId=00001111911128052658,towerid=11232w34532543456345623453456984756894759,bytes=122112212212212216.9860890496178944E17
subId=00001111911128052652,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.303981333116041E17

Here is my mapper code which takes line of data as input and extract and emit the towerid along with the NullWritable.

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistinctMapper extends Mapper<Object, Text, Text, NullWritable> {

private Text outTowerId = new Text();

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {

String[] tokens = value.toString().split(",");
String[] towerIdEx = tokens[1].split("=");
String towerId = towerIdEx[1].toString();
if(towerId != null) {
outTowerId.set(towerId);
}
context.write(outTowerId, NullWritable.get());

}

}

The reducer code is plain identity reducer that writes the key along with the NullWritable values

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable>{

@Override
protected void reduce(Text key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}

}

Now configure your driver class as follows:

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DistinctDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 2) {
System.err.println("Usage : DistinctDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf, "Distinct Towers");
job.setJarByClass(DistinctDriver.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));
int rc = job.waitForCompletion(true)?0:1;
System.exit(rc);
}

}

Distributed Grep : Implementation of distributed grep on the big data set using MapReduce

Grep is a command-line utility for searching plain-text data sets for lines matching a regular expression. Grep was originally developed for the Unix operating system, but is available today for all Unix-like systems. Its name comes from the ed command g/re/p (globally search a regular expression and print), which has the same effect: doing a global search with the regular expression and printing all matching lines

In this tutorial I will demonstarte you to implement distributed grep utility which will work on the files stored on the HDFS using MapReduce.It is a very simple program that receives as an input a regular expression, scans a bunch of input files, find any matching strings while counting the occurrences of each match and finally outputs the result into an output file.

lets create our Mapper as follows

package com.rajkrrsingh.disributedgrep;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistributedGrepMapper extends Mapper<Object, Text, NullWritable, Text>{

@Override
protected void map(Object key, Text value,Context context) throws IOException, InterruptedException {
String txt = value.toString();
String regex = context.getConfiguration().get("regex");

if (txt.matches(regex)) {
context.write(NullWritable.get(), value);
}
}

}

Now we need to implement the driver class which will run the map reduce on the cluster and produce the output.
package com.rajkrrsingh.disributedgrep;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DistributedGrepDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 3) {
System.err.println("Usage : DistributedGrepDriver <regex> <input> <output>");
System.exit(2);
}

conf.set("regex", commandLineArgs[0]);
Job job = new Job(conf,"Distributed Grep Using MapReduce");
job.setJarByClass(DistributedGrepDriver.class);
job.setMapperClass(DistributedGrepMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[2]));
int rc = job.waitForCompletion(true) ? 0 : 1;
System.exit(rc);
}


}

Sunday, November 24, 2013

HBase compatibility with Hadoop 2.x

HBase releases downloaded from the mirror site are compiled against a certain Hadoop version.for example,hbase-0.94.7 and hbase-0.92.2 are compatible with Hadoop1.0.x version.They wont work with the Hadoop 2.x.

If you have Hadoop 2.x version installed on your system,you need to download the HBase source and compile against the Hadoop 2.x version.

let it make it to work with Hadoop 2.x version

Download and install maven latest version

Install subversion
$sudo yum install subversion

Checkout HBase code
$svn co http://svn.apache.org/repos/asf/hbase/tags/0.94.7 hbase-0.94.7

Now go to the HBase directory and build an HBase tarball,use -Dhadoop.profile option to compile against the Hadoop 2.x
$MAVEN_OPTS="-Xmx2g"
$mvn clean install javadoc:aggregate site assembly:assembly -DskipTests -Prelease -Dhadoop.profile=2.0

verify that *.tar.gz file produced in the target directory

Tuesday, November 19, 2013

Apache Oozie Workflow : Configure and Running a MapReduce job

In this post I will demonstrate you how to configure the Oozie workflow. let's develop a simple MapReduce program using java, if you find any difficulties in doing it then download the code from my git location.Download

Please follow my earlier post to install and run oozie server, create a job directory say SimpleOozieMR as per following directory structure

---SimpleOozieMR
----workflow
-----lib
------workflow.xml

in the lib folder copy the you hadoop job jar and related jars.
let's configure our workflow.xml and keep it into the workflow directory as shown.
<workflow-app name="WorkFlowPatentCitation" xmlns="uri:oozie:workflow:0.1">
    <start to="JavaMR-Job"/>
        <action name="JavaMR-Job">
                <java>
                        <job-tracker>${jobTracker}</job-tracker>
                        <name-node>${nameNode}</name-node>
                        <prepare>
                                <delete path="${outputDir}"/>
                        </prepare>
                        <configuration>
                            <name>mapred.queue.name</name>
       <value>default</value>
                        </configuration>
      <main-class>com.rjkrsinghhadoop.App</main-class>
      <arg>${citationIn}</arg>
      <arg>${citationOut}</arg>
                </java>
                <ok to="end"/>
                <error to="fail"/>
        </action>
        <kill name="fail">
            <message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
        </kill>
    <end name="end" />
</workflow-app>

Now configure your properties file PatentCitation.properties as follows
nameNode=hdfs://master:8020
jobTracker=master:8021
queueName=default
citationIn=citationIn-hdfs
citationOut=citationOut-hdfs
oozie.wf.application.path=$(namenode)/user/rks/oozieworkdir/SimpleOozieMR/workflow

lets create a shell script which will run your first oozie job:
#!/bin/sh
#
export OOZIE_URL="http://localhost:11000/oozie"
#copy your input data to the hdfs
hadoop fs -copyFromLocal /home/rks/CitationInput.txt citationIn-hdfs
#copy SimpleOozieMR to hdfs
hadoop fs -put /home/rks/SimpleOozieMR SimpleOozieMR
#running the oozie job
cd /usr/lib/oozie/bin/
oozie job -config /home/rks/SimpleOozieMR/PatentCitation.properties -run

Apache oozie : Getting Started

Apache oozie Introduction:

--- Started by Yahoo, currenly managed by Apache open source project.
--- Oozie is a workflow scheduler system to manage Apache Hadoop jobs.
-- MapReduce
-- Pig,Hive
-- Streaming
-- Standard Applications
--- Oozie is a scalable, reliable and extensible system.

--- User specifies action flow as Directed Acyclic Graph (DAG)
--- DAG: is a collection of vertices and directed edge configured so that one may not traverse the same vertex twice
--- Each node signifies eighter a Job or Script,Execution and branching can be parameterized by time, decision, data availability,
file size etc.
--- Client specifies process flow in webflow XML
--- Oozie is an extra level of abstraction between user and Hadoop
--- Oozie has its own server application which talks to it's own database(Apache Derby(default),MySql,Oracle etc.
--- User must load required component into the HDFS prior to the execution like input data, flow XML,JARs, resource files.





Interaction with Oozie through command line
$oozie job --oozie http://localhost:11000/oozie -config /user/rks/spjob/job.properties -run 


Web Interface





Installation
-Download Oozie from the Apache oozie official site
-Download ExtJS
-Configure core-site.xml
-restart namenode
-Copy Hadoop jars into a directory
-Extract ExtJS into Oozie's webapp
-Run oozie-setup.sh
-Relocalt newly generated war file
-Configure oozie-site.xml
-Initialize the databse
-Start oozie server

it's done, in the next course of action we will run MapReduce job configured using. stay tuned