1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/schema/manager_evolution_test.go
chrislu deb315a8a9 persist kafka offset
Phase E2: Integrate Protobuf descriptor parser with decoder

- Update NewProtobufDecoder to use ProtobufDescriptorParser
- Add findFirstMessageName helper for automatic message detection
- Fix ParseBinaryDescriptor to return schema even on resolution failure
- Add comprehensive tests for protobuf decoder integration
- Improve error handling and caching behavior

This enables proper binary descriptor parsing in the protobuf decoder,
completing the integration between descriptor parsing and decoding.

Phase E3: Complete Protobuf message descriptor resolution

- Implement full protobuf descriptor resolution using protoreflect API
- Add buildFileDescriptor and findMessageInFileDescriptor methods
- Support nested message resolution with findNestedMessageDescriptor
- Add proper mutex protection for thread-safe cache access
- Update all test data to use proper field cardinality labels
- Update test expectations to handle successful descriptor resolution
- Enable full protobuf decoder creation from binary descriptors

Phase E (Protobuf Support) is now complete:
 E1: Binary descriptor parsing
 E2: Decoder integration
 E3: Full message descriptor resolution

Protobuf messages can now be fully parsed and decoded

Phase F: Implement Kafka record batch compression support

- Add comprehensive compression module supporting gzip/snappy/lz4/zstd
- Implement RecordBatchParser with full compression and CRC validation
- Support compression codec extraction from record batch attributes
- Add compression/decompression for all major Kafka codecs
- Integrate compression support into Produce and Fetch handlers
- Add extensive unit tests for all compression codecs
- Support round-trip compression/decompression with proper error handling
- Add performance benchmarks for compression operations

Key features:
 Gzip compression (ratio: 0.02)
 Snappy compression (ratio: 0.06, fastest)
 LZ4 compression (ratio: 0.02)
 Zstd compression (ratio: 0.01, best compression)
 CRC32 validation for record batch integrity
 Proper Kafka record batch format v2 parsing
 Backward compatibility with uncompressed records

Phase F (Compression Handling) is now complete.

Phase G: Implement advanced schema compatibility checking and migration

- Add comprehensive SchemaEvolutionChecker with full compatibility rules
- Support BACKWARD, FORWARD, FULL, and NONE compatibility levels
- Implement Avro schema compatibility checking with field analysis
- Add JSON Schema compatibility validation
- Support Protobuf compatibility checking (simplified implementation)
- Add type promotion rules (int->long, float->double, string<->bytes)
- Integrate schema evolution into Manager with validation methods
- Add schema evolution suggestions and migration guidance
- Support schema compatibility validation before evolution
- Add comprehensive unit tests for all compatibility scenarios

Key features:
 BACKWARD compatibility: New schema can read old data
 FORWARD compatibility: Old schema can read new data
 FULL compatibility: Both backward and forward compatible
 Type promotion support for safe schema evolution
 Field addition/removal validation with default value checks
 Schema evolution suggestions for incompatible changes
 Integration with schema registry for validation workflows

Phase G (Schema Evolution) is now complete.

fmt
2025-09-11 19:53:00 -07:00

344 lines
8.5 KiB
Go

package schema
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestManager_SchemaEvolution tests schema evolution integration in the manager
func TestManager_SchemaEvolution(t *testing.T) {
// Create a manager without registry (for testing evolution logic only)
manager := &Manager{
evolutionChecker: NewSchemaEvolutionChecker(),
}
t.Run("Compatible Avro evolution", func(t *testing.T) {
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""}
]
}`
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
require.NoError(t, err)
assert.True(t, result.Compatible)
assert.Empty(t, result.Issues)
})
t.Run("Incompatible Avro evolution", func(t *testing.T) {
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
require.NoError(t, err)
assert.False(t, result.Compatible)
assert.NotEmpty(t, result.Issues)
assert.Contains(t, result.Issues[0], "Field 'email' was removed")
})
t.Run("Schema evolution suggestions", func(t *testing.T) {
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}`
suggestions, err := manager.SuggestSchemaEvolution(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
require.NoError(t, err)
assert.NotEmpty(t, suggestions)
// Should suggest adding default values
found := false
for _, suggestion := range suggestions {
if strings.Contains(suggestion, "default") {
found = true
break
}
}
assert.True(t, found, "Should suggest adding default values, got: %v", suggestions)
})
t.Run("JSON Schema evolution", func(t *testing.T) {
oldSchema := `{
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"}
},
"required": ["id", "name"]
}`
newSchema := `{
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"email": {"type": "string"}
},
"required": ["id", "name"]
}`
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatJSONSchema, CompatibilityBackward)
require.NoError(t, err)
assert.True(t, result.Compatible)
})
t.Run("Full compatibility check", func(t *testing.T) {
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""}
]
}`
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityFull)
require.NoError(t, err)
assert.True(t, result.Compatible)
})
t.Run("Type promotion compatibility", func(t *testing.T) {
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "score", "type": "int"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "score", "type": "long"}
]
}`
result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
require.NoError(t, err)
assert.True(t, result.Compatible)
})
}
// TestManager_CompatibilityLevels tests compatibility level management
func TestManager_CompatibilityLevels(t *testing.T) {
manager := &Manager{
evolutionChecker: NewSchemaEvolutionChecker(),
}
t.Run("Get default compatibility level", func(t *testing.T) {
level := manager.GetCompatibilityLevel("test-subject")
assert.Equal(t, CompatibilityBackward, level)
})
t.Run("Set compatibility level", func(t *testing.T) {
err := manager.SetCompatibilityLevel("test-subject", CompatibilityFull)
assert.NoError(t, err)
})
}
// TestManager_CanEvolveSchema tests the CanEvolveSchema method
func TestManager_CanEvolveSchema(t *testing.T) {
manager := &Manager{
evolutionChecker: NewSchemaEvolutionChecker(),
}
t.Run("Compatible evolution", func(t *testing.T) {
currentSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""}
]
}`
result, err := manager.CanEvolveSchema("test-subject", currentSchema, newSchema, FormatAvro)
require.NoError(t, err)
assert.True(t, result.Compatible)
})
t.Run("Incompatible evolution", func(t *testing.T) {
currentSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
result, err := manager.CanEvolveSchema("test-subject", currentSchema, newSchema, FormatAvro)
require.NoError(t, err)
assert.False(t, result.Compatible)
assert.Contains(t, result.Issues[0], "Field 'email' was removed")
})
}
// TestManager_SchemaEvolutionWorkflow tests a complete schema evolution workflow
func TestManager_SchemaEvolutionWorkflow(t *testing.T) {
manager := &Manager{
evolutionChecker: NewSchemaEvolutionChecker(),
}
t.Run("Complete evolution workflow", func(t *testing.T) {
// Step 1: Define initial schema
initialSchema := `{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "userId", "type": "int"},
{"name": "action", "type": "string"}
]
}`
// Step 2: Propose schema evolution (compatible)
evolvedSchema := `{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "userId", "type": "int"},
{"name": "action", "type": "string"},
{"name": "timestamp", "type": "long", "default": 0}
]
}`
// Check compatibility explicitly
result, err := manager.CanEvolveSchema("user-events", initialSchema, evolvedSchema, FormatAvro)
require.NoError(t, err)
assert.True(t, result.Compatible)
// Step 3: Try incompatible evolution
incompatibleSchema := `{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "userId", "type": "int"}
]
}`
result, err = manager.CanEvolveSchema("user-events", initialSchema, incompatibleSchema, FormatAvro)
require.NoError(t, err)
assert.False(t, result.Compatible)
assert.Contains(t, result.Issues[0], "Field 'action' was removed")
// Step 4: Get suggestions for incompatible evolution
suggestions, err := manager.SuggestSchemaEvolution(initialSchema, incompatibleSchema, FormatAvro, CompatibilityBackward)
require.NoError(t, err)
assert.NotEmpty(t, suggestions)
})
}
// BenchmarkSchemaEvolution benchmarks schema evolution operations
func BenchmarkSchemaEvolution(b *testing.B) {
manager := &Manager{
evolutionChecker: NewSchemaEvolutionChecker(),
}
oldSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""}
]
}`
newSchema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string", "default": ""},
{"name": "age", "type": "int", "default": 0}
]
}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
if err != nil {
b.Fatal(err)
}
}
}