mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
310 lines
8.8 KiB
Go
310 lines
8.8 KiB
Go
package protocol
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
)
|
|
|
|
// MessageRecord represents a stored message (TEST ONLY)
|
|
type MessageRecord struct {
|
|
Key []byte
|
|
Value []byte
|
|
Timestamp int64
|
|
}
|
|
|
|
// basicSeaweedMQHandler is a minimal in-memory implementation for testing (TEST ONLY)
|
|
type basicSeaweedMQHandler struct {
|
|
topics map[string]bool
|
|
ledgers map[string]*offset.Ledger
|
|
// messages stores actual message content indexed by topic-partition-offset
|
|
messages map[string]map[int32]map[int64]*MessageRecord // topic -> partition -> offset -> message
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// testSeaweedMQHandler is a minimal mock implementation for testing (TEST ONLY)
|
|
type testSeaweedMQHandler struct {
|
|
topics map[string]bool
|
|
ledgers map[string]*offset.Ledger
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewTestHandler creates a handler for testing purposes without requiring SeaweedMQ masters
|
|
// This should ONLY be used in tests - uses basicSeaweedMQHandler for message storage simulation
|
|
func NewTestHandler() *Handler {
|
|
return &Handler{
|
|
groupCoordinator: consumer.NewGroupCoordinator(),
|
|
seaweedMQHandler: &basicSeaweedMQHandler{
|
|
topics: make(map[string]bool),
|
|
ledgers: make(map[string]*offset.Ledger),
|
|
messages: make(map[string]map[int32]map[int64]*MessageRecord),
|
|
},
|
|
}
|
|
}
|
|
|
|
// NewSimpleTestHandler creates a minimal test handler without message storage
|
|
// This should ONLY be used for basic protocol tests that don't need message content
|
|
func NewSimpleTestHandler() *Handler {
|
|
return &Handler{
|
|
groupCoordinator: consumer.NewGroupCoordinator(),
|
|
seaweedMQHandler: &testSeaweedMQHandler{
|
|
topics: make(map[string]bool),
|
|
ledgers: make(map[string]*offset.Ledger),
|
|
},
|
|
}
|
|
}
|
|
|
|
// ===== basicSeaweedMQHandler implementation (TEST ONLY) =====
|
|
|
|
func (b *basicSeaweedMQHandler) TopicExists(topic string) bool {
|
|
return b.topics[topic]
|
|
}
|
|
|
|
func (b *basicSeaweedMQHandler) ListTopics() []string {
|
|
topics := make([]string, 0, len(b.topics))
|
|
for topic := range b.topics {
|
|
topics = append(topics, topic)
|
|
}
|
|
return topics
|
|
}
|
|
|
|
func (b *basicSeaweedMQHandler) CreateTopic(topic string, partitions int32) error {
|
|
b.topics[topic] = true
|
|
return nil
|
|
}
|
|
|
|
func (b *basicSeaweedMQHandler) DeleteTopic(topic string) error {
|
|
delete(b.topics, topic)
|
|
return nil
|
|
}
|
|
|
|
func (b *basicSeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
key := fmt.Sprintf("%s-%d", topic, partition)
|
|
if ledger, exists := b.ledgers[key]; exists {
|
|
return ledger
|
|
}
|
|
|
|
// Create new ledger
|
|
ledger := offset.NewLedger()
|
|
b.ledgers[key] = ledger
|
|
|
|
// Also create the topic if it doesn't exist
|
|
b.topics[topic] = true
|
|
|
|
return ledger
|
|
}
|
|
|
|
func (b *basicSeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
key := fmt.Sprintf("%s-%d", topic, partition)
|
|
if ledger, exists := b.ledgers[key]; exists {
|
|
return ledger
|
|
}
|
|
|
|
// Return nil if ledger doesn't exist (topic doesn't exist)
|
|
return nil
|
|
}
|
|
|
|
func (b *basicSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
|
// Get or create the ledger first (this will acquire and release the lock)
|
|
ledger := b.GetOrCreateLedger(topicName, partitionID)
|
|
|
|
// Now acquire the lock for the rest of the operation
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
// Assign an offset and append the record
|
|
offset := ledger.AssignOffsets(1)
|
|
timestamp := time.Now().UnixNano()
|
|
size := int32(len(value))
|
|
|
|
if err := ledger.AppendRecord(offset, timestamp, size); err != nil {
|
|
return 0, fmt.Errorf("failed to append record: %w", err)
|
|
}
|
|
|
|
// Store the actual message content
|
|
if b.messages[topicName] == nil {
|
|
b.messages[topicName] = make(map[int32]map[int64]*MessageRecord)
|
|
}
|
|
if b.messages[topicName][partitionID] == nil {
|
|
b.messages[topicName][partitionID] = make(map[int64]*MessageRecord)
|
|
}
|
|
|
|
// Make copies of key and value to avoid referencing the original slices
|
|
keyCopy := make([]byte, len(key))
|
|
copy(keyCopy, key)
|
|
valueCopy := make([]byte, len(value))
|
|
copy(valueCopy, value)
|
|
|
|
b.messages[topicName][partitionID][offset] = &MessageRecord{
|
|
Key: keyCopy,
|
|
Value: valueCopy,
|
|
Timestamp: timestamp,
|
|
}
|
|
|
|
return offset, nil
|
|
}
|
|
|
|
// GetStoredMessages retrieves stored messages for a topic-partition from a given offset (TEST ONLY)
|
|
func (b *basicSeaweedMQHandler) GetStoredMessages(topicName string, partitionID int32, fromOffset int64, maxMessages int) []*MessageRecord {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
if b.messages[topicName] == nil || b.messages[topicName][partitionID] == nil {
|
|
return nil
|
|
}
|
|
|
|
partitionMessages := b.messages[topicName][partitionID]
|
|
var result []*MessageRecord
|
|
|
|
// Collect messages starting from fromOffset
|
|
for offset := fromOffset; offset < fromOffset+int64(maxMessages); offset++ {
|
|
if msg, exists := partitionMessages[offset]; exists {
|
|
result = append(result, msg)
|
|
} else {
|
|
// No more consecutive messages
|
|
break
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// BasicSMQRecord implements SMQRecord interface for basicSeaweedMQHandler (TEST ONLY)
|
|
type BasicSMQRecord struct {
|
|
*MessageRecord
|
|
offset int64
|
|
}
|
|
|
|
func (r *BasicSMQRecord) GetKey() []byte { return r.Key }
|
|
func (r *BasicSMQRecord) GetValue() []byte { return r.Value }
|
|
func (r *BasicSMQRecord) GetTimestamp() int64 { return r.Timestamp }
|
|
func (r *BasicSMQRecord) GetOffset() int64 { return r.offset }
|
|
|
|
// GetStoredRecords retrieves stored message records for basicSeaweedMQHandler (TEST ONLY)
|
|
func (b *basicSeaweedMQHandler) GetStoredRecords(topic string, partition int32, fromOffset int64, maxRecords int) ([]offset.SMQRecord, error) {
|
|
messages := b.GetStoredMessages(topic, partition, fromOffset, maxRecords)
|
|
if len(messages) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
records := make([]offset.SMQRecord, len(messages))
|
|
for i, msg := range messages {
|
|
records[i] = &BasicSMQRecord{
|
|
MessageRecord: msg,
|
|
offset: fromOffset + int64(i),
|
|
}
|
|
}
|
|
return records, nil
|
|
}
|
|
|
|
func (b *basicSeaweedMQHandler) GetFilerClient() filer_pb.SeaweedFilerClient {
|
|
return nil // Test handler doesn't have filer access
|
|
}
|
|
|
|
// GetBrokerAddresses returns the discovered SMQ broker addresses for Metadata responses
|
|
func (b *basicSeaweedMQHandler) GetBrokerAddresses() []string {
|
|
return []string{"localhost:17777"} // Test broker address
|
|
}
|
|
|
|
func (b *basicSeaweedMQHandler) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// ===== testSeaweedMQHandler implementation (TEST ONLY) =====
|
|
|
|
func (t *testSeaweedMQHandler) TopicExists(topic string) bool {
|
|
return t.topics[topic]
|
|
}
|
|
|
|
func (t *testSeaweedMQHandler) ListTopics() []string {
|
|
var topics []string
|
|
for topic := range t.topics {
|
|
topics = append(topics, topic)
|
|
}
|
|
return topics
|
|
}
|
|
|
|
func (t *testSeaweedMQHandler) CreateTopic(topic string, partitions int32) error {
|
|
t.topics[topic] = true
|
|
return nil
|
|
}
|
|
|
|
func (t *testSeaweedMQHandler) DeleteTopic(topic string) error {
|
|
delete(t.topics, topic)
|
|
return nil
|
|
}
|
|
|
|
func (t *testSeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
// Mark topic as existing when creating ledger
|
|
t.topics[topic] = true
|
|
|
|
key := fmt.Sprintf("%s-%d", topic, partition)
|
|
if ledger, exists := t.ledgers[key]; exists {
|
|
return ledger
|
|
}
|
|
|
|
ledger := offset.NewLedger()
|
|
t.ledgers[key] = ledger
|
|
return ledger
|
|
}
|
|
|
|
func (t *testSeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
key := fmt.Sprintf("%s-%d", topic, partition)
|
|
if ledger, exists := t.ledgers[key]; exists {
|
|
return ledger
|
|
}
|
|
|
|
// Return nil if ledger doesn't exist (topic doesn't exist)
|
|
return nil
|
|
}
|
|
|
|
func (t *testSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
|
|
// For testing, actually store the record in the ledger
|
|
ledger := t.GetOrCreateLedger(topicName, partitionID)
|
|
|
|
// Assign an offset and append the record
|
|
offset := ledger.AssignOffsets(1)
|
|
timestamp := time.Now().UnixNano()
|
|
size := int32(len(value))
|
|
|
|
if err := ledger.AppendRecord(offset, timestamp, size); err != nil {
|
|
return 0, fmt.Errorf("failed to append record: %w", err)
|
|
}
|
|
|
|
return offset, nil
|
|
}
|
|
|
|
// GetStoredRecords for testSeaweedMQHandler - returns empty (no storage simulation)
|
|
func (t *testSeaweedMQHandler) GetStoredRecords(topic string, partition int32, fromOffset int64, maxRecords int) ([]offset.SMQRecord, error) {
|
|
// Test handler doesn't simulate message storage, return empty
|
|
return nil, nil
|
|
}
|
|
|
|
func (t *testSeaweedMQHandler) GetFilerClient() filer_pb.SeaweedFilerClient {
|
|
return nil // Test handler doesn't have filer access
|
|
}
|
|
|
|
// GetBrokerAddresses returns the discovered SMQ broker addresses for Metadata responses
|
|
func (t *testSeaweedMQHandler) GetBrokerAddresses() []string {
|
|
return []string{"localhost:17777"} // Test broker address
|
|
}
|
|
|
|
func (t *testSeaweedMQHandler) Close() error {
|
|
return nil
|
|
}
|