Passing Custom Parameters

ScaleOut ComputeServer’s Simple MapReduce framework can also pass customized parameter objects to the application’s Mappers and Reducers. The following sample demonstrates how to pass a parameter object to the word count Mappers. In this sample, the parameter object is used as a lookup table to ignore the article adjectives, "a", "an", and "the" during the map phase of the MapReduce invocation.

// Create a NamedMap input map
NamedMap<Integer, String> inputMap = NamedMapFactory.getMap("WordCount_InputMap_" + System.currentTimeMillis());
// Create a NamedMap output map
NamedMap<String, Integer> outputMap = NamedMapFactory.getMap("WordCount_OutputMap_" + System.currentTimeMillis());

HashMap<String, Boolean> ignoreTheseWords = new HashMap<String, Boolean>(3);
ignoreTheseWords.put("a", false);
ignoreTheseWords.put("an", false);
ignoreTheseWords.put("the", false);

MapReduce wordcount = new MapReduce(inputMap, outputMap, WordCountMapper.class, WordCountReducer.class)
                                        .withCombiner(WordCountCombiner.class)
                                        .withParameterObject(ignoreTheseWords);

wordcount.run(TimeSpan.INFINITE_TIMEOUT);

We can get the parameter object from the Context class and we will use the Mapper’s setup() method to get and set the parameter object once:

public static class WordCountMapper implements Mapper<Object, String, String, Integer> {
        private final static Integer one = new Integer(1);
        private static volatile HashMap<String, Boolean> ignoreTheseWords = null;

        @Override
        public void map(Object key, String value, Context<String, Integer> context) {
                StringTokenizer tokenizer = new StringTokenizer(value);
                while (tokenizer.hasMoreTokens()) {
                        String word = tokenizer.nextToken();
                        if(ignoreTheseWords.containsKey(word))
                                continue;
                        else
                                context.emit(word, one);
                }
        }

        @Override
        public Class<String> getEmittedKeyClass() {
                return String.class;
        }

        @Override
        public Class<Integer> getEmittedValueClass() {
                return Integer.class;
        }

        @Override
        public void setup(Context<String, Integer> context) {
                if(ignoreTheseWords == null) {
                        synchronized (WordCountMapper.class) {
                                if(ignoreTheseWords == null) {
                                        ignoreTheseWords = (HashMap<String,Boolean>)context.getParameterObject();
                                }
                        }
                }
        }

        @Override
        public void cleanup(Context<String, Integer> context) {
        }
}