Architecture Overview

This page explains the write path in depth. It covers the entry methods, the gateway, the ingest node internals, backpressure and batching, dual WAL durability, Arrow handoff to storage, the control plane and communication layers, data representation at each hop, failure modes and recovery, what to monitor, and where to tune the YAML.

1. Responsibilities of the ingestion pipeline

The ingestion pipeline accepts logs, traces, and metrics through Kafka, gRPC, and HTTP. It authenticates the caller, validates and normalizes payloads, batches by size or time, deduplicates in a window, persists to short-term and long-term WAL, converts accepted batches into Arrow, and then ships to storage over Arrow Flight. Backpressure protects the cluster when the downstream slows.

2. Entry points and Ingestion Gateway

Supported entry methods

  • Kafka with protobuf messages for sustained high throughput
  • gRPC accepting OTLP or custom proto exporters
  • HTTP receiving JSON payloads for light testing and pipeline checks

What the gateway does

The gateway authenticates and enforces quotas. It validates the schema and rejects malformed events with a 4xx. It normalizes field names and types to keep a consistent schema. It buffers events per tenant or source until batch_size or batch_timeout_ms is reached, then sends a bulk proto to the ingest node over gRPC. If overloaded, it applies backpressure and either returns 429 or pauses Kafka consumption.

Key config for the gateway

ingestion.sources., ingestion.kafka., ingestion.grpc., ingestion.http., ingestion.batching.batch_size, ingestion.batching.batch_timeout_ms, plus proxy concurrency and payload limits.

3. Ingest node internals

Buffering, batching, and memory accounting

The ingest node assembles MessageBatch objects that carry ids, offsets, and payloads. It deduplicates inside a windowed cache. It enforces buffer ceilings using processing.buffer.max_size_messages and processing.buffer.max_size_bytes. When limits trip, it raises BufferFull, which triggers backpressure at the gateway. Flushes are driven by processing.buffer.flush_interval_seconds and processing.flush_size_messages. Watch buffer depth, memory usage, and buffer_full counters for early signs of congestion.

Kafka-specific concurrency

Use high-performance settings to increase parallelism: consumer count, processing threads, gRPC client count, and channel buffer size. Increase fetch_min_bytes, fetch_max_bytes, and max_partition_fetch_bytes for higher throughput, but plan RAM accordingly. Enable zero copy, pre-allocation, and a buffer pool to trade memory for speed.

Deduplication window

Set processing.dedup_enabled and processing.dedup_window_seconds. Memory cost scales with events per second times the window. Choose the window carefully for high ingest rates.

Backpressure

When buffers or downstream reach limits, the system rejects new messages or pauses reads. This prevents unbounded memory growth and keeps the cluster stable.

4. Dual WAL durability and acknowledgment

Short-term WAL

Every batch is written to a short-term WAL segment before acknowledging the client or committing the Kafka offset. This gives immediate durability. Monitor WAL write latency, fsync duration, backlog, segment counts, and disk usage.

Long-term WAL

The same batch is published to a long-term WAL stream for daily aggregation that supports replay and indexing workflows. Long-term retention is configured separately.

Trade-off: sync_on_write

Set sync_on_write = true if data loss is unacceptable and accept higher IOPS and latency. Set it to false for better throughput, but accept a small loss window on crash. Segment sizes between 64 and 512 MB balance recovery speed and file count.

Where do short and long WAL live

Short-term WAL defaults to ./data/wal/short on local disk. Long-term WAL defaults to ./data/wal/long.

5. Arrow conversion and Arrow Flight handoff

After short-term WAL success, the ingest node converts the batch to an Arrow RecordBatch using zero-copy where possible. It sends the columnar payload to storage over Arrow Flight. This keeps the transfer high throughput and avoids re-serialization.

6. Communication layers and control coordination

OpenWit uses three communication paths across nodes:

  • Gossip via Chitchat for low-rate cluster metadata and health
  • gRPC for control commands, coordination, and smaller RPCs
  • Arrow Flight for high-throughput columnar data transfer

The control plane observes node health through gossip, can reroute producers when a node is unresponsive, and decides where to send batches. Metadata for files, time ranges, and versions is stored in Postgres, which is the source of truth.

7. Data representation at each stage

StageFormat or Unit
Entry → GatewayJSON or Protobuf
Gateway → IngestgRPC bulk proto
Ingest → StorageArrow RecordBatch
Storage → ObjectParquet
Indexes → Object StoreIndex file formats: bitmap, tantivy, bloom or loom, zonemap
Search → ProxyArrow RecordBatch results

8. End-to-end flow and state

High-level flow

Data sources → Ingestion Gateway → Ingest Node (dedup, short WAL, long WAL, Arrow) → Storage Node (active Parquet, stable Parquet, upload via OpenDAL) → Indexer (build indexes, upload) → Postgres metadata → Search.

Sequence view

Client emits events → Gateway authenticates, validates, normalizes, batches → Bulk gRPC to ingest → Buffering and dedup → Short-term WAL then long-term WAL → Ack client or commit Kafka offset → Convert to Arrow → Arrow Flight to storage.

9. Failure modes and recovery patterns

  • WAL write fails → no ack to producer, backpressure at gateway. Recover by fixing the disk or changing the WAL directory. If sync_on_write=false, a small unpersisted window can be lost.
  • Arrow Flight send fails → batch stays in WAL. Router retries with backoff. The control plane can mark storage unhealthy and route elsewhere. Dedup prevents double processing on replay.
  • Postgres metadata write fails → uploaded files are not recorded, so search cannot find data. Janitor reconciliation scans object storage and re-registers missing metadata.
  • Node crash or restart → WAL replay restores batches. Dedup and IDs stop duplicates at the next stage.

10. What to monitor in production

Ingestion and WAL

WAL write latency p95, WAL disk usage, backlog, segment count. Buffer depth, used memory, buffer_full events, and backpressure triggers.

Batching and network

Average batch size (bytes and events), Arrow Flight send latency and error rate, and gRPC message size rejections.

Storage and indexing

Upload latency and failures, throttling like S3 429. Indexing lag and CPU, index upload errors.

Query side and control plane

Query p99, bytes scanned, index pruning ratio, cache hit ratio. Gossip health, and flapping decisions. Security counters like auth failures and tenant rate limits.

11. Configuration mapping where to tune

  • Ingress and gateway: ingestion.batching.batch_size, ingestion.batching.batch_timeout_ms, proxy concurrency and payload caps.
  • Buffers: processing.buffer.max_size_messages, processing.buffer.max_size_bytes, processing.buffer.flush_interval_seconds.
  • WAL: processing.wal.sync_on_write, processing.wal.segment_size_bytes, processing.wal.long_term_retention_hours, plus cleanup.
  • Network and transfer: gRPC max_message_size, Arrow Flight client and inter_node limits.
  • Storage thresholds: rotation and parquet split size, local retention.
  • Indexing async pool: indexing.worker_threads, indexing.batch_size.
  • Autoscaling and backpressure: control_plane.autoscaling., control_plane.backpressure_advanced..
  • Memory and CPU: memory.max_memory_gb, performance.threads.*.

Ingestion component specifics

Kafka tuning covers brokers, group ID, topic patterns, consumer type, parallelism, fetch sizes, zero-copy, and buffer pools. gRPC has request and stream concurrency, message size limits, keepalive, compression, and TLS. HTTP is for canaries, not sustained load.

12. Performance presets and safe limits

Batch sizing Start with batch_size = 1000 and batch_timeout_ms = 200–500 for near real-time. For bulk throughput, use 10k–50k batches with a 2–5s timeout only on hosts with plenty of memory.

Keep batch_size × avg_msg_size lower than grpc_max_message_size, and below your available max_buffer_memory.

WAL and memory

Use sync_on_write=true when durability is critical. Use 64–512 MB segment sizes. Keep max_buffer_memory within container limits with headroom for process overhead. Size worker threads near CPU cores.

Safe production defaults

A practical starting point: batch_size=1000, batch_timeout_ms=500, processing.buffer.max_size_bytes=10GiB, processing.wal.sync_on_write=true for strict durability, storage.file_size_mb=256, parquet_split_size_mb=256, indexing.worker_threads=4, and set memory.max_memory_gb to node RAM minus 8 GB headroom.

13. Security and operational notes

Enable TLS or mTLS for gRPC and Arrow Flight in production. Keep object store and Postgres secrets in a secrets manager. Set per-tenant rate limits and quotas to stop a noisy tenant from causing cluster-wide backpressure. Use autoscaling thresholds from the control plane.

Go-live checklist

Control plane quorum, Postgres HA and backups, object store IAM and lifecycle policy, WAL policy chosen and budgeted, monitoring and alerts in place, autoscaling thresholds tested, secrets stored in Vault or Kubernetes secrets, retention and legal checks verified.