Architecture Overview
The storage pipeline receives Arrow RecordBatch streams from Ingest over Arrow Flight, appends them into Active Parquet files, rotates those files into Stable Parquet when size or time thresholds are met, uploads stable files to an object store through OpenDAL, and registers an authoritative record in Postgres.
Once metadata is written the storage node signals the Indexer to build artifacts that the Search layer uses for pruning and fast scans. Each state transition is observable. There are defined retry paths and a reconciliation process if anything goes wrong.
1. Position in the end-to-end flow
Data arrives through Kafka or gRPC or HTTP, passes the Ingestion Gateway for auth, schema validation, normalization, and batching, then reaches the Ingest node, which performs deduplication, short-term WAL, long-term WAL aggregation, and Arrow RecordBatch creation. Storage receives Arrow over Flight, builds Active and Stable Parquet, uploads to object storage, and registers metadata. The Indexer then builds bitmap, bloom or loom, zonemap, and Tantivy artifacts. Search uses these artifacts with Parquet statistics to prune and execute SQL or full-text queries.
Tiered persistence layout
| Type | Purpose | Default | Backed by |
|---|---|---|---|
| Short-term WAL | Immediate durability | ./data/wal/short | Local disk |
| Long-term WAL | Aggregated daily durability | ./data/wal/long | Local disk |
| Active Parquet | Real-time Arrow → Parquet buffer | ./data/storage/active | Local |
| Stable Parquet | Finalized Parquet files | ./data/storage/stable | Local |
| Object Store | Durable cloud storage | s3://bucket/path | S3 or Azure or GCS |
| Metadata Store | Source of truth for metadata | Postgres | Postgres |
| Cache tiers | Electro (RAM), Pyro (disk), Cryo (cloud) | Configured | RAM, SSD, or object store |
2. Core components
| Layer | Component | Responsibility |
|---|---|---|
| Ingress boundary | StorageProcessor | Receives Arrow batches, routes per client, appends to the active file, checks rotation, hands stable files to the uploader, writes metadata after upload, and triggers indexing. |
| File management | ParquetFileManager | Tracks per-client Active and Stable files, performs appends on a blocking I/O thread, enforces size or time thresholds, and finalizes to stable. |
| Parquet I/O | ParquetWriter trait and LocalParquetWriter | create_active, append_batch, finalize; Parquet writes run in spawn_blocking. |
| Uploader | ObjectUploader trait and OpenDALUploader | Uploads stable Parquet to S3 or Azure or GCS with bounded concurrency and retry. Metadata writes are idempotent. |
| Metastore | Postgres | Truth store for file and index records with URI, sizes, schema hash, and time ranges that Search uses to prune reads. |
| Index integration | Indexer | Builds and uploads artifacts, then updates index metadata so queries can prune by index. |
3. End-to-End sequence
The sequence below merges the complete lifecycle with the architectural view, so a reader can follow a batch from the first Arrow frame to the indexed Parquet in the cloud.
Preconditions
The storage container reads the unified config, identifies its role, starts the Arrow Flight service, prepares local directories for active and stable files, connects to object storage through OpenDAL, and connects to Postgres. Health and node metadata are exposed to the control plane.
1. Receive Arrow over Flight
Ingest opens a Flight DoPut stream and sends Arrow RecordBatch frames. The Storage Flight handler wraps each frame as ArrowFlightBatch { client, batch_id, record_batch, start_ts, end_ts } and sends it into a Tokio MPSC channel consumed by StorageProcessor.
2. Route per client and ensure an active file
StorageProcessor extracts the client name from metadata or topic and asks ParquetFileManager for that client’s active handle. If none exists, a new active file is created under ./data/storage, with creation time and counters tracked in memory.
3. Append Arrow to Parquet row groups
The manager appends the RecordBatch as one or more Parquet row groups. Parquet I/O is blocking and runs in a blocking thread. After the write, the in-memory counters for bytes and rows are updated. Compression uses ZSTD level 3, as specified in the doc.
4. Check rotation on size or time
After every append, the manager checks if the estimated compressed size has reached the size threshold or if the elapsed time since creation has reached the time threshold. If any threshold is met, it proceeds to finalize.
5. Finalize to a stable file
The writer flushes remaining row groups, writes the Parquet footer, and renames the file from active to a deterministic stable name:
{client}_{timestamp}_{ulid}_{status}.parquet // status = active or stableA StableFileInfo is produced with path, size, row count, and time range.
6. Upload stable Parquet to object storage
The uploader pushes the stable file through OpenDAL with limited concurrency. Throttle errors and server errors use exponential backoff with jitter. Auth errors fail fast, and the file is kept locally. On success, the latency and bytes are recorded.
7. Write authoritative metadata in Postgres
A row is created or upserted with fields:
| Field | Meaning |
|---|---|
file_id ULID | Unique stable file ID |
client_name | Logical tenant or source |
uri | Object path in cloud store |
start_ts, end_ts | Time range covered by the file |
row_count | Total rows written |
uncompressed_size, compressed_size | Size counter |
schema_hash | Version and compatibility |
indexes (list) | Artifacts linked to the file |
uploaded_at | Upload completion time |
This record is the discovery source for the Indexer and Search.
8. Trigger indexing
Storage notifies Indexer with the file URI, schema, and time range. Indexer builds the configured artifacts, uploads them to the object store, and writes index metadata.
Artifact types and formats:
- Zone maps: min or max per row group; JSON or compact binary compressed as zone.<file_ulid>.zst. Used to drop row groups outside filter ranges.
- Bloom filters: probabilistic membership per column with target false positive rate; serialized binary bloom::<file_ulid>::colname.bloom.zst. Used to skip files or row groups.
- Bitmap (Roaring): exact membership for categorical fields, stored as roaring bitmaps with dictionary metadata.
- Tantivy: full-text index exported as a tar.zst for portability.
9. Local retention and eviction
Once metadata and index registration are confirmed, local stable files can be deleted immediately or kept for a configured retention window. The behavior depends on retention settings.
10. Background compaction
Periodic compaction merges many small stable files into larger Parquet objects. Parameters include minimum files to compact, maximum files per task, parallelism, and memory limits. The write is validated to a temporary file and atomically renamed. Hard links can be used if enabled.
4. Append path in detail
This summarizes the concrete steps the code takes to turn an Arrow batch into a Parquet object.
- Receive: StorageArrowFlightReceiver converts gRPC frames to ArrowFlightBatch and sends to StorageProcessor through an MPSC channel.
- Route: StorageProcessor extracts client_name and selects or creates the client’s ActiveFileInfo.
- Append: ParquetFileManager.write_batch(client, record_batch) runs the Parquet append inside tokio::task::spawn_blocking. Counters for bytes and rows are updated after the write.
- Rotate: after append, the manager checks size and time thresholds. If true, it finalizes, renames to the stable name, and enqueues the file for upload.
5. File states and naming
| State | Description |
|---|---|
| Active | Open Parquet file receiving row groups in ./data/storage. |
| Stable | Finalized Parquet file ready for upload. |
| Uploaded | Object persisted in cloud storage. |
| MetaRegistered | Postgres row exists with URI, sizes, schema hash, and time range. |
| Indexed | Index artifacts uploaded and registered. |
| Evicted | Local copy deleted according to retention policy. |
Deterministic naming:
{client}_{timestamp}_{ulid}_{status}.parquetThis keeps scanning and reconciliation simple.
6. Communication boundaries
- Arrow Flight carries high-throughput columnar data from Ingest to Storage.
- gRPC carries control commands and the post-upload trigger to Indexer.
- Gossip (Chitchat) shares low-frequency cluster metadata and health.
7. Configuration that shapes storage behavior
| Area | Keys or rules from docs | Effect |
|---|---|---|
| Parquet layout | storage.parquet.row_group_size | Balances write and read memory with pruning granularity. |
| Rotation | storage.file_rotation.file_size_mb, storage.file_rotation.file_duration_minutes | Governs Active to Stable transitions in size or time. |
| Object store upload | Concurrency and retry with backoff and jitter | Avoids throttling and keeps writes idempotent. |
| Local retention | delete_after_upload and a retention window | Frees disk while cloud and Postgres remain authoritative. |
| File size target | 128–256 MB stable objects recommended in docs | Efficient object count and read performance. |
8. Observability signals
Metrics include upload latency and bytes, files uploaded, rotation counts, append latency, and error counters for Parquet writes and compaction. The storage.upload_success callout shows latency and bytes.
Traces capture the path storage.receive → append → rotate → upload → meta_write → index_trigger.
Logs include file_id, client_name, uri, rows, bytes, and per-step durations for straightforward reconciliation.
9. Data representation across the pipeline
| Stage | Format or Unit |
|---|---|
| Entry → Gateway | JSON or Protobuf |
| Gateway → Ingest | gRPC bulk proto |
| Ingest → Storage | Arrow RecordBatch |
| Storage → Object | Parquet |
| Indexes → Object Store | Index file formats: bitmap, tantivy, bloom or loom, zonemap |
| Search → Proxy | Arrow RecordBatch results |