Single result optimization

It is often useful for a job to produce a single object as the result, for example, when combining the output of all mappers into a single output value. To accomplish this, the map output key space should consist of a single key, the reducer input and output types should match (i.e., the reducer can be used as a combiner), and the reducer/combiner should produce no more than one key value pair per call. If these conditions are met, the output of the job is a single object, which is the result of combining all the values for a single map output key.

ScaleOut hServer identifies and optimizes this usage model, allowing this type of job to run efficiently and without the need for an output format. Using the runAndGetResult(…) method of HServerJob to run the optimized job, the result object is returned directly to the application. The job should include a combiner to run this optimization.

To illustrate a single result optimization, the WordCount example can be modified to count the occurrences of a specific word:

public class SingleWordCount {

    private final static String wordPropertyName = "com.scaleoutsoftware.soss.hserver.examples.lookupWord";

    //The mapper is changed to emit values only for matching words
    public static class TokenizerMapper
            extends Mapper<Object, Text, NullWritable, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private String lookupWord;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            super.setup(context);
            String strings[] = context.getConfiguration().getStrings(wordPropertyName);
            if (strings.length == 1) {
                lookupWord = strings[0];
            } else throw new IOException("Word property is not set.");

        }

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                if (itr.nextToken().equals(lookupWord))  {
                    //Emit only for the matching words
                    context.write(NullWritable.get(), one);
                }
            }
        }
    }

    //The reducer is unchanged except for the key type
    public static class IntSumReducer
            extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {

        private IntWritable result = new IntWritable();

        @Override
        public void reduce(NullWritable key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: singlewordcount <in>  <word>");
            System.exit(2);
        }
        conf.setStrings(wordPropertyName, otherArgs[1]);
        HServerJob job = new HServerJob(conf, "Single word count");
        job.setJarByClass(SingleWordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        //The map output types should be specified
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setCombinerClass(IntSumReducer.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        System.out.println("Word \""+otherArgs[1]+"\" was used "+job.runAndGetResult()+" times.");
    }
}