Parallel Method Invocation

The ScaleOut StateServer service provides a parallel method invocation (PMI) facility to enable applications to quickly and easily analyze data stored in the in-memory data grid. PMI follows the "map/reduce" model for parallel data analysis. Data that is subject for analysis can be selected using a parallel query and is then analyzed on every host in the farm using user-specified methods for evaluating selected data and then merging the results.

To perform PMI operations, users must implement two items:

  1. A long-running "worker" application that is deployed to every host in the ScaleOut StateServer farm: this application handles invocation events for the a named cache by defining the eval (map) and merge (reduce) functions that are responsible for performing the analysis.
  2. Client application logic that initiates PMI operations and receives the result.

Users of ScaleOut’s original, low-level C API have been able to perform PMI operations using the soss_pmicli API (the interface and extensive documentation for this API is available in the soss_pmicli.h header that ships with the product). The soss_pmicli API allows you to build both the worker and client applications outlined above. The new C++ Native Client API allows you to build client applications that can initiate PMI operations—PMI worker processes that are written to use the original soss_pmicli C API can be used by client applications written to use the new C++ Native Client API.

The C++ Native Client API provides the NamedCache::pmicli_invoke method, which returns a smart pointer to a PMI client invoke result object (boost::shared_ptr<sosscli::PmicliInvokeResult>). This result object contains a vector of bytes representing the serialized final result object of a PMI operation. The invocation can optionally specify a sosscli::Filter object as a parameter to invoke against a subset of objects in the named cache (see Querying the Named Cache for examples of filters).

The example below illustrates how prices in portfolios of stock objects can be updated en masse using a PMI operation. First, we implement worker process that uses the soss_pmicli API—it handles invocation events for a given named cache and defines the eval and merge methods.

#include <stdexcept>
#include <iostream>

#include "soss_pmicli.h"

#include "soss_client/named_protobuf_cache.h"

#include "StockPortfolio.pb.h"

// Unique identifiers for the eval and merge methods defined below: 1
#define PMICLI_SAMPLE_EVAL_CALLBACK_ID    42
#define PMICLI_SAMPLE_MERGE_CALLBACK_ID   43

using namespace NativeClientSample;

/// The eval callback (SOSSPMI_EVAL_FUNCP) passed to the pmicli invoke function to handle
/// evaluation events. Accepts a stock price parameter to be applied to a portfolio object that
/// has been stored in the ScaleOut StateServer service.
///
/// This callback accepts two objects: an object from the datagrid to be evaluated and a parameter
/// object for the invoke operation that is provided to every eval call. Both the data object
/// (a Portfolio) and the parameter object (a Stock) are serialized Google Protocol Buffer objects. The result of
/// the evaluation is a result object that is serialized and set to the ppser_results_out output
/// parameter.
static int  eval_cb(
                    void const *    pser_port,                  // pointer to serialized Portfolio object to be evaluated
                    unsigned int    ser_port_len,               // length of serialized Portfolio data object
                    void const *    pser_params,                // pointer to serialized Stock object
                    unsigned int    ser_params_len,             // parameters object length
                    void **         ppser_results_out,          // out: pointer to serialized PriceUpdateResult object
                    unsigned int *  pser_results_len_out)       // out: serialized PriceUpdateResult length
{
    // Deserialize the portfolio that we're evaluating:
    auto obj = boost::make_shared<Portfolio>();
    obj->ParseFromArray(pser_port, ser_port_len);

    // Deserialize the PMI parameter object. Note that the parameter object
    // will be null if the user didn't provide one to the invoke operation.
    Stock params_obj;
    if (pser_params != NULL && ser_params_len > 0)
        params_obj.ParseFromArray(pser_params, ser_params_len);
    else
        return -1;

    bool portfolio_affected = false;
    double portfolio_value = 0.0;

    // update positions in this portfolio that hold this stock:
    for (int i = 0; i < obj->position_size(); ++i)
    {
        Portfolio::Position *position = obj->mutable_position(i);
        Stock *stock = position->mutable_stock();
        if (stock->ticker() == params_obj.ticker())
        {
            portfolio_affected = true;

            stock->set_price(params_obj.price());
            stock->set_quote_time(params_obj.quote_time());
        }

        portfolio_value += (stock->price() * position->shares());
    }

    // Create and fill out a results object.
    PriceUpdateResult results;

    if (portfolio_affected)
    {
        results.add_affected_portfolios(obj->id());

        // the portfolio's value was affected, so we update it in the SOSS service:
        obj->set_total_value(portfolio_value);
        sosscli::NamedProtobufCache<Portfolio> nc("pmicli portfolios");
        nc.put(obj->id(), obj);
    }

    // Write results to the output parameter, which contains the serialized results.
    // We must allocate a buffer that will receive the serialized results.
    int result_size = results.ByteSize();
    void * pobj_out = Sosspmi_malloc(result_size);

    // Any non-zero return code is interpreted by the soss_pmicli library as a
    // user-defined error and will be returned to the invoker as an exception:
    if (pobj_out == NULL) // malloc failed
        return -2;

    results.SerializeToArray(pobj_out, result_size);

    *ppser_results_out      = pobj_out;
    *pser_results_len_out   = result_size;

    return 0;
}


/// The merge callback (SOSSPMI_MERGE_FUNCP) passed to the SOSS PMI API invoke function to handle
/// merge events.
///
/// This callback accepts two result objects and combines them to a single result that is returned to the
/// PMI engine. This method will be called repeatedly on results in order to combine them all into a single
/// results object, which is returned to the caller that initiated the PMI operation.
static int merge_cb(
                    void *          pser_results_1,                     // first serialized PriceUpdateResult object
                    unsigned int    ser_results_len_1,                  // first serialized PriceUpdateResult object length
                    void const *    pser_results_2,                     // second serialized PriceUpdateResult object
                    unsigned int    ser_results_len_2,                  // second serialized PriceUpdateResult object length
                    void **         ppser_results_out,                  // out: pointer to merged PriceUpdateResult object
                    unsigned int *  pser_results_len_out)               // out: length of serialized PriceUpdateResult object
{
    PriceUpdateResult results1;
    PriceUpdateResult results2;

    // Deserialize results:
    results1.ParseFromArray(pser_results_1, ser_results_len_1);
    results2.ParseFromArray(pser_results_2, ser_results_len_2);

    // Merge results2 into results1.

    // For each result in results2, add it to the result1 collection:
    for (int i = 0; i < results2.affected_portfolios_size(); ++i)
    {
        results1.add_affected_portfolios(results2.affected_portfolios(i));
    }

    // Now that the merge is complete, serialize the combined PriceUpdateResults object and assign
    // it to the output parameter. Note that we must allocate a buffer that will
    // receive the serialized results. The PMI library will automatically free memory for
    // intermediate merge results that are allocated here.
    int resultsBufferSize = results1.ByteSize();
    void * pobj_out = Sosspmi_malloc(resultsBufferSize);

    // Any non-zero return code is interpreted by the soss_pmicli library as a
    // user-defined error and will be returned to the invoker as an exception:
    if (pobj_out == NULL)
        return -2;

    results1.SerializeToArray(pobj_out, resultsBufferSize);

    *ppser_results_out      = pobj_out;
    *pser_results_len_out   = resultsBufferSize;

    return 0;
}


int main(int argc, char* argv[])
{
    sosscli::NamedProtobufCache<Portfolio> nc("pmicli portfolios");

    // Initialize PMI worker library. Start by registering the eval and merge callbacks
    // that will process PMI requests. The IDs we assign to these callback functions
    // must be provided to the initiating invocation request that is made in the managed
    // C# client app:
    int ret = Sosspmi_worker_register_eval_func(eval_cb, PMICLI_SAMPLE_EVAL_CALLBACK_ID, nc.app_id());
    if (ret != SOSSLIB_RET_SUCCESS)
        throw std::runtime_error("Eval function registration failed.");

    ret = Sosspmi_worker_register_merge_func(merge_cb, PMICLI_SAMPLE_MERGE_CALLBACK_ID, nc.app_id());
    if (ret != SOSSLIB_RET_SUCCESS)
        throw std::runtime_error("Merge function registration failed.");

    // Tell the PMI library that we're ready to start handling invocation requests:
    ret = Sosspmi_worker_init(nc.app_id());
    if (ret != SOSSLIB_RET_SUCCESS)
        throw std::runtime_error("Sosspmi_worker_init failed.");

    return 0;
}

1

The client invoking application must also know the IDs provided during the registration of the eval() and merge() methods.

A client application can invoke the worker’s eval and merge methods in parallel across some or all portfolios stored in the distributed ScaleOut StateServer farm. The sample below illustrates how a PMI operation can be initiated by an instance of the NamedProtobufCache:

#include <vector>
#include <iostream>

#include "soss_pmicli.h"

#include "soss_client/named_protobuf_cache.h"
#include "soss_client/value_comparand.h"
#include "soss_client/reference_comparand.h"    1

#include "StockPortfolio.pb.h"

// This set of eval/merge implementations do the expected work of the invocation.
#define PMICLI_SAMPLE_EVAL_CALLBACK_ID    42
#define PMICLI_SAMPLE_MERGE_CALLBACK_ID   43

using namespace NativeClientSample;

static const unsigned int DEFAULT_NUM_STRATEGIES = 100;
static const unsigned int DEFAULT_NUM_POSITIONS_PER_STRATEGY = 10;

int main(int argc, char* argv[])
{
    sosscli::NamedProtobufCache<Portfolio> strategy_npc("pmicli portfolios");

    // populate the cache with sample data for invocation
    for(unsigned int i = 0; i < DEFAULT_NUM_STRATEGIES; i++)
    {
        // Create a StockPortfolio strategy to insert into the StateServer service:
        auto strategy = boost::make_shared<Portfolio>();
        strategy->set_id(i);
        double total_value = 0;
        for(unsigned int j = 0; j < DEFAULT_NUM_POSITIONS_PER_STRATEGY; j++)
        {
            // Create Portfolio positions for the strategy:
            Portfolio_Position * position = strategy->add_position();

            Stock * stock = position->mutable_stock();
            stock->set_ticker("GOOG");
            stock->set_price(100.0f);
            time_t now;
            time(&now);
            stock->set_quote_time(now);

            position->set_shares(j);

            total_value += position->shares() * stock->price();
        }
        strategy->set_total_value(total_value);

        // Now that the strategy is populated, put it into the cache
        strategy_npc.put(i, strategy);
    }

    // define the parameters object to only affect portfolios which contain the GOOG stock
    Stock stock;
    stock.set_ticker("GOOG");
    stock.set_price(500.0f);
    time_t now;
    time(&now);
    stock.set_quote_time(now);

    // serialize the results object
    std::vector<uint8_t> param(stock.ByteSize());
    stock.SerializeToArray(param.data(), stock.ByteSize());

    // start the invocation, only selecting half of the objects (0-49, total of 50).
    auto invoke_res = strategy_npc.pmicli_invoke(sosscli::ReferenceComparand("id") < (DEFAULT_NUM_STRATEGIES / 2),
                                                 PMICLI_SAMPLE_EVAL_CALLBACK_ID,
                                                 PMICLI_SAMPLE_MERGE_CALLBACK_ID, 2
                                                 param);  3
    if (invoke_res->return_code() != SOSSLIB_RET_SUCCESS)
    {
        // invocation was not successful
        std::cout << "*** Invocation failed with error " << invoke_res->return_code() << std::endl;

        // check for and print user exceptions
        if (invoke_res->exceptions_ptr()->size() > 0)
        {
            using namespace sosscli::exceptions;
            std::cout << "User exception codes:" << std::endl;
            for (std::vector<PmiCliUserExceptionCode>::iterator it = invoke_res->exceptions_ptr()->begin(); it != invoke_res->exceptions_ptr()->end(); ++it) {
                if (it->exception_source() == PmiCliUserExceptionCode::ExceptionSource::EVAL)
                {
                    std::wcout << "Evaluation of object " << strategy_npc.get_key_string(it->key()) << " failed with error code " << it->return_code() << std::endl;
                }
                else if (it->exception_source() == PmiCliUserExceptionCode::ExceptionSource::MERGE)
                {
                    std::cout << "Merge failed with error code " << it->return_code() << std::endl;
                }
            }
        }
        return 1;
    }

    // The invocation is complete; deserialize the results object
    PriceUpdateResult result;
    result.ParseFromArray(invoke_res->object_ptr()->data(), (int)invoke_res->object_ptr()->size());

    // Print invocation results
    std::cout << "Invocation results:" << std::endl;
    std::cout << "# successful evaluations:" << invoke_res->num_successful() << std::endl;
    std::cout << "# failed evaluations:" << invoke_res->num_failed() << std::endl;
    std::cout << "# affected portfolios:" << result.affected_portfolios_size() << std::endl;

    return 0;
}

1

reference_comparand.h and value_comparand.h must be included to allow construction of query filter expressions.

2

The client invoking application must know the IDs provided during the registration of the eval() and merge() methods.

3

The ReferenceComparand class is used to create a filter for an invocation—pass field names from your protobuf classes into the ReferenceComparand’s constructor when building up a filter expression for your query. Note that filters can be composed using intuitive operators, which have been overloaded in the ReferenceComparand and Filter classes.

The NamedCache::pmicli_invoke method that’s illustrated in the example above accepts a sosscli::Filter object as a parameter. Filter objects are not intended to be created directly—instead, operator overloads from comparison expressions act as factories for creating a Filter objects. Those filters may then be logically combined with other filter clauses using overloaded logical operators (&&, ||) to create a single Filter object that is sent to the StateServer service. These operator overloads allow multiple search predicates to be sent to the StateServer service using straightforward C++ syntax.