mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Fix deadlock in FindStaticMember by adding FindStaticMemberLocked version - Fix deadlock in RegisterStaticMember by adding RegisterStaticMemberLocked version - Fix deadlock in UnregisterStaticMember by adding UnregisterStaticMemberLocked version - Fix GroupInstanceID parsing in parseLeaveGroupRequest method - All static membership tests now pass without deadlocks: - JoinGroup static membership (join, reconnection, dynamic members) - LeaveGroup static membership (leave, wrong instance ID validation) - DescribeGroups static membership The deadlocks occurred because protocol handlers were calling GroupCoordinator methods that tried to acquire locks on groups that were already locked by the calling handler. The fix introduces *Locked versions of these methods that assume the group is already locked by the caller.
428 lines
13 KiB
Go
428 lines
13 KiB
Go
package protocol
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
|
|
)
|
|
|
|
// Heartbeat API (key 12) - Consumer group heartbeat
|
|
// Consumers send periodic heartbeats to stay in the group and receive rebalancing signals
|
|
|
|
// HeartbeatRequest represents a Heartbeat request from a Kafka client
|
|
type HeartbeatRequest struct {
|
|
GroupID string
|
|
GenerationID int32
|
|
MemberID string
|
|
GroupInstanceID string // Optional static membership ID
|
|
}
|
|
|
|
// HeartbeatResponse represents a Heartbeat response to a Kafka client
|
|
type HeartbeatResponse struct {
|
|
CorrelationID uint32
|
|
ErrorCode int16
|
|
}
|
|
|
|
// LeaveGroup API (key 13) - Consumer graceful departure
|
|
// Consumers call this when shutting down to trigger immediate rebalancing
|
|
|
|
// LeaveGroupRequest represents a LeaveGroup request from a Kafka client
|
|
type LeaveGroupRequest struct {
|
|
GroupID string
|
|
MemberID string
|
|
GroupInstanceID string // Optional static membership ID
|
|
Members []LeaveGroupMember // For newer versions, can leave multiple members
|
|
}
|
|
|
|
// LeaveGroupMember represents a member leaving the group (for batch departures)
|
|
type LeaveGroupMember struct {
|
|
MemberID string
|
|
GroupInstanceID string
|
|
Reason string // Optional reason for leaving
|
|
}
|
|
|
|
// LeaveGroupResponse represents a LeaveGroup response to a Kafka client
|
|
type LeaveGroupResponse struct {
|
|
CorrelationID uint32
|
|
ErrorCode int16
|
|
Members []LeaveGroupMemberResponse // Per-member responses for newer versions
|
|
}
|
|
|
|
// LeaveGroupMemberResponse represents per-member leave group response
|
|
type LeaveGroupMemberResponse struct {
|
|
MemberID string
|
|
GroupInstanceID string
|
|
ErrorCode int16
|
|
}
|
|
|
|
// Error codes specific to consumer coordination are imported from errors.go
|
|
|
|
func (h *Handler) handleHeartbeat(correlationID uint32, requestBody []byte) ([]byte, error) {
|
|
// Parse Heartbeat request
|
|
request, err := h.parseHeartbeatRequest(requestBody)
|
|
if err != nil {
|
|
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
|
|
}
|
|
|
|
// Validate request
|
|
if request.GroupID == "" || request.MemberID == "" {
|
|
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
|
|
}
|
|
|
|
// Get consumer group
|
|
group := h.groupCoordinator.GetGroup(request.GroupID)
|
|
if group == nil {
|
|
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
|
|
}
|
|
|
|
group.Mu.Lock()
|
|
defer group.Mu.Unlock()
|
|
|
|
// Update group's last activity
|
|
group.LastActivity = time.Now()
|
|
|
|
// Validate member exists
|
|
member, exists := group.Members[request.MemberID]
|
|
if !exists {
|
|
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil
|
|
}
|
|
|
|
// Validate generation
|
|
if request.GenerationID != group.Generation {
|
|
return h.buildHeartbeatErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil
|
|
}
|
|
|
|
// Update member's last heartbeat
|
|
member.LastHeartbeat = time.Now()
|
|
|
|
// Check if rebalancing is in progress
|
|
var errorCode int16 = ErrorCodeNone
|
|
switch group.State {
|
|
case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance:
|
|
// Signal the consumer that rebalancing is happening
|
|
errorCode = ErrorCodeRebalanceInProgress
|
|
case consumer.GroupStateDead:
|
|
errorCode = ErrorCodeInvalidGroupID
|
|
case consumer.GroupStateEmpty:
|
|
// This shouldn't happen if member exists, but handle gracefully
|
|
errorCode = ErrorCodeUnknownMemberID
|
|
case consumer.GroupStateStable:
|
|
// Normal case - heartbeat accepted
|
|
errorCode = ErrorCodeNone
|
|
}
|
|
|
|
// Build successful response
|
|
response := HeartbeatResponse{
|
|
CorrelationID: correlationID,
|
|
ErrorCode: errorCode,
|
|
}
|
|
|
|
return h.buildHeartbeatResponse(response), nil
|
|
}
|
|
|
|
func (h *Handler) handleLeaveGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
|
|
// Parse LeaveGroup request
|
|
request, err := h.parseLeaveGroupRequest(requestBody)
|
|
if err != nil {
|
|
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
|
|
}
|
|
|
|
// Validate request
|
|
if request.GroupID == "" || request.MemberID == "" {
|
|
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
|
|
}
|
|
|
|
// Get consumer group
|
|
group := h.groupCoordinator.GetGroup(request.GroupID)
|
|
if group == nil {
|
|
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
|
|
}
|
|
|
|
group.Mu.Lock()
|
|
defer group.Mu.Unlock()
|
|
|
|
// Update group's last activity
|
|
group.LastActivity = time.Now()
|
|
|
|
// Validate member exists
|
|
member, exists := group.Members[request.MemberID]
|
|
if !exists {
|
|
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID, apiVersion), nil
|
|
}
|
|
|
|
// For static members, only remove if GroupInstanceID matches or is not provided
|
|
if h.groupCoordinator.IsStaticMember(member) {
|
|
if request.GroupInstanceID != "" && *member.GroupInstanceID != request.GroupInstanceID {
|
|
return h.buildLeaveGroupErrorResponse(correlationID, ErrorCodeFencedInstanceID, apiVersion), nil
|
|
}
|
|
// Unregister static member
|
|
h.groupCoordinator.UnregisterStaticMemberLocked(group, *member.GroupInstanceID)
|
|
}
|
|
|
|
// Remove the member from the group
|
|
delete(group.Members, request.MemberID)
|
|
|
|
// Update group state based on remaining members
|
|
if len(group.Members) == 0 {
|
|
// Group becomes empty
|
|
group.State = consumer.GroupStateEmpty
|
|
group.Generation++
|
|
group.Leader = ""
|
|
} else {
|
|
// Trigger rebalancing for remaining members
|
|
group.State = consumer.GroupStatePreparingRebalance
|
|
group.Generation++
|
|
|
|
// If the leaving member was the leader, select a new leader
|
|
if group.Leader == request.MemberID {
|
|
// Select first remaining member as new leader
|
|
for memberID := range group.Members {
|
|
group.Leader = memberID
|
|
break
|
|
}
|
|
}
|
|
|
|
// Mark remaining members as pending to trigger rebalancing
|
|
for _, member := range group.Members {
|
|
member.State = consumer.MemberStatePending
|
|
}
|
|
}
|
|
|
|
// Update group's subscribed topics (may have changed with member leaving)
|
|
h.updateGroupSubscriptionFromMembers(group)
|
|
|
|
// Build successful response
|
|
response := LeaveGroupResponse{
|
|
CorrelationID: correlationID,
|
|
ErrorCode: ErrorCodeNone,
|
|
Members: []LeaveGroupMemberResponse{
|
|
{
|
|
MemberID: request.MemberID,
|
|
GroupInstanceID: request.GroupInstanceID,
|
|
ErrorCode: ErrorCodeNone,
|
|
},
|
|
},
|
|
}
|
|
|
|
return h.buildLeaveGroupResponse(response, apiVersion), nil
|
|
}
|
|
|
|
func (h *Handler) parseHeartbeatRequest(data []byte) (*HeartbeatRequest, error) {
|
|
if len(data) < 8 {
|
|
return nil, fmt.Errorf("request too short")
|
|
}
|
|
|
|
offset := 0
|
|
|
|
// GroupID (string)
|
|
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
|
|
offset += 2
|
|
if offset+groupIDLength > len(data) {
|
|
return nil, fmt.Errorf("invalid group ID length")
|
|
}
|
|
groupID := string(data[offset : offset+groupIDLength])
|
|
offset += groupIDLength
|
|
|
|
// Generation ID (4 bytes)
|
|
if offset+4 > len(data) {
|
|
return nil, fmt.Errorf("missing generation ID")
|
|
}
|
|
generationID := int32(binary.BigEndian.Uint32(data[offset:]))
|
|
offset += 4
|
|
|
|
// MemberID (string)
|
|
if offset+2 > len(data) {
|
|
return nil, fmt.Errorf("missing member ID length")
|
|
}
|
|
memberIDLength := int(binary.BigEndian.Uint16(data[offset:]))
|
|
offset += 2
|
|
if offset+memberIDLength > len(data) {
|
|
return nil, fmt.Errorf("invalid member ID length")
|
|
}
|
|
memberID := string(data[offset : offset+memberIDLength])
|
|
offset += memberIDLength
|
|
|
|
return &HeartbeatRequest{
|
|
GroupID: groupID,
|
|
GenerationID: generationID,
|
|
MemberID: memberID,
|
|
GroupInstanceID: "", // Simplified - would parse from remaining data
|
|
}, nil
|
|
}
|
|
|
|
func (h *Handler) parseLeaveGroupRequest(data []byte) (*LeaveGroupRequest, error) {
|
|
if len(data) < 4 {
|
|
return nil, fmt.Errorf("request too short")
|
|
}
|
|
|
|
offset := 0
|
|
|
|
// GroupID (string)
|
|
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
|
|
offset += 2
|
|
if offset+groupIDLength > len(data) {
|
|
return nil, fmt.Errorf("invalid group ID length")
|
|
}
|
|
groupID := string(data[offset : offset+groupIDLength])
|
|
offset += groupIDLength
|
|
|
|
// MemberID (string)
|
|
if offset+2 > len(data) {
|
|
return nil, fmt.Errorf("missing member ID length")
|
|
}
|
|
memberIDLength := int(binary.BigEndian.Uint16(data[offset:]))
|
|
offset += 2
|
|
if offset+memberIDLength > len(data) {
|
|
return nil, fmt.Errorf("invalid member ID length")
|
|
}
|
|
memberID := string(data[offset : offset+memberIDLength])
|
|
offset += memberIDLength
|
|
|
|
// GroupInstanceID (string, v3+) - optional field
|
|
var groupInstanceID string
|
|
if offset+2 <= len(data) {
|
|
instanceIDLength := int(binary.BigEndian.Uint16(data[offset:]))
|
|
offset += 2
|
|
if instanceIDLength != 0xFFFF && offset+instanceIDLength <= len(data) {
|
|
groupInstanceID = string(data[offset : offset+instanceIDLength])
|
|
}
|
|
}
|
|
|
|
return &LeaveGroupRequest{
|
|
GroupID: groupID,
|
|
MemberID: memberID,
|
|
GroupInstanceID: groupInstanceID,
|
|
Members: []LeaveGroupMember{}, // Would parse members array for batch operations
|
|
}, nil
|
|
}
|
|
|
|
func (h *Handler) buildHeartbeatResponse(response HeartbeatResponse) []byte {
|
|
result := make([]byte, 0, 12)
|
|
|
|
// Correlation ID (4 bytes)
|
|
correlationIDBytes := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
|
|
result = append(result, correlationIDBytes...)
|
|
|
|
// Error code (2 bytes)
|
|
errorCodeBytes := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
|
|
result = append(result, errorCodeBytes...)
|
|
|
|
// Throttle time (4 bytes, 0 = no throttling)
|
|
result = append(result, 0, 0, 0, 0)
|
|
|
|
return result
|
|
}
|
|
|
|
func (h *Handler) buildLeaveGroupResponse(response LeaveGroupResponse, apiVersion uint16) []byte {
|
|
// LeaveGroup v0 only includes correlation_id and error_code (no throttle_time_ms, no members)
|
|
if apiVersion == 0 {
|
|
return h.buildLeaveGroupV0Response(response)
|
|
}
|
|
|
|
// For v1+ use the full response format
|
|
return h.buildLeaveGroupFullResponse(response)
|
|
}
|
|
|
|
func (h *Handler) buildLeaveGroupV0Response(response LeaveGroupResponse) []byte {
|
|
result := make([]byte, 0, 6)
|
|
|
|
// Correlation ID (4 bytes)
|
|
correlationIDBytes := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
|
|
result = append(result, correlationIDBytes...)
|
|
|
|
// Error code (2 bytes) - that's it for v0!
|
|
errorCodeBytes := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
|
|
result = append(result, errorCodeBytes...)
|
|
|
|
return result
|
|
}
|
|
|
|
func (h *Handler) buildLeaveGroupFullResponse(response LeaveGroupResponse) []byte {
|
|
estimatedSize := 16
|
|
for _, member := range response.Members {
|
|
estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + 8
|
|
}
|
|
|
|
result := make([]byte, 0, estimatedSize)
|
|
|
|
// Correlation ID (4 bytes)
|
|
correlationIDBytes := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
|
|
result = append(result, correlationIDBytes...)
|
|
|
|
// Error code (2 bytes)
|
|
errorCodeBytes := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
|
|
result = append(result, errorCodeBytes...)
|
|
|
|
// Members array length (4 bytes)
|
|
membersLengthBytes := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(membersLengthBytes, uint32(len(response.Members)))
|
|
result = append(result, membersLengthBytes...)
|
|
|
|
// Members
|
|
for _, member := range response.Members {
|
|
// Member ID length (2 bytes)
|
|
memberIDLength := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(memberIDLength, uint16(len(member.MemberID)))
|
|
result = append(result, memberIDLength...)
|
|
|
|
// Member ID
|
|
result = append(result, []byte(member.MemberID)...)
|
|
|
|
// Group instance ID length (2 bytes)
|
|
instanceIDLength := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID)))
|
|
result = append(result, instanceIDLength...)
|
|
|
|
// Group instance ID
|
|
if len(member.GroupInstanceID) > 0 {
|
|
result = append(result, []byte(member.GroupInstanceID)...)
|
|
}
|
|
|
|
// Error code (2 bytes)
|
|
memberErrorBytes := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(memberErrorBytes, uint16(member.ErrorCode))
|
|
result = append(result, memberErrorBytes...)
|
|
}
|
|
|
|
// Throttle time (4 bytes, 0 = no throttling)
|
|
result = append(result, 0, 0, 0, 0)
|
|
|
|
return result
|
|
}
|
|
|
|
func (h *Handler) buildHeartbeatErrorResponse(correlationID uint32, errorCode int16) []byte {
|
|
response := HeartbeatResponse{
|
|
CorrelationID: correlationID,
|
|
ErrorCode: errorCode,
|
|
}
|
|
|
|
return h.buildHeartbeatResponse(response)
|
|
}
|
|
|
|
func (h *Handler) buildLeaveGroupErrorResponse(correlationID uint32, errorCode int16, apiVersion uint16) []byte {
|
|
response := LeaveGroupResponse{
|
|
CorrelationID: correlationID,
|
|
ErrorCode: errorCode,
|
|
Members: []LeaveGroupMemberResponse{},
|
|
}
|
|
|
|
return h.buildLeaveGroupResponse(response, apiVersion)
|
|
}
|
|
|
|
func (h *Handler) updateGroupSubscriptionFromMembers(group *consumer.ConsumerGroup) {
|
|
// Update group's subscribed topics from remaining members
|
|
group.SubscribedTopics = make(map[string]bool)
|
|
for _, member := range group.Members {
|
|
for _, topic := range member.Subscription {
|
|
group.SubscribedTopics[topic] = true
|
|
}
|
|
}
|
|
}
|