1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/test/kafka/internal/testutil/messages.go
chrislu 0dee802d05 refactor test/kafka
Update docker_test.go

Update gateway.go

Fix kafka-tests.yml workflow for refactored test structure

- Update test commands to use new directory structure (unit/, integration/, e2e/)
- Replace old test names with new refactored test names
- Add dedicated E2E test job
- Update protocol tests to run from correct directory
- Remove obsolete test patterns that no longer exist

Fixes GitHub Actions failure: 'no Go files in test/kafka'
2025-09-15 09:43:40 -07:00

116 lines
3.3 KiB
Go

package testutil
import (
"fmt"
"time"
"github.com/segmentio/kafka-go"
)
// MessageGenerator provides utilities for generating test messages
type MessageGenerator struct {
counter int
}
// NewMessageGenerator creates a new message generator
func NewMessageGenerator() *MessageGenerator {
return &MessageGenerator{counter: 0}
}
// GenerateKafkaGoMessages generates kafka-go messages for testing
func (m *MessageGenerator) GenerateKafkaGoMessages(count int) []kafka.Message {
messages := make([]kafka.Message, count)
for i := 0; i < count; i++ {
m.counter++
messages[i] = kafka.Message{
Key: []byte(fmt.Sprintf("test-key-%d", m.counter)),
Value: []byte(fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())),
}
}
return messages
}
// GenerateStringMessages generates string messages for Sarama
func (m *MessageGenerator) GenerateStringMessages(count int) []string {
messages := make([]string, count)
for i := 0; i < count; i++ {
m.counter++
messages[i] = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())
}
return messages
}
// GenerateKafkaGoMessage generates a single kafka-go message
func (m *MessageGenerator) GenerateKafkaGoMessage(key, value string) kafka.Message {
if key == "" {
m.counter++
key = fmt.Sprintf("test-key-%d", m.counter)
}
if value == "" {
value = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())
}
return kafka.Message{
Key: []byte(key),
Value: []byte(value),
}
}
// GenerateUniqueTopicName generates a unique topic name for testing
func GenerateUniqueTopicName(prefix string) string {
if prefix == "" {
prefix = "test-topic"
}
return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
}
// GenerateUniqueGroupID generates a unique consumer group ID for testing
func GenerateUniqueGroupID(prefix string) string {
if prefix == "" {
prefix = "test-group"
}
return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
}
// ValidateMessageContent validates that consumed messages match expected content
func ValidateMessageContent(expected, actual []string) error {
if len(expected) != len(actual) {
return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual))
}
for i, expectedMsg := range expected {
if i >= len(actual) {
return fmt.Errorf("missing message at index %d", i)
}
if actual[i] != expectedMsg {
return fmt.Errorf("message mismatch at index %d: expected %q, got %q", i, expectedMsg, actual[i])
}
}
return nil
}
// ValidateKafkaGoMessageContent validates kafka-go messages
func ValidateKafkaGoMessageContent(expected, actual []kafka.Message) error {
if len(expected) != len(actual) {
return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual))
}
for i, expectedMsg := range expected {
if i >= len(actual) {
return fmt.Errorf("missing message at index %d", i)
}
if string(actual[i].Key) != string(expectedMsg.Key) {
return fmt.Errorf("key mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Key), string(actual[i].Key))
}
if string(actual[i].Value) != string(expectedMsg.Value) {
return fmt.Errorf("value mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Value), string(actual[i].Value))
}
}
return nil
}