0
0
HadoopHow-ToBeginner ยท 4 min read

How to Write MapReduce Program in Python for Hadoop

To write a MapReduce program in Python for Hadoop, create two Python scripts: one for the mapper and one for the reducer. Use Hadoop Streaming to run these scripts by passing them as arguments to the hadoop jar command, which allows Hadoop to execute Python code for MapReduce tasks.
๐Ÿ“

Syntax

A MapReduce program in Python for Hadoop consists of two scripts: a mapper and a reducer. The mapper reads input line by line, processes it, and outputs key-value pairs. The reducer reads these pairs, aggregates or processes them, and outputs the final result.

Use the Hadoop Streaming command to run the Python scripts:

  • hadoop jar /path/to/hadoop-streaming.jar - runs the streaming utility
  • -input <input_path> - specifies input data location
  • -output <output_path> - specifies output data location
  • -mapper <mapper_script.py> - specifies the mapper script
  • -reducer <reducer_script.py> - specifies the reducer script
bash
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
  -input /input/path \
  -output /output/path \
  -mapper mapper.py \
  -reducer reducer.py
๐Ÿ’ป

Example

This example shows a simple word count MapReduce program in Python. The mapper outputs each word with count 1. The reducer sums counts for each word.

python
# mapper.py
import sys
for line in sys.stdin:
    words = line.strip().split()
    for word in words:
        print(f"{word}\t1")

# reducer.py
import sys
current_word = None
current_count = 0
for line in sys.stdin:
    word, count = line.strip().split('\t')
    count = int(count)
    if current_word == word:
        current_count += count
    else:
        if current_word is not None:
            print(f"{current_word}\t{current_count}")
        current_word = word
        current_count = count
if current_word == word:
    print(f"{current_word}\t{current_count}")
Output
apple 3 banana 2 orange 1
โš ๏ธ

Common Pitfalls

  • Not flushing output or buffering can cause Hadoop to miss mapper/reducer output.
  • Incorrectly splitting input lines or keys can cause errors.
  • Forgetting to handle the last key in reducer causes missing output.
  • Not setting executable permissions on Python scripts causes Hadoop to fail running them.

Always test mapper and reducer scripts locally with sample input before running on Hadoop.

python
# Wrong reducer missing last key output
import sys
current_word = None
current_count = 0
for line in sys.stdin:
    word, count = line.strip().split('\t')
    count = int(count)
    if current_word == word:
        current_count += count
    else:
        if current_word is not None:
            print(f"{current_word}\t{current_count}")
        current_word = word
        current_count = count
# Missing final print here

# Correct reducer includes final print
import sys
current_word = None
current_count = 0
for line in sys.stdin:
    word, count = line.strip().split('\t')
    count = int(count)
    if current_word == word:
        current_count += count
    else:
        if current_word is not None:
            print(f"{current_word}\t{current_count}")
        current_word = word
        current_count = count
if current_word == word:
    print(f"{current_word}\t{current_count}")
๐Ÿ“Š

Quick Reference

Tips for writing Python MapReduce programs in Hadoop:

  • Use sys.stdin to read input line by line.
  • Output key-value pairs separated by tab \t.
  • Ensure scripts have executable permission: chmod +x mapper.py reducer.py.
  • Test scripts locally with sample input before Hadoop run.
  • Use Hadoop Streaming jar to run Python scripts as mapper and reducer.
โœ…

Key Takeaways

Write separate Python scripts for mapper and reducer using sys.stdin and print for output.
Run Python MapReduce jobs on Hadoop using the Hadoop Streaming jar with -mapper and -reducer options.
Always handle the last key output in reducer to avoid missing results.
Set executable permissions on Python scripts before running on Hadoop.
Test your mapper and reducer scripts locally with sample data before Hadoop execution.