1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/integration/agent_client.go
chrislu 9c1f0e9f3f fix: centralize and standardize Kafka partition to SeaweedMQ ring mapping
- Created centralized PartitionMapper utility with consistent range size of 32
- Fixed inconsistent partition mapping between agent_client.go (78 range) and seaweedmq_handler.go (32 range)
- Updated smq_mapping.go to use centralized utility instead of dynamic calculation
- Standardized all partition mapping to use kafka.CreateSMQPartition() and related functions
- Added comprehensive tests for partition mapping consistency and round-trip conversion
- Achieves 99.05% ring utilization supporting 78 Kafka partitions

This fixes the high-priority issue where inconsistent partition mapping could cause
incorrect message routing between different components of the Kafka Gateway.
2025-09-16 00:45:10 -07:00

471 lines
13 KiB
Go

package integration
import (
"context"
"fmt"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// AgentClient wraps the SeaweedMQ Agent gRPC client for Kafka gateway integration
type AgentClient struct {
agentAddress string
conn *grpc.ClientConn
client mq_agent_pb.SeaweedMessagingAgentClient
// Publisher sessions: topic-partition -> session info
publishersLock sync.RWMutex
publishers map[string]*PublisherSession
// Subscriber sessions for offset tracking
subscribersLock sync.RWMutex
subscribers map[string]*SubscriberSession
ctx context.Context
cancel context.CancelFunc
}
// PublisherSession tracks a publishing session to SeaweedMQ
type PublisherSession struct {
SessionID int64
Topic string
Partition int32
Stream mq_agent_pb.SeaweedMessagingAgent_PublishRecordClient
RecordType *schema_pb.RecordType
LastSequence int64
}
// SubscriberSession tracks a subscription for offset management
type SubscriberSession struct {
Topic string
Partition int32
Stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordClient
OffsetLedger *offset.Ledger // Still use for Kafka offset translation
}
// SeaweedRecord represents a record received from SeaweedMQ
type SeaweedRecord struct {
Key []byte
Value []byte
Timestamp int64
Sequence int64
}
// NewAgentClient creates a new SeaweedMQ Agent client
func NewAgentClient(agentAddress string) (*AgentClient, error) {
ctx, cancel := context.WithCancel(context.Background())
conn, err := grpc.DialContext(ctx, agentAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
// Don't block - fail fast for invalid addresses
)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to connect to agent %s: %v", agentAddress, err)
}
client := mq_agent_pb.NewSeaweedMessagingAgentClient(conn)
return &AgentClient{
agentAddress: agentAddress,
conn: conn,
client: client,
publishers: make(map[string]*PublisherSession),
subscribers: make(map[string]*SubscriberSession),
ctx: ctx,
cancel: cancel,
}, nil
}
// Close shuts down the agent client and all sessions
func (ac *AgentClient) Close() error {
ac.cancel()
// Close all publisher sessions
ac.publishersLock.Lock()
for key, session := range ac.publishers {
ac.closePublishSessionLocked(session.SessionID)
delete(ac.publishers, key)
}
ac.publishersLock.Unlock()
// Close all subscriber sessions
ac.subscribersLock.Lock()
for key, session := range ac.subscribers {
if session.Stream != nil {
session.Stream.CloseSend()
}
delete(ac.subscribers, key)
}
ac.subscribersLock.Unlock()
return ac.conn.Close()
}
// GetOrCreatePublisher gets or creates a publisher session for a topic-partition
func (ac *AgentClient) GetOrCreatePublisher(topic string, partition int32) (*PublisherSession, error) {
key := fmt.Sprintf("%s-%d", topic, partition)
// Try to get existing publisher
ac.publishersLock.RLock()
if session, exists := ac.publishers[key]; exists {
ac.publishersLock.RUnlock()
return session, nil
}
ac.publishersLock.RUnlock()
// Create new publisher session
ac.publishersLock.Lock()
defer ac.publishersLock.Unlock()
// Double-check after acquiring write lock
if session, exists := ac.publishers[key]; exists {
return session, nil
}
// Create the session
session, err := ac.createPublishSession(topic, partition)
if err != nil {
return nil, err
}
ac.publishers[key] = session
return session, nil
}
// createPublishSession creates a new publishing session with SeaweedMQ Agent
func (ac *AgentClient) createPublishSession(topic string, partition int32) (*PublisherSession, error) {
// Create a basic record type for Kafka messages
recordType := &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{
Name: "key",
FieldIndex: 0,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES},
},
IsRequired: false,
IsRepeated: false,
},
{
Name: "value",
FieldIndex: 1,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES},
},
IsRequired: true,
IsRepeated: false,
},
{
Name: "timestamp",
FieldIndex: 2,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_TIMESTAMP},
},
IsRequired: false,
IsRepeated: false,
},
},
}
// Start publish session
startReq := &mq_agent_pb.StartPublishSessionRequest{
Topic: &schema_pb.Topic{
Namespace: "kafka", // Use "kafka" namespace for Kafka messages
Name: topic,
},
PartitionCount: 1, // For Phase 2, use single partition
RecordType: recordType,
PublisherName: "kafka-gateway",
}
startResp, err := ac.client.StartPublishSession(ac.ctx, startReq)
if err != nil {
return nil, fmt.Errorf("failed to start publish session: %v", err)
}
if startResp.Error != "" {
return nil, fmt.Errorf("publish session error: %s", startResp.Error)
}
// Create streaming connection
stream, err := ac.client.PublishRecord(ac.ctx)
if err != nil {
return nil, fmt.Errorf("failed to create publish stream: %v", err)
}
session := &PublisherSession{
SessionID: startResp.SessionId,
Topic: topic,
Partition: partition,
Stream: stream,
RecordType: recordType,
}
return session, nil
}
// PublishRecord publishes a single record to SeaweedMQ
func (ac *AgentClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
session, err := ac.GetOrCreatePublisher(topic, partition)
if err != nil {
return 0, err
}
// Convert to SeaweedMQ record format
record := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"key": {
Kind: &schema_pb.Value_BytesValue{BytesValue: key},
},
"value": {
Kind: &schema_pb.Value_BytesValue{BytesValue: value},
},
"timestamp": {
Kind: &schema_pb.Value_TimestampValue{
TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: timestamp / 1000, // Convert nanoseconds to microseconds
IsUtc: true,
},
},
},
},
}
// Send publish request
req := &mq_agent_pb.PublishRecordRequest{
SessionId: session.SessionID,
Key: key,
Value: record,
}
if err := session.Stream.Send(req); err != nil {
return 0, fmt.Errorf("failed to send record: %v", err)
}
// Read acknowledgment (this is a streaming API, so we should read the response)
resp, err := session.Stream.Recv()
if err != nil {
return 0, fmt.Errorf("failed to receive ack: %v", err)
}
if resp.Error != "" {
return 0, fmt.Errorf("publish error: %s", resp.Error)
}
session.LastSequence = resp.AckSequence
return resp.AckSequence, nil
}
// GetOrCreateSubscriber gets or creates a subscriber for offset tracking
func (ac *AgentClient) GetOrCreateSubscriber(topic string, partition int32, startOffset int64) (*SubscriberSession, error) {
key := fmt.Sprintf("%s-%d", topic, partition)
ac.subscribersLock.RLock()
if session, exists := ac.subscribers[key]; exists {
ac.subscribersLock.RUnlock()
return session, nil
}
ac.subscribersLock.RUnlock()
// Create new subscriber session
ac.subscribersLock.Lock()
defer ac.subscribersLock.Unlock()
if session, exists := ac.subscribers[key]; exists {
return session, nil
}
session, err := ac.createSubscribeSession(topic, partition, startOffset)
if err != nil {
return nil, err
}
ac.subscribers[key] = session
return session, nil
}
// createSubscribeSession creates a subscriber session for reading messages
func (ac *AgentClient) createSubscribeSession(topic string, partition int32, startOffset int64) (*SubscriberSession, error) {
stream, err := ac.client.SubscribeRecord(ac.ctx)
if err != nil {
return nil, fmt.Errorf("failed to create subscribe stream: %v", err)
}
// Send initial subscribe request
initReq := &mq_agent_pb.SubscribeRecordRequest{
Init: &mq_agent_pb.SubscribeRecordRequest_InitSubscribeRecordRequest{
ConsumerGroup: "kafka-gateway",
ConsumerGroupInstanceId: fmt.Sprintf("kafka-gateway-%s-%d", topic, partition),
Topic: &schema_pb.Topic{
Namespace: "kafka",
Name: topic,
},
PartitionOffsets: []*schema_pb.PartitionOffset{
{
// Map Kafka partition to specific SMQ ring range using centralized utility
Partition: kafka.CreateSMQPartition(partition, time.Now().UnixNano()),
StartTsNs: startOffset, // Use offset as timestamp for now
},
},
OffsetType: schema_pb.OffsetType_EXACT_TS_NS,
MaxSubscribedPartitions: 1,
SlidingWindowSize: 10,
},
}
if err := stream.Send(initReq); err != nil {
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
}
session := &SubscriberSession{
Topic: topic,
Partition: partition,
Stream: stream,
OffsetLedger: offset.NewLedger(), // Keep Kafka offset tracking
}
return session, nil
}
// ClosePublisher closes a specific publisher session
func (ac *AgentClient) ClosePublisher(topic string, partition int32) error {
key := fmt.Sprintf("%s-%d", topic, partition)
ac.publishersLock.Lock()
defer ac.publishersLock.Unlock()
session, exists := ac.publishers[key]
if !exists {
return nil // Already closed or never existed
}
err := ac.closePublishSessionLocked(session.SessionID)
delete(ac.publishers, key)
return err
}
// closePublishSessionLocked closes a publish session (must be called with lock held)
func (ac *AgentClient) closePublishSessionLocked(sessionID int64) error {
closeReq := &mq_agent_pb.ClosePublishSessionRequest{
SessionId: sessionID,
}
_, err := ac.client.ClosePublishSession(ac.ctx, closeReq)
return err
}
// ReadRecords reads available records from the subscriber session
func (ac *AgentClient) ReadRecords(session *SubscriberSession, maxRecords int) ([]*SeaweedRecord, error) {
if session == nil {
return nil, fmt.Errorf("subscriber session cannot be nil")
}
var records []*SeaweedRecord
for len(records) < maxRecords {
// Try to receive a message with timeout to avoid blocking indefinitely
ctx, cancel := context.WithTimeout(ac.ctx, 100*time.Millisecond)
select {
case <-ctx.Done():
cancel()
return records, nil // Return what we have so far
default:
// Try to receive a record
resp, err := session.Stream.Recv()
cancel()
if err != nil {
// If we have some records, return them; otherwise return error
if len(records) > 0 {
return records, nil
}
return nil, fmt.Errorf("failed to receive record: %v", err)
}
if resp.Value != nil || resp.Key != nil {
// Convert SeaweedMQ record to our format
record := &SeaweedRecord{
Sequence: resp.Offset, // Use offset as sequence
Timestamp: resp.TsNs, // Timestamp in nanoseconds
Key: resp.Key, // Raw key
}
// Extract value from the structured record
if resp.Value != nil && resp.Value.Fields != nil {
if valueValue, exists := resp.Value.Fields["value"]; exists && valueValue.GetBytesValue() != nil {
record.Value = valueValue.GetBytesValue()
}
// Also check for key in structured fields if raw key is empty
if len(record.Key) == 0 {
if keyValue, exists := resp.Value.Fields["key"]; exists && keyValue.GetBytesValue() != nil {
record.Key = keyValue.GetBytesValue()
}
}
// Override timestamp if available in structured fields
if timestampValue, exists := resp.Value.Fields["timestamp"]; exists && timestampValue.GetTimestampValue() != nil {
record.Timestamp = timestampValue.GetTimestampValue().TimestampMicros * 1000 // Convert to nanoseconds
}
}
records = append(records, record)
}
}
}
return records, nil
}
// HealthCheck verifies the agent connection is working
func (ac *AgentClient) HealthCheck() error {
// Create a timeout context for health check
ctx, cancel := context.WithTimeout(ac.ctx, 2*time.Second)
defer cancel()
// Try to start and immediately close a dummy session
req := &mq_agent_pb.StartPublishSessionRequest{
Topic: &schema_pb.Topic{
Namespace: "kafka",
Name: "_health_check",
},
PartitionCount: 1,
RecordType: &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{
Name: "test",
FieldIndex: 0,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING},
},
},
},
},
PublisherName: "health-check",
}
resp, err := ac.client.StartPublishSession(ctx, req)
if err != nil {
return fmt.Errorf("health check failed: %v", err)
}
if resp.Error != "" {
return fmt.Errorf("health check error: %s", resp.Error)
}
// Close the health check session
closeReq := &mq_agent_pb.ClosePublishSessionRequest{
SessionId: resp.SessionId,
}
_, _ = ac.client.ClosePublishSession(ctx, closeReq)
return nil
}