Hadoop reducer string manipulation doesn't work

Tag: hadoop Author: wanfyi1997 Date: 2010-09-15

Hi Text manipulation in Reduce phase seems not working correctly. I suspect problem could be in my code rather then hadoop itself but you never know... If you can spot any gotchas let me know. I wasted a day trying to figure out what’s wrong with this code.

my sample input file called simple.psv

12345   [email protected]|m|1975
12346   [email protected]|m|1981

my Mapper and reducer code

package simplemapreduce;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;

 * @author
public class Main {

    public static class SimpleMap extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text inputs,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            String inputString = inputs.toString();
            //System.out.println("CRM Map record:"+inputString);
            StringTokenizer tokenizer = new StringTokenizer(inputString);
            Text userid = new Text();
            if (tokenizer.hasMoreTokens()) {
                Text data = new Text();
                if (tokenizer.hasMoreTokens()) {
                } else {
                output.collect(userid, data);

     * A reducer class that just emits its input.
    public static class SimpleReduce extends MapReduceBase implements
            Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            while (values.hasNext()) {
                Text txt = values.next();
                String inputString = "<start>" + txt.toString() + "<end>";
                Text out = new Text();
                output.collect(key, out);


    public static void main(String[] args) throws IOException {

        if (args.length != 2) {
            System.err.println("Usage: SimpleMapReduce <input path> <output path>");
        JobConf conf = new JobConf(Main.class);
        conf.setJobName("Simple Map reducer");

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

my sample launch script called simple.sh


hadoop jar SimpleMapReduce.jar \
  /full/path/to/input/simple.tsv  /user/joebloggs/output

expected output

12345   <start>[email protected]|m|1975<end>
12346   <start>[email protected]|m|1981<end>

actual output

12345   <start><start>[email protected]|m|1975<end><end>
12346   <start><start>[email protected]|m|1981<end><end>

I tested this on Amazon s3 as well on Linux if you could spot the problem and let me know what it is... that will really save some hair on my head!

Other Answer1

The basic flow of data through the system is:

Input -> Map -> Reduce -> output.

As a performance optimization the combiner has been added to allow a computer (one of the many in the hadoop cluster) to do a partial aggregation of the data before it is transmitted to the system where the actual reducer is run.

In the word count example it is fine to start with these values :

1 1 1 1 1 1 1 1 1 1

combine them into

3 4 2 1

and the reduce them into the final result


So the combiner is essentially a performance optimization. If you do not specify a combiner it will not change the information going through (i.e. it's an "identity reducer"). So you can only use the SAME class as both the combiner and reducer if the dataset remains valid that way. In your case: that is not true --> your data is now invalid.

You do:


So this makes the output of your mapper go through your reducer twice. The first one adds: "start" & "end" The second one adds "start" & "end" again.

Simple solution:

// conf.setCombinerClass(SimpleReduce.class);


Other Answer2

I had a problem wherein the reducer wont get all the data sent by the mapper. The reducer would only get upto the specific portion output.collect will emit. For Eg. for the Input Data:

12345 [email protected]|m|1975
12346 [email protected]|m|1981

if I say


Then it will not get the next two fields - sex and year of birth.

// conf.setCombinerClass(SimpleReduce.class);

Solved the problem.