Architecture Overview

The storage pipeline receives Arrow RecordBatch streams from Ingest over Arrow Flight, appends them into Active Parquet files, rotates those files into Stable Parquet when size or time thresholds are met, uploads stable files to an object store through OpenDAL, and registers an authoritative record in Postgres.

Once metadata is written the storage node signals the Indexer to build artifacts that the Search layer uses for pruning and fast scans. Each state transition is observable. There are defined retry paths and a reconciliation process if anything goes wrong.

1. Position in the end-to-end flow

Data arrives through Kafka or gRPC or HTTP, passes the Ingestion Gateway for auth, schema validation, normalization, and batching, then reaches the Ingest node, which performs deduplication, short-term WAL, long-term WAL aggregation, and Arrow RecordBatch creation. Storage receives Arrow over Flight, builds Active and Stable Parquet, uploads to object storage, and registers metadata. The Indexer then builds bitmap, bloom or loom, zonemap, and Tantivy artifacts. Search uses these artifacts with Parquet statistics to prune and execute SQL or full-text queries.

Tiered persistence layout

TypePurposeDefaultBacked by
Short-term WALImmediate durability./data/wal/shortLocal disk
Long-term WALAggregated daily durability./data/wal/longLocal disk
Active ParquetReal-time Arrow → Parquet buffer./data/storage/activeLocal
Stable ParquetFinalized Parquet files./data/storage/stableLocal
Object StoreDurable cloud storages3://bucket/pathS3 or Azure or GCS
Metadata StoreSource of truth for metadataPostgresPostgres
Cache tiersElectro (RAM), Pyro (disk), Cryo (cloud)ConfiguredRAM, SSD, or object store

2. Core components

LayerComponentResponsibility
Ingress boundaryStorageProcessorReceives Arrow batches, routes per client, appends to the active file, checks rotation, hands stable files to the uploader, writes metadata after upload, and triggers indexing.
File managementParquetFileManagerTracks per-client Active and Stable files, performs appends on a blocking I/O thread, enforces size or time thresholds, and finalizes to stable.
Parquet I/OParquetWriter trait and LocalParquetWritercreate_active, append_batch, finalize; Parquet writes run in spawn_blocking.
UploaderObjectUploader trait and OpenDALUploaderUploads stable Parquet to S3 or Azure or GCS with bounded concurrency and retry. Metadata writes are idempotent.
MetastorePostgresTruth store for file and index records with URI, sizes, schema hash, and time ranges that Search uses to prune reads.
Index integrationIndexerBuilds and uploads artifacts, then updates index metadata so queries can prune by index.

3. End-to-End sequence

The sequence below merges the complete lifecycle with the architectural view, so a reader can follow a batch from the first Arrow frame to the indexed Parquet in the cloud.

Preconditions

The storage container reads the unified config, identifies its role, starts the Arrow Flight service, prepares local directories for active and stable files, connects to object storage through OpenDAL, and connects to Postgres. Health and node metadata are exposed to the control plane.

1. Receive Arrow over Flight

Ingest opens a Flight DoPut stream and sends Arrow RecordBatch frames. The Storage Flight handler wraps each frame as ArrowFlightBatch { client, batch_id, record_batch, start_ts, end_ts } and sends it into a Tokio MPSC channel consumed by StorageProcessor.

2. Route per client and ensure an active file

StorageProcessor extracts the client name from metadata or topic and asks ParquetFileManager for that client’s active handle. If none exists, a new active file is created under ./data/storage, with creation time and counters tracked in memory.

3. Append Arrow to Parquet row groups

The manager appends the RecordBatch as one or more Parquet row groups. Parquet I/O is blocking and runs in a blocking thread. After the write, the in-memory counters for bytes and rows are updated. Compression uses ZSTD level 3, as specified in the doc.

4. Check rotation on size or time

After every append, the manager checks if the estimated compressed size has reached the size threshold or if the elapsed time since creation has reached the time threshold. If any threshold is met, it proceeds to finalize.

5. Finalize to a stable file

The writer flushes remaining row groups, writes the Parquet footer, and renames the file from active to a deterministic stable name:

{client}_{timestamp}_{ulid}_{status}.parquet   // status = active or stable

A StableFileInfo is produced with path, size, row count, and time range.

6. Upload stable Parquet to object storage

The uploader pushes the stable file through OpenDAL with limited concurrency. Throttle errors and server errors use exponential backoff with jitter. Auth errors fail fast, and the file is kept locally. On success, the latency and bytes are recorded.

7. Write authoritative metadata in Postgres

A row is created or upserted with fields:

FieldMeaning
file_id ULIDUnique stable file ID
client_nameLogical tenant or source
uriObject path in cloud store
start_ts, end_tsTime range covered by the file
row_countTotal rows written
uncompressed_size, compressed_sizeSize counter
schema_hashVersion and compatibility
indexes (list)Artifacts linked to the file
uploaded_atUpload completion time

This record is the discovery source for the Indexer and Search.

8. Trigger indexing

Storage notifies Indexer with the file URI, schema, and time range. Indexer builds the configured artifacts, uploads them to the object store, and writes index metadata.

Artifact types and formats:

  • Zone maps: min or max per row group; JSON or compact binary compressed as zone.<file_ulid>.zst. Used to drop row groups outside filter ranges.
  • Bloom filters: probabilistic membership per column with target false positive rate; serialized binary bloom::<file_ulid>::colname.bloom.zst. Used to skip files or row groups.
  • Bitmap (Roaring): exact membership for categorical fields, stored as roaring bitmaps with dictionary metadata.
  • Tantivy: full-text index exported as a tar.zst for portability.

9. Local retention and eviction

Once metadata and index registration are confirmed, local stable files can be deleted immediately or kept for a configured retention window. The behavior depends on retention settings.

10. Background compaction

Periodic compaction merges many small stable files into larger Parquet objects. Parameters include minimum files to compact, maximum files per task, parallelism, and memory limits. The write is validated to a temporary file and atomically renamed. Hard links can be used if enabled.

4. Append path in detail

This summarizes the concrete steps the code takes to turn an Arrow batch into a Parquet object.

  • Receive: StorageArrowFlightReceiver converts gRPC frames to ArrowFlightBatch and sends to StorageProcessor through an MPSC channel.
  • Route: StorageProcessor extracts client_name and selects or creates the client’s ActiveFileInfo.
  • Append: ParquetFileManager.write_batch(client, record_batch) runs the Parquet append inside tokio::task::spawn_blocking. Counters for bytes and rows are updated after the write.
  • Rotate: after append, the manager checks size and time thresholds. If true, it finalizes, renames to the stable name, and enqueues the file for upload.

5. File states and naming

StateDescription
ActiveOpen Parquet file receiving row groups in ./data/storage.
StableFinalized Parquet file ready for upload.
UploadedObject persisted in cloud storage.
MetaRegisteredPostgres row exists with URI, sizes, schema hash, and time range.
IndexedIndex artifacts uploaded and registered.
EvictedLocal copy deleted according to retention policy.

Deterministic naming:

{client}_{timestamp}_{ulid}_{status}.parquet

This keeps scanning and reconciliation simple.

6. Communication boundaries

  • Arrow Flight carries high-throughput columnar data from Ingest to Storage.
  • gRPC carries control commands and the post-upload trigger to Indexer.
  • Gossip (Chitchat) shares low-frequency cluster metadata and health.

7. Configuration that shapes storage behavior

AreaKeys or rules from docsEffect
Parquet layoutstorage.parquet.row_group_sizeBalances write and read memory with pruning granularity.
Rotationstorage.file_rotation.file_size_mb, storage.file_rotation.file_duration_minutesGoverns Active to Stable transitions in size or time.
Object store uploadConcurrency and retry with backoff and jitterAvoids throttling and keeps writes idempotent.
Local retentiondelete_after_upload and a retention windowFrees disk while cloud and Postgres remain authoritative.
File size target128–256 MB stable objects recommended in docsEfficient object count and read performance.

8. Observability signals

Metrics include upload latency and bytes, files uploaded, rotation counts, append latency, and error counters for Parquet writes and compaction. The storage.upload_success callout shows latency and bytes.

Traces capture the path storage.receive → append → rotate → upload → meta_write → index_trigger.

Logs include file_id, client_name, uri, rows, bytes, and per-step durations for straightforward reconciliation.

9. Data representation across the pipeline

StageFormat or Unit
Entry → GatewayJSON or Protobuf
Gateway → IngestgRPC bulk proto
Ingest → StorageArrow RecordBatch
Storage → ObjectParquet
Indexes → Object StoreIndex file formats: bitmap, tantivy, bloom or loom, zonemap
Search → ProxyArrow RecordBatch results