1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/protocol/create_topics_test.go
chrislu 5d0c45c9dc Phase 2: Implement CreateTopics protocol compliance for v0/v1
CreateTopics Protocol Compliance completed:

## Implementation
- Implement handleCreateTopicsV0V1() with proper v0/v1 request parsing
- Support regular array/string format (not compact) for v0/v1
- Parse topic name, partitions, replication factor, assignments, configs
- Handle timeout_ms and validate_only fields correctly
- Maintain existing v2+ compact format support
- Wire to SeaweedMQ handler for actual topic creation

## Key Features
- Full v0-v5 CreateTopics API version support
- Proper error handling (TOPIC_ALREADY_EXISTS, INVALID_PARTITIONS, etc.)
- Partition count validation and enforcement
- Compatible with existing SeaweedMQ topic management

## Tests
- Comprehensive unit tests for v0/v1/v2+ parsing
- Error condition testing (duplicate topics, invalid partitions)
- Multi-topic creation support
- Integration tests across all API versions
- Performance benchmarks for CreateTopics operations

## Verification
- All protocol tests pass (v0-v5 CreateTopics)
- E2E Sarama tests continue to work
- Real topics created with specified partition counts
- Proper error responses for edge cases

Ready for Phase 3: ApiVersions matrix accuracy
2025-09-13 13:18:54 -07:00

469 lines
13 KiB
Go

package protocol
import (
"encoding/binary"
"fmt"
"testing"
)
func TestCreateTopicsV0_BasicParsing(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Build a CreateTopics v0 request
request := make([]byte, 0, 256)
// Topics array count (1 topic)
request = append(request, 0x00, 0x00, 0x00, 0x01)
// Topic 1: "test-topic"
topicName := "test-topic"
request = append(request, 0x00, byte(len(topicName))) // Topic name length
request = append(request, []byte(topicName)...) // Topic name
// num_partitions = 3
request = append(request, 0x00, 0x00, 0x00, 0x03)
// replication_factor = 1
request = append(request, 0x00, 0x01)
// assignments array (empty)
request = append(request, 0x00, 0x00, 0x00, 0x00)
// configs array (empty)
request = append(request, 0x00, 0x00, 0x00, 0x00)
// timeout_ms = 5000
request = append(request, 0x00, 0x00, 0x13, 0x88)
// Call handler
response, err := handler.handleCreateTopicsV0V1(12345, request)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(response) < 10 {
t.Fatalf("Response too short: %d bytes", len(response))
}
// Check correlation ID
correlationID := binary.BigEndian.Uint32(response[0:4])
if correlationID != 12345 {
t.Errorf("Expected correlation ID 12345, got %d", correlationID)
}
// Check topics array count
topicsCount := binary.BigEndian.Uint32(response[4:8])
if topicsCount != 1 {
t.Errorf("Expected 1 topic in response, got %d", topicsCount)
}
// Verify topic was actually created
if !handler.seaweedMQHandler.TopicExists("test-topic") {
t.Error("Topic 'test-topic' was not created")
}
}
func TestCreateTopicsV0_TopicAlreadyExists(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Pre-create the topic
err := handler.seaweedMQHandler.CreateTopic("existing-topic", 2)
if err != nil {
t.Fatalf("Failed to pre-create topic: %v", err)
}
// Build request for the same topic
request := make([]byte, 0, 256)
// Topics array count (1 topic)
request = append(request, 0x00, 0x00, 0x00, 0x01)
// Topic 1: "existing-topic"
topicName := "existing-topic"
request = append(request, 0x00, byte(len(topicName))) // Topic name length
request = append(request, []byte(topicName)...) // Topic name
// num_partitions = 1
request = append(request, 0x00, 0x00, 0x00, 0x01)
// replication_factor = 1
request = append(request, 0x00, 0x01)
// assignments array (empty)
request = append(request, 0x00, 0x00, 0x00, 0x00)
// configs array (empty)
request = append(request, 0x00, 0x00, 0x00, 0x00)
// timeout_ms = 5000
request = append(request, 0x00, 0x00, 0x13, 0x88)
// Call handler
response, err := handler.handleCreateTopicsV0V1(12346, request)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Parse response to check error code
if len(response) < 12 {
t.Fatalf("Response too short for error code check: %d bytes", len(response))
}
// Skip correlation ID (4 bytes) + topics count (4 bytes) + topic name length (2 bytes) + topic name
offset := 4 + 4 + 2 + len(topicName)
if len(response) >= offset+2 {
errorCode := binary.BigEndian.Uint16(response[offset : offset+2])
if errorCode != 36 { // TOPIC_ALREADY_EXISTS
t.Errorf("Expected error code 36 (TOPIC_ALREADY_EXISTS), got %d", errorCode)
}
} else {
t.Error("Response too short to contain error code")
}
}
func TestCreateTopicsV0_InvalidPartitions(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Build request with invalid partition count (0)
request := make([]byte, 0, 256)
// Topics array count (1 topic)
request = append(request, 0x00, 0x00, 0x00, 0x01)
// Topic 1: "invalid-topic"
topicName := "invalid-topic"
request = append(request, 0x00, byte(len(topicName))) // Topic name length
request = append(request, []byte(topicName)...) // Topic name
// num_partitions = 0 (invalid)
request = append(request, 0x00, 0x00, 0x00, 0x00)
// replication_factor = 1
request = append(request, 0x00, 0x01)
// assignments array (empty)
request = append(request, 0x00, 0x00, 0x00, 0x00)
// configs array (empty)
request = append(request, 0x00, 0x00, 0x00, 0x00)
// timeout_ms = 5000
request = append(request, 0x00, 0x00, 0x13, 0x88)
// Call handler
response, err := handler.handleCreateTopicsV0V1(12347, request)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Parse response to check error code
if len(response) < 12 {
t.Fatalf("Response too short for error code check: %d bytes", len(response))
}
// Skip correlation ID (4 bytes) + topics count (4 bytes) + topic name length (2 bytes) + topic name
offset := 4 + 4 + 2 + len(topicName)
if len(response) >= offset+2 {
errorCode := binary.BigEndian.Uint16(response[offset : offset+2])
if errorCode != 37 { // INVALID_PARTITIONS
t.Errorf("Expected error code 37 (INVALID_PARTITIONS), got %d", errorCode)
}
} else {
t.Error("Response too short to contain error code")
}
// Verify topic was not created
if handler.seaweedMQHandler.TopicExists("invalid-topic") {
t.Error("Topic with invalid partitions should not have been created")
}
}
func TestCreateTopicsV2Plus_CompactFormat(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Build a CreateTopics v2 request (compact format)
request := make([]byte, 0, 256)
// Topics array count (compact: count + 1, so 1 topic = 2)
request = append(request, 0x02)
// Topic 1: "compact-topic"
topicName := "compact-topic"
request = append(request, byte(len(topicName)+1)) // Compact string length
request = append(request, []byte(topicName)...) // Topic name
// num_partitions = 2
request = append(request, 0x00, 0x00, 0x00, 0x02)
// replication_factor = 1
request = append(request, 0x00, 0x01)
// configs array (compact: empty = 0)
request = append(request, 0x00)
// tagged fields (empty)
request = append(request, 0x00)
// timeout_ms = 10000
request = append(request, 0x00, 0x00, 0x27, 0x10)
// validate_only = false
request = append(request, 0x00)
// tagged fields at end
request = append(request, 0x00)
// Call handler
response, err := handler.handleCreateTopicsV2Plus(12348, 2, request)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(response) < 10 {
t.Fatalf("Response too short: %d bytes", len(response))
}
// Check correlation ID
correlationID := binary.BigEndian.Uint32(response[0:4])
if correlationID != 12348 {
t.Errorf("Expected correlation ID 12348, got %d", correlationID)
}
// Verify topic was created
if !handler.seaweedMQHandler.TopicExists("compact-topic") {
t.Error("Topic 'compact-topic' was not created")
}
}
func TestCreateTopicsV2Plus_MultipleTopics(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Build a CreateTopics v2 request with 2 topics
request := make([]byte, 0, 512)
// Topics array count (compact: 2 topics = 3)
request = append(request, 0x03)
// Topic 1: "topic-one"
topicName1 := "topic-one"
request = append(request, byte(len(topicName1)+1)) // Compact string length
request = append(request, []byte(topicName1)...) // Topic name
// num_partitions = 1
request = append(request, 0x00, 0x00, 0x00, 0x01)
// replication_factor = 1
request = append(request, 0x00, 0x01)
// configs array (compact: empty = 0)
request = append(request, 0x00)
// tagged fields (empty)
request = append(request, 0x00)
// Topic 2: "topic-two"
topicName2 := "topic-two"
request = append(request, byte(len(topicName2)+1)) // Compact string length
request = append(request, []byte(topicName2)...) // Topic name
// num_partitions = 3
request = append(request, 0x00, 0x00, 0x00, 0x03)
// replication_factor = 1
request = append(request, 0x00, 0x01)
// configs array (compact: empty = 0)
request = append(request, 0x00)
// tagged fields (empty)
request = append(request, 0x00)
// timeout_ms = 5000
request = append(request, 0x00, 0x00, 0x13, 0x88)
// validate_only = false
request = append(request, 0x00)
// tagged fields at end
request = append(request, 0x00)
// Call handler
response, err := handler.handleCreateTopicsV2Plus(12349, 2, request)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(response) < 4 {
t.Fatalf("Response too short: %d bytes", len(response))
}
// Verify both topics were created
if !handler.seaweedMQHandler.TopicExists("topic-one") {
t.Error("Topic 'topic-one' was not created")
}
if !handler.seaweedMQHandler.TopicExists("topic-two") {
t.Error("Topic 'topic-two' was not created")
}
}
// Integration test with actual Kafka-like workflow
func TestCreateTopics_Integration(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Test version routing
testCases := []struct {
name string
version uint16
topicName string
partitions int32
}{
{"Version0", 0, "integration-v0-topic", 2},
{"Version1", 1, "integration-v1-topic", 3},
{"Version2", 2, "integration-v2-topic", 1},
{"Version3", 3, "integration-v3-topic", 4},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var request []byte
if tc.version <= 1 {
// Build v0/v1 format request
request = make([]byte, 0, 256)
// Topics array count (1 topic)
request = append(request, 0x00, 0x00, 0x00, 0x01)
// Topic name
request = append(request, 0x00, byte(len(tc.topicName)))
request = append(request, []byte(tc.topicName)...)
// num_partitions
partitionBytes := make([]byte, 4)
binary.BigEndian.PutUint32(partitionBytes, uint32(tc.partitions))
request = append(request, partitionBytes...)
// replication_factor = 1
request = append(request, 0x00, 0x01)
// assignments array (empty)
request = append(request, 0x00, 0x00, 0x00, 0x00)
// configs array (empty)
request = append(request, 0x00, 0x00, 0x00, 0x00)
// timeout_ms = 5000
request = append(request, 0x00, 0x00, 0x13, 0x88)
} else {
// Build v2+ format request (compact)
request = make([]byte, 0, 256)
// Topics array count (compact: 1 topic = 2)
request = append(request, 0x02)
// Topic name (compact string)
request = append(request, byte(len(tc.topicName)+1))
request = append(request, []byte(tc.topicName)...)
// num_partitions
partitionBytes := make([]byte, 4)
binary.BigEndian.PutUint32(partitionBytes, uint32(tc.partitions))
request = append(request, partitionBytes...)
// replication_factor = 1
request = append(request, 0x00, 0x01)
// configs array (compact: empty = 0)
request = append(request, 0x00)
// tagged fields (empty)
request = append(request, 0x00)
// timeout_ms = 5000
request = append(request, 0x00, 0x00, 0x13, 0x88)
// validate_only = false
request = append(request, 0x00)
// tagged fields at end
request = append(request, 0x00)
}
// Call the main handler (which routes to version-specific handlers)
response, err := handler.handleCreateTopics(uint32(1000+tc.version), tc.version, request)
if err != nil {
t.Fatalf("CreateTopics v%d failed: %v", tc.version, err)
}
if len(response) == 0 {
t.Fatalf("CreateTopics v%d returned empty response", tc.version)
}
// Verify topic was created with correct partition count
if !handler.seaweedMQHandler.TopicExists(tc.topicName) {
t.Errorf("Topic '%s' was not created in v%d", tc.topicName, tc.version)
}
// Check partition count (create ledgers on-demand to verify partition setup)
for partitionID := int32(0); partitionID < tc.partitions; partitionID++ {
ledger := handler.seaweedMQHandler.GetOrCreateLedger(tc.topicName, partitionID)
if ledger == nil {
t.Errorf("Failed to get/create ledger for topic '%s' partition %d", tc.topicName, partitionID)
}
}
})
}
}
// Benchmark CreateTopics performance
func BenchmarkCreateTopicsV0(b *testing.B) {
handler := NewTestHandler()
defer handler.Close()
// Pre-build request
request := make([]byte, 0, 256)
request = append(request, 0x00, 0x00, 0x00, 0x01) // 1 topic
topicName := "benchmark-topic"
request = append(request, 0x00, byte(len(topicName)))
request = append(request, []byte(topicName)...)
request = append(request, 0x00, 0x00, 0x00, 0x01) // 1 partition
request = append(request, 0x00, 0x01) // replication factor 1
request = append(request, 0x00, 0x00, 0x00, 0x00) // empty assignments
request = append(request, 0x00, 0x00, 0x00, 0x00) // empty configs
request = append(request, 0x00, 0x00, 0x13, 0x88) // timeout 5000ms
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Create unique topic names to avoid "already exists" errors
uniqueRequest := make([]byte, len(request))
copy(uniqueRequest, request)
// Modify topic name to make it unique
topicSuffix := []byte(fmt.Sprintf("-%d", i))
uniqueRequest = append(uniqueRequest[:10+len(topicName)], topicSuffix...)
uniqueRequest = append(uniqueRequest, request[10+len(topicName):]...)
// Update topic name length
uniqueRequest[8] = byte(len(topicName) + len(topicSuffix))
_, err := handler.handleCreateTopicsV0V1(uint32(i), uniqueRequest)
if err != nil {
b.Fatalf("CreateTopics failed on iteration %d: %v", i, err)
}
}
}