Architecture Overview

Kafka is one of the three ingestion entry points into OpenWit. Producers publish events to the topics you configure. The Gateway authenticates, validates, normalizes, and batches these events. Batches move to the Ingest node, where buffering, deduplication, and durability (dual WAL) are enforced. Only after the short-term WAL write succeeds does OpenWit acknowledge and commit Kafka offsets. The batch is then converted to Arrow and sent over Arrow Flight toward Storage.

1. End-to-end flow

A. Gateway (entry)

The Gateway authenticates the caller, enforces quotas, validates schema, and normalizes field names and types. If validation fails, it returns a 4xx and can optionally write to a dead-letter. When overloaded, it applies backpressure (HTTP 429 or pausing Kafka). It buffers per tenant or source, and flushes a batch when either batch_size or batch_timeout_ms is reached. Key knobs live under ingestion.sources.* and ingestion.batching.*.

Why this order matters: Validation and normalization guard the system’s schema consistency across sources. Mismatched types are coerced when allowed, otherwise rejected, so downstream components always see consistent shapes.

B. Ingest node (buffering & dedup)

The Ingest node assembles MessageBatch objects with IDs, messages, sizes, and offsets. It checks a windowed dedup cache, accounts memory against processing.buffer.max_size_messages and processing.buffer.max_size_bytes, and signals backpressure if limits are hit. Flush behavior is driven by processing.buffer.flush_interval_seconds and processing.flush_size_messages. The dedup window is set via processing.dedup_window_seconds. Watch buffer depth, memory usage, and buffer_full signals for early congestion.

C. WAL (durability & ack)

Batches are written to short-term WAL and published to long-term WAL. Short-term WAL must succeed before acknowledging/committing Kafka offsets, which gives the at-least-once guarantee your docs describe. Configure the directory, maximum size, segment size, and sync_on_write behavior; retention and cleanup are managed under wal_cleanup.*. Monitor WAL write and fsync latency, backlog, segment counts, and disk usage.

Trade-off: sync_on_write=true increases durability at the cost of IOPS/latency. false improves throughput, with a small crash window risk. Choose based on your durability target.

D. Arrow conversion and Arrow Flight handoff

After WAL success, the batch is converted to an Arrow RecordBatch (columnar) and sent to a Storage endpoint selected by the control/router. Arrow Flight is used for the transfer; keep batch_size × avg_message_size comfortably below your gRPC max message size and leave headroom for Arrow metadata. Track Arrow send latency, serialization overhead, and endpoint errors.

E. Storage (context from ingestion’s downstream)

Storage appends Arrow batches to an active Parquet, rotates to a stable Parquet based on size or time, uploads to object storage via OpenDAL, and records file metadata (URI, time range, schema hash, sizes) in Postgres. Indexing is then triggered.

2. Kafka configuration (what you tune and why)

The Kafka block defines brokers, consumer groups, subscribed topic patterns, the consumer profile, high-performance options, fetch sizes, memory optimizations, topic-to-index mapping, consumer liveness, and batching.

AreaKeysPurpose / Notes
Source toggleingestion.sources.kafka.enabledEnable Kafka as an entry point; disable unused sources to reduce surface.
Corebrokers, group_id, topicsBroker list, consumer group, topic patterns (wildcards supported).
Modeconsumer_type"standard" or "high_performance".
High-perfnum_consumers, processing_threads, num_grpc_clients, channel_buffer_sizeParallelism and inter-thread capacity.
Fetch sizingfetch_min_bytes, fetch_max_bytes, max_partition_fetch_bytesLarger fetches raise throughput and memory. Start conservative.
Memory optsenable_zero_copy, preallocate_buffers, buffer_pool_sizeFaster at higher memory footprint.
Topic→indextopic_index_config, auto_generate.*Extract index token from topic name or auto-generate if absent.
Livenessconsumer.session_timeout_ms, heartbeat_interval_ms, max_poll_interval_ms, max_poll_recordsConsumer health and polling depth.
Batchingbatch_size, batch_timeout_msFlush triggers. Your config sets batch_size: 1000 to avoid gRPC size limits.
Telemetry hinttelemetry_type"traces", "metrics", or "logs" to guide downstream handling.

Guardrail: Keep batch_size × avg_msg_size < grpc_max_message_size and ensure used memory stays below max_buffer_memory. This is the central sizing rule the pipeline section calls out.

3. Topic strategy and telemetry type

You can subscribe to topic patterns (wildcards). topic_index_config lets you derive an index token from a specific position in the topic name. If a topic lacks that token, you can auto-generate it using a prefix and strategy. telemetry_type indicates whether payloads represent traces, metrics, or logs.

Example from config: Pattern v6.qtw.traces.. with default_index_position: 3 and a per-pattern override maps the fourth token as the index. Auto-generation is disabled.

4. Operational signals to watch

The pipeline doc lists the signals that reveal health and bottlenecks:

  • Gateway/Buffer: buffer depth, used memory, buffer_full events, backpressure triggers; average batch size in bytes and events.
  • Network/Transfer: Arrow Flight send latency and error rates; gRPC message-size rejections.
  • Storage: upload latency/failures, object-store throttling; local disk free percentage.
  • Indexing: indexing lag and upload errors.
  • Control-plane: gossip health and flapping decisions.
  • Security/Quotas: authentication failures, tenant rate limits, unexpected high-volume tenants.

5. Failure modes & recovery

FailureEffectRecovery
WAL write failsNo ack; gateway backpressures until fixedSwap WAL dir or restore; if sync_on_write=false, a small unpersisted window may be lost
Arrow Flight send failsBatches remain in WAL; ack may already have been givenRouter retries with backoff; control plane may route elsewhere
Storage upload throttled/failsWAL entries remain until upload succeeds or retention expiresRetry; slower draining; local retention until recovery
Postgres metadata write failsUploaded files not visible to searchJanitor reconciliation re-registers metadata by scanning object storage
Node crash / restartWAL replay restores batches; dedup prevents double-processingReplay on startup

6. Sizing guidance

  • Batch sizing: start with batch_size = 1000 and batch_timeout_ms = 200–5000 for near-real-time; larger batches (10k–50k) and 2–5s timeouts for bulk, on hosts with ample memory.
  • WAL: sync_on_write=true if data loss is unacceptable; segment sizes 64–512 MB.
  • Memory: set max_buffer_memory within container limits and monitor used memory; reject early when near ceilings.
  • Kafka fetch: typical high-throughput values appear as 1 MiB min and 50 MiB max.
  • Arrow/Parquet: prefer larger row groups to reduce small-file overheads.

7. Example

kafka:
  brokers: "ns1005571.ip-147-135-65.us:9092"
  group_id: "ingestion-group"
  topics:
    - "v6.qtw.traces.*.*"
  consumer_type: "standard"

  high_performance:
    num_consumers: 8
    processing_threads: 16
    num_grpc_clients: 4
    channel_buffer_size: 100000

  fetch_min_bytes: 1048576
  fetch_max_bytes: 52428800
  max_partition_fetch_bytes: 10485760

  enable_zero_copy: true
  preallocate_buffers: true
  buffer_pool_size: 1000

  topic_index_config:
    default_index_position: 3
    patterns:
      - match: "v6.qtw.*.*.*"
        index_position: 3

  auto_generate:
    enabled: false
    prefix: "auto_"
    strategy: "hash"

  consumer:
    session_timeout_ms: 180000
    heartbeat_interval_ms: 3000
    max_poll_interval_ms: 900000
    max_poll_records: 50000

  batching:
    batch_size: 1000        # reduced to avoid gRPC size limits
    batch_timeout_ms: 5000

  telemetry_type: "traces"

Important guardrail in this excerpt: your batch_size is explicitly reduced “to avoid gRPC size limits (4MB),” which ties back to the earlier sizing rule.

8. Quick mapping (stage → configs → what to watch)

StagePrimary configsWhat to watch
Gatewayingestion.batching., ingestion.sources., proxy.*Buffer depth, avg batch size (events/bytes), 429/pause events
Ingest bufferprocessing.buffer., processing.flush_size_messages, processing.dedup_Queue depth, used memory, buffer_full counts
WALprocessing.wal., wal_cleanup.WAL fsync/write latency, backlog, segments, disk %
Arrow Flightingestion.grpc.max_message_size, pipeline.batch_size, service_ports.arrow_flightArrow send latency, size errors, endpoint failures