1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/mq/logstore/merged_read.go
2025-09-18 13:04:56 -07:00

49 lines
1.8 KiB
Go

package logstore
import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
)
func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
fromParquetFn := GenParquetReadFunc(filerClient, t, p)
readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p)
// Reversed order: live logs first (recent), then Parquet files (historical)
// This provides better performance for real-time analytics queries
return mergeReadFuncs(readLogDirectFn, fromParquetFn)
}
func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
var exhaustedLiveLogs bool
var lastProcessedPosition log_buffer.MessagePosition
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
if !exhaustedLiveLogs {
lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
if isDone {
// For very early timestamps (like timestamp=1 for RESET_TO_EARLIEST),
// we want to continue to read from in-memory data
isDone = false
}
if err != nil {
return
}
lastProcessedPosition = lastReadPosition
}
exhaustedLiveLogs = true
if startPosition.Before(lastProcessedPosition.Time) {
startPosition = lastProcessedPosition
}
lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
if isDone {
// For very early timestamps (like timestamp=1 for RESET_TO_EARLIEST),
// parquet files won't exist, but we want to continue to in-memory data reading
isDone = false
}
return
}
}