mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Add GroupInstanceID field to GroupMember struct - Add StaticMembers mapping to ConsumerGroup for instance ID tracking - Implement static member management methods: * FindStaticMember, RegisterStaticMember, UnregisterStaticMember * IsStaticMember for checking membership type - Update JoinGroup handler to support static membership: * Check for existing static members by instance ID * Register new static members automatically * Generate appropriate member IDs for static vs dynamic members - Update LeaveGroup handler for static member validation: * Verify GroupInstanceID matches for static members * Return FENCED_INSTANCE_ID error for mismatched instance IDs * Unregister static members on successful leave - Update DescribeGroups to return GroupInstanceID in member info - Add comprehensive tests for static membership functionality: * Basic registration and lookup * Member reconnection scenarios * Edge cases and error conditions * Concurrent access patterns Static membership enables sticky partition assignments and reduces rebalancing overhead for long-running consumers.
441 lines
13 KiB
Go
441 lines
13 KiB
Go
package protocol
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
|
|
)
|
|
|
|
// Helper function to build JoinGroup request body for testing
|
|
func buildJoinGroupRequestBody(req *JoinGroupRequest) []byte {
|
|
// This is a simplified version for testing
|
|
// In practice, you'd use the actual protocol serialization
|
|
body := make([]byte, 0, 256)
|
|
|
|
// Group ID (string)
|
|
groupIDLen := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(groupIDLen, uint16(len(req.GroupID)))
|
|
body = append(body, groupIDLen...)
|
|
body = append(body, []byte(req.GroupID)...)
|
|
|
|
// Session timeout (4 bytes)
|
|
sessionTimeout := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(sessionTimeout, uint32(req.SessionTimeout))
|
|
body = append(body, sessionTimeout...)
|
|
|
|
// Rebalance timeout (4 bytes)
|
|
rebalanceTimeout := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(rebalanceTimeout, uint32(req.RebalanceTimeout))
|
|
body = append(body, rebalanceTimeout...)
|
|
|
|
// Member ID (string)
|
|
memberIDLen := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(memberIDLen, uint16(len(req.MemberID)))
|
|
body = append(body, memberIDLen...)
|
|
body = append(body, []byte(req.MemberID)...)
|
|
|
|
// Group instance ID (string, v5+)
|
|
instanceIDLen := make([]byte, 2)
|
|
if req.GroupInstanceID == "" {
|
|
binary.BigEndian.PutUint16(instanceIDLen, 0xFFFF) // null
|
|
} else {
|
|
binary.BigEndian.PutUint16(instanceIDLen, uint16(len(req.GroupInstanceID)))
|
|
}
|
|
body = append(body, instanceIDLen...)
|
|
if req.GroupInstanceID != "" {
|
|
body = append(body, []byte(req.GroupInstanceID)...)
|
|
}
|
|
|
|
// Protocol type (string)
|
|
protocolTypeLen := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(protocolTypeLen, uint16(len(req.ProtocolType)))
|
|
body = append(body, protocolTypeLen...)
|
|
body = append(body, []byte(req.ProtocolType)...)
|
|
|
|
// Group protocols array (simplified)
|
|
protocolsCount := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(protocolsCount, uint32(len(req.GroupProtocols)))
|
|
body = append(body, protocolsCount...)
|
|
|
|
for _, protocol := range req.GroupProtocols {
|
|
// Protocol name
|
|
nameLen := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(nameLen, uint16(len(protocol.Name)))
|
|
body = append(body, nameLen...)
|
|
body = append(body, []byte(protocol.Name)...)
|
|
|
|
// Protocol metadata
|
|
metadataLen := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(metadataLen, uint32(len(protocol.Metadata)))
|
|
body = append(body, metadataLen...)
|
|
body = append(body, protocol.Metadata...)
|
|
}
|
|
|
|
return body
|
|
}
|
|
|
|
// Helper function to build LeaveGroup request body for testing
|
|
func buildLeaveGroupRequestBody(req *LeaveGroupRequest) []byte {
|
|
body := make([]byte, 0, 128)
|
|
|
|
// Group ID (string)
|
|
groupIDLen := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(groupIDLen, uint16(len(req.GroupID)))
|
|
body = append(body, groupIDLen...)
|
|
body = append(body, []byte(req.GroupID)...)
|
|
|
|
// Member ID (string)
|
|
memberIDLen := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(memberIDLen, uint16(len(req.MemberID)))
|
|
body = append(body, memberIDLen...)
|
|
body = append(body, []byte(req.MemberID)...)
|
|
|
|
// Group instance ID (string, v3+)
|
|
instanceIDLen := make([]byte, 2)
|
|
if req.GroupInstanceID == "" {
|
|
binary.BigEndian.PutUint16(instanceIDLen, 0xFFFF) // null
|
|
} else {
|
|
binary.BigEndian.PutUint16(instanceIDLen, uint16(len(req.GroupInstanceID)))
|
|
}
|
|
body = append(body, instanceIDLen...)
|
|
if req.GroupInstanceID != "" {
|
|
body = append(body, []byte(req.GroupInstanceID)...)
|
|
}
|
|
|
|
return body
|
|
}
|
|
|
|
func TestHandler_JoinGroup_StaticMembership(t *testing.T) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Test static member joining
|
|
t.Run("StaticMemberJoin", func(t *testing.T) {
|
|
request := &JoinGroupRequest{
|
|
GroupID: "test-group",
|
|
SessionTimeout: 30000,
|
|
RebalanceTimeout: 300000,
|
|
MemberID: "", // New member
|
|
GroupInstanceID: "static-instance-1",
|
|
ProtocolType: "consumer",
|
|
GroupProtocols: []GroupProtocol{
|
|
{Name: "range", Metadata: []byte{}},
|
|
},
|
|
}
|
|
|
|
// Build request body (simplified for testing)
|
|
requestBody := buildJoinGroupRequestBody(request)
|
|
|
|
response, err := handler.handleJoinGroup(12345, 5, requestBody)
|
|
if err != nil {
|
|
t.Fatalf("JoinGroup failed: %v", err)
|
|
}
|
|
|
|
// Parse response to verify member was added
|
|
if len(response) < 10 {
|
|
t.Fatalf("Response too short: %d bytes", len(response))
|
|
}
|
|
|
|
// Check error code (should be 0)
|
|
errorCode := binary.BigEndian.Uint16(response[4:6])
|
|
if errorCode != 0 {
|
|
t.Errorf("Expected error code 0, got %d", errorCode)
|
|
}
|
|
|
|
// Verify member was added to group
|
|
group := handler.groupCoordinator.GetGroup("test-group")
|
|
if group == nil {
|
|
t.Fatal("Group not found")
|
|
}
|
|
|
|
if len(group.Members) != 1 {
|
|
t.Errorf("Expected 1 member, got %d", len(group.Members))
|
|
}
|
|
|
|
// Find the member and verify static membership
|
|
var member *consumer.GroupMember
|
|
for _, m := range group.Members {
|
|
member = m
|
|
break
|
|
}
|
|
|
|
if member == nil {
|
|
t.Fatal("Member not found")
|
|
}
|
|
|
|
if member.GroupInstanceID == nil || *member.GroupInstanceID != "static-instance-1" {
|
|
t.Errorf("Expected GroupInstanceID 'static-instance-1', got %v", member.GroupInstanceID)
|
|
}
|
|
|
|
if !handler.groupCoordinator.IsStaticMember(member) {
|
|
t.Error("Expected member to be static")
|
|
}
|
|
})
|
|
|
|
// Test static member reconnection
|
|
t.Run("StaticMemberReconnection", func(t *testing.T) {
|
|
// First join
|
|
request1 := &JoinGroupRequest{
|
|
GroupID: "test-group-2",
|
|
SessionTimeout: 30000,
|
|
RebalanceTimeout: 300000,
|
|
MemberID: "",
|
|
GroupInstanceID: "static-instance-2",
|
|
ProtocolType: "consumer",
|
|
GroupProtocols: []GroupProtocol{
|
|
{Name: "range", Metadata: []byte{}},
|
|
},
|
|
}
|
|
|
|
requestBody1 := buildJoinGroupRequestBody(request1)
|
|
_, err := handler.handleJoinGroup(12346, 5, requestBody1)
|
|
if err != nil {
|
|
t.Fatalf("First JoinGroup failed: %v", err)
|
|
}
|
|
|
|
group := handler.groupCoordinator.GetGroup("test-group-2")
|
|
if len(group.Members) != 1 {
|
|
t.Errorf("Expected 1 member after first join, got %d", len(group.Members))
|
|
}
|
|
|
|
// Get the first member ID
|
|
var firstMemberID string
|
|
for memberID := range group.Members {
|
|
firstMemberID = memberID
|
|
break
|
|
}
|
|
|
|
// Second join with same instance ID (simulating reconnection)
|
|
request2 := &JoinGroupRequest{
|
|
GroupID: "test-group-2",
|
|
SessionTimeout: 30000,
|
|
RebalanceTimeout: 300000,
|
|
MemberID: "",
|
|
GroupInstanceID: "static-instance-2", // Same instance ID
|
|
ProtocolType: "consumer",
|
|
GroupProtocols: []GroupProtocol{
|
|
{Name: "range", Metadata: []byte{}},
|
|
},
|
|
}
|
|
|
|
requestBody2 := buildJoinGroupRequestBody(request2)
|
|
_, err = handler.handleJoinGroup(12347, 5, requestBody2)
|
|
if err != nil {
|
|
t.Fatalf("Second JoinGroup failed: %v", err)
|
|
}
|
|
|
|
// Should still have the same member (static member reused)
|
|
if len(group.Members) != 1 {
|
|
t.Errorf("Expected 1 member after reconnection, got %d", len(group.Members))
|
|
}
|
|
|
|
// Verify the member ID is the same (static member reused)
|
|
var secondMemberID string
|
|
for memberID := range group.Members {
|
|
secondMemberID = memberID
|
|
break
|
|
}
|
|
|
|
if firstMemberID != secondMemberID {
|
|
t.Errorf("Expected same member ID for static member reconnection, got %s vs %s", firstMemberID, secondMemberID)
|
|
}
|
|
})
|
|
|
|
// Test dynamic member (no instance ID)
|
|
t.Run("DynamicMember", func(t *testing.T) {
|
|
request := &JoinGroupRequest{
|
|
GroupID: "test-group-3",
|
|
SessionTimeout: 30000,
|
|
RebalanceTimeout: 300000,
|
|
MemberID: "",
|
|
GroupInstanceID: "", // No instance ID (dynamic member)
|
|
ProtocolType: "consumer",
|
|
GroupProtocols: []GroupProtocol{
|
|
{Name: "range", Metadata: []byte{}},
|
|
},
|
|
}
|
|
|
|
requestBody := buildJoinGroupRequestBody(request)
|
|
_, err := handler.handleJoinGroup(12348, 5, requestBody)
|
|
if err != nil {
|
|
t.Fatalf("JoinGroup failed: %v", err)
|
|
}
|
|
|
|
group := handler.groupCoordinator.GetGroup("test-group-3")
|
|
if len(group.Members) != 1 {
|
|
t.Errorf("Expected 1 member, got %d", len(group.Members))
|
|
}
|
|
|
|
// Find the member and verify it's dynamic
|
|
var member *consumer.GroupMember
|
|
for _, m := range group.Members {
|
|
member = m
|
|
break
|
|
}
|
|
|
|
if member == nil {
|
|
t.Fatal("Member not found")
|
|
}
|
|
|
|
if handler.groupCoordinator.IsStaticMember(member) {
|
|
t.Error("Expected member to be dynamic")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestHandler_LeaveGroup_StaticMembership(t *testing.T) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Setup: Add a static member
|
|
group := handler.groupCoordinator.GetOrCreateGroup("test-group")
|
|
instanceID := "static-instance-1"
|
|
member := &consumer.GroupMember{
|
|
ID: "member-1",
|
|
ClientID: "client-1",
|
|
ClientHost: "localhost",
|
|
GroupInstanceID: &instanceID,
|
|
SessionTimeout: 30000,
|
|
State: consumer.MemberStatePending,
|
|
}
|
|
group.Members[member.ID] = member
|
|
handler.groupCoordinator.RegisterStaticMember(group, member)
|
|
|
|
// Test leaving static member
|
|
t.Run("StaticMemberLeave", func(t *testing.T) {
|
|
request := &LeaveGroupRequest{
|
|
GroupID: "test-group",
|
|
MemberID: "member-1",
|
|
GroupInstanceID: "static-instance-1",
|
|
}
|
|
|
|
requestBody := buildLeaveGroupRequestBody(request)
|
|
response, err := handler.handleLeaveGroup(12349, 3, requestBody)
|
|
if err != nil {
|
|
t.Fatalf("LeaveGroup failed: %v", err)
|
|
}
|
|
|
|
// Check error code (should be 0)
|
|
if len(response) < 6 {
|
|
t.Fatalf("Response too short: %d bytes", len(response))
|
|
}
|
|
errorCode := binary.BigEndian.Uint16(response[4:6])
|
|
if errorCode != 0 {
|
|
t.Errorf("Expected error code 0, got %d", errorCode)
|
|
}
|
|
|
|
// Verify member was removed
|
|
if len(group.Members) != 0 {
|
|
t.Errorf("Expected 0 members after leave, got %d", len(group.Members))
|
|
}
|
|
|
|
// Verify static member was unregistered
|
|
foundMember := handler.groupCoordinator.FindStaticMember(group, instanceID)
|
|
if foundMember != nil {
|
|
t.Error("Expected static member to be unregistered")
|
|
}
|
|
})
|
|
|
|
// Test leaving with wrong instance ID
|
|
t.Run("StaticMemberLeaveWrongInstanceID", func(t *testing.T) {
|
|
// Setup: Add another static member
|
|
instanceID2 := "static-instance-2"
|
|
member2 := &consumer.GroupMember{
|
|
ID: "member-2",
|
|
ClientID: "client-2",
|
|
ClientHost: "localhost",
|
|
GroupInstanceID: &instanceID2,
|
|
SessionTimeout: 30000,
|
|
State: consumer.MemberStatePending,
|
|
}
|
|
group.Members[member2.ID] = member2
|
|
handler.groupCoordinator.RegisterStaticMember(group, member2)
|
|
|
|
request := &LeaveGroupRequest{
|
|
GroupID: "test-group",
|
|
MemberID: "member-2",
|
|
GroupInstanceID: "wrong-instance-id", // Wrong instance ID
|
|
}
|
|
|
|
requestBody := buildLeaveGroupRequestBody(request)
|
|
response, err := handler.handleLeaveGroup(12350, 3, requestBody)
|
|
if err != nil {
|
|
t.Fatalf("LeaveGroup failed: %v", err)
|
|
}
|
|
|
|
// Check error code (should be FENCED_INSTANCE_ID)
|
|
if len(response) < 6 {
|
|
t.Fatalf("Response too short: %d bytes", len(response))
|
|
}
|
|
errorCode := binary.BigEndian.Uint16(response[4:6])
|
|
if errorCode != uint16(ErrorCodeFencedInstanceID) {
|
|
t.Errorf("Expected error code %d (FENCED_INSTANCE_ID), got %d", ErrorCodeFencedInstanceID, errorCode)
|
|
}
|
|
|
|
// Verify member was NOT removed
|
|
if len(group.Members) != 1 {
|
|
t.Errorf("Expected 1 member after failed leave, got %d", len(group.Members))
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestHandler_DescribeGroups_StaticMembership(t *testing.T) {
|
|
handler := NewTestHandler()
|
|
defer handler.Close()
|
|
|
|
// Setup: Add static and dynamic members
|
|
group := handler.groupCoordinator.GetOrCreateGroup("test-group")
|
|
|
|
// Static member
|
|
staticInstanceID := "static-instance-1"
|
|
staticMember := &consumer.GroupMember{
|
|
ID: "static-member",
|
|
ClientID: "static-client",
|
|
ClientHost: "localhost",
|
|
GroupInstanceID: &staticInstanceID,
|
|
SessionTimeout: 30000,
|
|
State: consumer.MemberStateStable,
|
|
}
|
|
group.Members[staticMember.ID] = staticMember
|
|
handler.groupCoordinator.RegisterStaticMember(group, staticMember)
|
|
|
|
// Dynamic member
|
|
dynamicMember := &consumer.GroupMember{
|
|
ID: "dynamic-member",
|
|
ClientID: "dynamic-client",
|
|
ClientHost: "localhost",
|
|
GroupInstanceID: nil,
|
|
SessionTimeout: 30000,
|
|
State: consumer.MemberStateStable,
|
|
}
|
|
group.Members[dynamicMember.ID] = dynamicMember
|
|
|
|
group.State = consumer.GroupStateStable
|
|
|
|
// Test DescribeGroups includes GroupInstanceID
|
|
// Build a simple DescribeGroups request body
|
|
requestBody := make([]byte, 0, 64)
|
|
// Groups array count (4 bytes)
|
|
groupsCount := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(groupsCount, 1)
|
|
requestBody = append(requestBody, groupsCount...)
|
|
// Group ID (string)
|
|
groupIDLen := make([]byte, 2)
|
|
binary.BigEndian.PutUint16(groupIDLen, uint16(len("test-group")))
|
|
requestBody = append(requestBody, groupIDLen...)
|
|
requestBody = append(requestBody, []byte("test-group")...)
|
|
|
|
response, err := handler.handleDescribeGroups(12351, 5, requestBody)
|
|
if err != nil {
|
|
t.Fatalf("DescribeGroups failed: %v", err)
|
|
}
|
|
|
|
if len(response) < 10 {
|
|
t.Fatalf("Response too short: %d bytes", len(response))
|
|
}
|
|
|
|
// Basic validation - response should contain group information
|
|
// In a real test, we'd parse the full response to verify GroupInstanceID is included
|
|
t.Logf("DescribeGroups response length: %d bytes", len(response))
|
|
}
|