Persistence Guide
InputLayer provides durable storage with crash recovery, using a combination of Write-Ahead Logging (WAL) and Parquet batch files.
Architecture Overview
Insert/Delete
↓
Update{data, time, diff}
↓
WAL (immediate durability)
↓
In-memory buffer
↓ (when buffer full)
Batch file (Parquet)
Recovery Flow
On startup, InputLayer:
- Loads shard metadata from disk
- Reads batch files (Parquet)
- Replays WAL (uncommitted updates)
- Consolidates to get current state
Directory Structure
data/
├── persist/
│ ├── shards/ # Shard metadata (JSON)
│ │ ├── default_edge.json
│ │ └── default_node.json
│ ├── batches/ # Data files (Parquet)
│ │ ├── 1.parquet
│ │ └── 2.parquet
│ └── wal/ # Write-ahead log
│ └── current.wal
Durability Modes
InputLayer offers three durability modes, configurable via config.toml:
Immediate Mode (Default)
Every write syncs to disk before returning:
durability_mode = "immediate"
| Property | Value |
|---|---|
| Write Latency | Highest |
| Crash Safety | Full (zero data loss) |
| Use Case | Financial data, critical records |
Batched Mode
Writes buffer in memory with periodic sync:
durability_mode = "batched"
buffer_size = 10000
| Property | Value |
|---|---|
| Write Latency | Medium |
| Crash Safety | Partial (may lose last batch) |
| Use Case | Most production workloads |
Async Mode
Writes return immediately; background persistence:
durability_mode = "async"
| Property | Value |
|---|---|
| Write Latency | Lowest |
| Crash Safety | Minimal (may lose recent updates) |
| Use Case | Analytics pipelines, high-throughput ingestion |
Write-Ahead Log (WAL)
The WAL provides O(1) append-only persistence with immediate durability.
WAL Entry Format
Each entry is a checksummed JSON line (format: <crc32hex>:<json>):
{"shard":"default:edge","update":{"data":[1,2],"time":1,"diff":1}}
{"shard":"default:edge","update":{"data":[1,2],"time":2,"diff":-1}}
WAL Entry Fields
| Field | Description |
|---|---|
shard | Shard name in "kg:relation" format |
update.data | The tuple data |
update.time | Logical timestamp (monotonically increasing) |
update.diff | Multiplicity change: +1 for insert, -1 for delete |
Each line is prefixed with a CRC32 checksum for integrity verification during recovery.
Automatic Compaction
WAL entries are compacted to Parquet when:
- Buffer reaches configured size (
buffer_size) - Manual flush is triggered
- Server shutdown (clean)
After compaction, the WAL is archived and cleared.
Batch Files (Parquet)
Data is stored in columnar Parquet format for efficient queries.
Parquet Schema
Each batch file contains:
- Data columns: Your tuple fields
- time: Update timestamp (UInt64)
- diff: +1 for insert, -1 for delete (Int64)
Compression
Snappy compression is used by default (fast decompression, good ratio).
Example File
batches/1.parquet
├── col0: Int32 [1, 3, 5]
├── col1: Int32 [2, 4, 6]
├── time: UInt64 [10, 20, 30]
└── diff: Int64 [1, 1, 1]
Shards
Each relation is stored as a separate "shard" with its own:
- Metadata file (JSON)
- Batch files (Parquet)
- WAL entries
Shard Metadata
{
"name": "default:edge",
"since": 0,
"upper": 100,
"batches": [
{
"id": "1",
"path": "batches/1.parquet",
"lower": 0,
"upper": 50,
"len": 100
}
]
}
| Field | Description |
|---|---|
name | Shard identifier (kg:relation) |
since | Lower bound frontier (history discarded before this) |
upper | Upper bound frontier (latest update time + 1) |
batches | List of batch file references |
Compaction
Compaction consolidates history and reclaims space.
Manual Compaction
This:
- Flushes all pending writes
- Merges batch files
- Removes historical entries before
sincefrontier - Clears WAL
Automatic Compaction
Compaction is triggered automatically when a shard accumulates more than auto_compact_threshold batch files (default: 10). The check runs every auto_compact_interval_secs seconds (default: 300).
auto_compact_threshold = 10 # Compact after this many batch files
auto_compact_interval_secs = 300 # Check interval (seconds)
Configuration Reference
# Base directory for all data
data_dir = "./data"
# Enable DD-native persistence
enabled = true
# Buffer size before flushing to Parquet
buffer_size = 10000
# Durability: immediate, batched, async
durability_mode = "immediate"
# Auto-compact when this many batch files accumulate (0 = disabled)
auto_compact_threshold = 10
# Check interval for auto-compaction in seconds (0 = disabled)
auto_compact_interval_secs = 300
Best Practices
Development
durability_mode = "async"
buffer_size = 1000
Fast iteration, acceptable data loss on crashes.
Production
durability_mode = "immediate"
buffer_size = 10000
Maximum safety, reasonable performance.
High-Throughput Ingestion
durability_mode = "batched"
buffer_size = 100000
batch_size = 10000
async_io = true
Balance between throughput and safety.
Memory-Constrained
buffer_size = 1000
auto_compact_threshold = 5
initial_capacity = 1000
batch_size = 100
Frequent flushes, aggressive compaction.
Monitoring
Check Storage Status
.status
Shows:
- Data directory location
- Number of shards
- WAL size
- Buffer status
Check Shard Info
Lists all relations with row counts.
Recovery Scenarios
Normal Startup
- Load shard metadata
- Read Parquet batch files
- Replay WAL entries
- Consolidate to current state
Crash Recovery
Same as normal startup. WAL ensures all committed writes are recovered.
Corrupted Parquet
If a batch file is corrupted:
- WAL entries for that batch may still be available
- Manually remove corrupted
.parquetfile - Restart to trigger WAL replay
Corrupted WAL
If WAL is corrupted:
- Data in Parquet files is safe
- Uncommitted writes since last flush are lost
- Rename/remove corrupted WAL file
- Restart
Differential Updates
InputLayer uses differential dataflow semantics internally:
Update {
data: Tuple, // The actual data
time: u64, // Logical timestamp
diff: i64, // +1 = insert, -1 = delete
}
Consolidation
Multiple updates to the same tuple are consolidated:
| Updates | Consolidated |
|---|---|
| +1, +1 | +2 (duplicate insert) |
| +1, -1 | 0 (cancelled out) |
| +1, -1, +1 | +1 (net insert) |
This enables:
- Efficient delta storage
- Time-travel queries (if history preserved)
- Incremental computation
Troubleshooting
High Write Latency
Cause: Immediate durability mode with slow disk
Solutions:
- Switch to
batcheddurability mode - Use faster storage (SSD)
- Increase
buffer_sizeto batch more writes
High Memory Usage
Cause: Large buffer, many shards
Solutions:
- Reduce
buffer_size - Lower
auto_compact_thresholdfor more frequent compaction - Flush more frequently
Slow Startup
Cause: Large WAL, many batch files
Solutions:
- Run
.compactbefore shutdown - Enable compaction window
- Flush buffers before shutdown
Missing Data After Crash
Cause: Async durability mode
Solutions:
- Switch to
immediateorbatchedmode - Accept trade-off for async mode
Next Steps
- Configuration Guide - Full configuration reference
- REST API Guide - Programmatic access
- Temporal Functions - Time-based queries on persisted data