Table of Contents

Returning Aggregated Results

A PMI invocation handler class can return an aggregated result value to the client that initiated the operation. Derive from Reduce or ReduceWithParam to return a result.

Procedure

  1. Create a new class to process the PMI operation, supplying the appropriate type parameters to the base invocation handler class.

    • In addition to the TKey and TValue that a ForEach handler uses, a reduce handler requires a TResult to indicate the result type.

    • This example analyzes a cache that tracks the most recent login date for an application's users. The cache's key type is a string (representing a User ID) and its value type is a DateTime (representing a login time). The handler uses an integer as its result type (representing a count of inactive users).

    using System;
    using System.IO;
    using Scaleout.Client;
    using Scaleout.Client.MethodInvocation;
    
    class ReturnInactiveUsers : Reduce<string, DateTime, int>
    {
        public override int AccumulatorFactory()
        {
            // ...
        }
    
        public override int MergeFinal(int result1, int result2)
        {
            // ...
        }
    
        public override int Evaluate(string key,
                                     int accumulator,
                                     OperationContext<string, DateTime> context)
        {
            // ...
        }
    }
    
  2. Implement the accumulator factory method to initialize result values at the beginning of a PMI operation.

    • The PMI engine uses multiple threads to evaluate objects in a cache. Each thread maintains its own thread-local result value to minimize locking overhead, so the AccumulatorFactory is called multiple times over the course of a PMI operation.
    public override int AccumulatorFactory()
    {
        return 0;
    }
    
  3. Implement your custom evaluation code. The accumulated result for the operation is passed into the Evaluate method via the accumulator parameter, and the method should return a (possibly) modified accumulator value that will be passed to the next Evaluate made call by the PMI pipeline.

     public override int Evaluate(string key, 
                                  int accumulator, 
                                  OperationContext<string, DateTime> context)
    {
        // Use the cache in the PMI context to retrieve the object being evaluated:
        var readResponse = context.Cache.Read(key);
        if (readResponse.Result == ServerResult.NotFound)
        {
            Console.WriteLine($"{key} removed by another client during PMI operation.");
            return accumulator;
        }
        else if (readResponse.Result != ServerResult.Retrieved)
        {
            throw new Exception($"Unexpected result {readResponse.Result} reading {key}.");
        }
    
        // Perform analysis, incrementing the accumulated result if criteria is met.
        DateTime lastLogin = readResponse.Value;
        TimeSpan inactiveTime = DateTime.UtcNow - lastLogin;
        if (inactiveTime > TimeSpan.FromDays(7))
        {
            return ++accumulator;
        }
        else
        {
            return accumulator;
        }
    }
    
  4. Implement the MergeFinal method to combine multiple result objects.

    • The PMI engine combines partial results from multiple threads on multiple servers into a single value that is ultimately returned to the Invoke call made from a client application. The MergeFinal method must merge two partial results into a single combined result.
    public override int MergeFinal(int result1, int result2)
    {
        return result1 + result2;
    }
    
  5. Optional, but recommended: Implement custom serialization of the TResult type by overriding the DeserializeResult and SerializeResult methods.

    • If these methods are not overridden, the base class will use Microsoft's BinaryFormatter for serialization. Although it is easy to use, most invocation handlers should avoid the BinaryFormatter because it does not perform well, and it is not cross-platform.
    public override int DeserializeResult(Stream stream)
    {
        BinaryReader br = new BinaryReader(stream);
        return br.ReadInt32();
    }
    
    public override void SerializeResult(int result, Stream stream)
    {
        BinaryWriter bw = new BinaryWriter(stream);
        bw.Write(result);
    }
    
  6. At application startup, register the class as an invocation handler.

class Program
{
    static void Main(string[] args)
    {
        // Connect to the cache that stores login times.
        var conn = GridConnection.Connect("bootstrapGateways=localhost:721");
        var cacheBuilder = new CacheBuilder<string, DateTime>("LoginTimes", conn);
        var cache = cacheBuilder.Build();

        // Register the handler with the cache and give it a name.
        ServiceEvents.SetInvokeHandler(cache, 
		                                 "Get inactive count", 
										 new ReturnInactiveUsers());

        // Run indefinitely to handle invocation requests.
        Console.ReadLine();
    }
}
Note

If you are running your handler in an Invocation Grid, this code should be put in your project's Startup.Configure() method.

  1. Build your handler application and deploy it to all the servers in your farm that are running the ScaleOut service.

Result

Once your handler app is running on all your ScaleOut hosts, you are ready to process PMI requests.