mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Implement ParseConfluentEnvelope() to detect and extract schema info - Add support for magic byte (0x00) + schema ID extraction - Include envelope validation and metadata extraction - Add comprehensive unit tests with 100% coverage - Prepare foundation for Avro/Protobuf/JSON Schema support This enables detection of schematized Kafka messages for gateway processing.
320 lines
7.2 KiB
Go
320 lines
7.2 KiB
Go
package schema
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"testing"
|
|
)
|
|
|
|
func TestParseConfluentEnvelope(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
input []byte
|
|
expectOK bool
|
|
expectID uint32
|
|
expectFormat Format
|
|
}{
|
|
{
|
|
name: "valid Avro message",
|
|
input: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x10, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, // schema ID 1 + "Hello"
|
|
expectOK: true,
|
|
expectID: 1,
|
|
expectFormat: FormatAvro,
|
|
},
|
|
{
|
|
name: "valid message with larger schema ID",
|
|
input: []byte{0x00, 0x00, 0x00, 0x04, 0xd2, 0x02, 0x66, 0x6f, 0x6f}, // schema ID 1234 + "foo"
|
|
expectOK: true,
|
|
expectID: 1234,
|
|
expectFormat: FormatAvro,
|
|
},
|
|
{
|
|
name: "too short message",
|
|
input: []byte{0x00, 0x00, 0x00},
|
|
expectOK: false,
|
|
},
|
|
{
|
|
name: "no magic byte",
|
|
input: []byte{0x01, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f},
|
|
expectOK: false,
|
|
},
|
|
{
|
|
name: "empty message",
|
|
input: []byte{},
|
|
expectOK: false,
|
|
},
|
|
{
|
|
name: "minimal valid message",
|
|
input: []byte{0x00, 0x00, 0x00, 0x00, 0x01}, // schema ID 1, empty payload
|
|
expectOK: true,
|
|
expectID: 1,
|
|
expectFormat: FormatAvro,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
envelope, ok := ParseConfluentEnvelope(tt.input)
|
|
|
|
if ok != tt.expectOK {
|
|
t.Errorf("ParseConfluentEnvelope() ok = %v, want %v", ok, tt.expectOK)
|
|
return
|
|
}
|
|
|
|
if !tt.expectOK {
|
|
return // No need to check further if we expected failure
|
|
}
|
|
|
|
if envelope.SchemaID != tt.expectID {
|
|
t.Errorf("ParseConfluentEnvelope() schemaID = %v, want %v", envelope.SchemaID, tt.expectID)
|
|
}
|
|
|
|
if envelope.Format != tt.expectFormat {
|
|
t.Errorf("ParseConfluentEnvelope() format = %v, want %v", envelope.Format, tt.expectFormat)
|
|
}
|
|
|
|
// Verify payload extraction
|
|
expectedPayloadLen := len(tt.input) - 5 // 5 bytes for magic + schema ID
|
|
if len(envelope.Payload) != expectedPayloadLen {
|
|
t.Errorf("ParseConfluentEnvelope() payload length = %v, want %v", len(envelope.Payload), expectedPayloadLen)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestIsSchematized(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
input []byte
|
|
expect bool
|
|
}{
|
|
{
|
|
name: "schematized message",
|
|
input: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f},
|
|
expect: true,
|
|
},
|
|
{
|
|
name: "non-schematized message",
|
|
input: []byte{0x48, 0x65, 0x6c, 0x6c, 0x6f}, // Just "Hello"
|
|
expect: false,
|
|
},
|
|
{
|
|
name: "empty message",
|
|
input: []byte{},
|
|
expect: false,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
result := IsSchematized(tt.input)
|
|
if result != tt.expect {
|
|
t.Errorf("IsSchematized() = %v, want %v", result, tt.expect)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestExtractSchemaID(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
input []byte
|
|
expectID uint32
|
|
expectOK bool
|
|
}{
|
|
{
|
|
name: "valid schema ID",
|
|
input: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f},
|
|
expectID: 1,
|
|
expectOK: true,
|
|
},
|
|
{
|
|
name: "large schema ID",
|
|
input: []byte{0x00, 0x00, 0x00, 0x04, 0xd2, 0x02, 0x66, 0x6f, 0x6f},
|
|
expectID: 1234,
|
|
expectOK: true,
|
|
},
|
|
{
|
|
name: "no magic byte",
|
|
input: []byte{0x01, 0x00, 0x00, 0x00, 0x01},
|
|
expectID: 0,
|
|
expectOK: false,
|
|
},
|
|
{
|
|
name: "too short",
|
|
input: []byte{0x00, 0x00},
|
|
expectID: 0,
|
|
expectOK: false,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
id, ok := ExtractSchemaID(tt.input)
|
|
|
|
if ok != tt.expectOK {
|
|
t.Errorf("ExtractSchemaID() ok = %v, want %v", ok, tt.expectOK)
|
|
}
|
|
|
|
if id != tt.expectID {
|
|
t.Errorf("ExtractSchemaID() id = %v, want %v", id, tt.expectID)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCreateConfluentEnvelope(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
format Format
|
|
schemaID uint32
|
|
indexes []int
|
|
payload []byte
|
|
expected []byte
|
|
}{
|
|
{
|
|
name: "simple Avro message",
|
|
format: FormatAvro,
|
|
schemaID: 1,
|
|
indexes: nil,
|
|
payload: []byte("Hello"),
|
|
expected: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f},
|
|
},
|
|
{
|
|
name: "large schema ID",
|
|
format: FormatAvro,
|
|
schemaID: 1234,
|
|
indexes: nil,
|
|
payload: []byte("foo"),
|
|
expected: []byte{0x00, 0x00, 0x00, 0x04, 0xd2, 0x66, 0x6f, 0x6f},
|
|
},
|
|
{
|
|
name: "empty payload",
|
|
format: FormatAvro,
|
|
schemaID: 5,
|
|
indexes: nil,
|
|
payload: []byte{},
|
|
expected: []byte{0x00, 0x00, 0x00, 0x00, 0x05},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
result := CreateConfluentEnvelope(tt.format, tt.schemaID, tt.indexes, tt.payload)
|
|
|
|
if len(result) != len(tt.expected) {
|
|
t.Errorf("CreateConfluentEnvelope() length = %v, want %v", len(result), len(tt.expected))
|
|
return
|
|
}
|
|
|
|
for i, b := range result {
|
|
if b != tt.expected[i] {
|
|
t.Errorf("CreateConfluentEnvelope() byte[%d] = %v, want %v", i, b, tt.expected[i])
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEnvelopeValidate(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
envelope *ConfluentEnvelope
|
|
expectErr bool
|
|
}{
|
|
{
|
|
name: "valid Avro envelope",
|
|
envelope: &ConfluentEnvelope{
|
|
Format: FormatAvro,
|
|
SchemaID: 1,
|
|
Payload: []byte("Hello"),
|
|
},
|
|
expectErr: false,
|
|
},
|
|
{
|
|
name: "zero schema ID",
|
|
envelope: &ConfluentEnvelope{
|
|
Format: FormatAvro,
|
|
SchemaID: 0,
|
|
Payload: []byte("Hello"),
|
|
},
|
|
expectErr: true,
|
|
},
|
|
{
|
|
name: "empty payload",
|
|
envelope: &ConfluentEnvelope{
|
|
Format: FormatAvro,
|
|
SchemaID: 1,
|
|
Payload: []byte{},
|
|
},
|
|
expectErr: true,
|
|
},
|
|
{
|
|
name: "unknown format",
|
|
envelope: &ConfluentEnvelope{
|
|
Format: FormatUnknown,
|
|
SchemaID: 1,
|
|
Payload: []byte("Hello"),
|
|
},
|
|
expectErr: true,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
err := tt.envelope.Validate()
|
|
|
|
if (err != nil) != tt.expectErr {
|
|
t.Errorf("Envelope.Validate() error = %v, expectErr %v", err, tt.expectErr)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEnvelopeMetadata(t *testing.T) {
|
|
envelope := &ConfluentEnvelope{
|
|
Format: FormatAvro,
|
|
SchemaID: 123,
|
|
Indexes: []int{1, 2, 3},
|
|
Payload: []byte("test"),
|
|
}
|
|
|
|
metadata := envelope.Metadata()
|
|
|
|
if metadata["schema_format"] != "AVRO" {
|
|
t.Errorf("Expected schema_format=AVRO, got %s", metadata["schema_format"])
|
|
}
|
|
|
|
if metadata["schema_id"] != "123" {
|
|
t.Errorf("Expected schema_id=123, got %s", metadata["schema_id"])
|
|
}
|
|
|
|
if metadata["protobuf_indexes"] != "1,2,3" {
|
|
t.Errorf("Expected protobuf_indexes=1,2,3, got %s", metadata["protobuf_indexes"])
|
|
}
|
|
}
|
|
|
|
// Benchmark tests for performance
|
|
func BenchmarkParseConfluentEnvelope(b *testing.B) {
|
|
// Create a test message
|
|
testMsg := make([]byte, 1024)
|
|
testMsg[0] = 0x00 // Magic byte
|
|
binary.BigEndian.PutUint32(testMsg[1:5], 123) // Schema ID
|
|
// Fill rest with dummy data
|
|
for i := 5; i < len(testMsg); i++ {
|
|
testMsg[i] = byte(i % 256)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_, _ = ParseConfluentEnvelope(testMsg)
|
|
}
|
|
}
|
|
|
|
func BenchmarkIsSchematized(b *testing.B) {
|
|
testMsg := []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_ = IsSchematized(testMsg)
|
|
}
|
|
}
|