Provides a simple and fast way to perform a MapReduce operation that does not require Hadoop infrastructure.
Namespace: Soss.Client.Concurrent
Assembly: soss_namedcache (in soss_namedcache.dll) Version: 6.2.0.0
Syntax
public static bool RunMapReduce<MK, MV, OK, OV>( NamedMap<TKey, TValue> inputMap, NamedMap<OK, OV> outputMap, Mapper<TKey, TValue, MK, MV> mapper, Combiner<MK, MV> combiner, Reducer<MK, MV, OK, OV> reducer, TimeSpan timeout )
Parameters
- inputMap
- Type: Soss.Client.ConcurrentNamedMapTKey, TValue
The input named map instance. - outputMap
- Type: Soss.Client.ConcurrentNamedMapOK, OV
The output named map instance. - mapper
- Type: Soss.Client.Concurrent.MapReduceMapperTKey, TValue, MK, MV
Mapper class that inherits from the MapperIK, IV, MK, MV abstract class and implements the Map abstract method. - combiner
- Type: Soss.Client.Concurrent.MapReduceCombinerMK, MV
Combiner class that inherits from the CombinerMK, MV abstract class and implements the Combine abstract method. - reducer
- Type: Soss.Client.Concurrent.MapReduceReducerMK, MV, OK, OV
Reducer class that inherits from the ReducerMK, MV, OK, OV abstract class and implements the Reduce abstract method. - timeout
- Type: SystemTimeSpan
Timeout for the MapReduce operation
Type Parameters
- MK
- Intermediate key type, the mapper's key output type.
- MV
- Intermediate value type, the mapper's value output type.
- OK
- Output key type of the MapReduce operation.
- OV
- Output value type of the MapReduce operation.
Return Value
Type: Booleantrue if operation succeeds, otheriwse false.
Remarks
By default, the NamedMap uses Microsoft's BinaryFormatter serializer, but for the best performance you may want to consider using serializers with more compact output (for example, serializers that implement the Protocol Buffers or MessagePack serialization format).
Examples
Performing a MapReduce operation using the NamedMap class
using System; using System.Text; using System.Reflection; using System.Diagnostics; using System.Collections.Generic; using Soss.Client; using Soss.Client.Concurrent; using Soss.Client.Concurrent.MapReduce; /// <summary> /// This sample illustrates how to run a MapReduce operation using the NamedMap API, /// that offers two signatures of the RunMapReduce method: the first one is an instance based, /// allowing to run the MapReduce task using locally stored key/value pairs; the second one /// represents a static method that gets the input named map data collection and outputs reduced values /// into a separate named map. /// /// This sample illustrates use of static RunMapReduce's method signatuire. It implements a classic /// word count algorhithm: first it generates rows of words and then counts of how many unique words were /// generated. Finally it uses a simple query to check the validity of the output. /// </summary> class MapReduceUsage_Static { static void Main(string[] args) { NamedMap<int, string> inputMap = null; NamedMap<string, int> outputMap = null; InvocationGrid grid = null; InvocationGridBuilder igBuilder = null; int numberOfTextLines = 100; int numberOfWordsPerLine = 10; igBuilder = new InvocationGridBuilder("GridForMap"); igBuilder.AddDependency(Assembly.GetExecutingAssembly()); igBuilder.LingerTime = TimeSpan.FromSeconds(600); grid = igBuilder.Load(); inputMap = new NamedMap<int, string>("Test_WordCount_Input"); inputMap.ParallelOperationTransport = ParallelOperationTransport.UseSockets; inputMap.AutoCorrectStreamPosition = AutoCorrectStreamPosition.Always; inputMap.InvocationGrid = grid; inputMap.Clear(); Console.WriteLine("Populating the input map, adding {0} words", numberOfTextLines * numberOfWordsPerLine); BulkLoader<int, string> loader = inputMap.CreateBulkLoader(); for (int row = 0; row < numberOfTextLines; row++) loader.Put(row, GenerateTextLine(numberOfWordsPerLine)); loader.Close(); Console.WriteLine("Finished creating the input named map. Running the MapReduce operation...\n"); outputMap = new NamedMap<string, int>("Test_WordCount_Ouput"); outputMap.InvocationGrid = grid; // The TestWordCountMapper, TestWordCountCombiner and TestWordCountReducer classes // implement required methods of the corresponding public Mapper, Combiner and Reducer abstract classes: bool bRet = NamedMap<int, string>.RunMapReduce<string, int, string, int>(inputMap, outputMap, new TestWordCountMapper(), new TestWordCountCombiner(), new TestWordCountReducer(), SossTimeout.InfiniteTimeSpan); // // Query and output the results: // IEnumerable<string> keys = outputMap.ExecuteParallelQuery(new SimpleMRQueryCondition()); // Loop through the results and count the words: int queryResultCount = 0; int numberOfWords = 0; int totalNumber = outputMap.ExecuteCount(); // Validation foreach (string key in keys) { outputMap.TryGetValue(key, out numberOfWords); Console.WriteLine(string.Format("Word: {0}, Count: {1}", key, numberOfWords)); queryResultCount++; } Debug.Assert(queryResultCount == totalNumber); Console.WriteLine("______________________________________"); Console.WriteLine(string.Format("Total number of unique words: {0}", totalNumber)); if (inputMap != null) inputMap.Clear(); if (outputMap != null) outputMap.Clear(); if (grid != null) grid.Unload(); Console.WriteLine("TestWordCount is completed. Press Enter to finish the program."); Console.ReadLine(); } // Main public static string GenerateTextLine(int numOfWordsPerRow) { string alphabet = "abcdefghijklmnopqrstuvwxyz"; int alphabetSize = alphabet.Length; Random random = new Random(); StringBuilder builder = new StringBuilder(); // Generate 4 letter words: for (int wordPerRow = 1; wordPerRow <= numOfWordsPerRow; wordPerRow++) { builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1)); builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1)); builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1)); builder.Append(alphabet.Substring(random.Next(0, alphabetSize), 1)); if (wordPerRow > 0 && wordPerRow % numOfWordsPerRow != 0) builder.Append(" "); } return builder.ToString(); } } [Serializable] public class TestWordCountMapper : Mapper<int, string, string, int> { public TestWordCountMapper() { } public override void Map(int key, string value, IContext<string, int> context) { byte[] buf = Encoding.ASCII.GetBytes(value); int length = buf.Length; int start = 0, cur = 0; int SPACE = 0x20; // Parsing the line of words while (start < length) { for (cur = start; cur < length; cur++) { if (buf[cur] == SPACE) break; } context.Emit(Encoding.ASCII.GetString(buf, start, cur - start), 1); start = cur + 1; } } } // class TestWordCountMapper [Serializable] public class TestWordCountCombiner : Combiner<string, int> { public TestWordCountCombiner() { } public override int Combine(string key, IEnumerable<int> values) { int sum = 0; foreach (int val in values) sum += val; return sum; } } // class TestWordCountCombiner [Serializable] public class TestWordCountReducer : Reducer<string, int, string, int> { public TestWordCountReducer() { } public override void Reduce(string key, IEnumerable<int> values, IContext<string, int> context) { int sum = 0; foreach (int val in values) sum += val; context.Emit(key, sum); } } // class TestWordCountReducer /// <summary> /// Class representing a query condition to be used in a /// NamedMap.ExecuteParallelQuery() call. /// </summary> [Serializable] public class SimpleMRQueryCondition : QueryCondition<string, int> { /// <summary> /// CheckCondition override. /// </summary> /// <param name="word">Key to the string object in the NamedMap representing a word.</param> /// <param name="numberOfWords">The number of times this word was found in the first input map.</param> /// <returns>true if the condition is satisfied, otherwise false</returns> public override bool CheckCondition(string word, int numberOfWords) { // do nothing return true; } }
See Also