1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/KAFKA_DEV_PLAN.md
2025-09-14 13:36:20 -07:00

7.5 KiB
Raw Permalink Blame History

Kafka Client Compatibility for SeaweedFS Message Queue — Development Plan

Goals

  • Kafka client support: Allow standard Kafka clients (Java, sarama, kafka-go) to Produce/Fetch to SeaweedMQ.
  • Semantics: At-least-once delivery, in-order per partition, consumer groups with committed offsets.
  • Performance: Horizontal scalability via stateless gateways; efficient batching and IO.
  • Security (initial): TLS listener; SASL/PLAIN later.

Non-goals (initial)

  • Idempotent producers, transactions (EOS), log compaction semantics.
  • Kafkas broker replication factor (durability comes from SeaweedFS).

Architecture Overview

Kafka Gateway

  • New stateless process that speaks the Kafka wire protocol and translates to SeaweedMQ.
  • Listens on Kafka TCP port (e.g., 9092); communicates with SeaweedMQ brokers over gRPC.
  • Persists lightweight control state (topic metadata, offset ledgers, group commits) in the filer.
  • Multiple gateways can be deployed; any gateway can serve any client.

Topic and Partition Mapping

  • A Kafka topics partition count N is fixed at create-time for client compatibility.
  • Map Kafka partitions to SMQs ring-based partitions by dividing the ring (size 4096) into N stable ranges.
  • Message routing: hash(key) -> kafka partition -> ring slot -> SMQ partition covering that slot.
  • SMQs internal segment split/merge remains transparent; ordering is preserved per Kafka partition.

Offset Model (updated)

  • Use SMQ native per-partition sequential offsets. Kafka offsets map 1:1 to SMQ offsets.
  • Earliest/latest and timestamp-based lookups come from SMQ APIs; minimize translation.
  • Consumer group commits store SMQ offsets directly.

Consumer Groups and Assignment

  • Gateway implements Kafka group coordinator: Join/Sync/Heartbeat/Leave.
  • Assignment strategy starts with Range assignor; Sticky assignor later.
  • Gateway uses SeaweedMQ subscriber APIs per assigned Kafka partition; stores group and commit state in filer.

Protocol Coverage (initial)

  • ApiVersions, Metadata, CreateTopics/DeleteTopics.
  • Produce (v2+) with record-batch v2 parsing, compression, and CRC validation; Fetch (v2+) with wait/maxBytes semantics.
  • ListOffsets (earliest/latest; timestamp in a later phase).
  • FindCoordinator/JoinGroup/SyncGroup/Heartbeat/LeaveGroup.
  • OffsetCommit/OffsetFetch.

Security

  • TLS for the Kafka listener (configurable cert/key/CA).
  • SASL/PLAIN in a later phase, backed by SeaweedFS auth.

Observability

  • Prometheus metrics: per-topic/partition produce/fetch rates, latencies, rebalance counts, offset lag.
  • Structured logs; optional tracing around broker RPC and ledger IO.

Compatibility Limits (initial)

  • No idempotent producers, transactions, or compaction policies.
  • Compression codecs (GZIP/Snappy/LZ4/ZSTD) are available via the record-batch parser.

Milestones

  • M1: Gateway skeleton; ApiVersions/Metadata/Create/Delete; single-partition Produce/Fetch; plaintext; SMQ native offsets.
  • M2: Multi-partition mapping, ListOffsets (earliest/latest), OffsetCommit/Fetch, group coordinator (Range), TLS.
  • M3: Record-batch compression codecs + CRC; timestamp ListOffsets; Sticky assignor; SASL/PLAIN; metrics.
  • M4: SCRAM, admin HTTP, ledger compaction tooling, performance tuning.
  • M5 (optional): Idempotent producers groundwork, EOS design exploration.

Phase 1 (M1) — Detailed Plan

Scope

  • Kafka Gateway process scaffolding and configuration.
  • Protocol: ApiVersions, Metadata, CreateTopics, DeleteTopics.
  • Produce (single topic-partition path) and Fetch using v2 record-batch parser with compression and CRC.
  • Basic filer-backed topic registry; offsets via SMQ native offsets (no separate ledger files).
  • Plaintext only; no consumer groups yet (direct Fetch by offset).

Deliverables

  • New command: weed mq.kafka.gateway (or weed mq.kafka) to start the Kafka Gateway.
  • Protocol handlers for ApiVersions/Metadata/CreateTopics/DeleteTopics/Produce/Fetch/ListOffsets (earliest/latest only).
  • Filer layout for Kafka compatibility metadata and ledgers under:
    • mq/kafka/<namespace>/<topic>/meta.json
    • mq/kafka/<namespace>/<topic>/partitions/<pid>/ledger.log
    • mq/kafka/<namespace>/<topic>/partitions/<pid>/ledger.index (sparse; phase 2 fills)
  • E2E tests using sarama and kafka-go for basic produce/fetch.

Work Breakdown

  1. Component Scaffolding
  • Add command: weed/command/mq_kafka_gateway.go with flags:
    • -listen=0.0.0.0:9092, -filer=, -master=, -namespace=default.
    • (M1) TLS off; placeholder flags added but disabled.
  • Service skeleton in weed/mq/kafka/gateway/* with lifecycle, readiness, and basic logging.
  1. Protocol Layer
  • Use segmentio/kafka-go/protocol for parsing/encoding.
  • Implement request router and handlers for:
    • ApiVersions: advertise minimal supported versions.
    • Metadata: topics/partitions and leader endpoints (this gateway instance).
    • CreateTopics/DeleteTopics: validate, persist topic metadata in filer, create SMQ topic.
    • ListOffsets: earliest/latest using SMQ bounds.
    • Produce: parse v2 record batches (compressed/uncompressed), extract records, publish to SMQ; return baseOffset.
    • Fetch: read from SMQ starting at requested offset; construct proper v2 record batches honoring maxBytes/maxWait.
  1. Topic Registry and Mapping
  • Define meta.json schema:
    • { name, namespace, partitions, createdAtNs, configVersion }.
  • Map Kafka partition id to SMQ ring range: divide ring (4096) into partitions contiguous ranges.
  • Enforce fixed partition count post-create.
  1. Offset Handling (M1)
  • Use SMQ native offsets; remove separate ledger and translation in the gateway.
  • Earliest/Latest come from SMQ; timestamp lookups added in later phase.
  1. Produce Path
  • For each topic-partition in request:
    • Validate topic existence and partition id.
    • Parse record-batch v2; extract records (varints/headers), handle compression and CRC.
    • Publish to SMQ via broker (batch if available); SMQ assigns offsets; return baseOffset per partition.
  1. Fetch Path (no groups)
  • For each topic-partition in request:
    • If offset is -1 (latest) or -2 (earliest), use SMQ bounds.
    • Read from SMQ starting at the requested offset; construct proper v2 record batches.
    • Page results up to maxBytes or minBytes/maxWait semantics.
  1. Metadata and SMQ Integration
  • Create/delete topic maps to SMQ topic lifecycle using existing MQ APIs.
  • No auto-scaling of partitions in M1 (Kafka partition count fixed).
  1. Testing
  • Unit tests for record-batch parser (compression, CRC), earliest/latest via SMQ.
  • E2E:
    • sarama producer -> gateway -> SMQ; fetch and validate ordering/offsets.
    • kafka-go fetch from earliest/latest.
    • Metadata and create/delete topic via Kafka Admin client (happy path).

Acceptance Criteria

  • Can create a topic with N partitions via Kafka Admin client and see it in meta.json.
  • Produce uncompressed records to a specific partition; responses carry correct baseOffset.
  • Fetch by offset from earliest and latest returns correct records in order.
  • Restart gateway: offsets and earliest/latest preserved; produce/fetch continue correctly.
  • Basic concurrency: multiple producers to different partitions; correctness maintained.

Open Questions / Follow-ups

  • Exact ApiVersions and version ranges to advertise for maximal client compatibility.
  • Whether to expose namespace as Kafka cluster or encode in topic names (ns.topic).
  • Offset state compaction not applicable in gateway; defer SMQ-side retention considerations to later phases.