0
0
HadoopHow-ToBeginner ยท 4 min read

How to Write a MapReduce Program in Java for Hadoop

To write a MapReduce program in Java for Hadoop, create a Mapper class to process input data and a Reducer class to aggregate results. Then, configure and run the job using Job class in your main method.
๐Ÿ“

Syntax

A MapReduce program in Hadoop Java has three main parts:

  • Mapper class: Processes input key-value pairs and outputs intermediate key-value pairs.
  • Reducer class: Receives intermediate keys and list of values, then outputs final key-value pairs.
  • Driver (main) class: Configures the job, sets input/output paths, and starts the job.
java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // map logic here
    }
}

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // reduce logic here
    }
}

public class MyDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "job name");
        job.setJarByClass(MyDriver.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
๐Ÿ’ป

Example

This example counts the occurrences of each word in a text file using MapReduce in Hadoop Java.

java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] tokens = value.toString().split("\\s+");
            for (String token : tokens) {
                if (!token.isEmpty()) {
                    word.set(token);
                    context.write(word, one);
                }
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
Output
word1 3 word2 5 word3 2 ...
โš ๏ธ

Common Pitfalls

  • Not setting the correct output key/value classes in the job configuration causes runtime errors.
  • Forgetting to set the jar class with job.setJarByClass() leads to class not found errors.
  • Not handling empty or null input lines in the mapper can cause exceptions.
  • Using wrong input/output paths or not deleting output directory before running causes job failure.
java
/* Wrong: Missing setJarByClass causes class not found error */
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// Missing job.setJarByClass(MyDriver.class);

/* Right: Always set jar class */
job.setJarByClass(MyDriver.class);
๐Ÿ“Š

Quick Reference

Remember these key steps when writing a Hadoop MapReduce Java program:

  • Extend Mapper and override map() method.
  • Extend Reducer and override reduce() method.
  • Configure job with input/output paths and classes.
  • Run job with job.waitForCompletion(true).
โœ…

Key Takeaways

Create Mapper and Reducer classes by extending Hadoop base classes and overriding map and reduce methods.
Configure the job properly with input/output paths and set the jar class to avoid runtime errors.
Use Combiner class to optimize performance when possible.
Always handle input data carefully to avoid null or empty values causing exceptions.
Delete output directory before running the job to prevent job failure.