mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
164 lines
4.6 KiB
Go
164 lines
4.6 KiB
Go
package protocol
|
|
|
|
import (
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
)
|
|
|
|
// testSMQRecord implements the SMQRecord interface for testing
|
|
type testSMQRecord struct {
|
|
offset int64
|
|
timestamp int64
|
|
key []byte
|
|
value []byte
|
|
}
|
|
|
|
func (r *testSMQRecord) GetOffset() int64 { return r.offset }
|
|
func (r *testSMQRecord) GetTimestamp() int64 { return r.timestamp }
|
|
func (r *testSMQRecord) GetKey() []byte { return r.key }
|
|
func (r *testSMQRecord) GetValue() []byte { return r.value }
|
|
|
|
// testSeaweedMQHandlerForUnitTests is a minimal mock implementation for unit testing
|
|
type testSeaweedMQHandlerForUnitTests struct {
|
|
topics map[string]bool
|
|
ledgers map[string]*offset.Ledger
|
|
records map[string][]offset.SMQRecord // Store records for GetStoredRecords
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) TopicExists(topic string) bool {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
return t.topics[topic]
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) ListTopics() []string {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
topics := make([]string, 0, len(t.topics))
|
|
for topic := range t.topics {
|
|
topics = append(topics, topic)
|
|
}
|
|
return topics
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) CreateTopic(topic string, partitions int32) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
t.topics[topic] = true
|
|
return nil
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) DeleteTopic(topic string) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
delete(t.topics, topic)
|
|
return nil
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
key := topicPartitionKeyForTest(topic, partition)
|
|
if ledger, exists := t.ledgers[key]; exists {
|
|
return ledger
|
|
}
|
|
ledger := offset.NewLedger()
|
|
t.ledgers[key] = ledger
|
|
return ledger
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) GetLedger(topic string, partition int32) *offset.Ledger {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
key := topicPartitionKeyForTest(topic, partition)
|
|
return t.ledgers[key]
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
|
ledger := t.GetOrCreateLedger(topicName, partitionID)
|
|
// Assign an offset first
|
|
kafkaOffset := ledger.AssignOffsets(1)
|
|
// Append the record with current timestamp and estimated size
|
|
timestamp := time.Now().UnixNano()
|
|
size := int32(len(key) + len(value))
|
|
if err := ledger.AppendRecord(kafkaOffset, timestamp, size); err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
// Store the record for GetStoredRecords
|
|
t.mu.Lock()
|
|
recordKey := topicPartitionKeyForTest(topicName, partitionID)
|
|
if t.records == nil {
|
|
t.records = make(map[string][]offset.SMQRecord)
|
|
}
|
|
t.records[recordKey] = append(t.records[recordKey], &testSMQRecord{
|
|
offset: kafkaOffset,
|
|
timestamp: timestamp,
|
|
key: key,
|
|
value: value,
|
|
})
|
|
t.mu.Unlock()
|
|
|
|
return kafkaOffset, nil
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) GetStoredRecords(topic string, partition int32, fromOffset int64, maxRecords int) ([]offset.SMQRecord, error) {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
recordKey := topicPartitionKeyForTest(topic, partition)
|
|
allRecords, exists := t.records[recordKey]
|
|
if !exists {
|
|
return []offset.SMQRecord{}, nil
|
|
}
|
|
|
|
// Filter records by offset range
|
|
var result []offset.SMQRecord
|
|
for _, record := range allRecords {
|
|
if record.GetOffset() >= fromOffset {
|
|
result = append(result, record)
|
|
if len(result) >= maxRecords {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) GetFilerClient() filer_pb.SeaweedFilerClient {
|
|
return nil
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) GetBrokerAddresses() []string {
|
|
return []string{"localhost:17777"}
|
|
}
|
|
|
|
func (t *testSeaweedMQHandlerForUnitTests) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// topicPartitionKeyForTest creates a unique key for topic-partition combination
|
|
func topicPartitionKeyForTest(topic string, partition int32) string {
|
|
return topic + "-" + strconv.Itoa(int(partition))
|
|
}
|
|
|
|
// NewHandlerForUnitTests creates a handler for unit testing without requiring SeaweedMQ masters
|
|
// This should ONLY be used for unit tests that don't need real SeaweedMQ functionality
|
|
func NewHandlerForUnitTests() *Handler {
|
|
return &Handler{
|
|
seaweedMQHandler: &testSeaweedMQHandlerForUnitTests{
|
|
topics: make(map[string]bool),
|
|
ledgers: make(map[string]*offset.Ledger),
|
|
records: make(map[string][]offset.SMQRecord),
|
|
},
|
|
groupCoordinator: consumer.NewGroupCoordinator(),
|
|
topicMetadataCache: make(map[string]*CachedTopicMetadata),
|
|
}
|
|
}
|