1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/kafka/schema/protobuf_decoder.go
chrislu 828ed7532d feat: implement Protobuf index varints handling
- Add proper varint encoding/decoding functions in envelope.go
- Implement CreateConfluentEnvelope with varint encoding for Protobuf indexes
- Add ParseConfluentProtobufEnvelopeWithIndexCount for reliable parsing when index count is known
- Add ParseConfluentProtobufEnvelope with conservative approach (assumes no indexes by default)
- Remove duplicate functions from protobuf_decoder.go to avoid conflicts
- Create comprehensive test suite in envelope_varint_test.go covering:
  - Basic varint encode/decode functionality
  - Confluent envelope creation and parsing with various index scenarios
  - Round-trip testing for Protobuf envelopes
  - Edge cases and validation
- Document limitations of heuristic-based parsing and provide explicit index count alternative

Tests: All varint and envelope tests pass, proper handling of Protobuf message indexes
2025-09-15 22:56:32 -07:00

315 lines
9.4 KiB
Go

package schema
import (
"fmt"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// ProtobufDecoder handles Protobuf schema decoding and conversion to SeaweedMQ format
type ProtobufDecoder struct {
descriptor protoreflect.MessageDescriptor
msgType protoreflect.MessageType
}
// NewProtobufDecoder creates a new Protobuf decoder from a schema descriptor
func NewProtobufDecoder(schemaBytes []byte) (*ProtobufDecoder, error) {
// Parse the binary descriptor using the descriptor parser
parser := NewProtobufDescriptorParser()
// For now, we need to extract the message name from the schema bytes
// In a real implementation, this would be provided by the Schema Registry
// For this phase, we'll try to find the first message in the descriptor
schema, err := parser.ParseBinaryDescriptor(schemaBytes, "")
if err != nil {
return nil, fmt.Errorf("failed to parse binary descriptor: %w", err)
}
// Create the decoder using the parsed descriptor
if schema.MessageDescriptor == nil {
return nil, fmt.Errorf("no message descriptor found in schema")
}
return NewProtobufDecoderFromDescriptor(schema.MessageDescriptor), nil
}
// NewProtobufDecoderFromDescriptor creates a Protobuf decoder from a message descriptor
// This is used for testing and when we have pre-built descriptors
func NewProtobufDecoderFromDescriptor(msgDesc protoreflect.MessageDescriptor) *ProtobufDecoder {
msgType := dynamicpb.NewMessageType(msgDesc)
return &ProtobufDecoder{
descriptor: msgDesc,
msgType: msgType,
}
}
// NewProtobufDecoderFromString creates a Protobuf decoder from a schema string
// This is a simplified version for testing - in production, schemas would be binary descriptors
func NewProtobufDecoderFromString(schemaStr string) (*ProtobufDecoder, error) {
// For Phase 5, we'll implement a basic string-to-descriptor parser
// In a full implementation, this would use protoc to compile .proto files
// or parse the Confluent Schema Registry's Protobuf descriptor format
return nil, fmt.Errorf("string-based Protobuf schemas not yet implemented - use binary descriptors")
}
// Decode decodes Protobuf binary data to a Go map representation
func (pd *ProtobufDecoder) Decode(data []byte) (map[string]interface{}, error) {
// Create a new message instance
msg := pd.msgType.New()
// Unmarshal the binary data
if err := proto.Unmarshal(data, msg.Interface()); err != nil {
return nil, fmt.Errorf("failed to unmarshal Protobuf data: %w", err)
}
// Convert to map representation
return pd.messageToMap(msg), nil
}
// DecodeToRecordValue decodes Protobuf data directly to SeaweedMQ RecordValue
func (pd *ProtobufDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) {
msgMap, err := pd.Decode(data)
if err != nil {
return nil, err
}
return MapToRecordValue(msgMap), nil
}
// InferRecordType infers a SeaweedMQ RecordType from the Protobuf descriptor
func (pd *ProtobufDecoder) InferRecordType() (*schema_pb.RecordType, error) {
return pd.descriptorToRecordType(pd.descriptor), nil
}
// messageToMap converts a Protobuf message to a Go map
func (pd *ProtobufDecoder) messageToMap(msg protoreflect.Message) map[string]interface{} {
result := make(map[string]interface{})
msg.Range(func(fd protoreflect.FieldDescriptor, v protoreflect.Value) bool {
fieldName := string(fd.Name())
result[fieldName] = pd.valueToInterface(fd, v)
return true
})
return result
}
// valueToInterface converts a Protobuf value to a Go interface{}
func (pd *ProtobufDecoder) valueToInterface(fd protoreflect.FieldDescriptor, v protoreflect.Value) interface{} {
if fd.IsList() {
// Handle repeated fields
list := v.List()
result := make([]interface{}, list.Len())
for i := 0; i < list.Len(); i++ {
result[i] = pd.scalarValueToInterface(fd, list.Get(i))
}
return result
}
if fd.IsMap() {
// Handle map fields
mapVal := v.Map()
result := make(map[string]interface{})
mapVal.Range(func(k protoreflect.MapKey, v protoreflect.Value) bool {
keyStr := fmt.Sprintf("%v", k.Interface())
result[keyStr] = pd.scalarValueToInterface(fd.MapValue(), v)
return true
})
return result
}
return pd.scalarValueToInterface(fd, v)
}
// scalarValueToInterface converts a scalar Protobuf value to Go interface{}
func (pd *ProtobufDecoder) scalarValueToInterface(fd protoreflect.FieldDescriptor, v protoreflect.Value) interface{} {
switch fd.Kind() {
case protoreflect.BoolKind:
return v.Bool()
case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind:
return int32(v.Int())
case protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind:
return v.Int()
case protoreflect.Uint32Kind, protoreflect.Fixed32Kind:
return uint32(v.Uint())
case protoreflect.Uint64Kind, protoreflect.Fixed64Kind:
return v.Uint()
case protoreflect.FloatKind:
return float32(v.Float())
case protoreflect.DoubleKind:
return v.Float()
case protoreflect.StringKind:
return v.String()
case protoreflect.BytesKind:
return v.Bytes()
case protoreflect.EnumKind:
return int32(v.Enum())
case protoreflect.MessageKind:
// Handle nested messages
nestedMsg := v.Message()
return pd.messageToMap(nestedMsg)
default:
// Fallback to string representation
return fmt.Sprintf("%v", v.Interface())
}
}
// descriptorToRecordType converts a Protobuf descriptor to SeaweedMQ RecordType
func (pd *ProtobufDecoder) descriptorToRecordType(desc protoreflect.MessageDescriptor) *schema_pb.RecordType {
fields := make([]*schema_pb.Field, 0, desc.Fields().Len())
for i := 0; i < desc.Fields().Len(); i++ {
fd := desc.Fields().Get(i)
field := &schema_pb.Field{
Name: string(fd.Name()),
FieldIndex: int32(fd.Number() - 1), // Protobuf field numbers start at 1
Type: pd.fieldDescriptorToType(fd),
IsRequired: fd.Cardinality() == protoreflect.Required,
IsRepeated: fd.IsList(),
}
fields = append(fields, field)
}
return &schema_pb.RecordType{
Fields: fields,
}
}
// fieldDescriptorToType converts a Protobuf field descriptor to SeaweedMQ Type
func (pd *ProtobufDecoder) fieldDescriptorToType(fd protoreflect.FieldDescriptor) *schema_pb.Type {
if fd.IsList() {
// Handle repeated fields
elementType := pd.scalarKindToType(fd.Kind(), fd.Message())
return &schema_pb.Type{
Kind: &schema_pb.Type_ListType{
ListType: &schema_pb.ListType{
ElementType: elementType,
},
},
}
}
if fd.IsMap() {
// Handle map fields - for simplicity, treat as record with key/value fields
keyType := pd.scalarKindToType(fd.MapKey().Kind(), nil)
valueType := pd.scalarKindToType(fd.MapValue().Kind(), fd.MapValue().Message())
mapRecordType := &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{
Name: "key",
FieldIndex: 0,
Type: keyType,
IsRequired: true,
},
{
Name: "value",
FieldIndex: 1,
Type: valueType,
IsRequired: false,
},
},
}
return &schema_pb.Type{
Kind: &schema_pb.Type_RecordType{
RecordType: mapRecordType,
},
}
}
return pd.scalarKindToType(fd.Kind(), fd.Message())
}
// scalarKindToType converts a Protobuf kind to SeaweedMQ scalar type
func (pd *ProtobufDecoder) scalarKindToType(kind protoreflect.Kind, msgDesc protoreflect.MessageDescriptor) *schema_pb.Type {
switch kind {
case protoreflect.BoolKind:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BOOL,
},
}
case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT32,
},
}
case protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT64,
},
}
case protoreflect.Uint32Kind, protoreflect.Fixed32Kind:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT32, // Map uint32 to int32 for simplicity
},
}
case protoreflect.Uint64Kind, protoreflect.Fixed64Kind:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT64, // Map uint64 to int64 for simplicity
},
}
case protoreflect.FloatKind:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_FLOAT,
},
}
case protoreflect.DoubleKind:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_DOUBLE,
},
}
case protoreflect.StringKind:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}
case protoreflect.BytesKind:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_BYTES,
},
}
case protoreflect.EnumKind:
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_INT32, // Enums as int32
},
}
case protoreflect.MessageKind:
if msgDesc != nil {
// Handle nested messages
nestedRecordType := pd.descriptorToRecordType(msgDesc)
return &schema_pb.Type{
Kind: &schema_pb.Type_RecordType{
RecordType: nestedRecordType,
},
}
}
fallthrough
default:
// Default to string for unknown types
return &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{
ScalarType: schema_pb.ScalarType_STRING,
},
}
}
}