1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/test/kafka/e2e/comprehensive_test.go
chrislu 6ca1597c52 fmt
2025-09-17 21:13:23 -07:00

131 lines
4.1 KiB
Go

package e2e
import (
"testing"
"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
)
// TestComprehensiveE2E tests complete end-to-end workflows
// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock
func TestComprehensiveE2E(t *testing.T) {
gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable)
defer gateway.CleanupAndClose()
addr := gateway.StartAndWait()
// Log which backend we're using
if gateway.IsSMQMode() {
t.Logf("Running comprehensive E2E tests with SMQ backend")
} else {
t.Logf("Running comprehensive E2E tests with mock backend")
}
// Create topics for different test scenarios
topics := []string{
testutil.GenerateUniqueTopicName("e2e-kafka-go"),
testutil.GenerateUniqueTopicName("e2e-sarama"),
testutil.GenerateUniqueTopicName("e2e-mixed"),
}
gateway.AddTestTopics(topics...)
t.Run("KafkaGo_to_KafkaGo", func(t *testing.T) {
testKafkaGoToKafkaGo(t, addr, topics[0])
})
t.Run("Sarama_to_Sarama", func(t *testing.T) {
testSaramaToSarama(t, addr, topics[1])
})
t.Run("KafkaGo_to_Sarama", func(t *testing.T) {
testKafkaGoToSarama(t, addr, topics[2])
})
t.Run("Sarama_to_KafkaGo", func(t *testing.T) {
testSaramaToKafkaGo(t, addr, topics[2])
})
}
func testKafkaGoToKafkaGo(t *testing.T, addr, topic string) {
client := testutil.NewKafkaGoClient(t, addr)
msgGen := testutil.NewMessageGenerator()
// Generate test messages
messages := msgGen.GenerateKafkaGoMessages(2)
// Produce with kafka-go
err := client.ProduceMessages(topic, messages)
testutil.AssertNoError(t, err, "kafka-go produce failed")
// Consume with kafka-go
consumed, err := client.ConsumeMessages(topic, len(messages))
testutil.AssertNoError(t, err, "kafka-go consume failed")
// Validate message content
err = testutil.ValidateKafkaGoMessageContent(messages, consumed)
testutil.AssertNoError(t, err, "Message content validation failed")
t.Logf("kafka-go to kafka-go test PASSED")
}
func testSaramaToSarama(t *testing.T, addr, topic string) {
client := testutil.NewSaramaClient(t, addr)
msgGen := testutil.NewMessageGenerator()
// Generate test messages
messages := msgGen.GenerateStringMessages(2)
// Produce with Sarama
err := client.ProduceMessages(topic, messages)
testutil.AssertNoError(t, err, "Sarama produce failed")
// Consume with Sarama
consumed, err := client.ConsumeMessages(topic, 0, len(messages))
testutil.AssertNoError(t, err, "Sarama consume failed")
// Validate message content
err = testutil.ValidateMessageContent(messages, consumed)
testutil.AssertNoError(t, err, "Message content validation failed")
t.Logf("Sarama to Sarama test PASSED")
}
func testKafkaGoToSarama(t *testing.T, addr, topic string) {
kafkaGoClient := testutil.NewKafkaGoClient(t, addr)
saramaClient := testutil.NewSaramaClient(t, addr)
msgGen := testutil.NewMessageGenerator()
// Produce with kafka-go
messages := msgGen.GenerateKafkaGoMessages(2)
err := kafkaGoClient.ProduceMessages(topic, messages)
testutil.AssertNoError(t, err, "kafka-go produce failed")
// Consume with Sarama
consumed, err := saramaClient.ConsumeMessages(topic, 0, len(messages))
testutil.AssertNoError(t, err, "Sarama consume failed")
// Validate that we got the expected number of messages
testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")
t.Logf("kafka-go to Sarama test PASSED")
}
func testSaramaToKafkaGo(t *testing.T, addr, topic string) {
kafkaGoClient := testutil.NewKafkaGoClient(t, addr)
saramaClient := testutil.NewSaramaClient(t, addr)
msgGen := testutil.NewMessageGenerator()
// Produce with Sarama
messages := msgGen.GenerateStringMessages(2)
err := saramaClient.ProduceMessages(topic, messages)
testutil.AssertNoError(t, err, "Sarama produce failed")
// Consume with kafka-go
consumed, err := kafkaGoClient.ConsumeMessages(topic, len(messages))
testutil.AssertNoError(t, err, "kafka-go consume failed")
// Validate that we got the expected number of messages
testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")
t.Logf("Sarama to kafka-go test PASSED")
}