Observability: Comprehensive metrics for diagnostics
Flexibility: Different database backends, configurations
The Database layer transforms raw backend capabilities into a reliable, performant storage system that can handle blockchain-scale data volumes while maintaining the responsiveness required for real-time validation.
void DatabaseNodeImp::asyncFetch(
uint256 const& hash,
std::function<void(std::shared_ptr<NodeObject>)> callback)
{
// Step 1: Queue request
mAsyncQueue.enqueue({hash, callback});
// Step 2: Background thread processes
// Wakes up, dequeues batch, fetches, invokes callbacks
// Meanwhile, caller continues without blocking
}
// Thread pool implementation
void asyncWorkerThread() {
while (running) {
// Wait for work or timeout
auto batch = mAsyncQueue.dequeueBatch(timeout);
if (batch.empty()) {
continue;
}
// Fetch all in batch (more efficient)
std::vector<uint256> hashes;
for (auto const& [hash, callback] : batch) {
hashes.push_back(hash);
}
auto results = mBackend->fetchBatch(hashes);
// Invoke callbacks
for (auto const& [hash, callback] : batch) {
auto result = results[hash];
callback(result);
}
}
}
1. Synchronization:
Requesting many nodes from network
Can queue hundreds of async fetches
Process results as they arrive
2. API queries:
Historical account queries
Don't block validator thread
Return results via callback
3. Background tasks:
Cache warming
Prefetching likely-needed nodes
Doesn't impact real-time performance
void DatabaseNodeImp::open(DatabaseConfig const& config) {
// Step 1: Parse configuration
std::string backend_type = config.get<std::string>("type");
std::string database_path = config.get<std::string>("path");
// Step 2: Create backend instance
mBackend = createBackend(backend_type, database_path);
// Step 3: Open backend (connects to database)
Status status = mBackend->open();
if (status != Status::ok) {
throw std::runtime_error("Failed to open database");
}
// Step 4: Allocate cache
size_t cache_size_mb = config.get<size_t>("cache_size");
mCache.setMaxSize(cache_size_mb * 1024 * 1024);
// Step 5: Start background threads
int num_threads = config.get<int>("async_threads", 4);
for (int i = 0; i < num_threads; ++i) {
mThreadPool.emplace_back([this] { asyncWorkerThread(); });
}
// Step 6: Optional: import from another database
if (config.has("import_db")) {
importFromDatabase(config.get<std::string>("import_db"));
}
// Step 7: Ready for operations
mReady = true;
}
void DatabaseNodeImp::close() {
// Step 1: Stop accepting new operations
mReady = false;
// Step 2: Wait for in-flight async operations to complete
mAsyncQueue.stop();
for (auto& thread : mThreadPool) {
thread.join();
}
// Step 3: Flush any pending writes
// (Most backends buffer writes)
mBackend->flush();
// Step 4: Clear cache (will be regenerated on restart)
mCache.clear();
// Step 5: Close backend database
Status status = mBackend->close();
if (status != Status::ok) {
logWarning("Backend close not clean", status);
}
}
Each ledger adds new nodes
Over time: thousands of gigabytes
Eventually: disk full
Options:
1. Stop validator (unacceptable)
2. Manual pruning (requires downtime)
3. Rotation (online deletion)
void DatabaseRotatingImp::rotate() {
// Step 1: Stop writes to current backend
auto old_writable = mWritableBackend;
auto old_archive = mArchiveBackend;
// Step 2: Create new writable backend
mWritableBackend = createNewBackend();
mWritableBackend->open();
// Step 3: Transition current writable → archive
mArchiveBackend = old_writable;
// Step 4: Delete old archive (in background)
deleteBackendAsync(old_archive);
// Step 5: Copy critical data if needed
// (e.g., ledger headers required for validation)
copyCriticalData(old_archive, mWritableBackend);
// Step 6: Continue operation with no downtime
}
std::shared_ptr<NodeObject> DatabaseRotatingImp::fetchNodeObject(
uint256 const& hash,
uint32_t ledgerSeq,
bool duplicate)
{
// Check cache first
auto cached = mCache.get(hash);
if (cached) {
return cached;
}
// Try writable (current ledgers)
auto obj = mWritableBackend->fetch(hash);
if (obj) {
mCache.insert(hash, obj);
return obj;
}
// Try archive (older ledgers)
obj = mArchiveBackend->fetch(hash);
if (obj) {
mCache.insert(hash, obj);
// Optionally duplicate to writable for longevity
if (duplicate) {
mWritableBackend->store(hash, obj);
}
return obj;
}
return nullptr;
}
With rotation:
Ledger 1000000: stored in Writable
Ledger 1000001-1100000: stored in Writable
Ledger 900000-999999: stored in Archive
Ledger 900000: Delete ledger → Keep recent 100k only
Old archive deleted → Disk space reclaimed
No downtime, no backups needed, bounded growth
Hit rate: 92-96% (well-configured systems)
Write latency: 0.1-1 ms per object
Fetch latency: 0.01-0.1 ms per object (mostly cache hits)
Cache size: 128MB - 2GB
Async queue depth: 0-100 (queue length)
If hit rate < 80%: Cache too small or thrashing
If write latency > 10ms: Backend I/O struggling
If queue depth > 10000: Not keeping up with load
If fetch latency > 100ms: Serious performance issue