How to Write a MapReduce Program in Java for Hadoop
To write a MapReduce program in Java for Hadoop, create a
Mapper class to process input data and a Reducer class to aggregate results. Then, configure and run the job using Job class in your main method.Syntax
A MapReduce program in Hadoop Java has three main parts:
- Mapper class: Processes input key-value pairs and outputs intermediate key-value pairs.
- Reducer class: Receives intermediate keys and list of values, then outputs final key-value pairs.
- Driver (main) class: Configures the job, sets input/output paths, and starts the job.
java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // map logic here } } public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // reduce logic here } } public class MyDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "job name"); job.setJarByClass(MyDriver.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Example
This example counts the occurrences of each word in a text file using MapReduce in Hadoop Java.
java
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("\\s+"); for (String token : tokens) { if (!token.isEmpty()) { word.set(token); context.write(word, one); } } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Output
word1 3
word2 5
word3 2
...
Common Pitfalls
- Not setting the correct output key/value classes in the job configuration causes runtime errors.
- Forgetting to set the jar class with
job.setJarByClass()leads to class not found errors. - Not handling empty or null input lines in the mapper can cause exceptions.
- Using wrong input/output paths or not deleting output directory before running causes job failure.
java
/* Wrong: Missing setJarByClass causes class not found error */ job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); // Missing job.setJarByClass(MyDriver.class); /* Right: Always set jar class */ job.setJarByClass(MyDriver.class);
Quick Reference
Remember these key steps when writing a Hadoop MapReduce Java program:
- Extend
Mapperand overridemap()method. - Extend
Reducerand overridereduce()method. - Configure job with input/output paths and classes.
- Run job with
job.waitForCompletion(true).
Key Takeaways
Create Mapper and Reducer classes by extending Hadoop base classes and overriding map and reduce methods.
Configure the job properly with input/output paths and set the jar class to avoid runtime errors.
Use Combiner class to optimize performance when possible.
Always handle input data carefully to avoid null or empty values causing exceptions.
Delete output directory before running the job to prevent job failure.