1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-06-29 16:22:46 +02:00
seaweedfs/test/mq/integration/framework.go
2025-06-24 09:35:08 -07:00

379 lines
11 KiB
Go

package integration
import (
"context"
"fmt"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/agent"
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// TestEnvironment holds the configuration for the test environment
type TestEnvironment struct {
Masters []string
Brokers []string
Filers []string
TestTimeout time.Duration
CleanupFuncs []func()
mutex sync.Mutex
}
// IntegrationTestSuite provides the base test framework
type IntegrationTestSuite struct {
env *TestEnvironment
agents map[string]*agent.MessageQueueAgent
publishers map[string]*pub_client.TopicPublisher
subscribers map[string]*sub_client.TopicSubscriber
subCancels map[string]context.CancelFunc
cleanupOnce sync.Once
t *testing.T
}
// NewIntegrationTestSuite creates a new test suite instance
func NewIntegrationTestSuite(t *testing.T) *IntegrationTestSuite {
env := &TestEnvironment{
Masters: getEnvList("SEAWEED_MASTERS", []string{"localhost:19333"}),
Brokers: getEnvList("SEAWEED_BROKERS", []string{"localhost:17777"}),
Filers: getEnvList("SEAWEED_FILERS", []string{"localhost:18888"}),
TestTimeout: getEnvDuration("GO_TEST_TIMEOUT", 30*time.Minute),
}
return &IntegrationTestSuite{
env: env,
agents: make(map[string]*agent.MessageQueueAgent),
publishers: make(map[string]*pub_client.TopicPublisher),
subscribers: make(map[string]*sub_client.TopicSubscriber),
subCancels: make(map[string]context.CancelFunc),
t: t,
}
}
// Setup initializes the test environment
func (its *IntegrationTestSuite) Setup() error {
// Wait for cluster to be ready
if err := its.waitForClusterReady(); err != nil {
return fmt.Errorf("cluster not ready: %v", err)
}
// Register cleanup
its.t.Cleanup(its.Cleanup)
return nil
}
// Cleanup performs cleanup operations
func (its *IntegrationTestSuite) Cleanup() {
its.cleanupOnce.Do(func() {
// Close all subscribers first (they use context cancellation)
for name := range its.subscribers {
if cancel, ok := its.subCancels[name]; ok && cancel != nil {
cancel()
its.t.Logf("Cancelled subscriber context: %s", name)
}
its.t.Logf("Cleaned up subscriber: %s", name)
}
// Wait a moment for gRPC connections to close gracefully
time.Sleep(1 * time.Second)
// Close all publishers
for name, publisher := range its.publishers {
if publisher != nil {
// Add timeout to prevent deadlock during shutdown
done := make(chan bool, 1)
go func(p *pub_client.TopicPublisher, n string) {
p.Shutdown()
done <- true
}(publisher, name)
select {
case <-done:
its.t.Logf("Cleaned up publisher: %s", name)
case <-time.After(5 * time.Second):
its.t.Logf("Publisher shutdown timed out: %s", name)
}
}
}
// Execute additional cleanup functions
its.env.mutex.Lock()
for _, cleanup := range its.env.CleanupFuncs {
cleanup()
}
its.env.mutex.Unlock()
})
}
// CreatePublisher creates a new topic publisher
func (its *IntegrationTestSuite) CreatePublisher(config *PublisherTestConfig) (*pub_client.TopicPublisher, error) {
publisherConfig := &pub_client.PublisherConfiguration{
Topic: topic.NewTopic(config.Namespace, config.TopicName),
PartitionCount: config.PartitionCount,
Brokers: its.env.Brokers,
PublisherName: config.PublisherName,
RecordType: config.RecordType,
}
publisher, err := pub_client.NewTopicPublisher(publisherConfig)
if err != nil {
return nil, fmt.Errorf("failed to create publisher: %v", err)
}
its.publishers[config.PublisherName] = publisher
return publisher, nil
}
// CreateSubscriber creates a new topic subscriber
func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig) (*sub_client.TopicSubscriber, error) {
subscriberConfig := &sub_client.SubscriberConfiguration{
ConsumerGroup: config.ConsumerGroup,
ConsumerGroupInstanceId: config.ConsumerInstanceId,
GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
MaxPartitionCount: config.MaxPartitionCount,
SlidingWindowSize: config.SlidingWindowSize,
}
contentConfig := &sub_client.ContentConfiguration{
Topic: topic.NewTopic(config.Namespace, config.TopicName),
Filter: config.Filter,
PartitionOffsets: config.PartitionOffsets,
OffsetType: config.OffsetType,
OffsetTsNs: config.OffsetTsNs,
}
offsetChan := make(chan sub_client.KeyedOffset, 1024)
ctx, cancel := context.WithCancel(context.Background())
subscriber := sub_client.NewTopicSubscriber(
ctx,
its.env.Brokers,
subscriberConfig,
contentConfig,
offsetChan,
)
its.subscribers[config.ConsumerInstanceId] = subscriber
its.subCancels[config.ConsumerInstanceId] = cancel
return subscriber, nil
}
// CreateAgent creates a new message queue agent
func (its *IntegrationTestSuite) CreateAgent(name string) (*agent.MessageQueueAgent, error) {
var brokerAddresses []pb.ServerAddress
for _, broker := range its.env.Brokers {
brokerAddresses = append(brokerAddresses, pb.ServerAddress(broker))
}
agentOptions := &agent.MessageQueueAgentOptions{
SeedBrokers: brokerAddresses,
}
mqAgent := agent.NewMessageQueueAgent(
agentOptions,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
its.agents[name] = mqAgent
return mqAgent, nil
}
// PublisherTestConfig holds configuration for creating test publishers
type PublisherTestConfig struct {
Namespace string
TopicName string
PartitionCount int32
PublisherName string
RecordType *schema_pb.RecordType
}
// SubscriberTestConfig holds configuration for creating test subscribers
type SubscriberTestConfig struct {
Namespace string
TopicName string
ConsumerGroup string
ConsumerInstanceId string
MaxPartitionCount int32
SlidingWindowSize int32
Filter string
PartitionOffsets []*schema_pb.PartitionOffset
OffsetType schema_pb.OffsetType
OffsetTsNs int64
}
// TestMessage represents a test message with metadata
type TestMessage struct {
ID string
Content []byte
Timestamp time.Time
Key []byte
}
// MessageCollector collects received messages for verification
type MessageCollector struct {
messages []TestMessage
mutex sync.RWMutex
waitCh chan struct{}
expected int
closed bool // protect against closing waitCh multiple times
}
// NewMessageCollector creates a new message collector
func NewMessageCollector(expectedCount int) *MessageCollector {
return &MessageCollector{
messages: make([]TestMessage, 0),
waitCh: make(chan struct{}),
expected: expectedCount,
}
}
// AddMessage adds a received message to the collector
func (mc *MessageCollector) AddMessage(msg TestMessage) {
mc.mutex.Lock()
defer mc.mutex.Unlock()
mc.messages = append(mc.messages, msg)
if len(mc.messages) >= mc.expected && !mc.closed {
close(mc.waitCh)
mc.closed = true
}
}
// WaitForMessages waits for the expected number of messages or timeout
func (mc *MessageCollector) WaitForMessages(timeout time.Duration) []TestMessage {
select {
case <-mc.waitCh:
case <-time.After(timeout):
}
mc.mutex.RLock()
defer mc.mutex.RUnlock()
result := make([]TestMessage, len(mc.messages))
copy(result, mc.messages)
return result
}
// GetMessages returns all collected messages
func (mc *MessageCollector) GetMessages() []TestMessage {
mc.mutex.RLock()
defer mc.mutex.RUnlock()
result := make([]TestMessage, len(mc.messages))
copy(result, mc.messages)
return result
}
// CreateTestSchema creates a simple test schema
func CreateTestSchema() *schema_pb.RecordType {
return schema.RecordTypeBegin().
WithField("id", schema.TypeString).
WithField("timestamp", schema.TypeInt64).
WithField("content", schema.TypeString).
WithField("sequence", schema.TypeInt32).
RecordTypeEnd()
}
// CreateComplexTestSchema creates a complex test schema with nested structures
func CreateComplexTestSchema() *schema_pb.RecordType {
addressType := schema.RecordTypeBegin().
WithField("street", schema.TypeString).
WithField("city", schema.TypeString).
WithField("zipcode", schema.TypeString).
RecordTypeEnd()
return schema.RecordTypeBegin().
WithField("user_id", schema.TypeString).
WithField("name", schema.TypeString).
WithField("age", schema.TypeInt32).
WithField("emails", schema.ListOf(schema.TypeString)).
WithRecordField("address", addressType).
WithField("created_at", schema.TypeInt64).
RecordTypeEnd()
}
// Helper functions
func getEnvList(key string, defaultValue []string) []string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return strings.Split(value, ",")
}
func getEnvDuration(key string, defaultValue time.Duration) time.Duration {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
duration, err := time.ParseDuration(value)
if err != nil {
return defaultValue
}
return duration
}
func (its *IntegrationTestSuite) waitForClusterReady() error {
maxRetries := 30
retryInterval := 2 * time.Second
for i := 0; i < maxRetries; i++ {
if its.isClusterReady() {
return nil
}
its.t.Logf("Waiting for cluster to be ready... attempt %d/%d", i+1, maxRetries)
time.Sleep(retryInterval)
}
return fmt.Errorf("cluster not ready after %d attempts", maxRetries)
}
func (its *IntegrationTestSuite) isClusterReady() bool {
// Check if at least one broker is accessible
for _, broker := range its.env.Brokers {
if its.isBrokerReady(broker) {
return true
}
}
return false
}
func (its *IntegrationTestSuite) isBrokerReady(broker string) bool {
// Simple connection test
conn, err := grpc.NewClient(broker, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return false
}
defer conn.Close()
// TODO: Add actual health check call here
return true
}
// AssertMessagesReceived verifies that expected messages were received
func (its *IntegrationTestSuite) AssertMessagesReceived(t *testing.T, collector *MessageCollector, expectedCount int, timeout time.Duration) {
messages := collector.WaitForMessages(timeout)
require.Len(t, messages, expectedCount, "Expected %d messages, got %d", expectedCount, len(messages))
}
// AssertMessageOrdering verifies that messages are received in the expected order
func (its *IntegrationTestSuite) AssertMessageOrdering(t *testing.T, messages []TestMessage) {
for i := 1; i < len(messages); i++ {
require.True(t, messages[i].Timestamp.After(messages[i-1].Timestamp) || messages[i].Timestamp.Equal(messages[i-1].Timestamp),
"Messages not in chronological order: message %d timestamp %v should be >= message %d timestamp %v",
i, messages[i].Timestamp, i-1, messages[i-1].Timestamp)
}
}