Monday, September 22, 2014

Hadoop Configuration Parameters revisited

fs.default.name specifies the default filesystem

fs.checkpoint.dir used by secondary namenode to store filesystem metadata during checkpoint operation

fs.trash.interval specifies the no of minutes the file will be available in the .Trash before final deletion

topology.script.file.name absolute path of the script to make cluster rack aware

hadoop.log.dir The directory in which log data should be written. This should be the same path as specified in HADOOP_LOG_DIR in the hadoop-env.sh file.

io.file.buffer.size (core-site.xml) general purpose buffer size to enhance read/write IO and network IO

dfs.block.size specifies default block size to store on HDFS

dfs.name.dir specifies a comma separated directories to store namenode metadata

dfs.data.dir list of directories where datanodes will store HDFS block data

dfs.datanode.du.reserved disk space reserved for the non HDFS use

dfs.namenode.handler.count count of worker thread to process RPC request by clients as well as other cluster deamon

dfs.datanode.failed.volumes.tolerated specifies the number of disks that are permitted to die before failing the entire datanode

dfs.hosts list of hostname or datanode that are allowed to communicate with the namenode.

dfs.host.exclude for decommisioning the datanode or to block the host to communicate with the namenode

dfs.permissions.supergroup specify group of user whose privileges equivalent to the super user

dfs.balance.bandwidthPerSec use by datanode to limit the bandwidth

mapred.job.tracker specifies the job tracker hostname and port

mapred.local.dir mapReduce job use the machine’s local disk to store their intermediate output to the specified directories

mapred.java.child.opts specifies the jvm heap properties like initial heap size,max heap size etc.

mapred.child.ulimit it a limit on how much virtual memory a process may consume before it is terminated.

mapred.tasktracker.map.tasks.maximum maximum no of map task can be supported by the workeer node in parallel

mapred.tasktracker.reduce.tasks.maximum maximum no of reduce task can be supported by the workeer node in parallel

io.sort.mb specifies the size of circular buffer to have intermediate key-value pair emitted by the mapper

io.sort.factor specifies the number of files/streams to merge at once

mapred.compress.map.output true/false depending on whether you want to compress the mapper emitted data

mapred.map.output.compression.codec specifies the codec that you want to use to compress the intermediate data

mapred.output.compression.type RECORD/BLOCK level compression

mapred.job.tracker.handler.count jobtracker maintains a pool of worker thread to handle RPC requests

mapred.jobtracker.taskScheduler The mapred.jobtracker.taskScheduler parameter specifies the Java class name of the scheduler plugin that should be used by the jobtracker

mapred.reduce.parallel.copies which controls the number of copies each reduce task initiates in parallel during the shuffle phase

mapred.reduce.tasks control the no of reduce tasks

tasktracker.http.threads no of threads avaiable to handle http request concurrently

mapred.reduce.slowstart.completed.maps indicates when to begin allocating reducers as a percentage of completed map tasks

mapred.acls.enabled Access control lists must be globally enabled prior to use






Wednesday, May 28, 2014

Hadoop Mapreduce : Working with Counters to get the outlook of data

The Hadoop system records a set of metric counters for each job that it runs. For example, the number of input records mapped, the number of bytes it reads from or writes to HDFS, etc. To profile your applications, you may wish to record other values as well. For example, if the records sent into your mappers fall into two categories (call them "A" and "B"), you may wish to count the total number of A-records seen vs. the total number of B-records.

lets run a small analytic on the following dataset to figure out how many records are of age less than 15 and how many of them are of greater than the 60 year of age.

"","Country","Region","Population","AgeGrp","FertilityRate","LifeExpectancy","ChildMortality","CellularSubscribers","LiteracyRate","GNI","PrimarySchoolEnrollmentMale","PrimarySchoolEnrollmentFemale"
"2","Albania","Europe",3162,21,1.75,74,16.7,96.39,NA,8820,NA,NA
"4","Andorra","Europe",78,15,NA,82,3.2,75.49,NA,NA,78.4,79.4
"8","Armenia","Europe",2969,62,1.74,71,16.4,103.57,99.6,6100,NA,NA
"10","Austria","Europe",8464,71,1.44,81,4,154.78,NA,42050,NA,NA
"11","Azerbaijan","Europe",9309,12,1.96,71,35.2,108.75,NA,8960,85.3,84.1
"16","Belarus","Europe",9405,9,1.47,71,5.2,111.88,NA,14460,NA,NA
"17","Belgium","Europe",11060,18,1.85,80,4.2,116.61,NA,39190,98.9,99.2
"22","Bosnia and Herzegovina","Europe",3834,71,1.26,76,6.7,84.52,97.9,9190,86.5,88.4
"26","Bulgaria","Europe",7278,9,1.51,74,12.1,140.68,NA,14160,99.3,99.7

lets configure our Mapper class with the Counter group "Under_15" and "Over_60" and extract the AgeGrp if we found the record which is greater than 60 increment the counter with count 1 and if the record is less than 15 then increment the counter with count 1.

package com.rajkrrsingh.counter;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CounterExMapper extends Mapper<Object, Text, NullWritable, NullWritable>{

public static final String UNDER_FIFTEEN_GROUP = "Under_15";
public static final String OVER_SIXTY_GROUP = "Over_60";

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {

String[] tokens = value.toString().split(",");

if(tokens[4] !=null && Integer.parseInt(tokens[4])>60) {
context.getCounter(OVER_SIXTY_GROUP, "Over_60").increment(1);
}else if (tokens[4] !=null && Integer.parseInt(tokens[4])<15) {
context.getCounter(UNDER_FIFTEEN_GROUP, "Under_15").increment(1);
}else {
context.getCounter(UNDER_FIFTEEN_GROUP, "NA").increment(1);
}
}

}

here is out job driver to get the overview of records which are greater than 60 and less than 15.

package com.rajkrrsingh.counter;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.Counter;

public class CounterDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();

if(commandLineArgs.length != 2) {
System.err.println("Usage : CounterDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf,"Count the record with age <15 and >60");
job.setJarByClass(CounterDriver.class);
job.setMapperClass(CounterExMapper.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));

int rc = job.waitForCompletion(true) ?0:1;

if(rc==0) {
for (Counter counter : job.getCounters().getGroup(CounterExMapper.OVER_SIXTY_GROUP)) {
System.out.println(counter.getDisplayName() + "\t"+ counter.getValue());
}

for (Counter counter : job.getCounters().getGroup(CounterExMapper.UNDER_FIFTEEN_GROUP)) {
System.out.println(counter.getDisplayName() + "\t"+ counter.getValue());
}
}

FileSystem.get(conf).delete(new Path(commandLineArgs[1]), true);

System.exit(rc);

}

}

Tuesday, May 27, 2014

Text analytics : binning the large data sets using MapReduce ( MultipleOutputs)

Divide and conquer is best way to overcome a big problem same is also applied to the large dataset.suppose we have a large data sets and we want to run the analytics overs some keywords which are scattered in the dataset then its better to partition the data in some bins based on the keywords and then run analytics over individual data set.
To provide binning of dataset hadoop mapreduce api shipped with the MulitpleOutputs, here you can find the general documentation to get familiar with the :
The MultipleOutputs class simplifies writing output data to multiple outputs

Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own OutputFormat, with its own key class and with its own value class.

Case two: to write data to different files provided by user

MultipleOutputs supports counters, by default they are disabled. The counters group is the MultipleOutputs class name. The names of the counters are the same as the output name. These count the number records written to each output name.

let suppose we have some keywords keyword1,keyword2,keyword3 in the provided data set and we want to create three bins based on the three keyword provided here.
lets configure our mapper which take each line as input and and look the match for the provided keyword,if it find the match in the line then mapper emits the line as a value using mulipleOutput object created while setup of mapper gets called.

package com.rajkrrsingh.binning;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class BinMapper extends Mapper<Object, Text, Text, NullWritable>{

private MultipleOutputs<Text, NullWritable> mos = null;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, NullWritable>(context);
}

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
if(line.contains("keyword1")) {
mos.write("bins", value, NullWritable.get(), "keyword1");
}else if(line.contains("keyword2")) {
mos.write("bins", value,NullWritable.get(),"keyword2");
}else if(line.contains("keyword3")) {
mos.write("bins", value, NullWritable.get(), "keyword3");
}else {
mos.write("bins", value, NullWritable.get(), "no_match");
}
}

@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
mos.close();
}
}

Now configure your Driver class to setup your binning job
package com.rajkrrsingh.binning;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class BinningJobDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 2) {
System.err.println("Usage : BinningJobDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf, "Binning Job");
job.setJarByClass(BinningJobDriver.class);
job.setMapperClass(BinMapper.class);
job.setNumReduceTasks(0);

TextInputFormat.setInputPaths(job, new Path(commandLineArgs[0]));
TextOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));

MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,Text.class, NullWritable.class);
MultipleOutputs.setCountersEnabled(job, true);
int rc = job.waitForCompletion(true) ? 0 : 2;
System.exit(rc);

}

}



MapReduce : Get Distinct Records from the Big data set.

in this tutorial we will do some analytics of data using map-reduce, we will find out the distinct records from the data set using map reduce.
lets walk trough our sample data which look like as follows, its a sample data of a telecom company which has subscriber id along with the tower id,we will run a map reduce on the dataset to find out the distinct records :
subId=00001111911128052627,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.4621702216543667E17
subId=00001111911128052639,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212219.6726312167218586E17
subId=00001111911128052615,towerid=11232w34532543456345623453456984756894758,bytes=122112212212212216.9431647633139046E17
subId=00001111911128052615,towerid=11232w34532543456345623453456984756894757,bytes=122112212212212214.7836041833447418E17
subId=00001111911128052639,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212219.0366596827240525E17
subId=00001111911128052619,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.0686280014540467E17
subId=00001111911128052658,towerid=11232w34532543456345623453456984756894759,bytes=122112212212212216.9860890496178944E17
subId=00001111911128052652,towerid=11232w34532543456345623453456984756894756,bytes=122112212212212218.303981333116041E17

Here is my mapper code which takes line of data as input and extract and emit the towerid along with the NullWritable.

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistinctMapper extends Mapper<Object, Text, Text, NullWritable> {

private Text outTowerId = new Text();

@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {

String[] tokens = value.toString().split(",");
String[] towerIdEx = tokens[1].split("=");
String towerId = towerIdEx[1].toString();
if(towerId != null) {
outTowerId.set(towerId);
}
context.write(outTowerId, NullWritable.get());

}

}

The reducer code is plain identity reducer that writes the key along with the NullWritable values

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable>{

@Override
protected void reduce(Text key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}

}

Now configure your driver class as follows:

package com.rajkrrsingh.distinct;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DistinctDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 2) {
System.err.println("Usage : DistinctDriver <input> <output>");
System.exit(2);
}

Job job = new Job(conf, "Distinct Towers");
job.setJarByClass(DistinctDriver.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[1]));
int rc = job.waitForCompletion(true)?0:1;
System.exit(rc);
}

}

Distributed Grep : Implementation of distributed grep on the big data set using MapReduce

Grep is a command-line utility for searching plain-text data sets for lines matching a regular expression. Grep was originally developed for the Unix operating system, but is available today for all Unix-like systems. Its name comes from the ed command g/re/p (globally search a regular expression and print), which has the same effect: doing a global search with the regular expression and printing all matching lines

In this tutorial I will demonstarte you to implement distributed grep utility which will work on the files stored on the HDFS using MapReduce.It is a very simple program that receives as an input a regular expression, scans a bunch of input files, find any matching strings while counting the occurrences of each match and finally outputs the result into an output file.

lets create our Mapper as follows

package com.rajkrrsingh.disributedgrep;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistributedGrepMapper extends Mapper<Object, Text, NullWritable, Text>{

@Override
protected void map(Object key, Text value,Context context) throws IOException, InterruptedException {
String txt = value.toString();
String regex = context.getConfiguration().get("regex");

if (txt.matches(regex)) {
context.write(NullWritable.get(), value);
}
}

}

Now we need to implement the driver class which will run the map reduce on the cluster and produce the output.
package com.rajkrrsingh.disributedgrep;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DistributedGrepDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] commandLineArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();
if(commandLineArgs.length != 3) {
System.err.println("Usage : DistributedGrepDriver <regex> <input> <output>");
System.exit(2);
}

conf.set("regex", commandLineArgs[0]);
Job job = new Job(conf,"Distributed Grep Using MapReduce");
job.setJarByClass(DistributedGrepDriver.class);
job.setMapperClass(DistributedGrepMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(commandLineArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(commandLineArgs[2]));
int rc = job.waitForCompletion(true) ? 0 : 1;
System.exit(rc);
}


}

Sunday, November 24, 2013

HBase compatibility with Hadoop 2.x

HBase releases downloaded from the mirror site are compiled against a certain Hadoop version.for example,hbase-0.94.7 and hbase-0.92.2 are compatible with Hadoop1.0.x version.They wont work with the Hadoop 2.x.

If you have Hadoop 2.x version installed on your system,you need to download the HBase source and compile against the Hadoop 2.x version.

let it make it to work with Hadoop 2.x version

Download and install maven latest version

Install subversion
$sudo yum install subversion

Checkout HBase code
$svn co http://svn.apache.org/repos/asf/hbase/tags/0.94.7 hbase-0.94.7

Now go to the HBase directory and build an HBase tarball,use -Dhadoop.profile option to compile against the Hadoop 2.x
$MAVEN_OPTS="-Xmx2g"
$mvn clean install javadoc:aggregate site assembly:assembly -DskipTests -Prelease -Dhadoop.profile=2.0

verify that *.tar.gz file produced in the target directory

Tuesday, November 19, 2013

Apache Oozie Workflow : Configure and Running a MapReduce job

In this post I will demonstrate you how to configure the Oozie workflow. let's develop a simple MapReduce program using java, if you find any difficulties in doing it then download the code from my git location.Download

Please follow my earlier post to install and run oozie server, create a job directory say SimpleOozieMR as per following directory structure

---SimpleOozieMR
----workflow
-----lib
------workflow.xml

in the lib folder copy the you hadoop job jar and related jars.
let's configure our workflow.xml and keep it into the workflow directory as shown.
<workflow-app name="WorkFlowPatentCitation" xmlns="uri:oozie:workflow:0.1">
    <start to="JavaMR-Job"/>
        <action name="JavaMR-Job">
                <java>
                        <job-tracker>${jobTracker}</job-tracker>
                        <name-node>${nameNode}</name-node>
                        <prepare>
                                <delete path="${outputDir}"/>
                        </prepare>
                        <configuration>
                            <name>mapred.queue.name</name>
       <value>default</value>
                        </configuration>
      <main-class>com.rjkrsinghhadoop.App</main-class>
      <arg>${citationIn}</arg>
      <arg>${citationOut}</arg>
                </java>
                <ok to="end"/>
                <error to="fail"/>
        </action>
        <kill name="fail">
            <message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
        </kill>
    <end name="end" />
</workflow-app>

Now configure your properties file PatentCitation.properties as follows
nameNode=hdfs://master:8020
jobTracker=master:8021
queueName=default
citationIn=citationIn-hdfs
citationOut=citationOut-hdfs
oozie.wf.application.path=$(namenode)/user/rks/oozieworkdir/SimpleOozieMR/workflow

lets create a shell script which will run your first oozie job:
#!/bin/sh
#
export OOZIE_URL="http://localhost:11000/oozie"
#copy your input data to the hdfs
hadoop fs -copyFromLocal /home/rks/CitationInput.txt citationIn-hdfs
#copy SimpleOozieMR to hdfs
hadoop fs -put /home/rks/SimpleOozieMR SimpleOozieMR
#running the oozie job
cd /usr/lib/oozie/bin/
oozie job -config /home/rks/SimpleOozieMR/PatentCitation.properties -run