The following program is a modified WordCount example which illustrates many of the approaches outlined in previous sections:
- It uses the NamedMap as input and output for a ScaleOut hServer job,
- It implements a custom Writable serialization, and
- It performs a parallel query of the output map to determine which words are used more frequently than a provided threshold.
package com.scaleoutsoftware.soss.hserver.examples; import com.scaleoutsoftware.soss.client.InvocationGrid; import com.scaleoutsoftware.soss.client.InvokeException; import com.scaleoutsoftware.soss.client.map.NamedMap; import com.scaleoutsoftware.soss.client.map.NamedMapFactory; import com.scaleoutsoftware.soss.client.map.QueryCondition; import com.scaleoutsoftware.soss.hserver.GridOutputFormat; import com.scaleoutsoftware.soss.hserver.HServerJob; import com.scaleoutsoftware.soss.hserver.NamedMapInputFormat; import com.scaleoutsoftware.soss.hserver.WritableSerializer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; import java.util.StringTokenizer; public class NamedMapWordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 3) { System.err.println("Usage: wordcount <input map> <output map> <threshold>"); System.exit(2); } final int threshold = new Integer(otherArgs[2]); //Create named maps NamedMap<IntWritable, Text> inputMap = NamedMapFactory.getMap(otherArgs[0], new WritableSerializer<IntWritable>(IntWritable.class), new WritableSerializer<Text>(Text.class)); NamedMap<Text, IntWritable> outputMap = NamedMapFactory.getMap(otherArgs[1], new WritableSerializer<Text>(Text.class), new WritableSerializer<IntWritable>(IntWritable.class)); outputMap.clear(); //clear output map //Create the invocation grid InvocationGrid grid = HServerJob.getInvocationGridBuilder("WordCountIG"). addJar("myjob.jar"). load(); //Create hServer job Job job = new HServerJob(conf, "word count", false, grid); job.setJarByClass(NamedMapWordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(NamedMapInputFormat.class); job.setOutputFormatClass(GridOutputFormat.class); //Set named maps as input and output NamedMapInputFormat.setNamedMap(job, inputMap); GridOutputFormat.setNamedMap(job, outputMap); //Execute job job.waitForCompletion(true); //Assign invocation grid to the map, so parallel operation can be performed outputMap.setInvocationGrid(grid); //Run query to find words that are used more than threshold frequency Iterable<Text> words = outputMap.executeParallelQuery(new UsageFrequencyCondition(threshold)); //Unload the invocation grid grid.unload(); //Output resulting words and their frequencies System.out.println("Following words were used more than " + threshold + " times:"); for(Text word : words) { System.out.println("\""+word.toString()+"\" was used " + outputMap.get(word) + " times."); } } //Implementation of the query condition. Condition is true if //the usage frequency exceeds threshold frequency static class UsageFrequencyCondition implements QueryCondition<Text, IntWritable> { private int frequency; UsageFrequencyCondition(int frequency) { this.frequency = frequency; } @Override public boolean check(Text key, IntWritable value) throws InvokeException { return value.get() > frequency; } } }