Architecture Overview
Kafka is one of the three ingestion entry points into OpenWit. Producers publish events to the topics you configure. The Gateway authenticates, validates, normalizes, and batches these events. Batches move to the Ingest node, where buffering, deduplication, and durability (dual WAL) are enforced. Only after the short-term WAL write succeeds does OpenWit acknowledge and commit Kafka offsets. The batch is then converted to Arrow and sent over Arrow Flight toward Storage.
1. End-to-end flow
A. Gateway (entry)
The Gateway authenticates the caller, enforces quotas, validates schema, and normalizes field names and types. If validation fails, it returns a 4xx and can optionally write to a dead-letter. When overloaded, it applies backpressure (HTTP 429 or pausing Kafka). It buffers per tenant or source, and flushes a batch when either batch_size or batch_timeout_ms is reached. Key knobs live under ingestion.sources.* and ingestion.batching.*.
Why this order matters: Validation and normalization guard the system’s schema consistency across sources. Mismatched types are coerced when allowed, otherwise rejected, so downstream components always see consistent shapes.
B. Ingest node (buffering & dedup)
The Ingest node assembles MessageBatch objects with IDs, messages, sizes, and offsets. It checks a windowed dedup cache, accounts memory against processing.buffer.max_size_messages and processing.buffer.max_size_bytes, and signals backpressure if limits are hit. Flush behavior is driven by processing.buffer.flush_interval_seconds and processing.flush_size_messages. The dedup window is set via processing.dedup_window_seconds. Watch buffer depth, memory usage, and buffer_full signals for early congestion.
C. WAL (durability & ack)
Batches are written to short-term WAL and published to long-term WAL. Short-term WAL must succeed before acknowledging/committing Kafka offsets, which gives the at-least-once guarantee your docs describe. Configure the directory, maximum size, segment size, and sync_on_write behavior; retention and cleanup are managed under wal_cleanup.*. Monitor WAL write and fsync latency, backlog, segment counts, and disk usage.
Trade-off: sync_on_write=true increases durability at the cost of IOPS/latency. false improves throughput, with a small crash window risk. Choose based on your durability target.
D. Arrow conversion and Arrow Flight handoff
After WAL success, the batch is converted to an Arrow RecordBatch (columnar) and sent to a Storage endpoint selected by the control/router. Arrow Flight is used for the transfer; keep batch_size × avg_message_size comfortably below your gRPC max message size and leave headroom for Arrow metadata. Track Arrow send latency, serialization overhead, and endpoint errors.
E. Storage (context from ingestion’s downstream)
Storage appends Arrow batches to an active Parquet, rotates to a stable Parquet based on size or time, uploads to object storage via OpenDAL, and records file metadata (URI, time range, schema hash, sizes) in Postgres. Indexing is then triggered.
2. Kafka configuration (what you tune and why)
The Kafka block defines brokers, consumer groups, subscribed topic patterns, the consumer profile, high-performance options, fetch sizes, memory optimizations, topic-to-index mapping, consumer liveness, and batching.
| Area | Keys | Purpose / Notes |
|---|---|---|
| Source toggle | ingestion.sources.kafka.enabled | Enable Kafka as an entry point; disable unused sources to reduce surface. |
| Core | brokers, group_id, topics | Broker list, consumer group, topic patterns (wildcards supported). |
| Mode | consumer_type | "standard" or "high_performance". |
| High-perf | num_consumers, processing_threads, num_grpc_clients, channel_buffer_size | Parallelism and inter-thread capacity. |
| Fetch sizing | fetch_min_bytes, fetch_max_bytes, max_partition_fetch_bytes | Larger fetches raise throughput and memory. Start conservative. |
| Memory opts | enable_zero_copy, preallocate_buffers, buffer_pool_size | Faster at higher memory footprint. |
| Topic→index | topic_index_config, auto_generate.* | Extract index token from topic name or auto-generate if absent. |
| Liveness | consumer.session_timeout_ms, heartbeat_interval_ms, max_poll_interval_ms, max_poll_records | Consumer health and polling depth. |
| Batching | batch_size, batch_timeout_ms | Flush triggers. Your config sets batch_size: 1000 to avoid gRPC size limits. |
| Telemetry hint | telemetry_type | "traces", "metrics", or "logs" to guide downstream handling. |
Guardrail: Keep batch_size × avg_msg_size < grpc_max_message_size and ensure used memory stays below max_buffer_memory. This is the central sizing rule the pipeline section calls out.
3. Topic strategy and telemetry type
You can subscribe to topic patterns (wildcards). topic_index_config lets you derive an index token from a specific position in the topic name. If a topic lacks that token, you can auto-generate it using a prefix and strategy. telemetry_type indicates whether payloads represent traces, metrics, or logs.
Example from config: Pattern v6.qtw.traces.. with default_index_position: 3 and a per-pattern override maps the fourth token as the index. Auto-generation is disabled.
4. Operational signals to watch
The pipeline doc lists the signals that reveal health and bottlenecks:
- Gateway/Buffer: buffer depth, used memory, buffer_full events, backpressure triggers; average batch size in bytes and events.
- Network/Transfer: Arrow Flight send latency and error rates; gRPC message-size rejections.
- Storage: upload latency/failures, object-store throttling; local disk free percentage.
- Indexing: indexing lag and upload errors.
- Control-plane: gossip health and flapping decisions.
- Security/Quotas: authentication failures, tenant rate limits, unexpected high-volume tenants.
5. Failure modes & recovery
| Failure | Effect | Recovery |
|---|---|---|
| WAL write fails | No ack; gateway backpressures until fixed | Swap WAL dir or restore; if sync_on_write=false, a small unpersisted window may be lost |
| Arrow Flight send fails | Batches remain in WAL; ack may already have been given | Router retries with backoff; control plane may route elsewhere |
| Storage upload throttled/fails | WAL entries remain until upload succeeds or retention expires | Retry; slower draining; local retention until recovery |
| Postgres metadata write fails | Uploaded files not visible to search | Janitor reconciliation re-registers metadata by scanning object storage |
| Node crash / restart | WAL replay restores batches; dedup prevents double-processing | Replay on startup |
6. Sizing guidance
- Batch sizing: start with batch_size = 1000 and batch_timeout_ms = 200–5000 for near-real-time; larger batches (10k–50k) and 2–5s timeouts for bulk, on hosts with ample memory.
- WAL: sync_on_write=true if data loss is unacceptable; segment sizes 64–512 MB.
- Memory: set max_buffer_memory within container limits and monitor used memory; reject early when near ceilings.
- Kafka fetch: typical high-throughput values appear as 1 MiB min and 50 MiB max.
- Arrow/Parquet: prefer larger row groups to reduce small-file overheads.
7. Example
kafka:
brokers: "ns1005571.ip-147-135-65.us:9092"
group_id: "ingestion-group"
topics:
- "v6.qtw.traces.*.*"
consumer_type: "standard"
high_performance:
num_consumers: 8
processing_threads: 16
num_grpc_clients: 4
channel_buffer_size: 100000
fetch_min_bytes: 1048576
fetch_max_bytes: 52428800
max_partition_fetch_bytes: 10485760
enable_zero_copy: true
preallocate_buffers: true
buffer_pool_size: 1000
topic_index_config:
default_index_position: 3
patterns:
- match: "v6.qtw.*.*.*"
index_position: 3
auto_generate:
enabled: false
prefix: "auto_"
strategy: "hash"
consumer:
session_timeout_ms: 180000
heartbeat_interval_ms: 3000
max_poll_interval_ms: 900000
max_poll_records: 50000
batching:
batch_size: 1000 # reduced to avoid gRPC size limits
batch_timeout_ms: 5000
telemetry_type: "traces"Important guardrail in this excerpt: your batch_size is explicitly reduced “to avoid gRPC size limits (4MB),” which ties back to the earlier sizing rule.
8. Quick mapping (stage → configs → what to watch)
| Stage | Primary configs | What to watch |
|---|---|---|
| Gateway | ingestion.batching., ingestion.sources., proxy.* | Buffer depth, avg batch size (events/bytes), 429/pause events |
| Ingest buffer | processing.buffer., processing.flush_size_messages, processing.dedup_ | Queue depth, used memory, buffer_full counts |
| WAL | processing.wal., wal_cleanup. | WAL fsync/write latency, backlog, segments, disk % |
| Arrow Flight | ingestion.grpc.max_message_size, pipeline.batch_size, service_ports.arrow_flight | Arrow send latency, size errors, endpoint failures |