1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/test_handler.go
2025-09-17 00:58:14 -07:00

164 lines
4.6 KiB
Go

package protocol
import (
"strconv"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
// testSMQRecord implements the SMQRecord interface for testing
type testSMQRecord struct {
offset int64
timestamp int64
key []byte
value []byte
}
func (r *testSMQRecord) GetOffset() int64 { return r.offset }
func (r *testSMQRecord) GetTimestamp() int64 { return r.timestamp }
func (r *testSMQRecord) GetKey() []byte { return r.key }
func (r *testSMQRecord) GetValue() []byte { return r.value }
// testSeaweedMQHandlerForUnitTests is a minimal mock implementation for unit testing
type testSeaweedMQHandlerForUnitTests struct {
topics map[string]bool
ledgers map[string]*offset.Ledger
records map[string][]offset.SMQRecord // Store records for GetStoredRecords
mu sync.RWMutex
}
func (t *testSeaweedMQHandlerForUnitTests) TopicExists(topic string) bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.topics[topic]
}
func (t *testSeaweedMQHandlerForUnitTests) ListTopics() []string {
t.mu.RLock()
defer t.mu.RUnlock()
topics := make([]string, 0, len(t.topics))
for topic := range t.topics {
topics = append(topics, topic)
}
return topics
}
func (t *testSeaweedMQHandlerForUnitTests) CreateTopic(topic string, partitions int32) error {
t.mu.Lock()
defer t.mu.Unlock()
t.topics[topic] = true
return nil
}
func (t *testSeaweedMQHandlerForUnitTests) DeleteTopic(topic string) error {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.topics, topic)
return nil
}
func (t *testSeaweedMQHandlerForUnitTests) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
t.mu.Lock()
defer t.mu.Unlock()
key := topicPartitionKeyForTest(topic, partition)
if ledger, exists := t.ledgers[key]; exists {
return ledger
}
ledger := offset.NewLedger()
t.ledgers[key] = ledger
return ledger
}
func (t *testSeaweedMQHandlerForUnitTests) GetLedger(topic string, partition int32) *offset.Ledger {
t.mu.RLock()
defer t.mu.RUnlock()
key := topicPartitionKeyForTest(topic, partition)
return t.ledgers[key]
}
func (t *testSeaweedMQHandlerForUnitTests) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
ledger := t.GetOrCreateLedger(topicName, partitionID)
// Assign an offset first
kafkaOffset := ledger.AssignOffsets(1)
// Append the record with current timestamp and estimated size
timestamp := time.Now().UnixNano()
size := int32(len(key) + len(value))
if err := ledger.AppendRecord(kafkaOffset, timestamp, size); err != nil {
return -1, err
}
// Store the record for GetStoredRecords
t.mu.Lock()
recordKey := topicPartitionKeyForTest(topicName, partitionID)
if t.records == nil {
t.records = make(map[string][]offset.SMQRecord)
}
t.records[recordKey] = append(t.records[recordKey], &testSMQRecord{
offset: kafkaOffset,
timestamp: timestamp,
key: key,
value: value,
})
t.mu.Unlock()
return kafkaOffset, nil
}
func (t *testSeaweedMQHandlerForUnitTests) GetStoredRecords(topic string, partition int32, fromOffset int64, maxRecords int) ([]offset.SMQRecord, error) {
t.mu.RLock()
defer t.mu.RUnlock()
recordKey := topicPartitionKeyForTest(topic, partition)
allRecords, exists := t.records[recordKey]
if !exists {
return []offset.SMQRecord{}, nil
}
// Filter records by offset range
var result []offset.SMQRecord
for _, record := range allRecords {
if record.GetOffset() >= fromOffset {
result = append(result, record)
if len(result) >= maxRecords {
break
}
}
}
return result, nil
}
func (t *testSeaweedMQHandlerForUnitTests) GetFilerClient() filer_pb.SeaweedFilerClient {
return nil
}
func (t *testSeaweedMQHandlerForUnitTests) GetBrokerAddresses() []string {
return []string{"localhost:17777"}
}
func (t *testSeaweedMQHandlerForUnitTests) Close() error {
return nil
}
// topicPartitionKeyForTest creates a unique key for topic-partition combination
func topicPartitionKeyForTest(topic string, partition int32) string {
return topic + "-" + strconv.Itoa(int(partition))
}
// NewHandlerForUnitTests creates a handler for unit testing without requiring SeaweedMQ masters
// This should ONLY be used for unit tests that don't need real SeaweedMQ functionality
func NewHandlerForUnitTests() *Handler {
return &Handler{
seaweedMQHandler: &testSeaweedMQHandlerForUnitTests{
topics: make(map[string]bool),
ledgers: make(map[string]*offset.Ledger),
records: make(map[string][]offset.SMQRecord),
},
groupCoordinator: consumer.NewGroupCoordinator(),
topicMetadataCache: make(map[string]*CachedTopicMetadata),
}
}