mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-06-29 16:22:46 +02:00
339 lines
9.9 KiB
Go
339 lines
9.9 KiB
Go
package integration
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestBasicPublishSubscribe(t *testing.T) {
|
|
suite := NewIntegrationTestSuite(t)
|
|
require.NoError(t, suite.Setup())
|
|
|
|
namespace := "test"
|
|
topicName := fmt.Sprintf("basic-pubsub-%d", time.Now().UnixNano()) // Unique topic name per run
|
|
testSchema := CreateTestSchema()
|
|
messageCount := 10
|
|
|
|
// Create publisher
|
|
pubConfig := &PublisherTestConfig{
|
|
Namespace: namespace,
|
|
TopicName: topicName,
|
|
PartitionCount: 1,
|
|
PublisherName: "basic-publisher",
|
|
RecordType: testSchema,
|
|
}
|
|
|
|
publisher, err := suite.CreatePublisher(pubConfig)
|
|
require.NoError(t, err)
|
|
|
|
// Create subscriber
|
|
subConfig := &SubscriberTestConfig{
|
|
Namespace: namespace,
|
|
TopicName: topicName,
|
|
ConsumerGroup: "test-group",
|
|
ConsumerInstanceId: "consumer-1",
|
|
MaxPartitionCount: 1,
|
|
SlidingWindowSize: 10,
|
|
OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
|
|
}
|
|
|
|
subscriber, err := suite.CreateSubscriber(subConfig)
|
|
require.NoError(t, err, "Failed to create subscriber")
|
|
|
|
// Set up message collector
|
|
collector := NewMessageCollector(messageCount)
|
|
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
|
|
t.Logf("[Subscriber] Received message with key: %s, ts: %d", string(m.Data.Key), m.Data.TsNs)
|
|
collector.AddMessage(TestMessage{
|
|
ID: fmt.Sprintf("msg-%d", len(collector.GetMessages())),
|
|
Content: m.Data.Value,
|
|
Timestamp: time.Unix(0, m.Data.TsNs),
|
|
Key: m.Data.Key,
|
|
})
|
|
})
|
|
|
|
// Start subscriber
|
|
go func() {
|
|
err := subscriber.Subscribe()
|
|
if err != nil {
|
|
t.Logf("Subscriber error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for subscriber to be ready
|
|
t.Logf("[Test] Waiting for subscriber to be ready...")
|
|
time.Sleep(2 * time.Second)
|
|
|
|
// Publish test messages
|
|
for i := 0; i < messageCount; i++ {
|
|
record := schema.RecordBegin().
|
|
SetString("id", fmt.Sprintf("msg-%d", i)).
|
|
SetInt64("timestamp", time.Now().UnixNano()).
|
|
SetString("content", fmt.Sprintf("Test message %d", i)).
|
|
SetInt32("sequence", int32(i)).
|
|
RecordEnd()
|
|
|
|
key := []byte(fmt.Sprintf("key-%d", i))
|
|
t.Logf("[Publisher] Publishing message %d with key: %s", i, string(key))
|
|
err := publisher.PublishRecord(key, record)
|
|
require.NoError(t, err, "Failed to publish message %d", i)
|
|
}
|
|
|
|
t.Logf("[Test] Waiting for messages to be received...")
|
|
messages := collector.WaitForMessages(30 * time.Second)
|
|
t.Logf("[Test] WaitForMessages returned. Received %d messages.", len(messages))
|
|
|
|
// Verify all messages were received
|
|
assert.Len(t, messages, messageCount, "Expected %d messages, got %d", messageCount, len(messages))
|
|
|
|
// Verify message content
|
|
for i, msg := range messages {
|
|
assert.NotEmpty(t, msg.Content, "Message %d should have content", i)
|
|
assert.NotEmpty(t, msg.Key, "Message %d should have key", i)
|
|
}
|
|
|
|
t.Logf("[Test] TestBasicPublishSubscribe completed.")
|
|
}
|
|
|
|
func TestMultipleConsumers(t *testing.T) {
|
|
suite := NewIntegrationTestSuite(t)
|
|
require.NoError(t, suite.Setup())
|
|
|
|
namespace := "test"
|
|
topicName := "multi-consumer"
|
|
testSchema := CreateTestSchema()
|
|
messageCount := 20
|
|
consumerCount := 3
|
|
|
|
// Create publisher
|
|
pubConfig := &PublisherTestConfig{
|
|
Namespace: namespace,
|
|
TopicName: topicName,
|
|
PartitionCount: 3, // Multiple partitions for load distribution
|
|
PublisherName: "multi-publisher",
|
|
RecordType: testSchema,
|
|
}
|
|
|
|
publisher, err := suite.CreatePublisher(pubConfig)
|
|
require.NoError(t, err)
|
|
|
|
// Create multiple consumers
|
|
collectors := make([]*MessageCollector, consumerCount)
|
|
for i := 0; i < consumerCount; i++ {
|
|
collectors[i] = NewMessageCollector(messageCount / consumerCount) // Expect roughly equal distribution
|
|
|
|
subConfig := &SubscriberTestConfig{
|
|
Namespace: namespace,
|
|
TopicName: topicName,
|
|
ConsumerGroup: "multi-consumer-group", // Same group for load balancing
|
|
ConsumerInstanceId: fmt.Sprintf("consumer-%d", i),
|
|
MaxPartitionCount: 1,
|
|
SlidingWindowSize: 10,
|
|
OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
|
|
}
|
|
|
|
subscriber, err := suite.CreateSubscriber(subConfig)
|
|
require.NoError(t, err)
|
|
|
|
// Set up message collection for this consumer
|
|
collectorIndex := i
|
|
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
|
|
collectors[collectorIndex].AddMessage(TestMessage{
|
|
ID: fmt.Sprintf("consumer-%d-msg-%d", collectorIndex, len(collectors[collectorIndex].GetMessages())),
|
|
Content: m.Data.Value,
|
|
Timestamp: time.Unix(0, m.Data.TsNs),
|
|
Key: m.Data.Key,
|
|
})
|
|
})
|
|
|
|
// Start subscriber
|
|
go func() {
|
|
subscriber.Subscribe()
|
|
}()
|
|
}
|
|
|
|
// Wait for subscribers to be ready
|
|
time.Sleep(3 * time.Second)
|
|
|
|
// Publish messages with different keys to distribute across partitions
|
|
for i := 0; i < messageCount; i++ {
|
|
record := schema.RecordBegin().
|
|
SetString("id", fmt.Sprintf("multi-msg-%d", i)).
|
|
SetInt64("timestamp", time.Now().UnixNano()).
|
|
SetString("content", fmt.Sprintf("Multi consumer test message %d", i)).
|
|
SetInt32("sequence", int32(i)).
|
|
RecordEnd()
|
|
|
|
key := []byte(fmt.Sprintf("partition-key-%d", i%3)) // Distribute across 3 partitions
|
|
err := publisher.PublishRecord(key, record)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Wait for all messages to be consumed
|
|
time.Sleep(10 * time.Second)
|
|
|
|
// Verify message distribution
|
|
totalReceived := 0
|
|
for i, collector := range collectors {
|
|
messages := collector.GetMessages()
|
|
t.Logf("Consumer %d received %d messages", i, len(messages))
|
|
totalReceived += len(messages)
|
|
}
|
|
|
|
// All messages should be consumed across all consumers
|
|
assert.Equal(t, messageCount, totalReceived, "Total messages received should equal messages sent")
|
|
}
|
|
|
|
func TestMessageOrdering(t *testing.T) {
|
|
suite := NewIntegrationTestSuite(t)
|
|
require.NoError(t, suite.Setup())
|
|
|
|
namespace := "test"
|
|
topicName := "ordering-test"
|
|
testSchema := CreateTestSchema()
|
|
messageCount := 15
|
|
|
|
// Create publisher
|
|
pubConfig := &PublisherTestConfig{
|
|
Namespace: namespace,
|
|
TopicName: topicName,
|
|
PartitionCount: 1, // Single partition to guarantee ordering
|
|
PublisherName: "ordering-publisher",
|
|
RecordType: testSchema,
|
|
}
|
|
|
|
publisher, err := suite.CreatePublisher(pubConfig)
|
|
require.NoError(t, err)
|
|
|
|
// Create subscriber
|
|
subConfig := &SubscriberTestConfig{
|
|
Namespace: namespace,
|
|
TopicName: topicName,
|
|
ConsumerGroup: "ordering-group",
|
|
ConsumerInstanceId: "ordering-consumer",
|
|
MaxPartitionCount: 1,
|
|
SlidingWindowSize: 5,
|
|
OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
|
|
}
|
|
|
|
subscriber, err := suite.CreateSubscriber(subConfig)
|
|
require.NoError(t, err)
|
|
|
|
// Set up message collector
|
|
collector := NewMessageCollector(messageCount)
|
|
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
|
|
collector.AddMessage(TestMessage{
|
|
ID: fmt.Sprintf("ordered-msg"),
|
|
Content: m.Data.Value,
|
|
Timestamp: time.Unix(0, m.Data.TsNs),
|
|
Key: m.Data.Key,
|
|
})
|
|
})
|
|
|
|
// Start subscriber
|
|
go func() {
|
|
subscriber.Subscribe()
|
|
}()
|
|
|
|
// Wait for consumer to be ready
|
|
time.Sleep(2 * time.Second)
|
|
|
|
// Publish messages with same key to ensure they go to same partition
|
|
publishTimes := make([]time.Time, messageCount)
|
|
for i := 0; i < messageCount; i++ {
|
|
publishTimes[i] = time.Now()
|
|
|
|
record := schema.RecordBegin().
|
|
SetString("id", fmt.Sprintf("ordered-%d", i)).
|
|
SetInt64("timestamp", publishTimes[i].UnixNano()).
|
|
SetString("content", fmt.Sprintf("Ordered message %d", i)).
|
|
SetInt32("sequence", int32(i)).
|
|
RecordEnd()
|
|
|
|
key := []byte("same-partition-key") // Same key ensures same partition
|
|
err := publisher.PublishRecord(key, record)
|
|
require.NoError(t, err)
|
|
|
|
// Small delay to ensure different timestamps
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
// Wait for all messages
|
|
messages := collector.WaitForMessages(30 * time.Second)
|
|
require.Len(t, messages, messageCount)
|
|
|
|
// Verify ordering within the partition
|
|
suite.AssertMessageOrdering(t, messages)
|
|
}
|
|
|
|
func TestSchemaValidation(t *testing.T) {
|
|
suite := NewIntegrationTestSuite(t)
|
|
require.NoError(t, suite.Setup())
|
|
|
|
namespace := "test"
|
|
topicName := "schema-validation"
|
|
|
|
// Test with simple schema
|
|
simpleSchema := CreateTestSchema()
|
|
|
|
pubConfig := &PublisherTestConfig{
|
|
Namespace: namespace,
|
|
TopicName: topicName,
|
|
PartitionCount: 1,
|
|
PublisherName: "schema-publisher",
|
|
RecordType: simpleSchema,
|
|
}
|
|
|
|
publisher, err := suite.CreatePublisher(pubConfig)
|
|
require.NoError(t, err)
|
|
|
|
// Test valid record
|
|
validRecord := schema.RecordBegin().
|
|
SetString("id", "valid-msg").
|
|
SetInt64("timestamp", time.Now().UnixNano()).
|
|
SetString("content", "Valid message").
|
|
SetInt32("sequence", 1).
|
|
RecordEnd()
|
|
|
|
err = publisher.PublishRecord([]byte("test-key"), validRecord)
|
|
assert.NoError(t, err, "Valid record should be published successfully")
|
|
|
|
// Test with complex nested schema
|
|
complexSchema := CreateComplexTestSchema()
|
|
|
|
complexPubConfig := &PublisherTestConfig{
|
|
Namespace: namespace,
|
|
TopicName: topicName + "-complex",
|
|
PartitionCount: 1,
|
|
PublisherName: "complex-publisher",
|
|
RecordType: complexSchema,
|
|
}
|
|
|
|
complexPublisher, err := suite.CreatePublisher(complexPubConfig)
|
|
require.NoError(t, err)
|
|
|
|
// Test complex nested record
|
|
complexRecord := schema.RecordBegin().
|
|
SetString("user_id", "user123").
|
|
SetString("name", "John Doe").
|
|
SetInt32("age", 30).
|
|
SetStringList("emails", "john@example.com", "john.doe@company.com").
|
|
SetRecord("address",
|
|
schema.RecordBegin().
|
|
SetString("street", "123 Main St").
|
|
SetString("city", "New York").
|
|
SetString("zipcode", "10001").
|
|
RecordEnd()).
|
|
SetInt64("created_at", time.Now().UnixNano()).
|
|
RecordEnd()
|
|
|
|
err = complexPublisher.PublishRecord([]byte("complex-key"), complexRecord)
|
|
assert.NoError(t, err, "Complex nested record should be published successfully")
|
|
}
|