1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-18 17:20:22 +02:00
seaweedfs/test/kafka
2025-09-18 00:18:10 -07:00
..
cmd/setup fix: correct Kafka Gateway health check to use TCP dial instead of HTTP 2025-09-16 00:22:46 -07:00
e2e fmt 2025-09-17 21:13:23 -07:00
integration more full stack tests 2025-09-17 21:27:50 -07:00
internal/testutil debug 2025-09-18 00:18:10 -07:00
scripts check grpc port 2025-09-18 00:03:43 -07:00
unit Phase 0 Part 1: Implement basic tests and offset fixes 2025-09-15 20:26:24 -07:00
docker-compose.yml improve builds 2025-09-17 09:33:02 -07:00
Dockerfile.kafka-gateway Update Dockerfile.kafka-gateway 2025-09-13 20:10:08 -07:00
Dockerfile.seaweedfs improve builds 2025-09-17 09:33:02 -07:00
Dockerfile.test-setup improve builds 2025-09-17 09:33:02 -07:00
go.mod Update go.mod 2025-09-12 21:43:20 -07:00
go.sum Update go.sum 2025-09-12 22:11:04 -07:00
Makefile improve builds 2025-09-17 09:33:02 -07:00
README.md check grpc port 2025-09-18 00:03:43 -07:00
REFACTORING_SUMMARY.md refactor test/kafka 2025-09-15 09:43:40 -07:00

Kafka Gateway Tests with SMQ Integration

This directory contains tests for the SeaweedFS Kafka Gateway with full SeaweedMQ (SMQ) integration.

Test Types

🧪 Unit Tests (./unit/)

  • Basic gateway functionality
  • Protocol compatibility
  • No SeaweedFS backend required
  • Uses mock handlers

🔗 Integration Tests (./integration/)

  • Mock Mode (default): Uses in-memory handlers for protocol testing
  • SMQ Mode (with SEAWEEDFS_MASTERS): Uses real SeaweedFS backend for full integration

🚀 E2E Tests (./e2e/)

  • End-to-end workflows
  • Automatically detects SMQ availability
  • Falls back to mock mode if SMQ unavailable

Running Tests Locally

Quick Protocol Testing (Mock Mode)

# Run all integration tests with mock backend
cd test/kafka
go test ./integration/...

# Run specific test
go test -v ./integration/ -run TestClientCompatibility

Full Integration Testing (SMQ Mode)

Requires running SeaweedFS instance:

  1. Start SeaweedFS with MQ support:
# Terminal 1: Start SeaweedFS server
weed server -ip="127.0.0.1" -ip.bind="0.0.0.0" -dir=/tmp/seaweedfs-data -master.port=9333 -volume.port=8081 -filer.port=8888 -filer=true

# Terminal 2: Start MQ broker  
weed mq.broker -master="127.0.0.1:9333" -ip="127.0.0.1" -port=17777
  1. Run tests with SMQ backend:
cd test/kafka
SEAWEEDFS_MASTERS=127.0.0.1:9333 go test ./integration/...

# Run specific SMQ integration tests
SEAWEEDFS_MASTERS=127.0.0.1:9333 go test -v ./integration/ -run TestSMQIntegration

Test Broker Startup

If you're having broker startup issues:

# Debug broker startup locally
./scripts/test-broker-startup.sh

CI/CD Integration

GitHub Actions Jobs

  1. Unit Tests - Fast protocol tests with mock backend
  2. Integration Tests - Mock mode by default
  3. E2E Tests (with SMQ) - Full SeaweedFS + MQ broker stack
  4. Client Compatibility (with SMQ) - Tests different Kafka clients against real backend
  5. Consumer Group Tests (with SMQ) - Tests consumer group persistence
  6. SMQ Integration Tests - Dedicated SMQ-specific functionality tests

What Gets Tested with SMQ

When SEAWEEDFS_MASTERS is available, tests exercise:

Real Message Persistence - Messages stored in SeaweedFS volumes
Offset Persistence - Consumer group offsets stored in SeaweedFS filer
Topic Persistence - Topic metadata persisted in SeaweedFS filer
Consumer Group Coordination - Distributed coordinator assignment
Cross-Client Compatibility - Sarama, kafka-go with real backend
Broker Discovery - Gateway discovers MQ brokers via masters

Test Infrastructure

testutil.NewGatewayTestServerWithSMQ(t, mode)

Smart gateway creation that automatically:

  • Detects SMQ availability via SEAWEEDFS_MASTERS
  • Uses production handler when available
  • Falls back to mock when unavailable
  • Provides timeout protection against hanging

Modes:

  • SMQRequired - Skip test if SMQ unavailable
  • SMQAvailable - Use SMQ if available, otherwise mock
  • SMQUnavailable - Always use mock

Timeout Protection

Gateway creation includes timeout protection to prevent CI hanging:

  • 20 second timeout for SMQRequired mode
  • 15 second timeout for SMQAvailable mode
  • Clear error messages when broker discovery fails

Debugging Failed Tests

CI Logs to Check

  1. "SeaweedFS master is up" - Master started successfully
  2. "SeaweedFS filer is up" - Filer ready
  3. "SeaweedFS MQ broker is up" - Broker started successfully
  4. Broker/Server logs - Shown on broker startup failure

Local Debugging

  1. Run ./scripts/test-broker-startup.sh to test broker startup
  2. Check logs at /tmp/weed-*.log
  3. Test individual components:
    # Test master
    curl http://127.0.0.1:9333/cluster/status
    
    # Test filer  
    curl http://127.0.0.1:8888/status
    
    # Test broker
    nc -z 127.0.0.1 17777
    

Common Issues

  • Broker fails to start: Check filer is ready before starting broker
  • Gateway timeout: Broker discovery fails, check broker is accessible
  • Test hangs: Timeout protection not working, reduce timeout values

Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Kafka Client  │───▶│  Kafka Gateway  │───▶│ SeaweedMQ Broker│
│   (Sarama,      │    │   (Protocol     │    │   (Message      │
│    kafka-go)    │    │    Handler)     │    │   Persistence)  │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                │                      │
                                ▼                      ▼
                       ┌─────────────────┐    ┌─────────────────┐
                       │ SeaweedFS Filer │    │ SeaweedFS Master│
                       │ (Offset Storage)│    │ (Coordination)  │
                       └─────────────────┘    └─────────────────┘
                                │                      │
                                ▼                      ▼  
                       ┌─────────────────────────────────────────┐
                       │        SeaweedFS Volumes                │
                       │      (Message Storage)                  │
                       └─────────────────────────────────────────┘

This architecture ensures full integration testing of the entire Kafka → SeaweedFS message path.