1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/test/kafka/integration/docker_test.go
chrislu 3847984566 Phase 0 Part 1: Implement basic tests and offset fixes
- Add unit tests for gateway connection/refusal in test/kafka/unit/gateway_test.go
- Add Schema Registry connectivity test in test/kafka/integration/docker_test.go
- Implement legacy offset key parsing in weed/mq/kafka/offset/smq_storage.go
- Fix leaderEpoch placeholder to return 0 instead of -1 in offset_management.go
- Add comprehensive test coverage for parseTopicPartitionKey function

All tests passing. Ready for Phase 0 Part 2 (API cleanup and logging).
2025-09-15 20:26:24 -07:00

216 lines
7 KiB
Go

package integration
import (
"encoding/json"
"io"
"net/http"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
)
// TestDockerIntegration tests the complete Kafka integration using Docker Compose
func TestDockerIntegration(t *testing.T) {
env := testutil.NewDockerEnvironment(t)
env.SkipIfNotAvailable(t)
t.Run("KafkaConnectivity", func(t *testing.T) {
env.RequireKafka(t)
testDockerKafkaConnectivity(t, env.KafkaBootstrap)
})
t.Run("SchemaRegistryConnectivity", func(t *testing.T) {
env.RequireSchemaRegistry(t)
testDockerSchemaRegistryConnectivity(t, env.SchemaRegistry)
})
t.Run("KafkaGatewayConnectivity", func(t *testing.T) {
env.RequireGateway(t)
testDockerKafkaGatewayConnectivity(t, env.KafkaGateway)
})
t.Run("SaramaProduceConsume", func(t *testing.T) {
env.RequireKafka(t)
testDockerSaramaProduceConsume(t, env.KafkaBootstrap)
})
t.Run("KafkaGoProduceConsume", func(t *testing.T) {
env.RequireKafka(t)
testDockerKafkaGoProduceConsume(t, env.KafkaBootstrap)
})
t.Run("GatewayProduceConsume", func(t *testing.T) {
env.RequireGateway(t)
testDockerGatewayProduceConsume(t, env.KafkaGateway)
})
t.Run("CrossClientCompatibility", func(t *testing.T) {
env.RequireKafka(t)
env.RequireGateway(t)
testDockerCrossClientCompatibility(t, env.KafkaBootstrap, env.KafkaGateway)
})
}
func testDockerKafkaConnectivity(t *testing.T, bootstrap string) {
client := testutil.NewSaramaClient(t, bootstrap)
// Test basic connectivity by creating a topic
topicName := testutil.GenerateUniqueTopicName("connectivity-test")
err := client.CreateTopic(topicName, 1, 1)
testutil.AssertNoError(t, err, "Failed to create topic for connectivity test")
t.Logf("✅ Kafka connectivity test passed")
}
func testDockerSchemaRegistryConnectivity(t *testing.T, registryURL string) {
// Test basic HTTP connectivity to Schema Registry
client := &http.Client{Timeout: 10 * time.Second}
// Test 1: Check if Schema Registry is responding
resp, err := client.Get(registryURL + "/subjects")
if err != nil {
t.Fatalf("Failed to connect to Schema Registry at %s: %v", registryURL, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("Schema Registry returned status %d, expected 200", resp.StatusCode)
}
// Test 2: Verify response is valid JSON array
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %v", err)
}
var subjects []string
if err := json.Unmarshal(body, &subjects); err != nil {
t.Fatalf("Schema Registry response is not valid JSON array: %v", err)
}
t.Logf("Schema Registry is accessible with %d subjects", len(subjects))
// Test 3: Check config endpoint
configResp, err := client.Get(registryURL + "/config")
if err != nil {
t.Fatalf("Failed to get Schema Registry config: %v", err)
}
defer configResp.Body.Close()
if configResp.StatusCode != http.StatusOK {
t.Fatalf("Schema Registry config endpoint returned status %d", configResp.StatusCode)
}
configBody, err := io.ReadAll(configResp.Body)
if err != nil {
t.Fatalf("Failed to read config response: %v", err)
}
var config map[string]interface{}
if err := json.Unmarshal(configBody, &config); err != nil {
t.Fatalf("Schema Registry config response is not valid JSON: %v", err)
}
t.Logf("Schema Registry config: %v", config)
t.Logf("Schema Registry connectivity test passed")
}
func testDockerKafkaGatewayConnectivity(t *testing.T, gatewayURL string) {
client := testutil.NewSaramaClient(t, gatewayURL)
// Test basic connectivity to gateway
topicName := testutil.GenerateUniqueTopicName("gateway-connectivity-test")
err := client.CreateTopic(topicName, 1, 1)
testutil.AssertNoError(t, err, "Failed to create topic via gateway")
t.Logf("✅ Kafka Gateway connectivity test passed")
}
func testDockerSaramaProduceConsume(t *testing.T, bootstrap string) {
client := testutil.NewSaramaClient(t, bootstrap)
msgGen := testutil.NewMessageGenerator()
topicName := testutil.GenerateUniqueTopicName("sarama-docker-test")
// Create topic
err := client.CreateTopic(topicName, 1, 1)
testutil.AssertNoError(t, err, "Failed to create topic")
// Produce and consume messages
messages := msgGen.GenerateStringMessages(3)
err = client.ProduceMessages(topicName, messages)
testutil.AssertNoError(t, err, "Failed to produce messages")
consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
testutil.AssertNoError(t, err, "Failed to consume messages")
err = testutil.ValidateMessageContent(messages, consumed)
testutil.AssertNoError(t, err, "Message validation failed")
t.Logf("✅ Sarama produce/consume test passed")
}
func testDockerKafkaGoProduceConsume(t *testing.T, bootstrap string) {
client := testutil.NewKafkaGoClient(t, bootstrap)
msgGen := testutil.NewMessageGenerator()
topicName := testutil.GenerateUniqueTopicName("kafka-go-docker-test")
// Create topic
err := client.CreateTopic(topicName, 1, 1)
testutil.AssertNoError(t, err, "Failed to create topic")
// Produce and consume messages
messages := msgGen.GenerateKafkaGoMessages(3)
err = client.ProduceMessages(topicName, messages)
testutil.AssertNoError(t, err, "Failed to produce messages")
consumed, err := client.ConsumeMessages(topicName, len(messages))
testutil.AssertNoError(t, err, "Failed to consume messages")
err = testutil.ValidateKafkaGoMessageContent(messages, consumed)
testutil.AssertNoError(t, err, "Message validation failed")
t.Logf("✅ kafka-go produce/consume test passed")
}
func testDockerGatewayProduceConsume(t *testing.T, gatewayURL string) {
client := testutil.NewSaramaClient(t, gatewayURL)
msgGen := testutil.NewMessageGenerator()
topicName := testutil.GenerateUniqueTopicName("gateway-docker-test")
// Produce and consume via gateway
messages := msgGen.GenerateStringMessages(3)
err := client.ProduceMessages(topicName, messages)
testutil.AssertNoError(t, err, "Failed to produce messages via gateway")
consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
testutil.AssertNoError(t, err, "Failed to consume messages via gateway")
err = testutil.ValidateMessageContent(messages, consumed)
testutil.AssertNoError(t, err, "Message validation failed")
t.Logf("✅ Gateway produce/consume test passed")
}
func testDockerCrossClientCompatibility(t *testing.T, kafkaBootstrap, gatewayURL string) {
kafkaClient := testutil.NewSaramaClient(t, kafkaBootstrap)
msgGen := testutil.NewMessageGenerator()
topicName := testutil.GenerateUniqueTopicName("cross-client-docker-test")
// Create topic on Kafka
err := kafkaClient.CreateTopic(topicName, 1, 1)
testutil.AssertNoError(t, err, "Failed to create topic on Kafka")
// Produce to Kafka
messages := msgGen.GenerateStringMessages(2)
err = kafkaClient.ProduceMessages(topicName, messages)
testutil.AssertNoError(t, err, "Failed to produce to Kafka")
// This tests the integration between Kafka and the Gateway
// In a real scenario, messages would be replicated or bridged
t.Logf("✅ Cross-client compatibility test passed")
}