1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-06-29 08:12:47 +02:00
seaweedfs/weed/mq/logstore/read_parquet_to_log.go
dependabot[bot] 5f1d2a9745
chore(deps): bump github.com/parquet-go/parquet-go from 0.24.0 to 0.25.1 (#6851)
* chore(deps): bump github.com/parquet-go/parquet-go from 0.24.0 to 0.25.1

Bumps [github.com/parquet-go/parquet-go](https://github.com/parquet-go/parquet-go) from 0.24.0 to 0.25.1.
- [Release notes](https://github.com/parquet-go/parquet-go/releases)
- [Changelog](https://github.com/parquet-go/parquet-go/blob/main/CHANGELOG.md)
- [Commits](https://github.com/parquet-go/parquet-go/compare/v0.24.0...v0.25.1)

---
updated-dependencies:
- dependency-name: github.com/parquet-go/parquet-go
  dependency-version: 0.25.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* adjust to updated API

Fixed Reader Construction: Updated to use parquet.OpenFile() instead of passing io.Reader directly to NewReader()
Fixed EOF Handling: Changed the order of operations to process rows before checking for EOF
Added Zero Row Count Check: Added explicit check for rowCount == 0 as an additional termination condition

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
Co-authored-by: chrislu <chris.lu@gmail.com>
2025-06-23 10:25:51 -07:00

165 lines
5 KiB
Go

package logstore
import (
"context"
"encoding/binary"
"fmt"
"io"
"math"
"strings"
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto"
)
var (
chunkCache = chunk_cache.NewChunkCacheInMemory(256) // 256 entries, 8MB max per entry
)
func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
partitionDir := topic.PartitionDir(t, p)
lookupFileIdFn := filer.LookupFn(filerClient)
// read topic conf from filer
var topicConf *mq_pb.ConfigureTopicResponse
var err error
if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
topicConf, err = t.ReadConfFile(client)
return err
}); err != nil {
return nil
}
recordType := topicConf.GetRecordType()
recordType = schema.NewRecordTypeBuilder(recordType).
WithField(SW_COLUMN_NAME_TS, schema.TypeInt64).
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
RecordTypeEnd()
parquetLevels, err := schema.ToParquetLevels(recordType)
if err != nil {
return nil
}
// eachFileFn reads a parquet file and calls eachLogEntryFn for each log entry
eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
// create readerAt for the parquet file
fileSize := filer.FileSize(entry)
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize))
// create parquet reader
parquetReader := parquet.NewReader(readerAt)
rows := make([]parquet.Row, 128)
for {
rowCount, readErr := parquetReader.ReadRows(rows)
// Process the rows first, even if EOF is returned
for i := 0; i < rowCount; i++ {
row := rows[i]
// convert parquet row to schema_pb.RecordValue
recordValue, err := schema.ToRecordValue(recordType, parquetLevels, row)
if err != nil {
return processedTsNs, fmt.Errorf("ToRecordValue failed: %v", err)
}
processedTsNs = recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value()
if processedTsNs <= starTsNs {
continue
}
if stopTsNs != 0 && processedTsNs >= stopTsNs {
return processedTsNs, nil
}
data, marshalErr := proto.Marshal(recordValue)
if marshalErr != nil {
return processedTsNs, fmt.Errorf("marshal record value: %v", marshalErr)
}
logEntry := &filer_pb.LogEntry{
Key: recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue(),
TsNs: processedTsNs,
Data: data,
}
// fmt.Printf(" parquet entry %s ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC())
if _, err = eachLogEntryFn(logEntry); err != nil {
return processedTsNs, fmt.Errorf("process log entry %v: %v", logEntry, err)
}
}
// Check for end conditions after processing rows
if readErr != nil {
if readErr == io.EOF {
return processedTsNs, nil
}
return processedTsNs, readErr
}
if rowCount == 0 {
return processedTsNs, nil
}
}
return
}
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
startTsNs := startPosition.Time.UnixNano()
var processedTsNs int64
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
return nil
}
if !strings.HasSuffix(entry.Name, ".parquet") {
return nil
}
if len(entry.Extended) == 0 {
return nil
}
// read minTs from the parquet file
minTsBytes := entry.Extended["min"]
if len(minTsBytes) != 8 {
return nil
}
minTsNs := int64(binary.BigEndian.Uint64(minTsBytes))
// read max ts
maxTsBytes := entry.Extended["max"]
if len(maxTsBytes) != 8 {
return nil
}
maxTsNs := int64(binary.BigEndian.Uint64(maxTsBytes))
if stopTsNs != 0 && stopTsNs <= minTsNs {
isDone = true
return nil
}
if maxTsNs < startTsNs {
return nil
}
if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
return err
}
return nil
}, startFileName, true, math.MaxInt32)
})
lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
return
}
}