package schema import ( "fmt" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/dynamicpb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) // ProtobufDecoder handles Protobuf schema decoding and conversion to SeaweedMQ format type ProtobufDecoder struct { descriptor protoreflect.MessageDescriptor msgType protoreflect.MessageType } // NewProtobufDecoder creates a new Protobuf decoder from a schema descriptor func NewProtobufDecoder(schemaBytes []byte) (*ProtobufDecoder, error) { // Parse the binary descriptor using the descriptor parser parser := NewProtobufDescriptorParser() // For now, we need to extract the message name from the schema bytes // In a real implementation, this would be provided by the Schema Registry // For this phase, we'll try to find the first message in the descriptor schema, err := parser.ParseBinaryDescriptor(schemaBytes, "") if err != nil { return nil, fmt.Errorf("failed to parse binary descriptor: %w", err) } // Create the decoder using the parsed descriptor if schema.MessageDescriptor == nil { return nil, fmt.Errorf("no message descriptor found in schema") } return NewProtobufDecoderFromDescriptor(schema.MessageDescriptor), nil } // NewProtobufDecoderFromDescriptor creates a Protobuf decoder from a message descriptor // This is used for testing and when we have pre-built descriptors func NewProtobufDecoderFromDescriptor(msgDesc protoreflect.MessageDescriptor) *ProtobufDecoder { msgType := dynamicpb.NewMessageType(msgDesc) return &ProtobufDecoder{ descriptor: msgDesc, msgType: msgType, } } // NewProtobufDecoderFromString creates a Protobuf decoder from a schema string // This is a simplified version for testing - in production, schemas would be binary descriptors func NewProtobufDecoderFromString(schemaStr string) (*ProtobufDecoder, error) { // For Phase 5, we'll implement a basic string-to-descriptor parser // In a full implementation, this would use protoc to compile .proto files // or parse the Confluent Schema Registry's Protobuf descriptor format return nil, fmt.Errorf("string-based Protobuf schemas not yet implemented - use binary descriptors") } // Decode decodes Protobuf binary data to a Go map representation func (pd *ProtobufDecoder) Decode(data []byte) (map[string]interface{}, error) { // Create a new message instance msg := pd.msgType.New() // Unmarshal the binary data if err := proto.Unmarshal(data, msg.Interface()); err != nil { return nil, fmt.Errorf("failed to unmarshal Protobuf data: %w", err) } // Convert to map representation return pd.messageToMap(msg), nil } // DecodeToRecordValue decodes Protobuf data directly to SeaweedMQ RecordValue func (pd *ProtobufDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) { msgMap, err := pd.Decode(data) if err != nil { return nil, err } return MapToRecordValue(msgMap), nil } // InferRecordType infers a SeaweedMQ RecordType from the Protobuf descriptor func (pd *ProtobufDecoder) InferRecordType() (*schema_pb.RecordType, error) { return pd.descriptorToRecordType(pd.descriptor), nil } // messageToMap converts a Protobuf message to a Go map func (pd *ProtobufDecoder) messageToMap(msg protoreflect.Message) map[string]interface{} { result := make(map[string]interface{}) msg.Range(func(fd protoreflect.FieldDescriptor, v protoreflect.Value) bool { fieldName := string(fd.Name()) result[fieldName] = pd.valueToInterface(fd, v) return true }) return result } // valueToInterface converts a Protobuf value to a Go interface{} func (pd *ProtobufDecoder) valueToInterface(fd protoreflect.FieldDescriptor, v protoreflect.Value) interface{} { if fd.IsList() { // Handle repeated fields list := v.List() result := make([]interface{}, list.Len()) for i := 0; i < list.Len(); i++ { result[i] = pd.scalarValueToInterface(fd, list.Get(i)) } return result } if fd.IsMap() { // Handle map fields mapVal := v.Map() result := make(map[string]interface{}) mapVal.Range(func(k protoreflect.MapKey, v protoreflect.Value) bool { keyStr := fmt.Sprintf("%v", k.Interface()) result[keyStr] = pd.scalarValueToInterface(fd.MapValue(), v) return true }) return result } return pd.scalarValueToInterface(fd, v) } // scalarValueToInterface converts a scalar Protobuf value to Go interface{} func (pd *ProtobufDecoder) scalarValueToInterface(fd protoreflect.FieldDescriptor, v protoreflect.Value) interface{} { switch fd.Kind() { case protoreflect.BoolKind: return v.Bool() case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind: return int32(v.Int()) case protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind: return v.Int() case protoreflect.Uint32Kind, protoreflect.Fixed32Kind: return uint32(v.Uint()) case protoreflect.Uint64Kind, protoreflect.Fixed64Kind: return v.Uint() case protoreflect.FloatKind: return float32(v.Float()) case protoreflect.DoubleKind: return v.Float() case protoreflect.StringKind: return v.String() case protoreflect.BytesKind: return v.Bytes() case protoreflect.EnumKind: return int32(v.Enum()) case protoreflect.MessageKind: // Handle nested messages nestedMsg := v.Message() return pd.messageToMap(nestedMsg) default: // Fallback to string representation return fmt.Sprintf("%v", v.Interface()) } } // descriptorToRecordType converts a Protobuf descriptor to SeaweedMQ RecordType func (pd *ProtobufDecoder) descriptorToRecordType(desc protoreflect.MessageDescriptor) *schema_pb.RecordType { fields := make([]*schema_pb.Field, 0, desc.Fields().Len()) for i := 0; i < desc.Fields().Len(); i++ { fd := desc.Fields().Get(i) field := &schema_pb.Field{ Name: string(fd.Name()), FieldIndex: int32(fd.Number() - 1), // Protobuf field numbers start at 1 Type: pd.fieldDescriptorToType(fd), IsRequired: fd.Cardinality() == protoreflect.Required, IsRepeated: fd.IsList(), } fields = append(fields, field) } return &schema_pb.RecordType{ Fields: fields, } } // fieldDescriptorToType converts a Protobuf field descriptor to SeaweedMQ Type func (pd *ProtobufDecoder) fieldDescriptorToType(fd protoreflect.FieldDescriptor) *schema_pb.Type { if fd.IsList() { // Handle repeated fields elementType := pd.scalarKindToType(fd.Kind(), fd.Message()) return &schema_pb.Type{ Kind: &schema_pb.Type_ListType{ ListType: &schema_pb.ListType{ ElementType: elementType, }, }, } } if fd.IsMap() { // Handle map fields - for simplicity, treat as record with key/value fields keyType := pd.scalarKindToType(fd.MapKey().Kind(), nil) valueType := pd.scalarKindToType(fd.MapValue().Kind(), fd.MapValue().Message()) mapRecordType := &schema_pb.RecordType{ Fields: []*schema_pb.Field{ { Name: "key", FieldIndex: 0, Type: keyType, IsRequired: true, }, { Name: "value", FieldIndex: 1, Type: valueType, IsRequired: false, }, }, } return &schema_pb.Type{ Kind: &schema_pb.Type_RecordType{ RecordType: mapRecordType, }, } } return pd.scalarKindToType(fd.Kind(), fd.Message()) } // scalarKindToType converts a Protobuf kind to SeaweedMQ scalar type func (pd *ProtobufDecoder) scalarKindToType(kind protoreflect.Kind, msgDesc protoreflect.MessageDescriptor) *schema_pb.Type { switch kind { case protoreflect.BoolKind: return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_BOOL, }, } case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind: return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_INT32, }, } case protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind: return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_INT64, }, } case protoreflect.Uint32Kind, protoreflect.Fixed32Kind: return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_INT32, // Map uint32 to int32 for simplicity }, } case protoreflect.Uint64Kind, protoreflect.Fixed64Kind: return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_INT64, // Map uint64 to int64 for simplicity }, } case protoreflect.FloatKind: return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_FLOAT, }, } case protoreflect.DoubleKind: return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_DOUBLE, }, } case protoreflect.StringKind: return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_STRING, }, } case protoreflect.BytesKind: return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_BYTES, }, } case protoreflect.EnumKind: return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_INT32, // Enums as int32 }, } case protoreflect.MessageKind: if msgDesc != nil { // Handle nested messages nestedRecordType := pd.descriptorToRecordType(msgDesc) return &schema_pb.Type{ Kind: &schema_pb.Type_RecordType{ RecordType: nestedRecordType, }, } } fallthrough default: // Default to string for unknown types return &schema_pb.Type{ Kind: &schema_pb.Type_ScalarType{ ScalarType: schema_pb.ScalarType_STRING, }, } } }