name: "Kafka Gateway Tests" on: push: branches: [ master ] pull_request: branches: [ master ] concurrency: group: ${{ github.head_ref }}/kafka-tests cancel-in-progress: true # Force different runners for better isolation env: FORCE_RUNNER_SEPARATION: true permissions: contents: read jobs: kafka-unit-tests: name: Kafka Unit Tests runs-on: ubuntu-latest timeout-minutes: 5 strategy: fail-fast: false matrix: container-id: [unit-tests-1] container: image: golang:1.24-alpine options: --cpus 1.0 --memory 1g --hostname kafka-unit-${{ matrix.container-id }} env: GOMAXPROCS: 1 CGO_ENABLED: 0 CONTAINER_ID: ${{ matrix.container-id }} steps: - name: Set up Go 1.x uses: actions/setup-go@v5 with: go-version: ^1.24 id: go - name: Check out code uses: actions/checkout@v4 - name: Setup Container Environment run: | apk add --no-cache git ulimit -n 1024 || echo "Warning: Could not set file descriptor limit" - name: Get dependencies run: | cd test/kafka go mod download - name: Run Kafka Gateway Unit Tests run: | cd test/kafka # Set process limits for container isolation ulimit -n 512 || echo "Warning: Could not set file descriptor limit" ulimit -u 100 || echo "Warning: Could not set process limit" go test -v -timeout 10s ./unit/... kafka-integration-tests: name: Kafka Integration Tests (Critical) runs-on: ubuntu-latest timeout-minutes: 5 strategy: fail-fast: false matrix: container-id: [integration-1] container: image: golang:1.24-alpine options: --cpus 2.0 --memory 2g --ulimit nofile=1024:1024 --hostname kafka-integration-${{ matrix.container-id }} env: GOMAXPROCS: 2 CGO_ENABLED: 0 KAFKA_TEST_ISOLATION: "true" CONTAINER_ID: ${{ matrix.container-id }} steps: - name: Set up Go 1.x uses: actions/setup-go@v5 with: go-version: ^1.24 id: go - name: Check out code uses: actions/checkout@v4 - name: Setup Integration Container Environment run: | apk add --no-cache git procps ulimit -n 2048 || echo "Warning: Could not set file descriptor limit" - name: Get dependencies run: | cd test/kafka go mod download - name: Run Integration Tests run: | cd test/kafka # Higher limits for integration tests ulimit -n 1024 || echo "Warning: Could not set file descriptor limit" ulimit -u 200 || echo "Warning: Could not set process limit" go test -v -timeout 15s ./integration/... env: GOMAXPROCS: 2 kafka-e2e-tests: name: Kafka End-to-End Tests (with SMQ) runs-on: ubuntu-latest timeout-minutes: 20 strategy: fail-fast: false matrix: container-id: [e2e-1] container: image: golang:1.24-alpine options: --cpus 2.0 --memory 2g --hostname kafka-e2e-${{ matrix.container-id }} env: GOMAXPROCS: 2 CGO_ENABLED: 0 KAFKA_E2E_ISOLATION: "true" CONTAINER_ID: ${{ matrix.container-id }} steps: - name: Check out code uses: actions/checkout@v4 - name: Set up Go 1.x uses: actions/setup-go@v5 with: go-version: ^1.24 cache: true cache-dependency-path: | **/go.sum id: go - name: Setup E2E Container Environment run: | apk add --no-cache git procps curl netcat-openbsd ulimit -n 2048 || echo "Warning: Could not set file descriptor limit" - name: Warm Go module cache run: | # Warm cache for root module go mod download || true # Warm cache for kafka test module cd test/kafka go mod download || true - name: Get dependencies run: | cd test/kafka # Use go mod download with timeout to prevent hanging timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules" - name: Build and start SeaweedFS MQ run: | set -e cd $GITHUB_WORKSPACE # Build weed binary go build -o /usr/local/bin/weed ./weed # Start SeaweedFS components with MQ brokers export WEED_DATA_DIR=/tmp/seaweedfs-e2e-$RANDOM mkdir -p "$WEED_DATA_DIR" # Start SeaweedFS server (master, volume, filer) with consistent IP advertising nohup weed -v 1 server \ -ip="127.0.0.1" \ -ip.bind="0.0.0.0" \ -dir="$WEED_DATA_DIR" \ -master.raftHashicorp \ -master.port=9333 \ -volume.port=8081 \ -filer.port=8888 \ -filer=true \ -metricsPort=9325 \ > /tmp/weed-server.log 2>&1 & # Wait for master to be ready for i in $(seq 1 30); do if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then echo "SeaweedFS master HTTP is up"; break fi echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1 done # Wait for master gRPC to be ready (this is what broker discovery uses) echo "Waiting for master gRPC port..." for i in $(seq 1 30); do if nc -z 127.0.0.1 19333; then echo "✓ SeaweedFS master gRPC is up (port 19333)" break fi echo " Waiting for master gRPC... ($i/30)"; sleep 1 done # Give server time to initialize all components including gRPC services echo "Waiting for SeaweedFS components to initialize..." sleep 15 # Additional wait specifically for gRPC services to be ready for streaming echo "Allowing extra time for master gRPC streaming services to initialize..." sleep 10 # Start MQ broker with maximum verbosity for debugging echo "Starting MQ broker..." nohup weed -v 3 mq.broker \ -master="127.0.0.1:9333" \ -ip="127.0.0.1" \ -port=17777 \ > /tmp/weed-mq-broker.log 2>&1 & # Wait for broker to be ready with better error reporting sleep 15 broker_ready=false for i in $(seq 1 20); do if nc -z 127.0.0.1 17777; then echo "SeaweedFS MQ broker is up" broker_ready=true break fi echo "Waiting for MQ broker... ($i/20)"; sleep 1 done # Give broker additional time to register with master if [ "$broker_ready" = true ]; then echo "Allowing broker to register with master..." sleep 30 # Check if broker is properly registered by querying cluster nodes echo "Cluster status after broker registration:" curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status" echo "Checking cluster topology (includes registered components):" curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status" echo "Verifying broker discovery via master client debug:" echo "If broker registration is successful, it should appear in dir status" echo "Testing gRPC connectivity with weed binary:" echo "This simulates what the gateway does during broker discovery..." timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..." echo "Shell test results:" cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs" fi # Check if broker failed to start and show logs if [ "$broker_ready" = false ]; then echo "ERROR: MQ broker failed to start. Broker logs:" cat /tmp/weed-mq-broker.log || echo "No broker logs found" echo "Server logs:" tail -20 /tmp/weed-server.log || echo "No server logs found" exit 1 fi - name: Run End-to-End Tests run: | cd test/kafka # Higher limits for E2E tests ulimit -n 1024 || echo "Warning: Could not set file descriptor limit" ulimit -u 200 || echo "Warning: Could not set process limit" # Allow additional time for all background processes to settle echo "Allowing additional settlement time for SeaweedFS ecosystem..." sleep 15 # Run tests and capture result if ! go test -v -timeout 180s ./e2e/...; then echo "=========================================" echo "Tests failed! Showing debug information:" echo "=========================================" echo "Server logs (last 50 lines):" tail -50 /tmp/weed-server.log || echo "No server logs" echo "=========================================" echo "Broker logs (last 50 lines):" tail -50 /tmp/weed-mq-broker.log || echo "No broker logs" echo "=========================================" exit 1 fi env: GOMAXPROCS: 2 SEAWEEDFS_MASTERS: 127.0.0.1:9333 kafka-consumer-group-tests: name: Kafka Consumer Group Tests (Highly Isolated) runs-on: ubuntu-latest timeout-minutes: 20 strategy: fail-fast: false matrix: container-id: [consumer-group-1] container: image: golang:1.24-alpine options: --cpus 1.0 --memory 2g --ulimit nofile=512:512 --hostname kafka-consumer-${{ matrix.container-id }} env: GOMAXPROCS: 1 CGO_ENABLED: 0 KAFKA_CONSUMER_ISOLATION: "true" CONTAINER_ID: ${{ matrix.container-id }} steps: - name: Check out code uses: actions/checkout@v4 - name: Set up Go 1.x uses: actions/setup-go@v5 with: go-version: ^1.24 cache: true cache-dependency-path: | **/go.sum id: go - name: Setup Consumer Group Container Environment run: | apk add --no-cache git procps curl netcat-openbsd ulimit -n 256 || echo "Warning: Could not set file descriptor limit" - name: Warm Go module cache run: | # Warm cache for root module go mod download || true # Warm cache for kafka test module cd test/kafka go mod download || true - name: Get dependencies run: | cd test/kafka # Use go mod download with timeout to prevent hanging timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules" - name: Build and start SeaweedFS MQ run: | set -e cd $GITHUB_WORKSPACE # Build weed binary go build -o /usr/local/bin/weed ./weed # Start SeaweedFS components with MQ brokers export WEED_DATA_DIR=/tmp/seaweedfs-mq-$RANDOM mkdir -p "$WEED_DATA_DIR" # Start SeaweedFS server (master, volume, filer) with consistent IP advertising nohup weed -v 1 server \ -ip="127.0.0.1" \ -ip.bind="0.0.0.0" \ -dir="$WEED_DATA_DIR" \ -master.raftHashicorp \ -master.port=9333 \ -volume.port=8081 \ -filer.port=8888 \ -filer=true \ -metricsPort=9325 \ > /tmp/weed-server.log 2>&1 & # Wait for master to be ready for i in $(seq 1 30); do if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then echo "SeaweedFS master HTTP is up"; break fi echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1 done # Wait for master gRPC to be ready (this is what broker discovery uses) echo "Waiting for master gRPC port..." for i in $(seq 1 30); do if nc -z 127.0.0.1 19333; then echo "✓ SeaweedFS master gRPC is up (port 19333)" break fi echo " Waiting for master gRPC... ($i/30)"; sleep 1 done # Give server time to initialize all components including gRPC services echo "Waiting for SeaweedFS components to initialize..." sleep 15 # Additional wait specifically for gRPC services to be ready for streaming echo "Allowing extra time for master gRPC streaming services to initialize..." sleep 10 # Start MQ broker with maximum verbosity for debugging echo "Starting MQ broker..." nohup weed -v 3 mq.broker \ -master="127.0.0.1:9333" \ -ip="127.0.0.1" \ -port=17777 \ > /tmp/weed-mq-broker.log 2>&1 & # Wait for broker to be ready with better error reporting sleep 15 broker_ready=false for i in $(seq 1 20); do if nc -z 127.0.0.1 17777; then echo "SeaweedFS MQ broker is up" broker_ready=true break fi echo "Waiting for MQ broker... ($i/20)"; sleep 1 done # Give broker additional time to register with master if [ "$broker_ready" = true ]; then echo "Allowing broker to register with master..." sleep 30 # Check if broker is properly registered by querying cluster nodes echo "Cluster status after broker registration:" curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status" echo "Checking cluster topology (includes registered components):" curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status" echo "Verifying broker discovery via master client debug:" echo "If broker registration is successful, it should appear in dir status" echo "Testing gRPC connectivity with weed binary:" echo "This simulates what the gateway does during broker discovery..." timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..." echo "Shell test results:" cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs" fi # Check if broker failed to start and show logs if [ "$broker_ready" = false ]; then echo "ERROR: MQ broker failed to start. Broker logs:" cat /tmp/weed-mq-broker.log || echo "No broker logs found" echo "Server logs:" tail -20 /tmp/weed-server.log || echo "No server logs found" exit 1 fi - name: Run Consumer Group Tests run: | cd test/kafka # Test consumer group functionality with explicit timeout ulimit -n 512 || echo "Warning: Could not set file descriptor limit" ulimit -u 100 || echo "Warning: Could not set process limit" timeout 240s go test -v -run "^TestConsumerGroups" -timeout 180s ./integration/... || echo "Test execution timed out or failed" env: GOMAXPROCS: 1 SEAWEEDFS_MASTERS: 127.0.0.1:9333 kafka-client-compatibility: name: Kafka Client Compatibility (with SMQ) runs-on: ubuntu-latest timeout-minutes: 10 strategy: fail-fast: false matrix: container-id: [client-compat-1] container: image: golang:1.24-alpine options: --cpus 1.0 --memory 1.5g --shm-size 256m --hostname kafka-client-${{ matrix.container-id }} env: GOMAXPROCS: 1 CGO_ENABLED: 0 KAFKA_CLIENT_ISOLATION: "true" CONTAINER_ID: ${{ matrix.container-id }} steps: - name: Check out code uses: actions/checkout@v4 - name: Set up Go 1.x uses: actions/setup-go@v5 with: go-version: ^1.24 cache: true cache-dependency-path: | **/go.sum id: go - name: Setup Client Container Environment run: | apk add --no-cache git procps curl netcat-openbsd ulimit -n 1024 || echo "Warning: Could not set file descriptor limit" - name: Warm Go module cache run: | # Warm cache for root module go mod download || true # Warm cache for kafka test module cd test/kafka go mod download || true - name: Get dependencies run: | cd test/kafka timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules" - name: Build and start SeaweedFS MQ run: | set -e cd $GITHUB_WORKSPACE # Build weed binary go build -o /usr/local/bin/weed ./weed # Start SeaweedFS components with MQ brokers export WEED_DATA_DIR=/tmp/seaweedfs-client-$RANDOM mkdir -p "$WEED_DATA_DIR" # Start SeaweedFS server (master, volume, filer) with consistent IP advertising nohup weed -v 1 server \ -ip="127.0.0.1" \ -ip.bind="0.0.0.0" \ -dir="$WEED_DATA_DIR" \ -master.raftHashicorp \ -master.port=9333 \ -volume.port=8081 \ -filer.port=8888 \ -filer=true \ -metricsPort=9325 \ > /tmp/weed-server.log 2>&1 & # Wait for master to be ready for i in $(seq 1 30); do if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then echo "SeaweedFS master HTTP is up"; break fi echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1 done # Wait for master gRPC to be ready (this is what broker discovery uses) echo "Waiting for master gRPC port..." for i in $(seq 1 30); do if nc -z 127.0.0.1 19333; then echo "✓ SeaweedFS master gRPC is up (port 19333)" break fi echo " Waiting for master gRPC... ($i/30)"; sleep 1 done # Give server time to initialize all components including gRPC services echo "Waiting for SeaweedFS components to initialize..." sleep 15 # Additional wait specifically for gRPC services to be ready for streaming echo "Allowing extra time for master gRPC streaming services to initialize..." sleep 10 # Start MQ broker with maximum verbosity for debugging echo "Starting MQ broker..." nohup weed -v 3 mq.broker \ -master="127.0.0.1:9333" \ -ip="127.0.0.1" \ -port=17777 \ > /tmp/weed-mq-broker.log 2>&1 & # Wait for broker to be ready with better error reporting sleep 15 broker_ready=false for i in $(seq 1 20); do if nc -z 127.0.0.1 17777; then echo "SeaweedFS MQ broker is up" broker_ready=true break fi echo "Waiting for MQ broker... ($i/20)"; sleep 1 done # Give broker additional time to register with master if [ "$broker_ready" = true ]; then echo "Allowing broker to register with master..." sleep 30 # Check if broker is properly registered by querying cluster nodes echo "Cluster status after broker registration:" curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status" echo "Checking cluster topology (includes registered components):" curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status" echo "Verifying broker discovery via master client debug:" echo "If broker registration is successful, it should appear in dir status" echo "Testing gRPC connectivity with weed binary:" echo "This simulates what the gateway does during broker discovery..." timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..." echo "Shell test results:" cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs" fi # Check if broker failed to start and show logs if [ "$broker_ready" = false ]; then echo "ERROR: MQ broker failed to start. Broker logs:" cat /tmp/weed-mq-broker.log || echo "No broker logs found" echo "Server logs:" tail -20 /tmp/weed-server.log || echo "No server logs found" exit 1 fi - name: Run Client Compatibility Tests run: | cd test/kafka go test -v -run "^TestClientCompatibility" -timeout 180s ./integration/... env: GOMAXPROCS: 1 SEAWEEDFS_MASTERS: 127.0.0.1:9333 kafka-smq-integration-tests: name: Kafka SMQ Integration Tests (Full Stack) runs-on: ubuntu-latest timeout-minutes: 20 strategy: fail-fast: false matrix: container-id: [smq-integration-1] container: image: golang:1.24-alpine options: --cpus 1.0 --memory 2g --hostname kafka-smq-${{ matrix.container-id }} env: GOMAXPROCS: 1 CGO_ENABLED: 0 KAFKA_SMQ_INTEGRATION: "true" CONTAINER_ID: ${{ matrix.container-id }} steps: - name: Check out code uses: actions/checkout@v4 - name: Set up Go 1.x uses: actions/setup-go@v5 with: go-version: ^1.24 cache: true cache-dependency-path: | **/go.sum id: go - name: Setup SMQ Integration Container Environment run: | apk add --no-cache git procps curl netcat-openbsd ulimit -n 1024 || echo "Warning: Could not set file descriptor limit" - name: Warm Go module cache run: | # Warm cache for root module go mod download || true # Warm cache for kafka test module cd test/kafka go mod download || true - name: Get dependencies run: | cd test/kafka timeout 90s go mod download || echo "Warning: Dependency download timed out, continuing with cached modules" - name: Build and start SeaweedFS MQ run: | set -e cd $GITHUB_WORKSPACE # Build weed binary go build -o /usr/local/bin/weed ./weed # Start SeaweedFS components with MQ brokers export WEED_DATA_DIR=/tmp/seaweedfs-smq-$RANDOM mkdir -p "$WEED_DATA_DIR" # Start SeaweedFS server (master, volume, filer) with consistent IP advertising nohup weed -v 1 server \ -ip="127.0.0.1" \ -ip.bind="0.0.0.0" \ -dir="$WEED_DATA_DIR" \ -master.raftHashicorp \ -master.port=9333 \ -volume.port=8081 \ -filer.port=8888 \ -filer=true \ -metricsPort=9325 \ > /tmp/weed-server.log 2>&1 & # Wait for master to be ready for i in $(seq 1 30); do if curl -s http://127.0.0.1:9333/cluster/status >/dev/null; then echo "SeaweedFS master HTTP is up"; break fi echo "Waiting for SeaweedFS master HTTP... ($i/30)"; sleep 1 done # Wait for master gRPC to be ready (this is what broker discovery uses) echo "Waiting for master gRPC port..." for i in $(seq 1 30); do if nc -z 127.0.0.1 19333; then echo "✓ SeaweedFS master gRPC is up (port 19333)" break fi echo " Waiting for master gRPC... ($i/30)"; sleep 1 done # Give server time to initialize all components including gRPC services echo "Waiting for SeaweedFS components to initialize..." sleep 15 # Additional wait specifically for gRPC services to be ready for streaming echo "Allowing extra time for master gRPC streaming services to initialize..." sleep 10 # Start MQ broker with maximum verbosity for debugging echo "Starting MQ broker..." nohup weed -v 3 mq.broker \ -master="127.0.0.1:9333" \ -ip="127.0.0.1" \ -port=17777 \ > /tmp/weed-mq-broker.log 2>&1 & # Wait for broker to be ready with better error reporting sleep 15 broker_ready=false for i in $(seq 1 20); do if nc -z 127.0.0.1 17777; then echo "SeaweedFS MQ broker is up" broker_ready=true break fi echo "Waiting for MQ broker... ($i/20)"; sleep 1 done # Give broker additional time to register with master if [ "$broker_ready" = true ]; then echo "Allowing broker to register with master..." sleep 30 # Check if broker is properly registered by querying cluster nodes echo "Cluster status after broker registration:" curl -s "http://127.0.0.1:9333/cluster/status" || echo "Could not check cluster status" echo "Checking cluster topology (includes registered components):" curl -s "http://127.0.0.1:9333/dir/status" | head -20 || echo "Could not check dir status" echo "Verifying broker discovery via master client debug:" echo "If broker registration is successful, it should appear in dir status" echo "Testing gRPC connectivity with weed binary:" echo "This simulates what the gateway does during broker discovery..." timeout 10s weed shell -master=127.0.0.1:9333 -filer=127.0.0.1:8888 > /tmp/shell-test.log 2>&1 || echo "weed shell test completed or timed out - checking logs..." echo "Shell test results:" cat /tmp/shell-test.log 2>/dev/null | head -10 || echo "No shell test logs" fi # Check if broker failed to start and show logs if [ "$broker_ready" = false ]; then echo "ERROR: MQ broker failed to start. Broker logs:" cat /tmp/weed-mq-broker.log || echo "No broker logs found" echo "Server logs:" tail -20 /tmp/weed-server.log || echo "No server logs found" exit 1 fi - name: Run SMQ Integration Tests run: | cd test/kafka ulimit -n 512 || echo "Warning: Could not set file descriptor limit" ulimit -u 100 || echo "Warning: Could not set process limit" # Run the dedicated SMQ integration tests go test -v -run "^TestSMQIntegration" -timeout 180s ./integration/... env: GOMAXPROCS: 1 SEAWEEDFS_MASTERS: 127.0.0.1:9333 kafka-protocol-tests: name: Kafka Protocol Tests (Isolated) runs-on: ubuntu-latest timeout-minutes: 5 strategy: fail-fast: false matrix: container-id: [protocol-1] container: image: golang:1.24-alpine options: --cpus 1.0 --memory 1g --tmpfs /tmp:exec --hostname kafka-protocol-${{ matrix.container-id }} env: GOMAXPROCS: 1 CGO_ENABLED: 0 KAFKA_PROTOCOL_ISOLATION: "true" CONTAINER_ID: ${{ matrix.container-id }} steps: - name: Set up Go 1.x uses: actions/setup-go@v5 with: go-version: ^1.24 id: go - name: Check out code uses: actions/checkout@v4 - name: Setup Protocol Container Environment run: | apk add --no-cache git procps # Ensure proper permissions for test execution chmod -R 755 /tmp || true export TMPDIR=/tmp export GOCACHE=/tmp/go-cache mkdir -p $GOCACHE chmod 755 $GOCACHE - name: Get dependencies run: | cd test/kafka go mod download - name: Run Protocol Tests run: | cd test/kafka export TMPDIR=/tmp export GOCACHE=/tmp/go-cache # Run protocol tests from the weed/mq/kafka directory since they test the protocol implementation cd ../../weed/mq/kafka go test -v -run "^Test.*" -timeout 10s ./... env: GOMAXPROCS: 1 TMPDIR: /tmp GOCACHE: /tmp/go-cache