mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
719 lines
20 KiB
Go
719 lines
20 KiB
Go
package schema
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/linkedin/goavro/v2"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
// AvroDecoder handles Avro schema decoding and conversion to SeaweedMQ format
|
|
type AvroDecoder struct {
|
|
codec *goavro.Codec
|
|
}
|
|
|
|
// NewAvroDecoder creates a new Avro decoder from a schema string
|
|
func NewAvroDecoder(schemaStr string) (*AvroDecoder, error) {
|
|
codec, err := goavro.NewCodec(schemaStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create Avro codec: %w", err)
|
|
}
|
|
|
|
return &AvroDecoder{
|
|
codec: codec,
|
|
}, nil
|
|
}
|
|
|
|
// Decode decodes Avro binary data to a Go map
|
|
func (ad *AvroDecoder) Decode(data []byte) (map[string]interface{}, error) {
|
|
native, _, err := ad.codec.NativeFromBinary(data)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode Avro data: %w", err)
|
|
}
|
|
|
|
// Convert to map[string]interface{} for easier processing
|
|
result, ok := native.(map[string]interface{})
|
|
if !ok {
|
|
return nil, fmt.Errorf("expected Avro record, got %T", native)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// DecodeToRecordValue decodes Avro data directly to SeaweedMQ RecordValue
|
|
func (ad *AvroDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) {
|
|
nativeMap, err := ad.Decode(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return MapToRecordValue(nativeMap), nil
|
|
}
|
|
|
|
// InferRecordType infers a SeaweedMQ RecordType from an Avro schema
|
|
func (ad *AvroDecoder) InferRecordType() (*schema_pb.RecordType, error) {
|
|
schema := ad.codec.Schema()
|
|
return avroSchemaToRecordType(schema)
|
|
}
|
|
|
|
// MapToRecordValue converts a Go map to SeaweedMQ RecordValue
|
|
func MapToRecordValue(m map[string]interface{}) *schema_pb.RecordValue {
|
|
fields := make(map[string]*schema_pb.Value)
|
|
|
|
for key, value := range m {
|
|
fields[key] = goValueToSchemaValue(value)
|
|
}
|
|
|
|
return &schema_pb.RecordValue{
|
|
Fields: fields,
|
|
}
|
|
}
|
|
|
|
// goValueToSchemaValue converts a Go value to a SeaweedMQ Value
|
|
func goValueToSchemaValue(value interface{}) *schema_pb.Value {
|
|
if value == nil {
|
|
// For null values, use an empty string as default
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: ""},
|
|
}
|
|
}
|
|
|
|
switch v := value.(type) {
|
|
case bool:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BoolValue{BoolValue: v},
|
|
}
|
|
case int32:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int32Value{Int32Value: v},
|
|
}
|
|
case int64:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: v},
|
|
}
|
|
case int:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)},
|
|
}
|
|
case float32:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_FloatValue{FloatValue: v},
|
|
}
|
|
case float64:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_DoubleValue{DoubleValue: v},
|
|
}
|
|
case string:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: v},
|
|
}
|
|
case []byte:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: v},
|
|
}
|
|
case time.Time:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_TimestampValue{
|
|
TimestampValue: &schema_pb.TimestampValue{
|
|
TimestampMicros: v.UnixMicro(),
|
|
IsUtc: true,
|
|
},
|
|
},
|
|
}
|
|
case []interface{}:
|
|
// Handle arrays
|
|
listValues := make([]*schema_pb.Value, len(v))
|
|
for i, item := range v {
|
|
listValues[i] = goValueToSchemaValue(item)
|
|
}
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_ListValue{
|
|
ListValue: &schema_pb.ListValue{
|
|
Values: listValues,
|
|
},
|
|
},
|
|
}
|
|
case map[string]interface{}:
|
|
// Check if this is an Avro union type (single key-value pair with type name as key)
|
|
// Union types have keys that are typically Avro type names like "int", "string", etc.
|
|
// Regular nested records would have meaningful field names like "inner", "name", etc.
|
|
if len(v) == 1 {
|
|
for unionType, unionValue := range v {
|
|
// Handle common Avro union type patterns (only if key looks like a type name)
|
|
switch unionType {
|
|
case "int":
|
|
if intVal, ok := unionValue.(int32); ok {
|
|
// Store union as a record with the union type as field name
|
|
// This preserves the union information for re-encoding
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_RecordValue{
|
|
RecordValue: &schema_pb.RecordValue{
|
|
Fields: map[string]*schema_pb.Value{
|
|
"int": {
|
|
Kind: &schema_pb.Value_Int32Value{Int32Value: intVal},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
case "long":
|
|
if longVal, ok := unionValue.(int64); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_RecordValue{
|
|
RecordValue: &schema_pb.RecordValue{
|
|
Fields: map[string]*schema_pb.Value{
|
|
"long": {
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: longVal},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
case "float":
|
|
if floatVal, ok := unionValue.(float32); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_RecordValue{
|
|
RecordValue: &schema_pb.RecordValue{
|
|
Fields: map[string]*schema_pb.Value{
|
|
"float": {
|
|
Kind: &schema_pb.Value_FloatValue{FloatValue: floatVal},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
case "double":
|
|
if doubleVal, ok := unionValue.(float64); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_RecordValue{
|
|
RecordValue: &schema_pb.RecordValue{
|
|
Fields: map[string]*schema_pb.Value{
|
|
"double": {
|
|
Kind: &schema_pb.Value_DoubleValue{DoubleValue: doubleVal},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
case "string":
|
|
if strVal, ok := unionValue.(string); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_RecordValue{
|
|
RecordValue: &schema_pb.RecordValue{
|
|
Fields: map[string]*schema_pb.Value{
|
|
"string": {
|
|
Kind: &schema_pb.Value_StringValue{StringValue: strVal},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
case "boolean":
|
|
if boolVal, ok := unionValue.(bool); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_RecordValue{
|
|
RecordValue: &schema_pb.RecordValue{
|
|
Fields: map[string]*schema_pb.Value{
|
|
"boolean": {
|
|
Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
// If it's not a recognized union type, fall through to treat as nested record
|
|
}
|
|
}
|
|
|
|
// Handle nested records (both single-field and multi-field maps)
|
|
fields := make(map[string]*schema_pb.Value)
|
|
for key, val := range v {
|
|
fields[key] = goValueToSchemaValue(val)
|
|
}
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_RecordValue{
|
|
RecordValue: &schema_pb.RecordValue{
|
|
Fields: fields,
|
|
},
|
|
},
|
|
}
|
|
default:
|
|
// Handle other types by converting to string
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{
|
|
StringValue: fmt.Sprintf("%v", v),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// avroSchemaToRecordType converts an Avro schema to SeaweedMQ RecordType
|
|
func avroSchemaToRecordType(schemaStr string) (*schema_pb.RecordType, error) {
|
|
// Validate the Avro schema by creating a codec (this ensures it's valid)
|
|
_, err := goavro.NewCodec(schemaStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse Avro schema: %w", err)
|
|
}
|
|
|
|
// Parse the schema JSON to extract field definitions
|
|
var avroSchema map[string]interface{}
|
|
if err := json.Unmarshal([]byte(schemaStr), &avroSchema); err != nil {
|
|
return nil, fmt.Errorf("failed to parse Avro schema JSON: %w", err)
|
|
}
|
|
|
|
// Extract fields from the Avro schema
|
|
fields, err := extractAvroFields(avroSchema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to extract Avro fields: %w", err)
|
|
}
|
|
|
|
return &schema_pb.RecordType{
|
|
Fields: fields,
|
|
}, nil
|
|
}
|
|
|
|
// extractAvroFields extracts field definitions from parsed Avro schema JSON
|
|
func extractAvroFields(avroSchema map[string]interface{}) ([]*schema_pb.Field, error) {
|
|
// Check if this is a record type
|
|
schemaType, ok := avroSchema["type"].(string)
|
|
if !ok || schemaType != "record" {
|
|
return nil, fmt.Errorf("expected record type, got %v", schemaType)
|
|
}
|
|
|
|
// Extract fields array
|
|
fieldsInterface, ok := avroSchema["fields"]
|
|
if !ok {
|
|
return nil, fmt.Errorf("no fields found in Avro record schema")
|
|
}
|
|
|
|
fieldsArray, ok := fieldsInterface.([]interface{})
|
|
if !ok {
|
|
return nil, fmt.Errorf("fields must be an array")
|
|
}
|
|
|
|
// Convert each Avro field to SeaweedMQ field
|
|
fields := make([]*schema_pb.Field, 0, len(fieldsArray))
|
|
for i, fieldInterface := range fieldsArray {
|
|
fieldMap, ok := fieldInterface.(map[string]interface{})
|
|
if !ok {
|
|
return nil, fmt.Errorf("field %d is not a valid object", i)
|
|
}
|
|
|
|
field, err := convertAvroFieldToSeaweedMQ(fieldMap, int32(i))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to convert field %d: %w", i, err)
|
|
}
|
|
|
|
fields = append(fields, field)
|
|
}
|
|
|
|
return fields, nil
|
|
}
|
|
|
|
// convertAvroFieldToSeaweedMQ converts a single Avro field to SeaweedMQ Field
|
|
func convertAvroFieldToSeaweedMQ(avroField map[string]interface{}, fieldIndex int32) (*schema_pb.Field, error) {
|
|
// Extract field name
|
|
name, ok := avroField["name"].(string)
|
|
if !ok {
|
|
return nil, fmt.Errorf("field name is required")
|
|
}
|
|
|
|
// Extract field type and check if it's an array
|
|
fieldType, isRepeated, err := convertAvroTypeToSeaweedMQWithRepeated(avroField["type"])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to convert field type for %s: %w", name, err)
|
|
}
|
|
|
|
// Check if field has a default value (indicates it's optional)
|
|
_, hasDefault := avroField["default"]
|
|
isRequired := !hasDefault
|
|
|
|
return &schema_pb.Field{
|
|
Name: name,
|
|
FieldIndex: fieldIndex,
|
|
Type: fieldType,
|
|
IsRequired: isRequired,
|
|
IsRepeated: isRepeated,
|
|
}, nil
|
|
}
|
|
|
|
// convertAvroTypeToSeaweedMQ converts Avro type to SeaweedMQ Type
|
|
func convertAvroTypeToSeaweedMQ(avroType interface{}) (*schema_pb.Type, error) {
|
|
fieldType, _, err := convertAvroTypeToSeaweedMQWithRepeated(avroType)
|
|
return fieldType, err
|
|
}
|
|
|
|
// convertAvroTypeToSeaweedMQWithRepeated converts Avro type to SeaweedMQ Type and returns if it's repeated
|
|
func convertAvroTypeToSeaweedMQWithRepeated(avroType interface{}) (*schema_pb.Type, bool, error) {
|
|
switch t := avroType.(type) {
|
|
case string:
|
|
// Simple type
|
|
fieldType, err := convertAvroSimpleType(t)
|
|
return fieldType, false, err
|
|
|
|
case map[string]interface{}:
|
|
// Complex type (record, enum, array, map, fixed)
|
|
return convertAvroComplexTypeWithRepeated(t)
|
|
|
|
case []interface{}:
|
|
// Union type
|
|
fieldType, err := convertAvroUnionType(t)
|
|
return fieldType, false, err
|
|
|
|
default:
|
|
return nil, false, fmt.Errorf("unsupported Avro type: %T", avroType)
|
|
}
|
|
}
|
|
|
|
// convertAvroSimpleType converts simple Avro types to SeaweedMQ types
|
|
func convertAvroSimpleType(avroType string) (*schema_pb.Type, error) {
|
|
switch avroType {
|
|
case "null":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BYTES, // Use bytes for null
|
|
},
|
|
}, nil
|
|
case "boolean":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BOOL,
|
|
},
|
|
}, nil
|
|
case "int":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT32,
|
|
},
|
|
}, nil
|
|
case "long":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT64,
|
|
},
|
|
}, nil
|
|
case "float":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_FLOAT,
|
|
},
|
|
}, nil
|
|
case "double":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_DOUBLE,
|
|
},
|
|
}, nil
|
|
case "bytes":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BYTES,
|
|
},
|
|
}, nil
|
|
case "string":
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}, nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupported simple Avro type: %s", avroType)
|
|
}
|
|
}
|
|
|
|
// convertAvroComplexType converts complex Avro types to SeaweedMQ types
|
|
func convertAvroComplexType(avroType map[string]interface{}) (*schema_pb.Type, error) {
|
|
fieldType, _, err := convertAvroComplexTypeWithRepeated(avroType)
|
|
return fieldType, err
|
|
}
|
|
|
|
// convertAvroComplexTypeWithRepeated converts complex Avro types to SeaweedMQ types and returns if it's repeated
|
|
func convertAvroComplexTypeWithRepeated(avroType map[string]interface{}) (*schema_pb.Type, bool, error) {
|
|
typeStr, ok := avroType["type"].(string)
|
|
if !ok {
|
|
return nil, false, fmt.Errorf("complex type must have a type field")
|
|
}
|
|
|
|
// Handle logical types - they are based on underlying primitive types
|
|
if _, hasLogicalType := avroType["logicalType"]; hasLogicalType {
|
|
// For logical types, use the underlying primitive type
|
|
return convertAvroSimpleTypeWithLogical(typeStr, avroType)
|
|
}
|
|
|
|
switch typeStr {
|
|
case "record":
|
|
// Nested record type
|
|
fields, err := extractAvroFields(avroType)
|
|
if err != nil {
|
|
return nil, false, fmt.Errorf("failed to extract nested record fields: %w", err)
|
|
}
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_RecordType{
|
|
RecordType: &schema_pb.RecordType{
|
|
Fields: fields,
|
|
},
|
|
},
|
|
}, false, nil
|
|
|
|
case "enum":
|
|
// Enum type - treat as string for now
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}, false, nil
|
|
|
|
case "array":
|
|
// Array type
|
|
itemsType, err := convertAvroTypeToSeaweedMQ(avroType["items"])
|
|
if err != nil {
|
|
return nil, false, fmt.Errorf("failed to convert array items type: %w", err)
|
|
}
|
|
// For arrays, we return the item type and set IsRepeated=true
|
|
return itemsType, true, nil
|
|
|
|
case "map":
|
|
// Map type - treat as record with dynamic fields
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_RecordType{
|
|
RecordType: &schema_pb.RecordType{
|
|
Fields: []*schema_pb.Field{}, // Dynamic fields
|
|
},
|
|
},
|
|
}, false, nil
|
|
|
|
case "fixed":
|
|
// Fixed-length bytes
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BYTES,
|
|
},
|
|
}, false, nil
|
|
|
|
default:
|
|
return nil, false, fmt.Errorf("unsupported complex Avro type: %s", typeStr)
|
|
}
|
|
}
|
|
|
|
// convertAvroSimpleTypeWithLogical handles logical types based on their underlying primitive types
|
|
func convertAvroSimpleTypeWithLogical(primitiveType string, avroType map[string]interface{}) (*schema_pb.Type, bool, error) {
|
|
logicalType, _ := avroType["logicalType"].(string)
|
|
|
|
// Map logical types to appropriate SeaweedMQ types
|
|
switch logicalType {
|
|
case "decimal":
|
|
// Decimal logical type - use bytes for precision
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BYTES,
|
|
},
|
|
}, false, nil
|
|
case "uuid":
|
|
// UUID logical type - use string
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}, false, nil
|
|
case "date":
|
|
// Date logical type (int) - use int32
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT32,
|
|
},
|
|
}, false, nil
|
|
case "time-millis":
|
|
// Time in milliseconds (int) - use int32
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT32,
|
|
},
|
|
}, false, nil
|
|
case "time-micros":
|
|
// Time in microseconds (long) - use int64
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT64,
|
|
},
|
|
}, false, nil
|
|
case "timestamp-millis":
|
|
// Timestamp in milliseconds (long) - use int64
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT64,
|
|
},
|
|
}, false, nil
|
|
case "timestamp-micros":
|
|
// Timestamp in microseconds (long) - use int64
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT64,
|
|
},
|
|
}, false, nil
|
|
default:
|
|
// For unknown logical types, fall back to the underlying primitive type
|
|
fieldType, err := convertAvroSimpleType(primitiveType)
|
|
return fieldType, false, err
|
|
}
|
|
}
|
|
|
|
// convertAvroUnionType converts Avro union types to SeaweedMQ types
|
|
func convertAvroUnionType(unionTypes []interface{}) (*schema_pb.Type, error) {
|
|
// For unions, we'll use the first non-null type
|
|
// This is a simplification - in a full implementation, we might want to create a union type
|
|
for _, unionType := range unionTypes {
|
|
if typeStr, ok := unionType.(string); ok && typeStr == "null" {
|
|
continue // Skip null types
|
|
}
|
|
|
|
// Use the first non-null type
|
|
return convertAvroTypeToSeaweedMQ(unionType)
|
|
}
|
|
|
|
// If all types are null, return bytes type
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BYTES,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// InferRecordTypeFromMap infers a RecordType from a decoded map
|
|
// This is useful when we don't have the original Avro schema
|
|
func InferRecordTypeFromMap(m map[string]interface{}) *schema_pb.RecordType {
|
|
fields := make([]*schema_pb.Field, 0, len(m))
|
|
fieldIndex := int32(0)
|
|
|
|
for key, value := range m {
|
|
fieldType := inferTypeFromValue(value)
|
|
|
|
field := &schema_pb.Field{
|
|
Name: key,
|
|
FieldIndex: fieldIndex,
|
|
Type: fieldType,
|
|
IsRequired: value != nil, // Non-nil values are considered required
|
|
IsRepeated: false,
|
|
}
|
|
|
|
// Check if it's an array
|
|
if reflect.TypeOf(value).Kind() == reflect.Slice {
|
|
field.IsRepeated = true
|
|
}
|
|
|
|
fields = append(fields, field)
|
|
fieldIndex++
|
|
}
|
|
|
|
return &schema_pb.RecordType{
|
|
Fields: fields,
|
|
}
|
|
}
|
|
|
|
// inferTypeFromValue infers a SeaweedMQ Type from a Go value
|
|
func inferTypeFromValue(value interface{}) *schema_pb.Type {
|
|
if value == nil {
|
|
// Default to string for null values
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}
|
|
}
|
|
|
|
switch v := value.(type) {
|
|
case bool:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BOOL,
|
|
},
|
|
}
|
|
case int32:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT32,
|
|
},
|
|
}
|
|
case int64, int:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_INT64,
|
|
},
|
|
}
|
|
case float32:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_FLOAT,
|
|
},
|
|
}
|
|
case float64:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_DOUBLE,
|
|
},
|
|
}
|
|
case string:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}
|
|
case []byte:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_BYTES,
|
|
},
|
|
}
|
|
case time.Time:
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_TIMESTAMP,
|
|
},
|
|
}
|
|
case []interface{}:
|
|
// Handle arrays - infer element type from first element
|
|
var elementType *schema_pb.Type
|
|
if len(v) > 0 {
|
|
elementType = inferTypeFromValue(v[0])
|
|
} else {
|
|
// Default to string for empty arrays
|
|
elementType = &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}
|
|
}
|
|
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ListType{
|
|
ListType: &schema_pb.ListType{
|
|
ElementType: elementType,
|
|
},
|
|
},
|
|
}
|
|
case map[string]interface{}:
|
|
// Handle nested records
|
|
nestedRecordType := InferRecordTypeFromMap(v)
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_RecordType{
|
|
RecordType: nestedRecordType,
|
|
},
|
|
}
|
|
default:
|
|
// Default to string for unknown types
|
|
return &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{
|
|
ScalarType: schema_pb.ScalarType_STRING,
|
|
},
|
|
}
|
|
}
|
|
}
|