Table of Contents

Overview

The arguments provided by the client-side PostEventsAsync caller are delivered to your server-side event handling code in a EventPayload object. This section covers how to host your server-side code and efficiently process these events.

Handling Event Streams

Hosting Server-Side Code

Before posting an event, server-side event handling code must be launched on the system(s) running the ScaleOut service.

The ScaleOut service load balances the delivery of events to all hosts in your cluster, so the event handling process must be started and running on all ScaleOut hosts before events are posted. Typically, these worker processes are launched and managed using ScaleOut's Invocation Grid feature. They can also be manually deployed as a long-running Windows Service or Linux daemon.

Registering a Posted Event Handler

Server-side event handling code uses the ServiceEvents.SetPostedEventHandler method to register a callback handler for posted events at process startup.

This callback is invoked repeatedly as events arrive, and typically consists of the following steps:

  1. If more than one type of event is being streamed, route the event to different handling logic as needed, typically using the EventPayload's EventInfo property to discriminate between different types of events.

  2. Deserialize an event object in the EventPayload's Bytes property, if one exists.

  3. Retrieve the object associated with the event from the local ScaleOut service.

  4. Perform any analysis and modify the state of the associated object.

  5. Persist the modified state back to ScaleOut service.

In the sample below, we continue with the streaming stock quote example from the prior topic. It illustrates registration of a basic server-side handler that deserializes an event and adds it to the price history of the associated stock object (a linked list of stock quotes).

static async Task Main(string[] args)
{
    var grid = await GridConnection.ConnectAsync("bootstrapGateways=localhost:721");
    var cb = new CacheBuilder<string, LinkedList<StockQuote>>("price histories", grid);
    cb.SetKeyEncoder(new ShortStringKeyEncoder());
    // Use MessagePack serialization
    cb.SetSerialization(
            (history, stream) => MessagePackSerializer.Serialize(stream, history),
            (stream) => MessagePackSerializer.Deserialize<LinkedList<StockQuote>>(stream)
    );
    var cache = cb.Build();

    // Inform the ScaleOut service that this client will handle events posted
    // to the price cache by providing a lambda callback:
    ServiceEvents.SetPostedEventHandler(cache, async (key, eventPayload) =>
    {
        // Extract stock quote from event payload.
        var quote = StockQuote.FromBytes(eventPayload.Bytes);

        // Retrieve (or add, if it doesn't exit) the price history
        // from the local ScaleOut service.
        var response = await cache.ReadOrAddAsync(
                              quote.Ticker,
                              valueFactory: () => new LinkedList<StockQuote>(new[] { quote })
                             );

        switch (response.Result)
        {
            case ServerResult.Retrieved:
                LinkedList<StockQuote> history = response.Value;

                // Add the incoming quote to the history:
                history.AddLast(quote);

                // Trim the history to 100 of the most recent quotes:
                if (history.Count > 100) history.RemoveFirst();

                // Persist updated history to ScaleOut service:
                await cache.UpdateAsync(quote.Ticker, history);
                break;
            case ServerResult.Added:
                // This was the first quote for the symbol. The valueFactory 
                // argument used in the ReadOrAddAsync call above already added a
                // new history object in the service with the quote, so there's 
                // nothing to do.
                break;
            default:
                Console.WriteLine($"Unexpected response: {response.Result}");
                return;
        }

        // Perform any actions/analysis/alerting here, as needed.
    });

    // Wait indefinitely for events to arrive.
    Console.ReadLine();
}

The application above performs the following actions:

  1. ServiceEvents.SetPostedEventHandler is called to register a lambda as a callback.

  2. The incoming event payload (assumed to be a StockQuote object) is deserialized.

  3. Cache.ReadOrAddAsync is called to either retrieve or create the associated stock history in the ScaleOut service.

  4. If needed, the history object is modified to include the incoming stock price quote, and the change is persisted back to the ScaleOut service using Cache.UpdateAsync.

Note

Note that the example above does not use exclusive locking when reading and updating the price history in the ScaleOut service. This approach is safe from races because events for a given key are queued up as they arrive, and the callback is invoked on each event sequentially, in order of arrival. In other words, the handler will be invoked concurrently for different keys, but execution is sequential on a key-by-key basis. So if objects stored in ScaleOut are modified only by your event handling callback and no other application, it is not necessary to using exclusive locking in the service when updating objects.

Tip

In streaming applications, processing events frequently involves transforming the events with a time windowing function and then applying an aggregate function to each window. (For example, moving averages are often used in the technical analysis of equities.) ScaleOut Software provides an open source time windowing library that is well suited for the stateful event processing model supported by ScaleOut StreamServer.