Monday, November 11, 2013

Mapreduce : Writing output to multiple files using MultipleOutputFormat

Multiple Outputs
FileOutputFormat and its subclasses generate a set of files in the output directory. There is one file per reducer, and files are named by the partition number: part-00000, part-00001, etc. There is sometimes a need to have more control over the naming of the files or to produce multiple files per reducer. MapReduce comes with two libraries to help you do this: MultipleOutputFormat and MultipleOutputs.

MultipleOutputFormat
MultipleOutputFormat allows you to write data to multiple files whose names are derived from the output keys and values. MultipleOutputFormat is an abstract class with two concrete subclasses, MultipleTextOutputFormat and MultipleSequenceFileOutputFormat, which are the multiple file equivalents of TextOutputFormat and SequenceFileOutputFormat. MultipleOutputFormat provides a few protected methods that subclasses can override to control the output filename. In Example 7-5, we create a subclass of MultipleTextOutputFormat to override the generateFileNameForKeyValue() method to return the station ID, which we extracted from the record value.

-- reference Hadoop Definitive guide

In this example I will demonstrate you how to write output data to multiple files.you can find the code of this example on the following git location

we have our sample customer data with attribute customer no,cust name, region, company. we will write the same region customer to the same file along with the other attributes.
customer no,customer name,region,company
.................................................................
9899821411,"Burke, Honorato U.",Alaska,Eu Incorporated
9899821422,"Bell, Emily R.",Arizona,Ut Eros Non Company
9899821379,"Hewitt, Chelsea Y.",PA,Egestas Aliquam Fringilla LLP
9899821387,"Baldwin, Merrill H.",VT,Rhoncus Proin Corp.
9899821392,"Bradshaw, Uma H.",OH,Nam Nulla Associates
9899821453,"Pollard, Boris G.",Hawaii,Consequat Corp.
9899821379,"Avila, Velma D.",OR,Sodales LLC

Create your Mapper Class as follows:
package com.rajkrrsingh.mr.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MultipleOutputMapper extends Mapper<LongWritable, Text, Text, Text> {
 
 private Text txtKey = new Text("");
 private Text txtValue = new Text("");
 
 @Override
 protected void map(LongWritable key, Text value,Context context)
   throws IOException, InterruptedException {
  if(value.toString().length() > 0) {
   String[] custArray = value.toString().split(",");
   txtKey.set(custArray[0].toString());
   txtValue.set(custArray[1].toString()+"\t"+custArray[3].toString());
   context.write(txtKey, txtValue);
  }
 }

}

Setup your reduce class:
package com.rajkrrsingh.mr.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MultiOutputReducer extends Reducer<Text, Text, Text, Text>{
 
 private MultipleOutputs multipleOutputs;
 
 @Override
 protected void setup(Context context) throws IOException, InterruptedException {
  multipleOutputs  = new MultipleOutputs(context);
 }
 
 @Override
 protected void reduce(Text key, Iterable<Text> values,Context context)
   throws IOException, InterruptedException {
  for(Text value : values) {
   multipleOutputs.write(key, value, key.toString());
  }
 }
 
 @Override
 protected void cleanup(Context context)
   throws IOException, InterruptedException {
  multipleOutputs.close();
 }

}

setup your driver class as follows:
package com.rajkrrsingh.mr.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class App extends Configured implements Tool
{
    public static void main( String[] args ) throws Exception
    {
       int exitCode = ToolRunner.run(new Configuration(),new App(),args);
       System.exit(exitCode);
    }

 @Override
 public int run(String[] args) throws Exception {
  if(args.length != 2) {
   System.out.println("Two Params are required to extecute App <input-path> <output-path>");
  }
  
  Job job = new Job(getConf());
  job.setJobName("MultipleOutputFormat example");
  job.setJarByClass(App.class);
  LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  job.setMapperClass(MultipleOutputMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setReducerClass(MultiOutputReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setNumReduceTasks(5);
  
  boolean success = job.waitForCompletion(true);
  return success ? 0 : 1;
 }
}

Create the job jar using maven assembly plugin and run on your hadoop cluster
$bin/hadoop jar /home/rks/MultipleOutputExample/target/MultipleOutputExample.jar com.rajkrrsingh.mr.hadoop.App 
/user/rks/input /user/rks/output