# Advanced Features

### Building Sophisticated RPC Handlers with Streaming, Subscriptions, and Beyond

[← Back to Building and Integrating Custom RPC Handlers](/core-dev-bootcamp/module07.md)

***

## Introduction

Beyond basic request-response handlers, Rippled supports **advanced features** that enable real-time data streaming, persistent subscriptions, batch processing, and gRPC integration. These features are essential for building sophisticated applications that need continuous updates, bulk operations, and high-performance communication.

In this section, you'll learn how to implement streaming responses, manage subscription mechanisms, process batch requests, and integrate with gRPC.

***

## Streaming Responses

### Overview

Streaming allows handlers to send multiple response chunks to clients over a single connection, useful for large result sets or real-time updates.

### WebSocket Streaming

WebSocket connections support streaming responses natively:

```cpp
Json::Value doStreamingHandler(RPC::JsonContext& context)
{
    // For WebSocket clients, we can send multiple messages
    // before the final response

    // Message 1: Progress update
    Json::Value message1;
    message1["type"] = "progress";
    message1["status"] = "processing";
    message1["processed"] = 1000;
    message1["total"] = 5000;

    // In actual implementation, messages are sent via WebSocket
    // context.sendMessage(message1);

    // Continue processing...

    // Message 2: More progress
    Json::Value message2;
    message2["type"] = "progress";
    message2["processed"] = 5000;

    // Final response
    Json::Value finalResult;
    finalResult[jss::status] = "success";
    finalResult["final_count"] = 5000;

    return finalResult;
}
```

### Paginated Streaming

For large datasets, implement pagination:

```cpp
Json::Value doPagedStream(RPC::JsonContext& context)
{
    // Get pagination parameters
    unsigned int pageSize = 100;
    std::string marker = "";

    if (context.params.isMember("limit")) {
        pageSize = context.params["limit"].asUInt();
        if (pageSize > 1000) pageSize = 1000;  // Cap at 1000
    }

    if (context.params.isMember("marker")) {
        marker = context.params["marker"].asString();
    }

    // Get ledger
    std::shared_ptr<ReadView const> ledger;
    auto ledgerResult = RPC::lookupLedger(ledger, context);
    if (!ledger) return ledgerResult;

    // Query large dataset with pagination
    std::vector<Json::Value> results;
    std::string nextMarker;

    // Fetch next page of results
    for (unsigned int i = 0; i < pageSize && results.size() < pageSize; ++i) {
        // Get next item from ledger...
        // Check if we have more items
    }

    // Build response
    Json::Value response;
    response[jss::status] = jss::success;

    Json::Value items(Json::arrayValue);
    for (auto const& item : results) {
        items.append(item);
    }

    response["items"] = items;

    // Include marker for next page if more results exist
    if (!nextMarker.empty()) {
        response["marker"] = nextMarker;
    }

    return response;
}
```

### HTTP Streaming (Chunked Encoding)

For HTTP clients, use chunked transfer encoding:

```cpp
// Handler signals streaming intent
Json::Value doHTTPStream(RPC::JsonContext& context)
{
    // Return special response indicating streaming
    Json::Value result;
    result[jss::stream] = true;
    result["content_type"] = "application/json";

    // Actual chunks would be sent via HTTP server
    return result;
}
```

***

## Subscription Mechanisms

### Ledger Subscription

Clients can subscribe to ledger close events:

**WebSocket Message**:

```json
{
    "command": "subscribe",
    "streams": ["ledger"]
}
```

**Handler Implementation**:

```cpp
// Subscriptions are managed by the WebSocket handler
// In src/xrpld/rpc/detail/WSHandler.cpp

// When a ledger closes, this is broadcast to all subscribers:
{
    "type": "ledgerClosed",
    "ledger_index": 12345,
    "ledger_hash": "...",
    "ledger_time": 1234567890,
    "fee_base": 12,
    "fee_ref": 12000,
    "reserve_base": 10000000,
    "reserve_inc": 2000000,
    "txn_count": 45,
    "validation_hash": "..."
}
```

### Transaction Subscription

Subscribe to transactions from a specific account:

```cpp
// Subscribe message
{
    "command": "subscribe",
    "accounts": ["rN7n7otQDd6FczFgLdlqtyMVrn3NnrcVXs"]
}

// When account receives a transaction:
{
    "type": "transaction",
    "transaction": {
        "Account": "rN7n7otQDd6FczFgLdlqtyMVrn3NnrcVXs",
        "Destination": "rLHzPsX6oXkzU9...",
        "Amount": "1000000",
        "Fee": "12",
        "Sequence": 5,
        "TxnSignature": "...",
        "TxID": "..."
    },
    "meta": {
        "TransactionIndex": 0,
        "TransactionResult": "tesSUCCESS"
    },
    "validated": false
}
```

### Validation Subscription

Subscribe to validator messages:

```cpp
// Subscribe message
{
    "command": "subscribe",
    "streams": ["validations"]
}

// When a validation is received:
{
    "type": "validationReceived",
    "flags": 2147483649,
    "ledger_hash": "...",
    "ledger_index": "12346",
    "signing_pubkey": "...",
    "signature": "..."
}
```

### Implementing a Custom Subscription Handler

```cpp
Json::Value doSubscribe(RPC::JsonContext& context)
{
    // Parse subscription types
    if (!context.params.isMember("streams") &&
        !context.params.isMember("accounts"))
    {
        return rpcError(rpcINVALID_PARAMS,
            "Must specify 'streams' or 'accounts'");
    }

    Json::Value response;
    response[jss::status] = jss::success;

    // For stream subscriptions
    if (context.params.isMember("streams")) {
        Json::Value const& streams = context.params["streams"];

        if (!streams.isArray()) {
            return rpcError(rpcINVALID_PARAMS, "'streams' must be array");
        }

        // Validate each stream type
        for (auto const& stream : streams) {
            std::string streamName = stream.asString();

            if (streamName == "ledger" ||
                streamName == "transactions" ||
                streamName == "validations")
            {
                // Register subscription
                // This is typically handled by the transport layer
            } else {
                return rpcError(rpcINVALID_PARAMS,
                    "Invalid stream: " + streamName);
            }
        }

        response["subscribed_streams"] = streams;
    }

    // For account subscriptions
    if (context.params.isMember("accounts")) {
        Json::Value const& accounts = context.params["accounts"];

        if (!accounts.isArray()) {
            return rpcError(rpcINVALID_PARAMS, "'accounts' must be array");
        }

        Json::Value validAccounts(Json::arrayValue);
        for (auto const& accountStr : accounts) {
            auto account = parseBase58<AccountID>(accountStr.asString());

            if (!account) {
                return rpcError(rpcACT_MALFORMED,
                    "Invalid account: " + accountStr.asString());
            }

            validAccounts.append(to_string(*account));
        }

        response["subscribed_accounts"] = validAccounts;
    }

    return response;
}
```

***

## Batch Request Processing

### Batch Request Format

Clients can send multiple requests in a single batch:

```json
[
    {
        "method": "account_info",
        "params": [{"account": "rN7n7otQDd6FczFgLdlqtyMVrn3NnrcVXs"}],
        "id": 1
    },
    {
        "method": "account_info",
        "params": [{"account": "rLHzPsX6oXkzU9..."}],
        "id": 2
    },
    {
        "method": "ledger",
        "params": [{"ledger_index": "validated"}],
        "id": 3
    }
]
```

### Processing Batch Requests

```cpp
Json::Value processBatchRequests(RPC::JsonContext& context)
{
    if (!context.params.isArray()) {
        return rpcError(rpcINVALID_PARAMS, "Batch must be an array");
    }

    Json::Value batchResults(Json::arrayValue);

    for (auto const& request : context.params) {
        if (!request.isMember("method")) {
            Json::Value error = rpcError(rpcINVALID_PARAMS);
            error["id"] = request.get("id", Json::Value::null);
            batchResults.append(error);
            continue;
        }

        std::string method = request["method"].asString();

        // Create context for this individual request
        RPC::JsonContext itemContext{
            request.get("params", Json::Value()),
            context.app,
            context.consumer,
            context.role,
            context.ledger,
            context.netOps,
            context.ledgerMaster,
            context.apiVersion
        };

        // Look up and execute handler
        auto handlerIt = handlerTable.find(method);
        Json::Value itemResult;

        if (handlerIt == handlerTable.end()) {
            itemResult = rpcError(rpcUNKNOWN_COMMAND);
        } else {
            try {
                itemResult = handlerIt->second.handler(itemContext);
            } catch (std::exception const& ex) {
                itemResult = rpcError(rpcINTERNAL);
            }
        }

        // Add request ID if provided
        if (request.isMember("id")) {
            itemResult["id"] = request["id"];
        }

        batchResults.append(itemResult);
    }

    return batchResults;
}
```

### Batch Request Optimization

```cpp
// When processing large batches, optimize by:

// 1. Share ledger access across requests
std::shared_ptr<ReadView const> sharedLedger;
auto ledgerResult = RPC::lookupLedger(sharedLedger, context);

for (auto const& request : context.params) {
    // Reuse the same ledger for all requests
    // Reduces ledger lookups
}

// 2. Implement request prioritization
std::vector<std::pair<int, Json::Value>> prioritized;
for (auto const& request : context.params) {
    int priority = getRequestPriority(request);
    prioritized.emplace_back(priority, request);
}

std::sort(prioritized.begin(), prioritized.end(),
    [](auto const& a, auto const& b) {
        return a.first > b.first;  // Higher priority first
    });

// 3. Implement early exit on critical errors
for (auto const& request : prioritized) {
    if (shouldTerminateBatch(request)) {
        break;
    }
}

// 4. Implement batching for database queries
std::vector<AccountID> accountsToQuery;
for (auto const& request : context.params) {
    if (request["method"].asString() == "account_info") {
        auto acc = parseBase58<AccountID>(
            request["params"]["account"].asString()
        );
        if (acc) {
            accountsToQuery.push_back(*acc);
        }
    }
}

// Query all accounts at once (if backend supports)
auto accounts = queryAccountsBatch(sharedLedger, accountsToQuery);
```

***

## gRPC Integration Basics

### Protocol Buffer Definitions

**File**: `src/xrpld/rpc/v1/xrpl_rpc.proto`

```proto
syntax = "proto3";

package ripple.rpc.v1;

// RPC Service Definition
service XRPLedger {
    rpc GetAccountInfo(GetAccountInfoRequest)
        returns (GetAccountInfoResponse);

    rpc SubmitTransaction(SubmitRequest)
        returns (SubmitResponse);

    rpc GetLedger(GetLedgerRequest)
        returns (GetLedgerResponse);
}

// Request/Response Messages
message GetAccountInfoRequest {
    string account = 1;
    string ledger_index = 2;
}

message GetAccountInfoResponse {
    string account = 1;
    string balance = 2;
    uint32 sequence = 3;
    uint32 ledger_index = 4;
    bool validated = 5;
}

message SubmitRequest {
    string tx_json = 1;
    string secret = 2;
}

message SubmitResponse {
    string status = 1;
    string tx_json = 2;
    string engine_result = 3;
}

message GetLedgerRequest {
    oneof ledger_specifier {
        uint32 ledger_index = 1;
        string ledger_hash = 2;
        string ledger_index_string = 3;
    }
}

message GetLedgerResponse {
    string ledger_hash = 1;
    uint32 ledger_index = 2;
    string closed_time = 3;
    string parent_hash = 4;
}
```

### gRPC Handler Implementation

```cpp
// In src/xrpld/rpc/detail/GRPCHandler.cpp

class XRPLedgerServiceImpl : public ripple::rpc::v1::XRPLedger::Service {
public:
    grpc::Status GetAccountInfo(
        grpc::ServerContext* context,
        const ripple::rpc::v1::GetAccountInfoRequest* request,
        ripple::rpc::v1::GetAccountInfoResponse* response) override
    {
        try {
            // Convert gRPC request to JSON context
            Json::Value jsonParams;
            jsonParams[jss::account] = request->account();
            if (!request->ledger_index().empty()) {
                jsonParams[jss::ledger_index] = request->ledger_index();
            }

            // Create context
            RPC::JsonContext rpcContext{
                jsonParams,
                app_,
                consumer_,
                roleFromPeer(context->peer()),
                nullptr,
                netOps_,
                ledgerMaster_,
                1
            };

            // Call handler
            Json::Value result = doAccountInfo(rpcContext);

            // Convert JSON response to Protobuf
            if (result.isMember(jss::error)) {
                return grpc::Status(
                    grpc::StatusCode::INVALID_ARGUMENT,
                    result[jss::error_message].asString()
                );
            }

            response->set_account(result[jss::account].asString());
            response->set_balance(result["balance"].asString());
            response->set_sequence(result["sequence"].asUInt());
            response->set_ledger_index(result[jss::ledger_index].asUInt());
            response->set_validated(result[jss::validated].asBool());

            return grpc::Status::OK;
        }
        catch (std::exception const& ex) {
            return grpc::Status(
                grpc::StatusCode::INTERNAL,
                "Internal error"
            );
        }
    }

    grpc::Status SubmitTransaction(
        grpc::ServerContext* context,
        const ripple::rpc::v1::SubmitRequest* request,
        ripple::rpc::v1::SubmitResponse* response) override
    {
        // Similar implementation...
        return grpc::Status::OK;
    }

private:
    Application& app_;
    Resource::Consumer& consumer_;
    NetworkOPs& netOps_;
    LedgerMaster& ledgerMaster_;

    Role roleFromPeer(std::string const& peer) {
        // Determine role from gRPC peer information
        // Typically all gRPC peers are admin for local connections
        return Role::ADMIN;
    }
};
```

### gRPC Server Setup

```cpp
// In rippled initialization code

void startGRPCServer(Application& app)
{
    // Create service implementation
    auto service = std::make_unique<XRPLedgerServiceImpl>(
        app,
        app.getOPs(),
        app.getLedgerMaster()
    );

    // Build server
    grpc::ServerBuilder builder;
    builder.AddListeningPort("0.0.0.0:5051", grpc::InsecureServerCredentials());
    builder.RegisterService(service.get());

    // Create and start server
    auto server = builder.BuildAndStart();

    if (!server) {
        JLOG(app.journal("gRPC")) << "Failed to start gRPC server";
        return;
    }

    JLOG(app.journal("gRPC")) << "gRPC server listening on 0.0.0.0:5051";

    // Run server (typically in background thread)
    server->Wait();
}
```

### gRPC Client Usage

```cpp
// Example client code

#include "xrpl_rpc.grpc.pb.h"
#include <grpc++/grpc++.h>

class XRPLClient {
    std::unique_ptr<ripple::rpc::v1::XRPLedger::Stub> stub_;

public:
    XRPLClient(std::shared_ptr<grpc::Channel> channel)
        : stub_(ripple::rpc::v1::XRPLedger::NewStub(channel)) {}

    std::string GetAccountInfo(std::string const& account) {
        ripple::rpc::v1::GetAccountInfoRequest request;
        request.set_account(account);
        request.set_ledger_index("validated");

        ripple::rpc::v1::GetAccountInfoResponse reply;

        grpc::ClientContext context;
        grpc::Status status = stub_->GetAccountInfo(&context, request, &reply);

        if (!status.ok()) {
            std::cerr << "RPC failed: " << status.error_message() << std::endl;
            return "";
        }

        return reply.account();
    }
};

// Usage
int main() {
    auto channel = grpc::CreateChannel(
        "localhost:5051",
        grpc::InsecureChannelCredentials()
    );

    XRPLClient client(channel);
    std::cout << client.GetAccountInfo("rN7n7otQDd6FczFgLdlqtyMVrn3NnrcVXs")
              << std::endl;

    return 0;
}
```

***

## WebSocket Subscriptions

### Subscribe Command Implementation

```cpp
Json::Value doSubscribe(RPC::JsonContext& context)
{
    if (!context.params.isMember("streams") &&
        !context.params.isMember("accounts") &&
        !context.params.isMember("account"))
    {
        return rpcError(rpcINVALID_PARAMS,
            "Must specify 'streams', 'accounts', or 'account'");
    }

    Json::Value response;
    response[jss::status] = jss::success;

    // Subscribe to streams
    if (context.params.isMember("streams")) {
        Json::Value streams = context.params["streams"];
        response["streams"] = streams;
    }

    // Subscribe to accounts
    if (context.params.isMember("accounts")) {
        Json::Value accounts = context.params["accounts"];
        response["accounts"] = accounts;
    }

    return response;
}
```

### Unsubscribe Command

```cpp
Json::Value doUnsubscribe(RPC::JsonContext& context)
{
    Json::Value response;
    response[jss::status] = jss::success;

    // Handle unsubscribe logic
    if (context.params.isMember("streams")) {
        response["unsubscribed_streams"] = context.params["streams"];
    }

    return response;
}
```

### Broadcasting Updates

```cpp
// When a ledger closes, broadcast to all subscribers
void broadcastLedgerClosed(
    std::shared_ptr<Ledger const> const& ledger,
    NetworkOPs& netOps)
{
    Json::Value message;
    message["type"] = "ledgerClosed";
    message[jss::ledger_index] = ledger->info().seq;
    message["ledger_hash"] = to_string(ledger->hash());
    message["ledger_time"] = ledger->info().closeTime;

    // Send to all WebSocket connections subscribed to ledger
    netOps.broadcastMessage(message);
}

// When a transaction is applied
void broadcastTransactionApplied(
    Json::Value const& txJson,
    Json::Value const& meta,
    AccountID const& account)
{
    Json::Value message;
    message["type"] = "transaction";
    message["transaction"] = txJson;
    message["meta"] = meta;

    // Send to all subscriptions for this account
    netOps.broadcastMessageToSubscribers(account, message);
}
```

***

## Performance Considerations

### Limiting Streaming Results

```cpp
// Set maximum items per streaming response
const unsigned int MAX_ITEMS_PER_STREAM = 10000;

Json::Value doStreamingQuery(RPC::JsonContext& context)
{
    unsigned int itemCount = 0;
    Json::Value items(Json::arrayValue);

    // Fetch items with limit
    for (/* iterate over data */) {
        if (itemCount >= MAX_ITEMS_PER_STREAM) {
            // Return marker for continuation
            result["marker"] = getCurrentMarker();
            break;
        }

        items.append(item);
        itemCount++;
    }

    result["count"] = itemCount;
    result["items"] = items;
    return result;
}
```

### Subscription Limits

```cpp
// Limit subscriptions per connection
const unsigned int MAX_SUBSCRIPTIONS_PER_CONNECTION = 100;

if (connection.subscriptionCount >= MAX_SUBSCRIPTIONS_PER_CONNECTION) {
    return rpcError(rpcTOO_MANY_SUBSCRIPTIONS,
        "Too many active subscriptions");
}
```

### Batch Processing Optimization

```cpp
// Process batches in parallel for better throughput
std::vector<std::thread> workers;

for (auto const& request : batchRequests) {
    workers.emplace_back([this, &request]() {
        // Process individual request
    });
}

for (auto& worker : workers) {
    worker.join();
}
```

***

### Conclusion

Advanced RPC features unlock powerful capabilities for building sophisticated applications on top of Rippled. Streaming responses enable efficient handling of large datasets, subscriptions provide real-time updates without polling, batch processing improves throughput for bulk operations, and gRPC integration offers high-performance binary communication. Understanding these patterns allows you to build handlers that scale gracefully and meet the demands of production environments.

***


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.xrpl-commons.org/core-dev-bootcamp/module07/advanced-features.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
