1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/group_introspection_test.go
chrislu 74980a6205 Phase 1 Part 2: Group introspection APIs
- Implement DescribeGroups (API 15) and ListGroups (API 16) handlers
- Add comprehensive request parsing for both APIs with version support
- Add response building with proper Kafka protocol format
- Support states filtering in ListGroups v4+
- Add API entries to ApiVersions response and validation
- Add structured logging for group introspection requests
- Add comprehensive test coverage for all parsing and response building

Group introspection APIs complete with full protocol compliance.
2025-09-15 20:36:04 -07:00

334 lines
7.5 KiB
Go

package protocol
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
func TestHandler_handleDescribeGroups(t *testing.T) {
handler := &Handler{}
// Test with no group coordinator
t.Run("NoCoordinator", func(t *testing.T) {
request := []byte{
0, 0, 0, 1, // 1 group
0, 10, 't', 'e', 's', 't', '-', 'g', 'r', 'o', 'u', 'p', // "test-group"
}
response, err := handler.handleDescribeGroups(123, 0, request)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if len(response) == 0 {
t.Fatal("Expected non-empty response")
}
// Should contain error code for GROUP_COORDINATOR_NOT_AVAILABLE
t.Logf("Response length: %d bytes", len(response))
})
// Test with group coordinator but no groups
t.Run("WithCoordinatorNoGroups", func(t *testing.T) {
coordinator := consumer.NewGroupCoordinator()
handler.groupCoordinator = coordinator
request := []byte{
0, 0, 0, 1, // 1 group
0, 10, 't', 'e', 's', 't', '-', 'g', 'r', 'o', 'u', 'p', // "test-group"
}
response, err := handler.handleDescribeGroups(123, 0, request)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if len(response) == 0 {
t.Fatal("Expected non-empty response")
}
// Should contain error code for UNKNOWN_GROUP_ID
t.Logf("Response length: %d bytes", len(response))
})
// Test parsing edge cases
t.Run("InvalidRequest", func(t *testing.T) {
request := []byte{0, 0} // Too short
_, err := handler.handleDescribeGroups(123, 0, request)
if err == nil {
t.Fatal("Expected error for invalid request")
}
})
}
func TestHandler_handleListGroups(t *testing.T) {
handler := &Handler{}
// Test with no group coordinator
t.Run("NoCoordinator", func(t *testing.T) {
request := []byte{} // Empty request
response, err := handler.handleListGroups(123, 0, request)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if len(response) == 0 {
t.Fatal("Expected non-empty response")
}
t.Logf("Response length: %d bytes", len(response))
})
// Test with group coordinator
t.Run("WithCoordinator", func(t *testing.T) {
coordinator := consumer.NewGroupCoordinator()
handler.groupCoordinator = coordinator
request := []byte{} // Empty request
response, err := handler.handleListGroups(123, 0, request)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if len(response) == 0 {
t.Fatal("Expected non-empty response")
}
t.Logf("Response length: %d bytes", len(response))
})
// Test with states filter (v4+)
t.Run("WithStatesFilter", func(t *testing.T) {
coordinator := consumer.NewGroupCoordinator()
handler.groupCoordinator = coordinator
// Request with states filter: ["Stable"]
request := []byte{
0, 0, 0, 1, // 1 state filter
0, 6, 'S', 't', 'a', 'b', 'l', 'e', // "Stable"
}
response, err := handler.handleListGroups(123, 4, request)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if len(response) == 0 {
t.Fatal("Expected non-empty response")
}
t.Logf("Response length: %d bytes", len(response))
})
}
func TestHandler_parseDescribeGroupsRequest(t *testing.T) {
handler := &Handler{}
tests := []struct {
name string
data []byte
apiVersion uint16
expectError bool
expectedLen int
}{
{
name: "single group",
data: []byte{
0, 0, 0, 1, // 1 group
0, 10, 't', 'e', 's', 't', '-', 'g', 'r', 'o', 'u', 'p', // "test-group"
},
apiVersion: 0,
expectError: false,
expectedLen: 1,
},
{
name: "multiple groups",
data: []byte{
0, 0, 0, 2, // 2 groups
0, 6, 'g', 'r', 'o', 'u', 'p', '1', // "group1"
0, 6, 'g', 'r', 'o', 'u', 'p', '2', // "group2"
},
apiVersion: 0,
expectError: false,
expectedLen: 2,
},
{
name: "empty request",
data: []byte{},
apiVersion: 0,
expectError: true,
},
{
name: "truncated request",
data: []byte{
0, 0, 0, 1, // 1 group
0, 10, 't', 'e', 's', 't', // Incomplete group name
},
apiVersion: 0,
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := handler.parseDescribeGroupsRequest(tt.data, tt.apiVersion)
if tt.expectError {
if err == nil {
t.Error("Expected error but got none")
}
return
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
if len(result.GroupIDs) != tt.expectedLen {
t.Errorf("Expected %d groups, got %d", tt.expectedLen, len(result.GroupIDs))
}
})
}
}
func TestHandler_parseListGroupsRequest(t *testing.T) {
handler := &Handler{}
tests := []struct {
name string
data []byte
apiVersion uint16
expectError bool
expectedLen int
}{
{
name: "empty request v0",
data: []byte{},
apiVersion: 0,
expectError: false,
expectedLen: 0,
},
{
name: "with states filter v4",
data: []byte{
0, 0, 0, 2, // 2 states
0, 6, 'S', 't', 'a', 'b', 'l', 'e', // "Stable"
0, 5, 'E', 'm', 'p', 't', 'y', // "Empty"
},
apiVersion: 4,
expectError: false,
expectedLen: 2,
},
{
name: "no states filter v4",
data: []byte{
0, 0, 0, 0, // 0 states
},
apiVersion: 4,
expectError: false,
expectedLen: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := handler.parseListGroupsRequest(tt.data, tt.apiVersion)
if tt.expectError {
if err == nil {
t.Error("Expected error but got none")
}
return
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
if len(result.StatesFilter) != tt.expectedLen {
t.Errorf("Expected %d states, got %d", tt.expectedLen, len(result.StatesFilter))
}
})
}
}
func TestHandler_buildDescribeGroupsResponse(t *testing.T) {
handler := &Handler{}
response := DescribeGroupsResponse{
ThrottleTimeMs: 0,
Groups: []DescribeGroupsGroup{
{
ErrorCode: 0,
GroupID: "test-group",
State: "Stable",
ProtocolType: "consumer",
Protocol: "range",
Members: []DescribeGroupsMember{
{
MemberID: "member-1",
GroupInstanceID: nil,
ClientID: "client-1",
ClientHost: "localhost",
MemberMetadata: []byte("metadata"),
MemberAssignment: []byte("assignment"),
},
},
AuthorizedOps: []int32{},
},
},
}
result := handler.buildDescribeGroupsResponse(response, 123, 0)
if len(result) == 0 {
t.Fatal("Expected non-empty response")
}
t.Logf("Response length: %d bytes", len(result))
// Test with different API versions
resultV3 := handler.buildDescribeGroupsResponse(response, 123, 3)
if len(resultV3) <= len(result) {
t.Error("Expected v3 response to be larger (includes authorized ops)")
}
}
func TestHandler_buildListGroupsResponse(t *testing.T) {
handler := &Handler{}
response := ListGroupsResponse{
ThrottleTimeMs: 0,
ErrorCode: 0,
Groups: []ListGroupsGroup{
{
GroupID: "group-1",
ProtocolType: "consumer",
GroupState: "Stable",
},
{
GroupID: "group-2",
ProtocolType: "consumer",
GroupState: "Empty",
},
},
}
result := handler.buildListGroupsResponse(response, 123, 0)
if len(result) == 0 {
t.Fatal("Expected non-empty response")
}
t.Logf("Response length: %d bytes", len(result))
// Test with different API versions
resultV4 := handler.buildListGroupsResponse(response, 123, 4)
if len(resultV4) <= len(result) {
t.Error("Expected v4 response to be larger (includes group state)")
}
}