In addition to the standard LINQ operators found in Queryable, the ScaleOut LINQ provider also recognizes the operators found in QueryableExtensions which support the use of LINQ queries with StateServer's parallel Invoke facility.
The NamedCache's Invoke and Query methods accept an IFilter parameter to restrict the list of objects the methods operate on or return. The AsFilterT method allows you to convert an IQueryableT result generated by QueryKeysT or QueryObjectsT to an IFilter object suitable for use with these methods.
For example, consider the following classes where a (stock) Portfolio holds a list of Positions in various stocks. Note that the Portfolio class implements ITaggable and that when Portfolio.Add adds a Position to the Portfolio, it also adds the Ticker corresponding to the stock as a Tag for the Portfolio.
1[Serializable] 2public struct Position 3{ 4 public string Ticker { get; set; } 5 public string Company { get; set; } 6 public decimal Quantity { get; set; } 7 public DateTime PurchaseDate { get; set; } 8 public decimal Cost { get; set; } 9 public decimal CurrentPrice { get; set; } 10} 11 12[Serializable] 13public class Portfolio : ITaggable 14{ 15 public Portfolio(string owner) 16 { 17 Owner = owner; 18 Positions = new List<Position>(); 19 } 20 21 [SossIndex] 22 public String Owner { get; private set; } 23 24 public IList<Position> Positions { get; private set; } 25 26 public void Add(Position position) 27 { 28 Positions.Add(position); 29 this.AddTags(position.Ticker); 30 } 31 32 SparseBitmap ITaggable.TagHolder { get; set; } 33 NamedCache ITaggable.NamedCache 34 { 35 get { return CacheFactory.GetCache(CacheName); } 36 } 37}
Suppose we'd like to calculate the value of all holdings of a particular list of stocks. Since Tags may be queried on StateServer, we can construct a LINQ query that returns only those Portfolio instances that contain the Tickers of the stocks we're interested in as shown below.
1HashSet<string> tickersToProcess = GetTickersToProcess(); 2 3var q = from pf in cache.QueryObjects<Portfolio>() 4 where pf.HasAnyTag(tickersToProcess) 5 select pf;
Using the AsFilterT method, we can use the query we created above as the IFilter argument as shown here:
1decimal valueOfSelectedStocks = 2 cache.Invoke<Portfolio, HashSet<string>, decimal>( 3 q.AsFilter(), 4 tickersToProcess, 5 (pfolio, selectedTickers) => 6 (from pos in pfolio.Positions 7 where selectedTickers.Contains(pos.Ticker) 8 select pos.Quantity * pos.CurrentPrice).Sum(), 9 (v1, v2) => v1 + v2, 10 NamedCache.InfiniteInvokeTimeout); 11 12Console.WriteLine("Value of stocks in all portfolios: {0}", 13 valueOfSelectedStocks);
We pass along the tickersToProcessHashSetT as the InvokeT, P, Rparam parameter.
In this case, the evalMethod parameter is expressed as a Lambda Function (lines 5-8) which simply finds the Positions within the Portfolio with a Ticker value in the tickersToProcess HashSet, multplies the Position's CurrentPrice and Quantity and sums the result across all selected Positions.
The mergeMethod (line 9) merges the results of evalMethod invocations by summing the results.
By using the LINQ query to filter the objects sent to the evalMethod, we've ensured that only objects of the right type (Position objects) and objects with some position in the selected stocks are evaluated in the evalMethod.
The StateServer LINQ provider provides an alternate way of expressing Parallel Method Invocation that more closely follows the LINQ programming pattern. The Invoke family of methods allow you to specify the evalMethod for an Invoke operation. If you use an evalMethod that has a non-void return value, then you must also use the MergeR method to specify the mergeMethod.
Note |
---|
When you use MergeR, it must immediately follow or consume the result from the Invoke method. No other LINQ operators may be used between the Invoke and the Merge. |
The example above can be rewritten using the LINQ pattern as follows:
1HashSet<string> tickersToProcess = GetTickersToProcess(); 2 3decimal valueOfSelectedStocks = 4 (from pf in cache.QueryObjects<Portfolio>() 5 where pf.HasAnyTag(tickersToProcess) 6 select pf) 7 8 .Invoke(NamedCache.InfiniteInvokeTimeout, 9 tickersToProcess, 10 (pfolio, selectedTickers) => 11 (from pos in pfolio.Positions 12 where selectedTickers.Contains(pos.Ticker) 13 select pos.Quantity * pos.CurrentPrice).Sum()) 14 15 .Merge((v1, v2) => v1 + v2); 16 17Console.WriteLine("Value of stocks in all portfolios: {0}", 18 valueOfSelectedStocks);
By leveraging readers' understanding of the LINQ pattern and making the data flow between the query, eval method and merge method more explicit, you may find that the LINQ pattern for Invoke is easier to read and write.
When you use the QueryableExtensions's version of the Invoke method, the generic type parameter of the "target" type parameter T is required to be the same as the type parameter of the IQueryableT generic type used as the data source for Invoke.
When Invoke's data source is QueryObjectsT then the "target" type parameter for Invoke is the same as the generic type parameter specified in QueryObjectsT. However, QueryKeysT returns CachedObjectIdT. That means that the "target" type parameter for Invoke is required to be CachedObjectIdT.
Normally, the parallel method invocation infrastructure expects the "target" type for the evalMethod to refer to an object in the store and the infrastructure arranges to fetch the object from the store and deserialize it before calling the evalMethod. In this version of StateServer, if the target type is CachedObjectIdT, the parallel method invocation infrastructure simply passes the CachedObjectIdT to the evalMethod. The evalMethod can choose to fetch the object from the store or not depending on what it is required to do.
For example, if you wanted to use parallel method invocation to simply count the number of portfolios containing the stock "GOOG", you might execute the following:
1int countOfStocks = (from key in cache.QueryKeys<Portfolio>() 2 where key.Value.HasAnyTag("GOOG") 3 select key) 4 5 .Invoke(NamedCache.InfiniteInvokeTimeout, 6 0, // no parameter, specify zero 7 (key, param) => 1) 8 9 .Merge((v1, v2) => v1 + v2); 10 11Console.WriteLine("Count of portfolios containing the stock GOOG: {0}", 12 countOfStocks);
1int countOfStocks = (from key in cache.QueryKeys<Portfolio>() 2 where key.Value.HasAnyTag("GOOG") 3 select key).Count(); 4 5Console.WriteLine("Count of portfolios containing the stock GOOG: {0}", 6 countOfStocks);
Note |
---|
In some future release the Count() LINQ operator is likely to be supported directly in the server. |