Executive Summary

gRPC is a first-class ingestion path in OpenWit. Producers send telemetry over OTLP or a custom protobuf to the Gateway. The Gateway authenticates, validates, normalizes, and batches records, then forwards a bulk proto to the Ingest node over gRPC. The Ingest node performs buffering and deduplication, writes to short-term and long-term WAL, converts to Arrow, and streams to Storage via Arrow Flight. Indexing and metadata registration proceed downstream.

1. Where gRPC fits

  • Entry: gRPC accepts OpenTelemetry signals or your custom proto. The Gateway batches by size or timeout and sends bulk proto to the Ingest node. This path is designed for high-throughput streaming into the same ingest pipeline used by Kafka and HTTP.
  • Representation: Gateway → Ingest uses gRPC (bulk proto), Ingest → Storage uses Arrow RecordBatch, Storage → Object Store is Parquet.
  • Control vs data: In OpenWit, gRPC is used both for control/coordination and for ingestion RPCs, while Arrow Flight carries the heavy columnar data to Storage.

2. End-to-end flow

A. Gateway → Ingest (gRPC bulk)

The Gateway performs authentication, schema validation, and field normalization so the Ingest node always receives consistent payloads. It then flushes a batch by configured size or timeout and forwards a bulk proto over gRPC to the Ingest node.

B. Ingest node (buffering, dedup, durability)

The Ingest node assembles batches, enforces buffer ceilings, applies a windowed dedup, and writes to short-term and long-term WAL. Short-term WAL must succeed before acknowledging the client side, which preserves at-least-once semantics. After WAL success, the batch is converted to Arrow and prepared for transfer.

C. Ingest → Storage (Arrow Flight)

The Arrow RecordBatch is sent to the Storage node using Arrow Flight. Storage assembles active Parquet, finalizes a stable Parquet, uploads via OpenDAL, and records file metadata such as URI and time ranges. Indexing then builds the configured index types and uploads them as well.

3. Configuration overview (gRPC receiver)

The grpc: block controls binding, concurrency, size limits, keepalive, OTLP signal toggles, compression, and TLS. The table below explains each group in simple terms.

GroupKeysExplanation
Network bindingbind, port, bind_addrWhere the gRPC server listens. Your sample uses 0.0.0.0:50051.
Runtime & concurrencyruntime_size, worker_threads, max_concurrent_requests, max_concurrent_streams, connection_pool_sizeThreading and request/stream concurrency for the gRPC runtime.
Message size limitsmax_message_size, max_receive_message_size_mb, max_send_message_size_mbUpper bounds for message sizes. Docs highlight ~200 MB, with the caution that very large messages spike memory and increase DoS risk.
Keepalive & lifetimeskeepalive_time_ms, keepalive_timeout_ms, max_connection_idle_seconds, max_connection_age_seconds, max_connection_age_grace_secondsConnection health and lifecycle tuning.
OTLP signal togglesotlp_traces_enabled, otlp_metrics_enabled, otlp_logs_enabledWhich OTLP signals the receiver accepts.
Compressioncompression_enabled, accepted_compression_algorithmsNetwork compression enablement and algorithms.
TLStls.enabled, cert_path, key_pathTransport security; docs flag enabled: false as a risk in production and advise using secrets for keys.

Guardrail: Even though the gRPC receiver can accept large messages, keep batch size × average event size well below the configured gRPC max to avoid oversize errors and memory spikes. The docs call out this sizing rule explicitly.

4. Example

grpc:
  # Network binding
  bind: "0.0.0.0"
  port: 50051
  bind_addr: "0.0.0.0:50051"

  # Performance settings
  runtime_size: 8
  max_concurrent_requests: 10000
  request_timeout_ms: 30000
  connection_pool_size: 100

  # Message size limits
  max_message_size: 208715200           # ~200MB
  max_receive_message_size_mb: 200
  max_send_message_size_mb: 100

  # Connection management
  keepalive_time_ms: 60000
  keepalive_timeout_ms: 20000
  keepalive_time_seconds: 60
  keepalive_timeout_seconds: 20
  max_connection_idle_seconds: 300
  max_connection_age_seconds: 0
  max_connection_age_grace_seconds: 10

  # Protocol support
  otlp_traces_enabled: true
  otlp_metrics_enabled: true
  otlp_logs_enabled: true

  # Concurrency
  worker_threads: 4
  max_concurrent_streams: 1000

  # Compression
  compression_enabled: true
  accepted_compression_algorithms: ["gzip", "zstd"]

  # TLS
  tls:
    enabled: false
    cert_path: "${TLS_CERT_PATH}"
    key_path: "${TLS_KEY_PATH}"

5. Batching, buffers, and WAL (shared ingestion behavior)

gRPC ingestion feeds the same ingestion internals used by Kafka:

  • Gateway batching by size or timeout before forwarding the bulk proto.
  • Ingest buffering with ceilings on messages and bytes, and flush by interval or count.
  • Deduplication with a configurable window to avoid re-processing during WAL replay.
  • Dual WAL where short-term WAL must succeed before acknowledging, and long-term WAL is kept for replay and indexing.

6. Size limits, keepalive, and connection guidance

The configuration doc groups the most important guardrails for the gRPC server:

  • Size limits: max_message_size, max_receive_message_size_mb, max_send_message_size_mb. Very large messages increase memory usage and DoS risk; enforce per-client limits.
  • Keepalive: keepalive_time_ms, keepalive_timeout_ms, plus max_connection_idle_seconds and max_connection_age_* to control lifetimes.
  • Concurrency: worker_threads, runtime_size, max_concurrent_requests, max_concurrent_streams to match expected stream counts.

7. Observability for gRPC ingestion

Emit metrics and spans at each step so you catch problems early:

  • Counters/Gauges: event.ingest.received, event.ingest.validated, buffer.size, batch.created, batch.size_bytes, wal.write.latency, ack.latency, arrow.convert.latency, arrow.send.latency.
  • Spans: ingest.receivebatch.buildwal.writearrow.convertarrow.sendstorage.appenduploadindex.build.
  • Node endpoints: each node exposes /metrics, and tracing context propagates through gRPC metadata.

8. Failure modes and recovery

ScenarioBehaviorRecovery
WAL write failsNo ack; Gateway sees backpressure, buffer growsRetry with backoff, pause ingestion if persistent, ensure WAL is on durable disk
Crash after ack, before Arrow sendBatch remains in short WAL; on restart recover() replays to Storage; dedup prevents double indexingVerify replay and dedup keys or deterministic computation
Arrow Flight send failures/timeoutsBatch stays in WAL and is marked in-flight; router retriesExponential backoff and endpoint health marking
Storage uploaded but metadata write failedFile exists in object store but not visible to SearchJanitor scans and registers missing files based on mapping/conventions