Namespace: Soss.Client
Assembly: soss_namedcache (in soss_namedcache.dll) Version: 6.2.0.0
public void PostEvent( CachedObjectId id, string eventInfo, byte[] payload, string targetInvocationGridName, Nullable<TimeSpan> invokeTimeout = null )
Parameters
- id
- Type: Soss.ClientCachedObjectId
Object ID identifying the object associated with the event. This ID will supplied to event subscribers. - eventInfo
- Type: SystemString
Arbitrary string identifying the event. - payload
- Type: SystemByte
Optional payload (typically a serialized object). - targetInvocationGridName
- Type: SystemString
Invocation grid name. The invocation grid must be previously loaded, for example by using the Load method. - invokeTimeout (Optional)
- Type: SystemNullableTimeSpan
The amount of time allowed for this operation. After the expires, the operation will be canceled. You may specify InfiniteInvokeTimeout if you want the operation to continue no matter how long it takes.
The value specified is currently adjusted up to the smallest number of seconds equal or greater than the TimeSpan value specified. This granularity may change in future implementations.
When objects held in the StateServer store are large and subject to more frequent updates than reads, it can be advantageous to use an event-based update model rather than the standard CRUD (Create/Read/Update/Delete) access model. Sending events directly to the StateServer hosts where objects reside can be more efficient than pulling an entire object to a client, modifying it, and then sending the entire object back to the StateServer host via an update.
The NamedCache.PostEvent() method raises an event on the StateServer host that contains the object associated with the provided key. Server-side code subscribes to an IObservableT sequence of posted events. This event-handling logic can either be hosted in a ScaleOut Invocation Grid or in a long-running server-side application such as a Windows service.
Note |
---|
The IObservableT returned by NamedCache.GetEventSource() is compatible with Reactive Extensions (Rx). |
[Serializable] public class StockQuote { public string Ticker { get; set; } public DateTimeOffset Timestamp { get; set; } public decimal Price { get; set; } public long Volume { get; set; } public byte[] ToBytes() { var serializer = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); var stream = new System.IO.MemoryStream(); serializer.Serialize(stream, this); return stream.ToArray(); } public static StockQuote FromBytes(byte[] bytes) { var serializer = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); var stream = new System.IO.MemoryStream(bytes); var quote = (StockQuote)serializer.Deserialize(stream); return quote; } }
// Client that posts an event to the distributed ScaleOut data grid. static void Main(string[] args) { // Simulate an incoming stock quote: var quote = new StockQuote() { Ticker = "MSFT", Timestamp = DateTimeOffset.Parse("12/21/2016 17:59 -5:00"), Price = 63.54m, Volume = 15253897 }; // Prepare to post the quote as an event to the "price histories" named cache. var cache = CacheFactory.GetCache("price histories"); // Ticker symbols are the keys to history objects in this named cache. // Use it as the key to the event to have it raised on the ScaleOut // host where the MSFT history resides: var cacheKey = cache.CreateKey(quote.Ticker); cache.PostEvent(cacheKey, "Quote Event", quote.ToBytes()); }
// Server-side application that handles events posted from clients. // Events are load-balanced by the ScaleOut compute engine, so an // instance of this application must run on every ScaleOut host in // the cluster. Alternatively, this logic could be hosted in a ScaleOut // Invocation Grid worker process. static void Main(string[] args) { var cache = CacheFactory.GetCache("price histories"); // Create empty history object: var msftPriceHistory = new LinkedList<StockQuote>(); cache.Add("MSFT", msftPriceHistory); // Wire up an observer to handle events posted from clients: var eventSource = cache.GetEventSource(); var observer = new StockQuoteObserver(); eventSource.Subscribe(observer); // Wait indefinitely for events to be posted by clients. Console.ReadLine(); }
// Observer for a NamedCache's event stream. Saves incoming // quotes into a price history object that is stored in the // ScaleOut distributed data grid. class StockQuoteObserver : IObserver<Soss.Client.Event> { static readonly int MaxHistoryLength = 100; public void OnNext(Event ev) { if (ev.EventInfo != "Quote Event") { Console.WriteLine("Unexpected event."); return; } // Extract the quote from the event's payload: var quote = StockQuote.FromBytes(ev.Payload); // Retrieve the price history in the named cache: var cache = CacheFactory.GetCache("price histories"); var history = cache.Retrieve(ev.ObjectId, acquireLock: true) as LinkedList<StockQuote>; if (history == null) { Console.WriteLine("History object not found."); return; } history.AddLast(quote); if (history.Count > MaxHistoryLength) history.RemoveFirst(); cache.Update(ev.ObjectId, history, unlockAfterUpdate: true); Console.WriteLine("Price history updated."); } public void OnCompleted() { // The IObservable sequence returned from NamedCache.GetEventSoure // will never complete. throw new NotSupportedException("Unexpected completion."); } public void OnError(Exception error) { Console.WriteLine(error.Message); } }