Database Operations and Lifecycle Management

← Back to SHAMap and NodeStore: Data Persistence and State Management


Introduction

Beyond the cache layer, the Database class orchestrates the complete lifecycle of NodeStore operations:

  • Initialization on startup

  • Runtime fetch/store operations

  • Asynchronous background operations

  • Graceful shutdown

  • Database rotation and archival

This chapter covers these operational aspects that are critical for production XRPL nodes.

Core Database Interface

The Database class provides higher-level operations above Backend:

Key Responsibilities:

class Database {
public:
    // Synchronous operations
    std::shared_ptr<NodeObject> fetchNodeObject(
        uint256 const& hash,
        std::uint32_t ledgerSeq = 0);

    void store(std::shared_ptr<NodeObject> const& obj);

    void storeBatch(std::vector<std::shared_ptr<NodeObject>> const& batch);

    // Asynchronous operations
    void asyncFetch(
        uint256 const& hash,
        std::function<void(std::shared_ptr<NodeObject>)> callback);

    // Management
    void open(std::string const& path);
    void close();

    // Metrics and diagnostics
    Json::Value getCountsJson() const;
};

DatabaseNodeImp: Single Backend Implementation

The standard implementation for most XRPL validators:

Architecture:

        Application

    DatabaseNodeImp (Coordination)
      /   |   \
     /    |    \
Cache   Backend  Threads
 (Hot)  (Disk)  (Async)

Storage Flow:

void DatabaseNodeImp::store(std::shared_ptr<NodeObject> const& obj) {
    // Step 1: Update cache immediately (likely reaccess soon)
    {
        std::lock_guard<std::mutex> lock(mCacheLock);
        mCache.insert(obj->getHash(), obj);
    }

    // Step 2: Encode to persistent format
    Blob encoded = encodeObject(obj);

    // Step 3: Persist to backend
    Status status = mBackend->store(obj->getHash(), encoded);

    if (status != Status::ok) {
        // Log error but don't crash
        // Backend error doesn't lose data (already in cache)
        logError("Backend store failed", status);
    }

    // Step 4: Update metrics
    mMetrics.bytesWritten += encoded.size();
    mMetrics.objectsWritten++;
}

Fetch Flow:

std::shared_ptr<NodeObject> DatabaseNodeImp::fetchNodeObject(
    uint256 const& hash,
    uint32_t ledgerSeq)
{
    // Step 1: Check cache
    {
        std::lock_guard<std::mutex> lock(mCacheLock);
        auto cached = mCache.get(hash);
        if (cached) {
            mMetrics.cacheHits++;
            return cached;
        }
    }

    // Step 2: Query backend (potentially slow)
    Blob encoded;
    Status status = mBackend->fetch(hash, encoded);

    std::shared_ptr<NodeObject> result;
    if (status == Status::ok) {
        result = decodeObject(hash, encoded);
    } else if (status == Status::notFound) {
        // Not found - cache dummy to prevent retry
        result = nullptr;
    } else {
        // Backend error
        logWarning("Backend fetch error", status);
        return nullptr;
    }

    // Step 3: Update cache
    {
        std::lock_guard<std::mutex> lock(mCacheLock);
        if (result) {
            mCache.insert(hash, result);
        } else {
            mCache.insertDummy(hash);
        }
    }

    // Step 4: Update metrics
    mMetrics.cacheMisses++;
    mMetrics.bytesRead += encoded.size();

    return result;
}

Batch Operations

Batch operations improve efficiency:

Batch Store:

void DatabaseNodeImp::storeBatch(
    std::vector<std::shared_ptr<NodeObject>> const& batch)
{
    // Step 1: Update cache for all objects
    {
        std::lock_guard<std::mutex> lock(mCacheLock);
        for (auto const& obj : batch) {
            mCache.insert(obj->getHash(), obj);
        }
    }

    // Step 2: Encode all objects
    std::vector<std::pair<uint256, Blob>> encoded;
    encoded.reserve(batch.size());
    for (auto const& obj : batch) {
        encoded.emplace_back(obj->getHash(), encodeObject(obj));
    }

    // Step 3: Store atomically in backend
    Status status = mBackend->storeBatch(encoded);

    // Step 4: Update metrics
    for (auto const& [hash, blob] : encoded) {
        mMetrics.bytesWritten += blob.size();
    }
    mMetrics.objectsWritten += batch.size();
}

Benefits:

Without batch:
  Write 1000 objects → 1000 backend transactions
  1000 disk I/O operations

With batch:
  Write 1000 objects → 1 backend transaction
  1 disk I/O operation (atomic write)

Throughput improvement: 10-50x depending on backend

Batch Size Limits:

static const size_t BATCH_WRITE_PREALLOCATE_SIZE = 256;
static const size_t BATCH_WRITE_LIMIT_SIZE = 65536;

// Prevents:
// 1. Memory exhaustion (unbounded batches)
// 2. Transaction timeout (backend transaction too large)
// 3. Excessive latency (batching too much)

Asynchronous Operations

Background threads handle expensive operations without blocking:

Asynchronous Fetch:

void DatabaseNodeImp::asyncFetch(
    uint256 const& hash,
    std::function<void(std::shared_ptr<NodeObject>)> callback)
{
    // Step 1: Queue request
    mAsyncQueue.enqueue({hash, callback});

    // Step 2: Background thread processes
    // Wakes up, dequeues batch, fetches, invokes callbacks
    // Meanwhile, caller continues without blocking
}

// Thread pool implementation
void asyncWorkerThread() {
    while (running) {
        // Wait for work or timeout
        auto batch = mAsyncQueue.dequeueBatch(timeout);

        if (batch.empty()) {
            continue;
        }

        // Fetch all in batch (more efficient)
        std::vector<uint256> hashes;
        for (auto const& [hash, callback] : batch) {
            hashes.push_back(hash);
        }

        auto results = mBackend->fetchBatch(hashes);

        // Invoke callbacks
        for (auto const& [hash, callback] : batch) {
            auto result = results[hash];
            callback(result);
        }
    }
}

Use Cases:

1. Synchronization:
   Requesting many nodes from network
   Can queue hundreds of async fetches
   Process results as they arrive

2. API queries:
   Historical account queries
   Don't block validator thread
   Return results via callback

3. Background tasks:
   Cache warming
   Prefetching likely-needed nodes
   Doesn't impact real-time performance

Initialization and Shutdown

Startup Sequence:

void DatabaseNodeImp::open(DatabaseConfig const& config) {
    // Step 1: Parse configuration
    std::string backend_type = config.get<std::string>("type");
    std::string database_path = config.get<std::string>("path");

    // Step 2: Create backend instance
    mBackend = createBackend(backend_type, database_path);

    // Step 3: Open backend (connects to database)
    Status status = mBackend->open();
    if (status != Status::ok) {
        throw std::runtime_error("Failed to open database");
    }

    // Step 4: Allocate cache
    size_t cache_size_mb = config.get<size_t>("cache_size");
    mCache.setMaxSize(cache_size_mb * 1024 * 1024);

    // Step 5: Start background threads
    int num_threads = config.get<int>("async_threads", 4);
    for (int i = 0; i < num_threads; ++i) {
        mThreadPool.emplace_back([this] { asyncWorkerThread(); });
    }

    // Step 6: Optional: import from another database
    if (config.has("import_db")) {
        importFromDatabase(config.get<std::string>("import_db"));
    }

    // Step 7: Ready for operations
    mReady = true;
}

Shutdown Sequence:

void DatabaseNodeImp::close() {
    // Step 1: Stop accepting new operations
    mReady = false;

    // Step 2: Wait for in-flight async operations to complete
    mAsyncQueue.stop();
    for (auto& thread : mThreadPool) {
        thread.join();
    }

    // Step 3: Flush any pending writes
    // (Most backends buffer writes)
    mBackend->flush();

    // Step 4: Clear cache (will be regenerated on restart)
    mCache.clear();

    // Step 5: Close backend database
    Status status = mBackend->close();
    if (status != Status::ok) {
        logWarning("Backend close not clean", status);
    }
}

DatabaseRotatingImp: Advanced Rotation

For production systems needing online deletion:

Problem Solved:

Without deletion, database grows unbounded:

Each ledger adds new nodes
Over time: thousands of gigabytes
Eventually: disk full
Options:
  1. Stop validator (unacceptable)
  2. Manual pruning (requires downtime)
  3. Rotation (online deletion)

Rotation Architecture:

         Application

    DatabaseRotatingImp
      /   |   \
     /    |    \
Writable Archive Cache
Backend   Backend
(New)     (Old)

Rotation Process:

void DatabaseRotatingImp::rotate() {
    // Step 1: Stop writes to current backend
    auto old_writable = mWritableBackend;
    auto old_archive = mArchiveBackend;

    // Step 2: Create new writable backend
    mWritableBackend = createNewBackend();
    mWritableBackend->open();

    // Step 3: Transition current writable → archive
    mArchiveBackend = old_writable;

    // Step 4: Delete old archive (in background)
    deleteBackendAsync(old_archive);

    // Step 5: Copy critical data if needed
    // (e.g., ledger headers required for validation)
    copyCriticalData(old_archive, mWritableBackend);

    // Step 6: Continue operation with no downtime
}

Dual Fetch Logic:

std::shared_ptr<NodeObject> DatabaseRotatingImp::fetchNodeObject(
    uint256 const& hash,
    uint32_t ledgerSeq,
    bool duplicate)
{
    // Check cache first
    auto cached = mCache.get(hash);
    if (cached) {
        return cached;
    }

    // Try writable (current ledgers)
    auto obj = mWritableBackend->fetch(hash);
    if (obj) {
        mCache.insert(hash, obj);
        return obj;
    }

    // Try archive (older ledgers)
    obj = mArchiveBackend->fetch(hash);
    if (obj) {
        mCache.insert(hash, obj);

        // Optionally duplicate to writable for longevity
        if (duplicate) {
            mWritableBackend->store(hash, obj);
        }

        return obj;
    }

    return nullptr;
}

Benefits:

With rotation:
  Ledger 1000000: stored in Writable
  Ledger 1000001-1100000: stored in Writable
  Ledger 900000-999999: stored in Archive

  Ledger 900000: Delete ledger → Keep recent 100k only
  Old archive deleted → Disk space reclaimed

  No downtime, no backups needed, bounded growth

Metrics and Monitoring

NodeStore exposes comprehensive metrics for monitoring:

struct NodeStoreMetrics {
    // Storage metrics
    uint64_t objectsWritten;
    uint64_t bytesWritten;
    std::chrono::microseconds writeLatency;

    // Retrieval metrics
    uint64_t objectsFetched;
    uint64_t cacheHits;
    uint64_t cacheMisses;
    uint64_t bytesFetched;
    std::chrono::microseconds fetchLatency;

    // Cache metrics
    size_t cacheObjects;
    double cacheHitRate() const {
        return cacheHits / (double)(cacheHits + cacheMisses);
    }

    // Threading metrics
    size_t asyncQueueDepth;
    int activeAsyncThreads;
};

Monitoring Typical Values:

Hit rate: 92-96% (well-configured systems)
Write latency: 0.1-1 ms per object
Fetch latency: 0.01-0.1 ms per object (mostly cache hits)
Cache size: 128MB - 2GB
Async queue depth: 0-100 (queue length)

Alert Thresholds:

If hit rate < 80%:        Cache too small or thrashing
If write latency > 10ms:   Backend I/O struggling
If queue depth > 10000:    Not keeping up with load
If fetch latency > 100ms:  Serious performance issue

Summary

Key Operational Components:

  1. Fetch/Store: Core operations with caching

  2. Batch Operations: Efficient bulk operations

  3. Async Operations: Background threads for non-blocking I/O

  4. Lifecycle: Startup, shutdown, error handling

  5. Rotation: Online deletion and archival

  6. Metrics: Monitoring and diagnostics

Design Properties:

  • Reliability: Graceful error handling, no data loss

  • Performance: Batch operations, async threading

  • Scalability: Online rotation enables unbounded operation

  • Observability: Comprehensive metrics for diagnostics

  • Flexibility: Different database backends, configurations

The Database layer transforms raw backend capabilities into a reliable, performant storage system that can handle blockchain-scale data volumes while maintaining the responsiveness required for real-time validation.

Last updated