1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/schema/avro_decoder_test.go
2025-09-13 08:06:36 -07:00

542 lines
13 KiB
Go

package schema
import (
"reflect"
"testing"
"time"
"github.com/linkedin/goavro/v2"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func TestNewAvroDecoder(t *testing.T) {
tests := []struct {
name string
schema string
expectErr bool
}{
{
name: "valid record schema",
schema: `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`,
expectErr: false,
},
{
name: "valid enum schema",
schema: `{
"type": "enum",
"name": "Color",
"symbols": ["RED", "GREEN", "BLUE"]
}`,
expectErr: false,
},
{
name: "invalid schema",
schema: `{"invalid": "schema"}`,
expectErr: true,
},
{
name: "empty schema",
schema: "",
expectErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
decoder, err := NewAvroDecoder(tt.schema)
if (err != nil) != tt.expectErr {
t.Errorf("NewAvroDecoder() error = %v, expectErr %v", err, tt.expectErr)
return
}
if !tt.expectErr && decoder == nil {
t.Error("Expected non-nil decoder for valid schema")
}
})
}
}
func TestAvroDecoder_Decode(t *testing.T) {
schema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}`
decoder, err := NewAvroDecoder(schema)
if err != nil {
t.Fatalf("Failed to create decoder: %v", err)
}
// Create test data
codec, _ := goavro.NewCodec(schema)
testRecord := map[string]interface{}{
"id": int32(123),
"name": "John Doe",
"email": map[string]interface{}{
"string": "john@example.com", // Avro union format
},
}
// Encode to binary
binary, err := codec.BinaryFromNative(nil, testRecord)
if err != nil {
t.Fatalf("Failed to encode test data: %v", err)
}
// Test decoding
result, err := decoder.Decode(binary)
if err != nil {
t.Fatalf("Failed to decode: %v", err)
}
// Verify results
if result["id"] != int32(123) {
t.Errorf("Expected id=123, got %v", result["id"])
}
if result["name"] != "John Doe" {
t.Errorf("Expected name='John Doe', got %v", result["name"])
}
// For union types, Avro returns a map with the type name as key
if emailMap, ok := result["email"].(map[string]interface{}); ok {
if emailMap["string"] != "john@example.com" {
t.Errorf("Expected email='john@example.com', got %v", emailMap["string"])
}
} else {
t.Errorf("Expected email to be a union map, got %v", result["email"])
}
}
func TestAvroDecoder_DecodeToRecordValue(t *testing.T) {
schema := `{
"type": "record",
"name": "SimpleRecord",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
decoder, err := NewAvroDecoder(schema)
if err != nil {
t.Fatalf("Failed to create decoder: %v", err)
}
// Create and encode test data
codec, _ := goavro.NewCodec(schema)
testRecord := map[string]interface{}{
"id": int32(456),
"name": "Jane Smith",
}
binary, err := codec.BinaryFromNative(nil, testRecord)
if err != nil {
t.Fatalf("Failed to encode test data: %v", err)
}
// Test decoding to RecordValue
recordValue, err := decoder.DecodeToRecordValue(binary)
if err != nil {
t.Fatalf("Failed to decode to RecordValue: %v", err)
}
// Verify RecordValue structure
if recordValue.Fields == nil {
t.Fatal("Expected non-nil fields")
}
idValue := recordValue.Fields["id"]
if idValue == nil {
t.Fatal("Expected id field")
}
if idValue.GetInt32Value() != 456 {
t.Errorf("Expected id=456, got %v", idValue.GetInt32Value())
}
nameValue := recordValue.Fields["name"]
if nameValue == nil {
t.Fatal("Expected name field")
}
if nameValue.GetStringValue() != "Jane Smith" {
t.Errorf("Expected name='Jane Smith', got %v", nameValue.GetStringValue())
}
}
func TestMapToRecordValue(t *testing.T) {
testMap := map[string]interface{}{
"bool_field": true,
"int32_field": int32(123),
"int64_field": int64(456),
"float_field": float32(1.23),
"double_field": float64(4.56),
"string_field": "hello",
"bytes_field": []byte("world"),
"null_field": nil,
"array_field": []interface{}{"a", "b", "c"},
"nested_field": map[string]interface{}{
"inner": "value",
},
}
recordValue := MapToRecordValue(testMap)
// Test each field type
if !recordValue.Fields["bool_field"].GetBoolValue() {
t.Error("Expected bool_field=true")
}
if recordValue.Fields["int32_field"].GetInt32Value() != 123 {
t.Error("Expected int32_field=123")
}
if recordValue.Fields["int64_field"].GetInt64Value() != 456 {
t.Error("Expected int64_field=456")
}
if recordValue.Fields["float_field"].GetFloatValue() != 1.23 {
t.Error("Expected float_field=1.23")
}
if recordValue.Fields["double_field"].GetDoubleValue() != 4.56 {
t.Error("Expected double_field=4.56")
}
if recordValue.Fields["string_field"].GetStringValue() != "hello" {
t.Error("Expected string_field='hello'")
}
if string(recordValue.Fields["bytes_field"].GetBytesValue()) != "world" {
t.Error("Expected bytes_field='world'")
}
// Test null value (converted to empty string)
if recordValue.Fields["null_field"].GetStringValue() != "" {
t.Error("Expected null_field to be empty string")
}
// Test array
arrayValue := recordValue.Fields["array_field"].GetListValue()
if arrayValue == nil || len(arrayValue.Values) != 3 {
t.Error("Expected array with 3 elements")
}
// Test nested record
nestedValue := recordValue.Fields["nested_field"].GetRecordValue()
if nestedValue == nil {
t.Fatal("Expected nested record")
}
if nestedValue.Fields["inner"].GetStringValue() != "value" {
t.Error("Expected nested inner='value'")
}
}
func TestGoValueToSchemaValue(t *testing.T) {
tests := []struct {
name string
input interface{}
expected func(*schema_pb.Value) bool
}{
{
name: "nil value",
input: nil,
expected: func(v *schema_pb.Value) bool {
return v.GetStringValue() == ""
},
},
{
name: "bool value",
input: true,
expected: func(v *schema_pb.Value) bool {
return v.GetBoolValue() == true
},
},
{
name: "int32 value",
input: int32(123),
expected: func(v *schema_pb.Value) bool {
return v.GetInt32Value() == 123
},
},
{
name: "int64 value",
input: int64(456),
expected: func(v *schema_pb.Value) bool {
return v.GetInt64Value() == 456
},
},
{
name: "string value",
input: "test",
expected: func(v *schema_pb.Value) bool {
return v.GetStringValue() == "test"
},
},
{
name: "bytes value",
input: []byte("data"),
expected: func(v *schema_pb.Value) bool {
return string(v.GetBytesValue()) == "data"
},
},
{
name: "time value",
input: time.Unix(1234567890, 0),
expected: func(v *schema_pb.Value) bool {
return v.GetTimestampValue() != nil
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := goValueToSchemaValue(tt.input)
if !tt.expected(result) {
t.Errorf("goValueToSchemaValue() failed for %v", tt.input)
}
})
}
}
func TestInferRecordTypeFromMap(t *testing.T) {
testMap := map[string]interface{}{
"id": int64(123),
"name": "test",
"active": true,
"score": float64(95.5),
"tags": []interface{}{"tag1", "tag2"},
"metadata": map[string]interface{}{"key": "value"},
}
recordType := InferRecordTypeFromMap(testMap)
if len(recordType.Fields) != 6 {
t.Errorf("Expected 6 fields, got %d", len(recordType.Fields))
}
// Create a map for easier field lookup
fieldMap := make(map[string]*schema_pb.Field)
for _, field := range recordType.Fields {
fieldMap[field.Name] = field
}
// Test field types
if fieldMap["id"].Type.GetScalarType() != schema_pb.ScalarType_INT64 {
t.Error("Expected id field to be INT64")
}
if fieldMap["name"].Type.GetScalarType() != schema_pb.ScalarType_STRING {
t.Error("Expected name field to be STRING")
}
if fieldMap["active"].Type.GetScalarType() != schema_pb.ScalarType_BOOL {
t.Error("Expected active field to be BOOL")
}
if fieldMap["score"].Type.GetScalarType() != schema_pb.ScalarType_DOUBLE {
t.Error("Expected score field to be DOUBLE")
}
// Test array field
if fieldMap["tags"].Type.GetListType() == nil {
t.Error("Expected tags field to be LIST")
}
// Test nested record field
if fieldMap["metadata"].Type.GetRecordType() == nil {
t.Error("Expected metadata field to be RECORD")
}
}
func TestInferTypeFromValue(t *testing.T) {
tests := []struct {
name string
input interface{}
expected schema_pb.ScalarType
}{
{"nil", nil, schema_pb.ScalarType_STRING}, // Default for nil
{"bool", true, schema_pb.ScalarType_BOOL},
{"int32", int32(123), schema_pb.ScalarType_INT32},
{"int64", int64(456), schema_pb.ScalarType_INT64},
{"int", int(789), schema_pb.ScalarType_INT64},
{"float32", float32(1.23), schema_pb.ScalarType_FLOAT},
{"float64", float64(4.56), schema_pb.ScalarType_DOUBLE},
{"string", "test", schema_pb.ScalarType_STRING},
{"bytes", []byte("data"), schema_pb.ScalarType_BYTES},
{"time", time.Now(), schema_pb.ScalarType_TIMESTAMP},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := inferTypeFromValue(tt.input)
// Handle special cases
if tt.input == nil || reflect.TypeOf(tt.input).Kind() == reflect.Slice ||
reflect.TypeOf(tt.input).Kind() == reflect.Map {
// Skip scalar type check for complex types
return
}
if result.GetScalarType() != tt.expected {
t.Errorf("inferTypeFromValue() = %v, want %v", result.GetScalarType(), tt.expected)
}
})
}
}
// Integration test with real Avro data
func TestAvroDecoder_Integration(t *testing.T) {
// Complex Avro schema with nested records and arrays
schema := `{
"type": "record",
"name": "Order",
"fields": [
{"name": "id", "type": "string"},
{"name": "customer_id", "type": "int"},
{"name": "total", "type": "double"},
{"name": "items", "type": {
"type": "array",
"items": {
"type": "record",
"name": "Item",
"fields": [
{"name": "product_id", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}},
{"name": "metadata", "type": {
"type": "record",
"name": "Metadata",
"fields": [
{"name": "source", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}}
]
}`
decoder, err := NewAvroDecoder(schema)
if err != nil {
t.Fatalf("Failed to create decoder: %v", err)
}
// Create complex test data
codec, _ := goavro.NewCodec(schema)
testOrder := map[string]interface{}{
"id": "order-123",
"customer_id": int32(456),
"total": float64(99.99),
"items": []interface{}{
map[string]interface{}{
"product_id": "prod-1",
"quantity": int32(2),
"price": float64(29.99),
},
map[string]interface{}{
"product_id": "prod-2",
"quantity": int32(1),
"price": float64(39.99),
},
},
"metadata": map[string]interface{}{
"source": "web",
"timestamp": int64(1234567890),
},
}
// Encode to binary
binary, err := codec.BinaryFromNative(nil, testOrder)
if err != nil {
t.Fatalf("Failed to encode test data: %v", err)
}
// Decode to RecordValue
recordValue, err := decoder.DecodeToRecordValue(binary)
if err != nil {
t.Fatalf("Failed to decode to RecordValue: %v", err)
}
// Verify complex structure
if recordValue.Fields["id"].GetStringValue() != "order-123" {
t.Error("Expected order ID to be preserved")
}
if recordValue.Fields["customer_id"].GetInt32Value() != 456 {
t.Error("Expected customer ID to be preserved")
}
// Check array handling
itemsArray := recordValue.Fields["items"].GetListValue()
if itemsArray == nil || len(itemsArray.Values) != 2 {
t.Fatal("Expected items array with 2 elements")
}
// Check nested record handling
metadataRecord := recordValue.Fields["metadata"].GetRecordValue()
if metadataRecord == nil {
t.Fatal("Expected metadata record")
}
if metadataRecord.Fields["source"].GetStringValue() != "web" {
t.Error("Expected metadata source to be preserved")
}
}
// Benchmark tests
func BenchmarkAvroDecoder_Decode(b *testing.B) {
schema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}`
decoder, _ := NewAvroDecoder(schema)
codec, _ := goavro.NewCodec(schema)
testRecord := map[string]interface{}{
"id": int32(123),
"name": "John Doe",
}
binary, _ := codec.BinaryFromNative(nil, testRecord)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = decoder.Decode(binary)
}
}
func BenchmarkMapToRecordValue(b *testing.B) {
testMap := map[string]interface{}{
"id": int64(123),
"name": "test",
"active": true,
"score": float64(95.5),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = MapToRecordValue(testMap)
}
}