Executive Summary
OpenWit’s storage layer receives columnar Arrow RecordBatches from Ingest over Arrow Flight, appends them to Active Parquet files, finalizes them into Stable Parquet files on rotation, uploads those stable files to an object store through OpenDAL, and registers authoritative file metadata in Postgres so Indexer and Query can prune, find, and scan the right data. The primary goals are durability, high throughput, compact cloud-efficient files, and fast queryability over Parquet and index artifacts.
End-to-end flow at a glance
- Receive: Storage exposes an internal Arrow Flight handler that deserializes incoming frames into ArrowFlightBatch { client, batch_id, record_batch, start_ts, end_ts } and enqueues them to the StorageProcessor. This keeps the client identity and time range attached to every batch.
- Route & append: StorageProcessor selects or creates an ActiveFile per client and appends RecordBatches using blocking Parquet I/O behind a thread boundary. It tracks per-file counters like bytes and rows as batches become Parquet row groups.
- Rotate: After each append, the manager evaluates size and time thresholds. When either threshold is met, the active file is finalized, validated, renamed to a Stable Parquet following the naming convention, and returned for upload.
- Upload: An OpenDAL‐backed uploader moves the stable file to the configured object store with controlled concurrency and retry/backoff. On success, storage writes a Postgres row with URI, size, time range, schema hash, and row count, then emits success metrics
- Index trigger & cleanup: Storage notifies Indexer to produce the configured artifacts and update metadata. Depending on local retention, storage can delete or keep local copies once indexes and metadata are in place.
This append → rotate → upload → register → index lifecycle is the canonical path across implementations.
Core building blocks
- StorageProcessor orchestrates the flow from Arrow input to Parquet output and uploads.
- ParquetFileManager tracks per-client Active and Stable files and performs append, rotation, and finalization.
- ObjectUploader (OpenDAL) uploads stable Parquet with retries and metrics.
- Postgres is the metastore for authoritative file records.
- Indexer builds and uploads index artifacts once files arrive.
- DataFusion/Ballista reads those files and indexes for SQL execution.
Data representation and naming
RecordBatches are appended into Parquet row groups during the active phase. Finalized stable files follow a deterministic naming pattern:
{client}_{timestamp}_{ulid}_{status}.parquetwith status as active or stable. This helps reconciliation and deterministic scanning.
Postgres metadata per stable file includes: file_id (ULID), client_name, uri, start_ts, end_ts, row_count, uncompressed_size, compressed_size, schema_hash, indexes (list), uploaded_at. These columns enable time-range pruning and file discovery by the search tier.
Rotation, compaction, and file quality
Rotation is driven by size and time thresholds, so you achieve cloud-efficient files while still flushing under sparse traffic. The docs recommend targeting ≥128–256 MB per file for object store efficiency. Time-based rotation guarantees progress when volume is low. Background compaction merges many small stable files into larger Parquet objects, with parameters for minimum files to compact, max files per task, parallelism, and memory limits. Compaction must be atomic and validated; the implementation writes to a temporary path, then renames on success, with optional hard links for faster transitions.
Upload, retries, and correctness
OpenDAL provides a unified client for S3, Azure, or GCS. The uploader retries with exponential backoff and jitter on throttling or transient server errors, fails fast on hard authorization errors, and limits concurrent uploads to avoid store-side throttling. Postgres upserts make metadata registration idempotent, so replays do not create duplicates.
Lifecycle states and transitions
| State | Description |
|---|---|
| Received | Arrow Flight batch accepted and queued to the processor. |
| Appended | Batch appended to Active Parquet as one or more row groups. |
| Rotated | Active file meets rotation conditions and is finalized to Stable Parquet. |
| Uploaded | Stable file uploaded via OpenDAL to the object store. |
| MetaRegistered | Postgres row written with file URI, schema hash, sizes, and time range. |
| Indexed | Index artifacts created and registered for the file. |
| Evicted | Local file pruned per retention policy after successful indexing and registration. |
Each transition emits metrics and spans. Failures trigger retries and janitor remediation.
How Query benefits from storage metadata and indexes
After uploading, the storage node writes a complete metadata row for every stable file and signals Indexer. Indexer produces bitmap, zonemap, bloom/loom, and Tantivy artifacts, uploads them, and updates the index metadata. At query time, the engine uses directory listing and Parquet statistics with these index files to prune aggressively and minimize I/O before executing the final plan in DataFusion.
Configuration knobs that shape behavior
- Arrow Flight port and time thresholds control receive side and in-memory buffering before Parquet:
service_ports.storage.arrow_flight,storage.ram_time_threshold_seconds,storage.disk_time_threshold_seconds. - Rotation thresholds drive active→stable transitions:
storage.file_rotation.file_size_mb,storage.file_rotation.file_duration_minutes. - Parquet writer and row group sizing impact memory and pruning granularity:
storage.parquet.row_group_size. Larger groups reduce object count but increase read memory. - Upload concurrency and retries protect throughput without tripping throttling; idempotent metadata prevents duplicates on retry.
Durability, tiers, and system context
Data becomes durable long before upload because ingestion requires short-term WAL to succeed before acknowledging. Long-term WAL and Parquet uploads give full crash recovery. Once in object storage, the data participates in your hot/warm/cold cache tiers model across Electro (RAM), Pyro (disk), and Cryo (cloud).
Operations: what to watch continuously
- WAL write throughput and latency because fsync spikes affect end-to-end ack time.
- Buffer utilization and batch size distribution to keep a stable throughput.
- Arrow Flight success ratio and retry counts to detect transport issues early.
- Storage upload latency and failure rate, which drive backlog and retention pressure.
- Short-term WAL backlog is a leading indicator that upload or indexing is lagging.
For troubleshooting, check WAL IOPS and sync_on_write for high ack latency, scaling storage or throttling producers when backpressure occurs, validating WAL recovery if replays increase, and running janitor reconciliation when object uploads succeed but Postgres metadata is missing.