mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
- Add proper varint encoding/decoding functions in envelope.go - Implement CreateConfluentEnvelope with varint encoding for Protobuf indexes - Add ParseConfluentProtobufEnvelopeWithIndexCount for reliable parsing when index count is known - Add ParseConfluentProtobufEnvelope with conservative approach (assumes no indexes by default) - Remove duplicate functions from protobuf_decoder.go to avoid conflicts - Create comprehensive test suite in envelope_varint_test.go covering: - Basic varint encode/decode functionality - Confluent envelope creation and parsing with various index scenarios - Round-trip testing for Protobuf envelopes - Edge cases and validation - Document limitations of heuristic-based parsing and provide explicit index count alternative Tests: All varint and envelope tests pass, proper handling of Protobuf message indexes
249 lines
6.5 KiB
Go
249 lines
6.5 KiB
Go
package schema
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
)
|
|
|
|
// Format represents the schema format type
|
|
type Format int
|
|
|
|
const (
|
|
FormatUnknown Format = iota
|
|
FormatAvro
|
|
FormatProtobuf
|
|
FormatJSONSchema
|
|
)
|
|
|
|
func (f Format) String() string {
|
|
switch f {
|
|
case FormatAvro:
|
|
return "AVRO"
|
|
case FormatProtobuf:
|
|
return "PROTOBUF"
|
|
case FormatJSONSchema:
|
|
return "JSON_SCHEMA"
|
|
default:
|
|
return "UNKNOWN"
|
|
}
|
|
}
|
|
|
|
// ConfluentEnvelope represents the parsed Confluent Schema Registry envelope
|
|
type ConfluentEnvelope struct {
|
|
Format Format
|
|
SchemaID uint32
|
|
Indexes []int // For Protobuf nested message resolution
|
|
Payload []byte // The actual encoded data
|
|
OriginalBytes []byte // The complete original envelope bytes
|
|
}
|
|
|
|
// ParseConfluentEnvelope parses a Confluent Schema Registry framed message
|
|
// Returns the envelope details and whether the message was successfully parsed
|
|
func ParseConfluentEnvelope(data []byte) (*ConfluentEnvelope, bool) {
|
|
if len(data) < 5 {
|
|
return nil, false // Too short to contain magic byte + schema ID
|
|
}
|
|
|
|
// Check for Confluent magic byte (0x00)
|
|
if data[0] != 0x00 {
|
|
return nil, false // Not a Confluent-framed message
|
|
}
|
|
|
|
// Extract schema ID (big-endian uint32)
|
|
schemaID := binary.BigEndian.Uint32(data[1:5])
|
|
|
|
envelope := &ConfluentEnvelope{
|
|
Format: FormatAvro, // Default assumption; will be refined by schema registry lookup
|
|
SchemaID: schemaID,
|
|
Indexes: nil,
|
|
Payload: data[5:], // Default: payload starts after schema ID
|
|
OriginalBytes: data, // Store the complete original envelope
|
|
}
|
|
|
|
// Note: Format detection should be done by the schema registry lookup
|
|
// For now, we'll default to Avro and let the manager determine the actual format
|
|
// based on the schema registry information
|
|
|
|
return envelope, true
|
|
}
|
|
|
|
// ParseConfluentProtobufEnvelope parses a Confluent Protobuf envelope with indexes
|
|
// This is a specialized version for Protobuf that handles message indexes
|
|
//
|
|
// Note: This function uses heuristics to distinguish between index varints and
|
|
// payload data, which may not be 100% reliable in all cases. For production use,
|
|
// consider using ParseConfluentProtobufEnvelopeWithIndexCount if you know the
|
|
// expected number of indexes.
|
|
func ParseConfluentProtobufEnvelope(data []byte) (*ConfluentEnvelope, bool) {
|
|
// For now, assume no indexes to avoid parsing issues
|
|
// This can be enhanced later when we have better schema information
|
|
return ParseConfluentProtobufEnvelopeWithIndexCount(data, 0)
|
|
}
|
|
|
|
// ParseConfluentProtobufEnvelopeWithIndexCount parses a Confluent Protobuf envelope
|
|
// when you know the expected number of indexes
|
|
func ParseConfluentProtobufEnvelopeWithIndexCount(data []byte, expectedIndexCount int) (*ConfluentEnvelope, bool) {
|
|
if len(data) < 5 {
|
|
return nil, false
|
|
}
|
|
|
|
// Check for Confluent magic byte
|
|
if data[0] != 0x00 {
|
|
return nil, false
|
|
}
|
|
|
|
// Extract schema ID (big-endian uint32)
|
|
schemaID := binary.BigEndian.Uint32(data[1:5])
|
|
|
|
envelope := &ConfluentEnvelope{
|
|
Format: FormatProtobuf,
|
|
SchemaID: schemaID,
|
|
Indexes: nil,
|
|
Payload: data[5:], // Default: payload starts after schema ID
|
|
OriginalBytes: data,
|
|
}
|
|
|
|
// Parse the expected number of indexes
|
|
offset := 5
|
|
for i := 0; i < expectedIndexCount && offset < len(data); i++ {
|
|
index, bytesRead := readVarint(data[offset:])
|
|
if bytesRead == 0 {
|
|
// Invalid varint, stop parsing
|
|
break
|
|
}
|
|
envelope.Indexes = append(envelope.Indexes, int(index))
|
|
offset += bytesRead
|
|
}
|
|
|
|
envelope.Payload = data[offset:]
|
|
return envelope, true
|
|
}
|
|
|
|
// IsSchematized checks if the given bytes represent a Confluent-framed message
|
|
func IsSchematized(data []byte) bool {
|
|
_, ok := ParseConfluentEnvelope(data)
|
|
return ok
|
|
}
|
|
|
|
// ExtractSchemaID extracts just the schema ID without full parsing (for quick checks)
|
|
func ExtractSchemaID(data []byte) (uint32, bool) {
|
|
if len(data) < 5 || data[0] != 0x00 {
|
|
return 0, false
|
|
}
|
|
return binary.BigEndian.Uint32(data[1:5]), true
|
|
}
|
|
|
|
// CreateConfluentEnvelope creates a Confluent-framed message from components
|
|
// This will be useful for reconstructing messages on the Fetch path
|
|
func CreateConfluentEnvelope(format Format, schemaID uint32, indexes []int, payload []byte) []byte {
|
|
// Start with magic byte + schema ID (5 bytes minimum)
|
|
result := make([]byte, 5, 5+len(payload)+len(indexes)*4)
|
|
result[0] = 0x00 // Magic byte
|
|
binary.BigEndian.PutUint32(result[1:5], schemaID)
|
|
|
|
// For Protobuf, add indexes as varints
|
|
if format == FormatProtobuf && len(indexes) > 0 {
|
|
for _, index := range indexes {
|
|
varintBytes := encodeVarint(uint64(index))
|
|
result = append(result, varintBytes...)
|
|
}
|
|
}
|
|
|
|
// Append the actual payload
|
|
result = append(result, payload...)
|
|
|
|
return result
|
|
}
|
|
|
|
// ValidateEnvelope performs basic validation on a parsed envelope
|
|
func (e *ConfluentEnvelope) Validate() error {
|
|
if e.SchemaID == 0 {
|
|
return fmt.Errorf("invalid schema ID: 0")
|
|
}
|
|
|
|
if len(e.Payload) == 0 {
|
|
return fmt.Errorf("empty payload")
|
|
}
|
|
|
|
// Format-specific validation
|
|
switch e.Format {
|
|
case FormatAvro:
|
|
// Avro payloads should be valid binary data
|
|
// More specific validation will be done by the Avro decoder
|
|
case FormatProtobuf:
|
|
// Protobuf validation will be implemented in Phase 5
|
|
case FormatJSONSchema:
|
|
// JSON Schema validation will be implemented in Phase 6
|
|
default:
|
|
return fmt.Errorf("unsupported format: %v", e.Format)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Metadata returns a map of envelope metadata for storage
|
|
func (e *ConfluentEnvelope) Metadata() map[string]string {
|
|
metadata := map[string]string{
|
|
"schema_format": e.Format.String(),
|
|
"schema_id": fmt.Sprintf("%d", e.SchemaID),
|
|
}
|
|
|
|
if len(e.Indexes) > 0 {
|
|
// Store indexes for Protobuf reconstruction
|
|
indexStr := ""
|
|
for i, idx := range e.Indexes {
|
|
if i > 0 {
|
|
indexStr += ","
|
|
}
|
|
indexStr += fmt.Sprintf("%d", idx)
|
|
}
|
|
metadata["protobuf_indexes"] = indexStr
|
|
}
|
|
|
|
return metadata
|
|
}
|
|
|
|
// encodeVarint encodes a uint64 as a varint
|
|
func encodeVarint(value uint64) []byte {
|
|
if value == 0 {
|
|
return []byte{0}
|
|
}
|
|
|
|
var result []byte
|
|
for value > 0 {
|
|
b := byte(value & 0x7F)
|
|
value >>= 7
|
|
|
|
if value > 0 {
|
|
b |= 0x80 // Set continuation bit
|
|
}
|
|
|
|
result = append(result, b)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// readVarint reads a varint from the byte slice and returns the value and bytes consumed
|
|
func readVarint(data []byte) (uint64, int) {
|
|
var result uint64
|
|
var shift uint
|
|
|
|
for i, b := range data {
|
|
if i >= 10 { // Prevent overflow (max varint is 10 bytes)
|
|
return 0, 0
|
|
}
|
|
|
|
result |= uint64(b&0x7F) << shift
|
|
|
|
if b&0x80 == 0 {
|
|
// Last byte (MSB is 0)
|
|
return result, i + 1
|
|
}
|
|
|
|
shift += 7
|
|
}
|
|
|
|
// Incomplete varint
|
|
return 0, 0
|
|
}
|