mirror of
https://github.com/chrislusf/seaweedfs
synced 2025-09-19 01:30:23 +02:00
* fix nil when explaining * add plain details when running full scan * skip files by timestamp * skip file by timestamp * refactor * handle filter by time * skip broker memory only if it has unflushed messages * refactoring * refactor * address comments * address comments * filter by parquet stats * simplify * refactor * prune old code * optimize * Update aggregations.go * ensure non-time predicates are properly detected * add stmt to populatePlanFileDetails This helper function is a great way to centralize logic for populating file details. However, it's missing an optimization that is present in executeSelectStatementWithBrokerStats: pruning Parquet files based on column statistics from the WHERE clause. Aggregation queries that fall back to the slow path could benefit from this optimization. Consider modifying the function signature to accept the *SelectStatement and adding the column statistics pruning logic here, similar to how it's done in executeSelectStatementWithBrokerStats. * refactoring to work with *schema_pb.Value directly after the initial conversion
933 lines
31 KiB
Go
933 lines
31 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
|
|
)
|
|
|
|
// AggregationSpec defines an aggregation function to be computed
|
|
type AggregationSpec struct {
|
|
Function string // COUNT, SUM, AVG, MIN, MAX
|
|
Column string // Column name, or "*" for COUNT(*)
|
|
Alias string // Optional alias for the result column
|
|
Distinct bool // Support for DISTINCT keyword
|
|
}
|
|
|
|
// AggregationResult holds the computed result of an aggregation
|
|
type AggregationResult struct {
|
|
Count int64
|
|
Sum float64
|
|
Min interface{}
|
|
Max interface{}
|
|
}
|
|
|
|
// AggregationStrategy represents the strategy for executing aggregations
|
|
type AggregationStrategy struct {
|
|
CanUseFastPath bool
|
|
Reason string
|
|
UnsupportedSpecs []AggregationSpec
|
|
}
|
|
|
|
// TopicDataSources represents the data sources available for a topic
|
|
type TopicDataSources struct {
|
|
ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats
|
|
ParquetRowCount int64
|
|
LiveLogRowCount int64
|
|
LiveLogFilesCount int // Total count of live log files across all partitions
|
|
PartitionsCount int
|
|
BrokerUnflushedCount int64
|
|
}
|
|
|
|
// FastPathOptimizer handles fast path aggregation optimization decisions
|
|
type FastPathOptimizer struct {
|
|
engine *SQLEngine
|
|
}
|
|
|
|
// NewFastPathOptimizer creates a new fast path optimizer
|
|
func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer {
|
|
return &FastPathOptimizer{engine: engine}
|
|
}
|
|
|
|
// DetermineStrategy analyzes aggregations and determines if fast path can be used
|
|
func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy {
|
|
strategy := AggregationStrategy{
|
|
CanUseFastPath: true,
|
|
Reason: "all_aggregations_supported",
|
|
UnsupportedSpecs: []AggregationSpec{},
|
|
}
|
|
|
|
for _, spec := range aggregations {
|
|
if !opt.engine.canUseParquetStatsForAggregation(spec) {
|
|
strategy.CanUseFastPath = false
|
|
strategy.Reason = "unsupported_aggregation_functions"
|
|
strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec)
|
|
}
|
|
}
|
|
|
|
return strategy
|
|
}
|
|
|
|
// CollectDataSources gathers information about available data sources for a topic
|
|
func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) {
|
|
return opt.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, 0, 0)
|
|
}
|
|
|
|
// CollectDataSourcesWithTimeFilter gathers information about available data sources for a topic
|
|
// with optional time filtering to skip irrelevant parquet files
|
|
func (opt *FastPathOptimizer) CollectDataSourcesWithTimeFilter(ctx context.Context, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) (*TopicDataSources, error) {
|
|
dataSources := &TopicDataSources{
|
|
ParquetFiles: make(map[string][]*ParquetFileStats),
|
|
ParquetRowCount: 0,
|
|
LiveLogRowCount: 0,
|
|
LiveLogFilesCount: 0,
|
|
PartitionsCount: 0,
|
|
}
|
|
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Collecting data sources for: %s/%s\n", hybridScanner.topic.Namespace, hybridScanner.topic.Name)
|
|
}
|
|
|
|
// Discover partitions for the topic
|
|
partitionPaths, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
|
|
if err != nil {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("ERROR: Partition discovery failed: %v\n", err)
|
|
}
|
|
return dataSources, DataSourceError{
|
|
Source: "partition_discovery",
|
|
Cause: err,
|
|
}
|
|
}
|
|
|
|
// DEBUG: Log discovered partitions
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Discovered %d partitions: %v\n", len(partitionPaths), partitionPaths)
|
|
}
|
|
|
|
// Collect stats from each partition
|
|
// Note: discoverTopicPartitions always returns absolute paths starting with "/topics/"
|
|
for _, partitionPath := range partitionPaths {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("\nProcessing partition: %s\n", partitionPath)
|
|
}
|
|
|
|
// Read parquet file statistics
|
|
parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath)
|
|
if err != nil {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf(" ERROR: Failed to read parquet statistics: %v\n", err)
|
|
}
|
|
} else if len(parquetStats) == 0 {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf(" No parquet files found in partition\n")
|
|
}
|
|
} else {
|
|
// Prune by time range using parquet column statistics
|
|
filtered := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
|
|
dataSources.ParquetFiles[partitionPath] = filtered
|
|
partitionParquetRows := int64(0)
|
|
for _, stat := range filtered {
|
|
partitionParquetRows += stat.RowCount
|
|
dataSources.ParquetRowCount += stat.RowCount
|
|
}
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf(" Found %d parquet files with %d total rows\n", len(filtered), partitionParquetRows)
|
|
}
|
|
}
|
|
|
|
// Count live log files (excluding those converted to parquet)
|
|
parquetSources := opt.engine.extractParquetSourceFiles(dataSources.ParquetFiles[partitionPath])
|
|
liveLogCount, liveLogErr := opt.engine.countLiveLogRowsExcludingParquetSources(ctx, partitionPath, parquetSources)
|
|
if liveLogErr != nil {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf(" ERROR: Failed to count live log rows: %v\n", liveLogErr)
|
|
}
|
|
} else {
|
|
dataSources.LiveLogRowCount += liveLogCount
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf(" Found %d live log rows (excluding %d parquet sources)\n", liveLogCount, len(parquetSources))
|
|
}
|
|
}
|
|
|
|
// Count live log files for partition with proper range values
|
|
// Extract partition name from absolute path (e.g., "0000-2520" from "/topics/.../v2025.../0000-2520")
|
|
partitionName := partitionPath[strings.LastIndex(partitionPath, "/")+1:]
|
|
partitionParts := strings.Split(partitionName, "-")
|
|
if len(partitionParts) == 2 {
|
|
rangeStart, err1 := strconv.Atoi(partitionParts[0])
|
|
rangeStop, err2 := strconv.Atoi(partitionParts[1])
|
|
if err1 == nil && err2 == nil {
|
|
partition := topic.Partition{
|
|
RangeStart: int32(rangeStart),
|
|
RangeStop: int32(rangeStop),
|
|
}
|
|
liveLogFileCount, err := hybridScanner.countLiveLogFiles(partition)
|
|
if err == nil {
|
|
dataSources.LiveLogFilesCount += liveLogFileCount
|
|
}
|
|
|
|
// Count broker unflushed messages for this partition
|
|
if hybridScanner.brokerClient != nil {
|
|
entries, err := hybridScanner.brokerClient.GetUnflushedMessages(ctx, hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition, 0)
|
|
if err == nil {
|
|
dataSources.BrokerUnflushedCount += int64(len(entries))
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf(" Found %d unflushed broker messages\n", len(entries))
|
|
}
|
|
} else if isDebugMode(ctx) {
|
|
fmt.Printf(" ERROR: Failed to get unflushed broker messages: %v\n", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
dataSources.PartitionsCount = len(partitionPaths)
|
|
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Data sources collected: %d partitions, %d parquet rows, %d live log rows, %d broker buffer rows\n",
|
|
dataSources.PartitionsCount, dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount)
|
|
}
|
|
|
|
return dataSources, nil
|
|
}
|
|
|
|
// AggregationComputer handles the computation of aggregations using fast path
|
|
type AggregationComputer struct {
|
|
engine *SQLEngine
|
|
}
|
|
|
|
// NewAggregationComputer creates a new aggregation computer
|
|
func NewAggregationComputer(engine *SQLEngine) *AggregationComputer {
|
|
return &AggregationComputer{engine: engine}
|
|
}
|
|
|
|
// ComputeFastPathAggregations computes aggregations using parquet statistics and live log data
|
|
func (comp *AggregationComputer) ComputeFastPathAggregations(
|
|
ctx context.Context,
|
|
aggregations []AggregationSpec,
|
|
dataSources *TopicDataSources,
|
|
partitions []string,
|
|
) ([]AggregationResult, error) {
|
|
|
|
aggResults := make([]AggregationResult, len(aggregations))
|
|
|
|
for i, spec := range aggregations {
|
|
switch spec.Function {
|
|
case FuncCOUNT:
|
|
if spec.Column == "*" {
|
|
aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
|
|
} else {
|
|
// For specific columns, we might need to account for NULLs in the future
|
|
aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
|
|
}
|
|
|
|
case FuncMIN:
|
|
globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions)
|
|
if err != nil {
|
|
return nil, AggregationError{
|
|
Operation: spec.Function,
|
|
Column: spec.Column,
|
|
Cause: err,
|
|
}
|
|
}
|
|
aggResults[i].Min = globalMin
|
|
|
|
case FuncMAX:
|
|
globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions)
|
|
if err != nil {
|
|
return nil, AggregationError{
|
|
Operation: spec.Function,
|
|
Column: spec.Column,
|
|
Cause: err,
|
|
}
|
|
}
|
|
aggResults[i].Max = globalMax
|
|
|
|
default:
|
|
return nil, OptimizationError{
|
|
Strategy: "fast_path_aggregation",
|
|
Reason: fmt.Sprintf("unsupported aggregation function: %s", spec.Function),
|
|
}
|
|
}
|
|
}
|
|
|
|
return aggResults, nil
|
|
}
|
|
|
|
// computeGlobalMin computes the global minimum value across all data sources
|
|
func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
|
|
var globalMin interface{}
|
|
var globalMinValue *schema_pb.Value
|
|
hasParquetStats := false
|
|
|
|
// Step 1: Get minimum from parquet statistics
|
|
for _, fileStats := range dataSources.ParquetFiles {
|
|
for _, fileStat := range fileStats {
|
|
// Try case-insensitive column lookup
|
|
var colStats *ParquetColumnStats
|
|
var found bool
|
|
|
|
// First try exact match
|
|
if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
|
|
colStats = stats
|
|
found = true
|
|
} else {
|
|
// Try case-insensitive lookup
|
|
for colName, stats := range fileStat.ColumnStats {
|
|
if strings.EqualFold(colName, spec.Column) {
|
|
colStats = stats
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if found && colStats != nil && colStats.MinValue != nil {
|
|
if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 {
|
|
globalMinValue = colStats.MinValue
|
|
extractedValue := comp.engine.extractRawValue(colStats.MinValue)
|
|
if extractedValue != nil {
|
|
globalMin = extractedValue
|
|
hasParquetStats = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 2: Get minimum from live log data (only if no live logs or if we need to compare)
|
|
if dataSources.LiveLogRowCount > 0 {
|
|
for _, partition := range partitions {
|
|
partitionParquetSources := make(map[string]bool)
|
|
if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
|
|
partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
|
|
}
|
|
|
|
liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
|
|
if err != nil {
|
|
continue // Skip partitions with errors
|
|
}
|
|
|
|
if liveLogMin != nil {
|
|
if globalMin == nil {
|
|
globalMin = liveLogMin
|
|
} else {
|
|
liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin)
|
|
if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 {
|
|
globalMin = liveLogMin
|
|
globalMinValue = liveLogSchemaValue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 3: Handle system columns if no regular data found
|
|
if globalMin == nil && !hasParquetStats {
|
|
globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles)
|
|
}
|
|
|
|
return globalMin, nil
|
|
}
|
|
|
|
// computeGlobalMax computes the global maximum value across all data sources
|
|
func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
|
|
var globalMax interface{}
|
|
var globalMaxValue *schema_pb.Value
|
|
hasParquetStats := false
|
|
|
|
// Step 1: Get maximum from parquet statistics
|
|
for _, fileStats := range dataSources.ParquetFiles {
|
|
for _, fileStat := range fileStats {
|
|
// Try case-insensitive column lookup
|
|
var colStats *ParquetColumnStats
|
|
var found bool
|
|
|
|
// First try exact match
|
|
if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
|
|
colStats = stats
|
|
found = true
|
|
} else {
|
|
// Try case-insensitive lookup
|
|
for colName, stats := range fileStat.ColumnStats {
|
|
if strings.EqualFold(colName, spec.Column) {
|
|
colStats = stats
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if found && colStats != nil && colStats.MaxValue != nil {
|
|
if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 {
|
|
globalMaxValue = colStats.MaxValue
|
|
extractedValue := comp.engine.extractRawValue(colStats.MaxValue)
|
|
if extractedValue != nil {
|
|
globalMax = extractedValue
|
|
hasParquetStats = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 2: Get maximum from live log data (only if live logs exist)
|
|
if dataSources.LiveLogRowCount > 0 {
|
|
for _, partition := range partitions {
|
|
partitionParquetSources := make(map[string]bool)
|
|
if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
|
|
partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
|
|
}
|
|
|
|
_, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
|
|
if err != nil {
|
|
continue // Skip partitions with errors
|
|
}
|
|
|
|
if liveLogMax != nil {
|
|
if globalMax == nil {
|
|
globalMax = liveLogMax
|
|
} else {
|
|
liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax)
|
|
if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 {
|
|
globalMax = liveLogMax
|
|
globalMaxValue = liveLogSchemaValue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 3: Handle system columns if no regular data found
|
|
if globalMax == nil && !hasParquetStats {
|
|
globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles)
|
|
}
|
|
|
|
return globalMax, nil
|
|
}
|
|
|
|
// executeAggregationQuery handles SELECT queries with aggregation functions
|
|
func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement) (*QueryResult, error) {
|
|
return e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, nil)
|
|
}
|
|
|
|
// executeAggregationQueryWithPlan handles SELECT queries with aggregation functions and populates execution plan
|
|
func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
|
|
// Parse LIMIT and OFFSET for aggregation results (do this first)
|
|
// Use -1 to distinguish "no LIMIT" from "LIMIT 0"
|
|
limit := -1
|
|
offset := 0
|
|
if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
|
|
if limitExpr, ok := stmt.Limit.Rowcount.(*SQLVal); ok && limitExpr.Type == IntVal {
|
|
if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil {
|
|
if limit64 > int64(math.MaxInt) || limit64 < 0 {
|
|
return nil, fmt.Errorf("LIMIT value %d is out of range", limit64)
|
|
}
|
|
// Safe conversion after bounds check
|
|
limit = int(limit64)
|
|
}
|
|
}
|
|
}
|
|
if stmt.Limit != nil && stmt.Limit.Offset != nil {
|
|
if offsetExpr, ok := stmt.Limit.Offset.(*SQLVal); ok && offsetExpr.Type == IntVal {
|
|
if offset64, err := strconv.ParseInt(string(offsetExpr.Val), 10, 64); err == nil {
|
|
if offset64 > int64(math.MaxInt) || offset64 < 0 {
|
|
return nil, fmt.Errorf("OFFSET value %d is out of range", offset64)
|
|
}
|
|
// Safe conversion after bounds check
|
|
offset = int(offset64)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Parse WHERE clause for filtering
|
|
var predicate func(*schema_pb.RecordValue) bool
|
|
var err error
|
|
if stmt.Where != nil {
|
|
predicate, err = e.buildPredicate(stmt.Where.Expr)
|
|
if err != nil {
|
|
return &QueryResult{Error: err}, err
|
|
}
|
|
}
|
|
|
|
// Extract time filters and validate that WHERE clause contains only time-based predicates
|
|
startTimeNs, stopTimeNs := int64(0), int64(0)
|
|
onlyTimePredicates := true
|
|
if stmt.Where != nil {
|
|
startTimeNs, stopTimeNs, onlyTimePredicates = e.extractTimeFiltersWithValidation(stmt.Where.Expr)
|
|
}
|
|
|
|
// FAST PATH WITH TIME-BASED OPTIMIZATION:
|
|
// Allow fast path only for queries without WHERE clause or with time-only WHERE clauses
|
|
// This prevents incorrect results when non-time predicates are present
|
|
canAttemptFastPath := stmt.Where == nil || onlyTimePredicates
|
|
|
|
if canAttemptFastPath {
|
|
if isDebugMode(ctx) {
|
|
if stmt.Where == nil {
|
|
fmt.Printf("\nFast path optimization attempt (no WHERE clause)...\n")
|
|
} else {
|
|
fmt.Printf("\nFast path optimization attempt (time-only WHERE clause)...\n")
|
|
}
|
|
}
|
|
fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan, startTimeNs, stopTimeNs, stmt)
|
|
if canOptimize {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Fast path optimization succeeded!\n")
|
|
}
|
|
return fastResult, nil
|
|
} else {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Fast path optimization failed, falling back to slow path\n")
|
|
}
|
|
}
|
|
} else {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Fast path not applicable due to complex WHERE clause\n")
|
|
}
|
|
}
|
|
|
|
// SLOW PATH: Fall back to full table scan
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n")
|
|
}
|
|
|
|
// Extract columns needed for aggregations
|
|
columnsNeeded := make(map[string]bool)
|
|
for _, spec := range aggregations {
|
|
if spec.Column != "*" {
|
|
columnsNeeded[spec.Column] = true
|
|
}
|
|
}
|
|
|
|
// Convert to slice
|
|
var scanColumns []string
|
|
if len(columnsNeeded) > 0 {
|
|
scanColumns = make([]string, 0, len(columnsNeeded))
|
|
for col := range columnsNeeded {
|
|
scanColumns = append(scanColumns, col)
|
|
}
|
|
}
|
|
// If no specific columns needed (COUNT(*) only), don't specify columns (scan all)
|
|
|
|
// Build scan options for full table scan (aggregations need all data during scanning)
|
|
hybridScanOptions := HybridScanOptions{
|
|
StartTimeNs: startTimeNs,
|
|
StopTimeNs: stopTimeNs,
|
|
Limit: -1, // Use -1 to mean "no limit" - need all data for aggregation
|
|
Offset: 0, // No offset during scanning - OFFSET applies to final results
|
|
Predicate: predicate,
|
|
Columns: scanColumns, // Include columns needed for aggregation functions
|
|
}
|
|
|
|
// DEBUG: Log scan options for aggregation
|
|
debugHybridScanOptions(ctx, hybridScanOptions, "AGGREGATION")
|
|
|
|
// Execute the hybrid scan to get all matching records
|
|
var results []HybridScanResult
|
|
if plan != nil {
|
|
// EXPLAIN mode - capture broker buffer stats
|
|
var stats *HybridScanStats
|
|
results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions)
|
|
if err != nil {
|
|
return &QueryResult{Error: err}, err
|
|
}
|
|
|
|
// Populate plan with broker buffer information
|
|
if stats != nil {
|
|
plan.BrokerBufferQueried = stats.BrokerBufferQueried
|
|
plan.BrokerBufferMessages = stats.BrokerBufferMessages
|
|
plan.BufferStartIndex = stats.BufferStartIndex
|
|
|
|
// Add broker_buffer to data sources if buffer was queried
|
|
if stats.BrokerBufferQueried {
|
|
// Check if broker_buffer is already in data sources
|
|
hasBrokerBuffer := false
|
|
for _, source := range plan.DataSources {
|
|
if source == "broker_buffer" {
|
|
hasBrokerBuffer = true
|
|
break
|
|
}
|
|
}
|
|
if !hasBrokerBuffer {
|
|
plan.DataSources = append(plan.DataSources, "broker_buffer")
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// Normal mode - just get results
|
|
results, err = hybridScanner.Scan(ctx, hybridScanOptions)
|
|
if err != nil {
|
|
return &QueryResult{Error: err}, err
|
|
}
|
|
}
|
|
|
|
// DEBUG: Log scan results
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("AGGREGATION SCAN RESULTS: %d rows returned\n", len(results))
|
|
}
|
|
|
|
// Compute aggregations
|
|
aggResults := e.computeAggregations(results, aggregations)
|
|
|
|
// Build result set
|
|
columns := make([]string, len(aggregations))
|
|
row := make([]sqltypes.Value, len(aggregations))
|
|
|
|
for i, spec := range aggregations {
|
|
columns[i] = spec.Alias
|
|
row[i] = e.formatAggregationResult(spec, aggResults[i])
|
|
}
|
|
|
|
// Apply OFFSET and LIMIT to aggregation results
|
|
// Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
|
|
rows := [][]sqltypes.Value{row}
|
|
if offset > 0 || limit >= 0 {
|
|
// Handle LIMIT 0 first
|
|
if limit == 0 {
|
|
rows = [][]sqltypes.Value{}
|
|
} else {
|
|
// Apply OFFSET first
|
|
if offset > 0 {
|
|
if offset >= len(rows) {
|
|
rows = [][]sqltypes.Value{}
|
|
} else {
|
|
rows = rows[offset:]
|
|
}
|
|
}
|
|
|
|
// Apply LIMIT after OFFSET (only if limit > 0)
|
|
if limit > 0 && len(rows) > limit {
|
|
rows = rows[:limit]
|
|
}
|
|
}
|
|
}
|
|
|
|
result := &QueryResult{
|
|
Columns: columns,
|
|
Rows: rows,
|
|
}
|
|
|
|
// Build execution tree for aggregation queries if plan is provided
|
|
if plan != nil {
|
|
// Populate detailed plan information for full scan (similar to fast path)
|
|
e.populateFullScanPlanDetails(ctx, plan, hybridScanner, stmt)
|
|
plan.RootNode = e.buildExecutionTree(plan, stmt)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// populateFullScanPlanDetails populates detailed plan information for full scan queries
|
|
// This provides consistency with fast path execution plan details
|
|
func (e *SQLEngine) populateFullScanPlanDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, stmt *SelectStatement) {
|
|
// plan.Details is initialized at the start of the SELECT execution
|
|
|
|
// Extract table information
|
|
var database, tableName string
|
|
if len(stmt.From) == 1 {
|
|
if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
|
|
if tableExpr, ok := table.Expr.(TableName); ok {
|
|
tableName = tableExpr.Name.String()
|
|
if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
|
|
database = tableExpr.Qualifier.String()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Use current database if not specified
|
|
if database == "" {
|
|
database = e.catalog.currentDatabase
|
|
if database == "" {
|
|
database = "default"
|
|
}
|
|
}
|
|
|
|
// Discover partitions and populate file details
|
|
if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil {
|
|
// Add partition paths to execution plan details
|
|
plan.Details["partition_paths"] = partitions
|
|
|
|
// Populate detailed file information using shared helper
|
|
e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt)
|
|
} else {
|
|
// Record discovery error to plan for better diagnostics
|
|
plan.Details["error_partition_discovery"] = discoverErr.Error()
|
|
}
|
|
}
|
|
|
|
// tryFastParquetAggregation attempts to compute aggregations using hybrid approach:
|
|
// - Use parquet metadata for parquet files
|
|
// - Count live log files for live data
|
|
// - Combine both for accurate results per partition
|
|
// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used
|
|
func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) {
|
|
return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil, 0, 0, nil)
|
|
}
|
|
|
|
// tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided
|
|
// startTimeNs, stopTimeNs: optional time range filters for parquet file optimization (0 means no filtering)
|
|
// stmt: SELECT statement for column statistics pruning optimization (can be nil)
|
|
func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan, startTimeNs, stopTimeNs int64, stmt *SelectStatement) (*QueryResult, bool) {
|
|
// Use the new modular components
|
|
optimizer := NewFastPathOptimizer(e)
|
|
computer := NewAggregationComputer(e)
|
|
|
|
// Step 1: Determine strategy
|
|
strategy := optimizer.DetermineStrategy(aggregations)
|
|
if !strategy.CanUseFastPath {
|
|
return nil, false
|
|
}
|
|
|
|
// Step 2: Collect data sources with time filtering for parquet file optimization
|
|
dataSources, err := optimizer.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, startTimeNs, stopTimeNs)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
|
|
// Build partition list for aggregation computer
|
|
// Note: discoverTopicPartitions always returns absolute paths
|
|
partitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
|
|
// Debug: Show the hybrid optimization results (only in explain mode)
|
|
if isDebugMode(ctx) && (dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0) {
|
|
partitionsWithLiveLogs := 0
|
|
if dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0 {
|
|
partitionsWithLiveLogs = 1 // Simplified for now
|
|
}
|
|
fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows + %d broker buffer rows from %d partitions\n",
|
|
dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount, partitionsWithLiveLogs)
|
|
}
|
|
|
|
// Step 3: Compute aggregations using fast path
|
|
aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
|
|
// Step 3.5: Validate fast path results (safety check)
|
|
// For simple COUNT(*) queries, ensure we got a reasonable result
|
|
if len(aggregations) == 1 && aggregations[0].Function == FuncCOUNT && aggregations[0].Column == "*" {
|
|
totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
|
|
countResult := aggResults[0].Count
|
|
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Validating fast path: COUNT=%d, Sources=%d\n", countResult, totalRows)
|
|
}
|
|
|
|
if totalRows == 0 && countResult > 0 {
|
|
// Fast path found data but data sources show 0 - this suggests a bug
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Fast path validation failed: COUNT=%d but sources=0\n", countResult)
|
|
}
|
|
return nil, false
|
|
}
|
|
if totalRows > 0 && countResult == 0 {
|
|
// Data sources show data but COUNT is 0 - this also suggests a bug
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Fast path validation failed: sources=%d but COUNT=0\n", totalRows)
|
|
}
|
|
return nil, false
|
|
}
|
|
if countResult != totalRows {
|
|
// Counts don't match - this suggests inconsistent logic
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Fast path validation failed: COUNT=%d != sources=%d\n", countResult, totalRows)
|
|
}
|
|
return nil, false
|
|
}
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Fast path validation passed: COUNT=%d\n", countResult)
|
|
}
|
|
}
|
|
|
|
// Step 4: Populate execution plan if provided (for EXPLAIN queries)
|
|
if plan != nil {
|
|
strategy := optimizer.DetermineStrategy(aggregations)
|
|
builder := &ExecutionPlanBuilder{}
|
|
|
|
// Create a minimal SELECT statement for the plan builder (avoid nil pointer)
|
|
stmt := &SelectStatement{}
|
|
|
|
// Build aggregation plan with fast path strategy
|
|
aggPlan := builder.BuildAggregationPlan(stmt, aggregations, strategy, dataSources)
|
|
|
|
// Copy relevant fields to the main plan
|
|
plan.ExecutionStrategy = aggPlan.ExecutionStrategy
|
|
plan.DataSources = aggPlan.DataSources
|
|
plan.OptimizationsUsed = aggPlan.OptimizationsUsed
|
|
plan.PartitionsScanned = aggPlan.PartitionsScanned
|
|
plan.ParquetFilesScanned = aggPlan.ParquetFilesScanned
|
|
plan.LiveLogFilesScanned = aggPlan.LiveLogFilesScanned
|
|
plan.TotalRowsProcessed = aggPlan.TotalRowsProcessed
|
|
plan.Aggregations = aggPlan.Aggregations
|
|
|
|
// Indicate broker buffer participation for EXPLAIN tree rendering
|
|
if dataSources.BrokerUnflushedCount > 0 {
|
|
plan.BrokerBufferQueried = true
|
|
plan.BrokerBufferMessages = int(dataSources.BrokerUnflushedCount)
|
|
}
|
|
|
|
// Merge details while preserving existing ones
|
|
for key, value := range aggPlan.Details {
|
|
plan.Details[key] = value
|
|
}
|
|
|
|
// Add file path information from the data collection
|
|
plan.Details["partition_paths"] = partitions
|
|
|
|
// Populate detailed file information using shared helper, including time filters for pruning
|
|
plan.Details[PlanDetailStartTimeNs] = startTimeNs
|
|
plan.Details[PlanDetailStopTimeNs] = stopTimeNs
|
|
e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt)
|
|
|
|
// Update counts to match discovered live log files
|
|
if liveLogFiles, ok := plan.Details["live_log_files"].([]string); ok {
|
|
dataSources.LiveLogFilesCount = len(liveLogFiles)
|
|
plan.LiveLogFilesScanned = len(liveLogFiles)
|
|
}
|
|
|
|
// Ensure PartitionsScanned is set so Statistics section appears
|
|
if plan.PartitionsScanned == 0 && len(partitions) > 0 {
|
|
plan.PartitionsScanned = len(partitions)
|
|
}
|
|
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Populated execution plan with fast path strategy\n")
|
|
}
|
|
}
|
|
|
|
// Step 5: Build final query result
|
|
columns := make([]string, len(aggregations))
|
|
row := make([]sqltypes.Value, len(aggregations))
|
|
|
|
for i, spec := range aggregations {
|
|
columns[i] = spec.Alias
|
|
row[i] = e.formatAggregationResult(spec, aggResults[i])
|
|
}
|
|
|
|
result := &QueryResult{
|
|
Columns: columns,
|
|
Rows: [][]sqltypes.Value{row},
|
|
}
|
|
|
|
return result, true
|
|
}
|
|
|
|
// computeAggregations computes aggregation results from a full table scan
|
|
func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult {
|
|
aggResults := make([]AggregationResult, len(aggregations))
|
|
|
|
for i, spec := range aggregations {
|
|
switch spec.Function {
|
|
case FuncCOUNT:
|
|
if spec.Column == "*" {
|
|
aggResults[i].Count = int64(len(results))
|
|
} else {
|
|
count := int64(0)
|
|
for _, result := range results {
|
|
if value := e.findColumnValue(result, spec.Column); value != nil && !e.isNullValue(value) {
|
|
count++
|
|
}
|
|
}
|
|
aggResults[i].Count = count
|
|
}
|
|
|
|
case FuncSUM:
|
|
sum := float64(0)
|
|
for _, result := range results {
|
|
if value := e.findColumnValue(result, spec.Column); value != nil {
|
|
if numValue := e.convertToNumber(value); numValue != nil {
|
|
sum += *numValue
|
|
}
|
|
}
|
|
}
|
|
aggResults[i].Sum = sum
|
|
|
|
case FuncAVG:
|
|
sum := float64(0)
|
|
count := int64(0)
|
|
for _, result := range results {
|
|
if value := e.findColumnValue(result, spec.Column); value != nil {
|
|
if numValue := e.convertToNumber(value); numValue != nil {
|
|
sum += *numValue
|
|
count++
|
|
}
|
|
}
|
|
}
|
|
if count > 0 {
|
|
aggResults[i].Sum = sum / float64(count) // Store average in Sum field
|
|
aggResults[i].Count = count
|
|
}
|
|
|
|
case FuncMIN:
|
|
var min interface{}
|
|
var minValue *schema_pb.Value
|
|
for _, result := range results {
|
|
if value := e.findColumnValue(result, spec.Column); value != nil {
|
|
if minValue == nil || e.compareValues(value, minValue) < 0 {
|
|
minValue = value
|
|
min = e.extractRawValue(value)
|
|
}
|
|
}
|
|
}
|
|
aggResults[i].Min = min
|
|
|
|
case FuncMAX:
|
|
var max interface{}
|
|
var maxValue *schema_pb.Value
|
|
for _, result := range results {
|
|
if value := e.findColumnValue(result, spec.Column); value != nil {
|
|
if maxValue == nil || e.compareValues(value, maxValue) > 0 {
|
|
maxValue = value
|
|
max = e.extractRawValue(value)
|
|
}
|
|
}
|
|
}
|
|
aggResults[i].Max = max
|
|
}
|
|
}
|
|
|
|
return aggResults
|
|
}
|
|
|
|
// canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats
|
|
func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool {
|
|
switch spec.Function {
|
|
case FuncCOUNT:
|
|
return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
|
|
case FuncMIN, FuncMAX:
|
|
return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
|
|
case FuncSUM, FuncAVG:
|
|
// These require scanning actual values, not just min/max
|
|
return false
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// debugHybridScanOptions logs the exact scan options being used
|
|
func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, queryType string) {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("\n=== HYBRID SCAN OPTIONS DEBUG (%s) ===\n", queryType)
|
|
fmt.Printf("StartTimeNs: %d\n", options.StartTimeNs)
|
|
fmt.Printf("StopTimeNs: %d\n", options.StopTimeNs)
|
|
fmt.Printf("Limit: %d\n", options.Limit)
|
|
fmt.Printf("Offset: %d\n", options.Offset)
|
|
fmt.Printf("Predicate: %v\n", options.Predicate != nil)
|
|
fmt.Printf("Columns: %v\n", options.Columns)
|
|
fmt.Printf("==========================================\n")
|
|
}
|
|
}
|