Running a Simple MapReduce Job

The standard word count MapReduce application can be easily written using simple MapReduce. The following code snippets implement the necessary Mapper and Reducer interfaces as well as the optional Combiner interface.

Word count Mapper implementation:

public static class WordCountMapper implements Mapper<Object, String, String, Integer> {
        private final static Integer one = new Integer(1);

        @Override
        public void map(Object key, String value, Context<String, Integer> context) {
                StringTokenizer tokenizer = new StringTokenizer(value);
                while (tokenizer.hasMoreTokens()) {
                        String word = tokenizer.nextToken();
                        context.emit(word, one);
                }
        }

        @Override
        public Class<String> getEmittedKeyClass() {
                return String.class;
        }

        @Override
        public Class<Integer> getEmittedValueClass() {
                return Integer.class;
        }

        @Override
        public void setup(Context<String, Integer> context) {
        }

        @Override
        public void cleanup(Context<String, Integer> context) {
        }
}

Word count Combiner implementation:

public static class WordCountCombiner implements Combiner<String, Integer> {
        @Override
        public Integer combine(String key, Iterable<Integer> values) {
            int sum = 0;
            for (Integer val : values) {
                sum += val.intValue();
            }

            return new Integer(sum);
        }
}

Word count Reducer implementation:

public static class WordCountReducer implements Reducer<String, Integer, String, Integer> {
        @Override
        public void reduce(String key, Iterable<Integer> values, Context<String, Integer> context) {
                int sum = 0;
                for (Integer val : values) {
                        sum += val.intValue();
                }

                context.emit(key, new Integer(sum));
        }

        @Override
        public void setup(Context<String, Integer> context) {
        }

        @Override
        public void cleanup(Context<String, Integer> context) {
        }
}

After implementing the Mapper and Reducer interfaces, you can run a MapReduce job using the MapReduce class:

// Create a NamedMap input map
NamedMap<Integer, String> inputMap = NamedMapFactory.getMap("WordCount_InputMap_" + System.currentTimeMillis());
// Create a NamedMap output map
NamedMap<String, Integer> outputMap = NamedMapFactory.getMap("WordCount_OutputMap_" + System.currentTimeMillis());
MapReduce wordcount = new MapReduce(inputMap, outputMap, WordCountMapper.class, WordCountReducer.class)
                                                                        .withCombiner(WordCountCombiner.class);

wordcount.run(TimeSpan.INFINITE_TIMEOUT);