The standard word count MapReduce application can be easily written using simple MapReduce. The following code snippets implement the necessary Mapper and Reducer interfaces as well as the optional Combiner interface.
Word count Mapper implementation:
public static class WordCountMapper implements Mapper<Object, String, String, Integer> { private final static Integer one = new Integer(1); @Override public void map(Object key, String value, Context<String, Integer> context) { StringTokenizer tokenizer = new StringTokenizer(value); while (tokenizer.hasMoreTokens()) { String word = tokenizer.nextToken(); context.emit(word, one); } } @Override public Class<String> getEmittedKeyClass() { return String.class; } @Override public Class<Integer> getEmittedValueClass() { return Integer.class; } @Override public void setup(Context<String, Integer> context) { } @Override public void cleanup(Context<String, Integer> context) { } }
Word count Combiner implementation:
public static class WordCountCombiner implements Combiner<String, Integer> { @Override public Integer combine(String key, Iterable<Integer> values) { int sum = 0; for (Integer val : values) { sum += val.intValue(); } return new Integer(sum); } }
Word count Reducer implementation:
public static class WordCountReducer implements Reducer<String, Integer, String, Integer> { @Override public void reduce(String key, Iterable<Integer> values, Context<String, Integer> context) { int sum = 0; for (Integer val : values) { sum += val.intValue(); } context.emit(key, new Integer(sum)); } @Override public void setup(Context<String, Integer> context) { } @Override public void cleanup(Context<String, Integer> context) { } }
After implementing the Mapper and Reducer interfaces, you can run a MapReduce job using the MapReduce class:
// Create a NamedMap input map NamedMap<Integer, String> inputMap = NamedMapFactory.getMap("WordCount_InputMap_" + System.currentTimeMillis()); // Create a NamedMap output map NamedMap<String, Integer> outputMap = NamedMapFactory.getMap("WordCount_OutputMap_" + System.currentTimeMillis()); MapReduce wordcount = new MapReduce(inputMap, outputMap, WordCountMapper.class, WordCountReducer.class) .withCombiner(WordCountCombiner.class); wordcount.run(TimeSpan.INFINITE_TIMEOUT);