Persistence Guide

InputLayer provides durable storage with crash recovery, using a combination of Write-Ahead Logging (WAL) and Parquet batch files.

Architecture Overview

Insert/Delete
    ↓
Update{data, time, diff}
    ↓
WAL (immediate durability)
    ↓
In-memory buffer
    ↓ (when buffer full)
Batch file (Parquet)

Recovery Flow

On startup, InputLayer:

  1. Loads shard metadata from disk
  2. Reads batch files (Parquet)
  3. Replays WAL (uncommitted updates)
  4. Consolidates to get current state

Directory Structure

data/
├── persist/
│   ├── shards/           # Shard metadata (JSON)
│   │   ├── default_edge.json
│   │   └── default_node.json
│   ├── batches/          # Data files (Parquet)
│   │   ├── 1.parquet
│   │   └── 2.parquet
│   └── wal/              # Write-ahead log
│       └── current.wal

Durability Modes

InputLayer offers three durability modes, configurable via config.toml:

Immediate Mode (Default)

Every write syncs to disk before returning:

[storage.persist]
durability_mode = "immediate"
PropertyValue
Write LatencyHighest
Crash SafetyFull (zero data loss)
Use CaseFinancial data, critical records

Batched Mode

Writes buffer in memory with periodic sync:

[storage.persist]
durability_mode = "batched"
buffer_size = 10000
PropertyValue
Write LatencyMedium
Crash SafetyPartial (may lose last batch)
Use CaseMost production workloads

Async Mode

Writes return immediately; background persistence:

[storage.persist]
durability_mode = "async"
PropertyValue
Write LatencyLowest
Crash SafetyMinimal (may lose recent updates)
Use CaseAnalytics pipelines, high-throughput ingestion

Write-Ahead Log (WAL)

The WAL provides O(1) append-only persistence with immediate durability.

WAL Entry Format

Each entry is a checksummed JSON line (format: <crc32hex>:<json>):

{"shard":"default:edge","update":{"data":[1,2],"time":1,"diff":1}}
{"shard":"default:edge","update":{"data":[1,2],"time":2,"diff":-1}}

WAL Entry Fields

FieldDescription
shardShard name in "kg:relation" format
update.dataThe tuple data
update.timeLogical timestamp (monotonically increasing)
update.diffMultiplicity change: +1 for insert, -1 for delete

Each line is prefixed with a CRC32 checksum for integrity verification during recovery.

Automatic Compaction

WAL entries are compacted to Parquet when:

  • Buffer reaches configured size (buffer_size)
  • Manual flush is triggered
  • Server shutdown (clean)

After compaction, the WAL is archived and cleared.


Batch Files (Parquet)

Data is stored in columnar Parquet format for efficient queries.

Parquet Schema

Each batch file contains:

  • Data columns: Your tuple fields
  • time: Update timestamp (UInt64)
  • diff: +1 for insert, -1 for delete (Int64)

Compression

Snappy compression is used by default (fast decompression, good ratio).

Example File

batches/1.parquet
├── col0: Int32 [1, 3, 5]
├── col1: Int32 [2, 4, 6]
├── time: UInt64 [10, 20, 30]
└── diff: Int64 [1, 1, 1]

Shards

Each relation is stored as a separate "shard" with its own:

  • Metadata file (JSON)
  • Batch files (Parquet)
  • WAL entries

Shard Metadata

{
  "name": "default:edge",
  "since": 0,
  "upper": 100,
  "batches": [
    {
      "id": "1",
      "path": "batches/1.parquet",
      "lower": 0,
      "upper": 50,
      "len": 100
    }
  ]
}
FieldDescription
nameShard identifier (kg:relation)
sinceLower bound frontier (history discarded before this)
upperUpper bound frontier (latest update time + 1)
batchesList of batch file references

Compaction

Compaction consolidates history and reclaims space.

Manual Compaction

.compact

This:

  1. Flushes all pending writes
  2. Merges batch files
  3. Removes historical entries before since frontier
  4. Clears WAL

Automatic Compaction

Compaction is triggered automatically when a shard accumulates more than auto_compact_threshold batch files (default: 10). The check runs every auto_compact_interval_secs seconds (default: 300).

[storage.persist]
auto_compact_threshold = 10      # Compact after this many batch files
auto_compact_interval_secs = 300  # Check interval (seconds)

Configuration Reference

[storage]
# Base directory for all data
data_dir = "./data"

[storage.persist]
# Enable DD-native persistence
enabled = true

# Buffer size before flushing to Parquet
buffer_size = 10000

# Durability: immediate, batched, async
durability_mode = "immediate"

# Auto-compact when this many batch files accumulate (0 = disabled)
auto_compact_threshold = 10

# Check interval for auto-compaction in seconds (0 = disabled)
auto_compact_interval_secs = 300

Best Practices

Development

[storage.persist]
durability_mode = "async"
buffer_size = 1000

Fast iteration, acceptable data loss on crashes.

Production

[storage.persist]
durability_mode = "immediate"
buffer_size = 10000

Maximum safety, reasonable performance.

High-Throughput Ingestion

[storage.persist]
durability_mode = "batched"
buffer_size = 100000

[storage.performance]
batch_size = 10000
async_io = true

Balance between throughput and safety.

Memory-Constrained

[storage.persist]
buffer_size = 1000
auto_compact_threshold = 5

[storage.performance]
initial_capacity = 1000
batch_size = 100

Frequent flushes, aggressive compaction.


Monitoring

Check Storage Status

.status

Shows:

  • Data directory location
  • Number of shards
  • WAL size
  • Buffer status

Check Shard Info

.rel

Lists all relations with row counts.


Recovery Scenarios

Normal Startup

  1. Load shard metadata
  2. Read Parquet batch files
  3. Replay WAL entries
  4. Consolidate to current state

Crash Recovery

Same as normal startup. WAL ensures all committed writes are recovered.

Corrupted Parquet

If a batch file is corrupted:

  1. WAL entries for that batch may still be available
  2. Manually remove corrupted .parquet file
  3. Restart to trigger WAL replay

Corrupted WAL

If WAL is corrupted:

  1. Data in Parquet files is safe
  2. Uncommitted writes since last flush are lost
  3. Rename/remove corrupted WAL file
  4. Restart

Differential Updates

InputLayer uses differential dataflow semantics internally:

Update {
    data: Tuple,    // The actual data
    time: u64,      // Logical timestamp
    diff: i64,      // +1 = insert, -1 = delete
}

Consolidation

Multiple updates to the same tuple are consolidated:

UpdatesConsolidated
+1, +1+2 (duplicate insert)
+1, -10 (cancelled out)
+1, -1, +1+1 (net insert)

This enables:

  • Efficient delta storage
  • Time-travel queries (if history preserved)
  • Incremental computation

Troubleshooting

High Write Latency

Cause: Immediate durability mode with slow disk

Solutions:

  1. Switch to batched durability mode
  2. Use faster storage (SSD)
  3. Increase buffer_size to batch more writes

High Memory Usage

Cause: Large buffer, many shards

Solutions:

  1. Reduce buffer_size
  2. Lower auto_compact_threshold for more frequent compaction
  3. Flush more frequently

Slow Startup

Cause: Large WAL, many batch files

Solutions:

  1. Run .compact before shutdown
  2. Enable compaction window
  3. Flush buffers before shutdown

Missing Data After Crash

Cause: Async durability mode

Solutions:

  1. Switch to immediate or batched mode
  2. Accept trade-off for async mode

Next Steps