1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2025-09-19 01:30:23 +02:00
seaweedfs/weed/query/engine/aggregations.go
Chris Lu 58e0c1b330
Fix sql bugs (#7219)
* fix nil when explaining

* add plain details when running full scan

* skip files by timestamp

* skip file by timestamp

* refactor

* handle filter by time

* skip broker memory only if it has unflushed messages

* refactoring

* refactor

* address comments

* address comments

* filter by parquet stats

* simplify

* refactor

* prune old code

* optimize

* Update aggregations.go

* ensure non-time predicates are properly detected

* add stmt to populatePlanFileDetails

This helper function is a great way to centralize logic for populating file details. However, it's missing an optimization that is present in executeSelectStatementWithBrokerStats: pruning Parquet files based on column statistics from the WHERE clause.

Aggregation queries that fall back to the slow path could benefit from this optimization. Consider modifying the function signature to accept the *SelectStatement and adding the column statistics pruning logic here, similar to how it's done in executeSelectStatementWithBrokerStats.

* refactoring to work with *schema_pb.Value directly after the initial conversion
2025-09-10 11:04:42 -07:00

933 lines
31 KiB
Go

package engine
import (
"context"
"fmt"
"math"
"strconv"
"strings"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
)
// AggregationSpec defines an aggregation function to be computed
type AggregationSpec struct {
Function string // COUNT, SUM, AVG, MIN, MAX
Column string // Column name, or "*" for COUNT(*)
Alias string // Optional alias for the result column
Distinct bool // Support for DISTINCT keyword
}
// AggregationResult holds the computed result of an aggregation
type AggregationResult struct {
Count int64
Sum float64
Min interface{}
Max interface{}
}
// AggregationStrategy represents the strategy for executing aggregations
type AggregationStrategy struct {
CanUseFastPath bool
Reason string
UnsupportedSpecs []AggregationSpec
}
// TopicDataSources represents the data sources available for a topic
type TopicDataSources struct {
ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats
ParquetRowCount int64
LiveLogRowCount int64
LiveLogFilesCount int // Total count of live log files across all partitions
PartitionsCount int
BrokerUnflushedCount int64
}
// FastPathOptimizer handles fast path aggregation optimization decisions
type FastPathOptimizer struct {
engine *SQLEngine
}
// NewFastPathOptimizer creates a new fast path optimizer
func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer {
return &FastPathOptimizer{engine: engine}
}
// DetermineStrategy analyzes aggregations and determines if fast path can be used
func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy {
strategy := AggregationStrategy{
CanUseFastPath: true,
Reason: "all_aggregations_supported",
UnsupportedSpecs: []AggregationSpec{},
}
for _, spec := range aggregations {
if !opt.engine.canUseParquetStatsForAggregation(spec) {
strategy.CanUseFastPath = false
strategy.Reason = "unsupported_aggregation_functions"
strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec)
}
}
return strategy
}
// CollectDataSources gathers information about available data sources for a topic
func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) {
return opt.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, 0, 0)
}
// CollectDataSourcesWithTimeFilter gathers information about available data sources for a topic
// with optional time filtering to skip irrelevant parquet files
func (opt *FastPathOptimizer) CollectDataSourcesWithTimeFilter(ctx context.Context, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) (*TopicDataSources, error) {
dataSources := &TopicDataSources{
ParquetFiles: make(map[string][]*ParquetFileStats),
ParquetRowCount: 0,
LiveLogRowCount: 0,
LiveLogFilesCount: 0,
PartitionsCount: 0,
}
if isDebugMode(ctx) {
fmt.Printf("Collecting data sources for: %s/%s\n", hybridScanner.topic.Namespace, hybridScanner.topic.Name)
}
// Discover partitions for the topic
partitionPaths, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
if err != nil {
if isDebugMode(ctx) {
fmt.Printf("ERROR: Partition discovery failed: %v\n", err)
}
return dataSources, DataSourceError{
Source: "partition_discovery",
Cause: err,
}
}
// DEBUG: Log discovered partitions
if isDebugMode(ctx) {
fmt.Printf("Discovered %d partitions: %v\n", len(partitionPaths), partitionPaths)
}
// Collect stats from each partition
// Note: discoverTopicPartitions always returns absolute paths starting with "/topics/"
for _, partitionPath := range partitionPaths {
if isDebugMode(ctx) {
fmt.Printf("\nProcessing partition: %s\n", partitionPath)
}
// Read parquet file statistics
parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath)
if err != nil {
if isDebugMode(ctx) {
fmt.Printf(" ERROR: Failed to read parquet statistics: %v\n", err)
}
} else if len(parquetStats) == 0 {
if isDebugMode(ctx) {
fmt.Printf(" No parquet files found in partition\n")
}
} else {
// Prune by time range using parquet column statistics
filtered := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
dataSources.ParquetFiles[partitionPath] = filtered
partitionParquetRows := int64(0)
for _, stat := range filtered {
partitionParquetRows += stat.RowCount
dataSources.ParquetRowCount += stat.RowCount
}
if isDebugMode(ctx) {
fmt.Printf(" Found %d parquet files with %d total rows\n", len(filtered), partitionParquetRows)
}
}
// Count live log files (excluding those converted to parquet)
parquetSources := opt.engine.extractParquetSourceFiles(dataSources.ParquetFiles[partitionPath])
liveLogCount, liveLogErr := opt.engine.countLiveLogRowsExcludingParquetSources(ctx, partitionPath, parquetSources)
if liveLogErr != nil {
if isDebugMode(ctx) {
fmt.Printf(" ERROR: Failed to count live log rows: %v\n", liveLogErr)
}
} else {
dataSources.LiveLogRowCount += liveLogCount
if isDebugMode(ctx) {
fmt.Printf(" Found %d live log rows (excluding %d parquet sources)\n", liveLogCount, len(parquetSources))
}
}
// Count live log files for partition with proper range values
// Extract partition name from absolute path (e.g., "0000-2520" from "/topics/.../v2025.../0000-2520")
partitionName := partitionPath[strings.LastIndex(partitionPath, "/")+1:]
partitionParts := strings.Split(partitionName, "-")
if len(partitionParts) == 2 {
rangeStart, err1 := strconv.Atoi(partitionParts[0])
rangeStop, err2 := strconv.Atoi(partitionParts[1])
if err1 == nil && err2 == nil {
partition := topic.Partition{
RangeStart: int32(rangeStart),
RangeStop: int32(rangeStop),
}
liveLogFileCount, err := hybridScanner.countLiveLogFiles(partition)
if err == nil {
dataSources.LiveLogFilesCount += liveLogFileCount
}
// Count broker unflushed messages for this partition
if hybridScanner.brokerClient != nil {
entries, err := hybridScanner.brokerClient.GetUnflushedMessages(ctx, hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition, 0)
if err == nil {
dataSources.BrokerUnflushedCount += int64(len(entries))
if isDebugMode(ctx) {
fmt.Printf(" Found %d unflushed broker messages\n", len(entries))
}
} else if isDebugMode(ctx) {
fmt.Printf(" ERROR: Failed to get unflushed broker messages: %v\n", err)
}
}
}
}
}
dataSources.PartitionsCount = len(partitionPaths)
if isDebugMode(ctx) {
fmt.Printf("Data sources collected: %d partitions, %d parquet rows, %d live log rows, %d broker buffer rows\n",
dataSources.PartitionsCount, dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount)
}
return dataSources, nil
}
// AggregationComputer handles the computation of aggregations using fast path
type AggregationComputer struct {
engine *SQLEngine
}
// NewAggregationComputer creates a new aggregation computer
func NewAggregationComputer(engine *SQLEngine) *AggregationComputer {
return &AggregationComputer{engine: engine}
}
// ComputeFastPathAggregations computes aggregations using parquet statistics and live log data
func (comp *AggregationComputer) ComputeFastPathAggregations(
ctx context.Context,
aggregations []AggregationSpec,
dataSources *TopicDataSources,
partitions []string,
) ([]AggregationResult, error) {
aggResults := make([]AggregationResult, len(aggregations))
for i, spec := range aggregations {
switch spec.Function {
case FuncCOUNT:
if spec.Column == "*" {
aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
} else {
// For specific columns, we might need to account for NULLs in the future
aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
}
case FuncMIN:
globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions)
if err != nil {
return nil, AggregationError{
Operation: spec.Function,
Column: spec.Column,
Cause: err,
}
}
aggResults[i].Min = globalMin
case FuncMAX:
globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions)
if err != nil {
return nil, AggregationError{
Operation: spec.Function,
Column: spec.Column,
Cause: err,
}
}
aggResults[i].Max = globalMax
default:
return nil, OptimizationError{
Strategy: "fast_path_aggregation",
Reason: fmt.Sprintf("unsupported aggregation function: %s", spec.Function),
}
}
}
return aggResults, nil
}
// computeGlobalMin computes the global minimum value across all data sources
func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
var globalMin interface{}
var globalMinValue *schema_pb.Value
hasParquetStats := false
// Step 1: Get minimum from parquet statistics
for _, fileStats := range dataSources.ParquetFiles {
for _, fileStat := range fileStats {
// Try case-insensitive column lookup
var colStats *ParquetColumnStats
var found bool
// First try exact match
if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
colStats = stats
found = true
} else {
// Try case-insensitive lookup
for colName, stats := range fileStat.ColumnStats {
if strings.EqualFold(colName, spec.Column) {
colStats = stats
found = true
break
}
}
}
if found && colStats != nil && colStats.MinValue != nil {
if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 {
globalMinValue = colStats.MinValue
extractedValue := comp.engine.extractRawValue(colStats.MinValue)
if extractedValue != nil {
globalMin = extractedValue
hasParquetStats = true
}
}
}
}
}
// Step 2: Get minimum from live log data (only if no live logs or if we need to compare)
if dataSources.LiveLogRowCount > 0 {
for _, partition := range partitions {
partitionParquetSources := make(map[string]bool)
if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
}
liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
if err != nil {
continue // Skip partitions with errors
}
if liveLogMin != nil {
if globalMin == nil {
globalMin = liveLogMin
} else {
liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin)
if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 {
globalMin = liveLogMin
globalMinValue = liveLogSchemaValue
}
}
}
}
}
// Step 3: Handle system columns if no regular data found
if globalMin == nil && !hasParquetStats {
globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles)
}
return globalMin, nil
}
// computeGlobalMax computes the global maximum value across all data sources
func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
var globalMax interface{}
var globalMaxValue *schema_pb.Value
hasParquetStats := false
// Step 1: Get maximum from parquet statistics
for _, fileStats := range dataSources.ParquetFiles {
for _, fileStat := range fileStats {
// Try case-insensitive column lookup
var colStats *ParquetColumnStats
var found bool
// First try exact match
if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
colStats = stats
found = true
} else {
// Try case-insensitive lookup
for colName, stats := range fileStat.ColumnStats {
if strings.EqualFold(colName, spec.Column) {
colStats = stats
found = true
break
}
}
}
if found && colStats != nil && colStats.MaxValue != nil {
if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 {
globalMaxValue = colStats.MaxValue
extractedValue := comp.engine.extractRawValue(colStats.MaxValue)
if extractedValue != nil {
globalMax = extractedValue
hasParquetStats = true
}
}
}
}
}
// Step 2: Get maximum from live log data (only if live logs exist)
if dataSources.LiveLogRowCount > 0 {
for _, partition := range partitions {
partitionParquetSources := make(map[string]bool)
if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
}
_, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
if err != nil {
continue // Skip partitions with errors
}
if liveLogMax != nil {
if globalMax == nil {
globalMax = liveLogMax
} else {
liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax)
if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 {
globalMax = liveLogMax
globalMaxValue = liveLogSchemaValue
}
}
}
}
}
// Step 3: Handle system columns if no regular data found
if globalMax == nil && !hasParquetStats {
globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles)
}
return globalMax, nil
}
// executeAggregationQuery handles SELECT queries with aggregation functions
func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement) (*QueryResult, error) {
return e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, nil)
}
// executeAggregationQueryWithPlan handles SELECT queries with aggregation functions and populates execution plan
func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
// Parse LIMIT and OFFSET for aggregation results (do this first)
// Use -1 to distinguish "no LIMIT" from "LIMIT 0"
limit := -1
offset := 0
if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
if limitExpr, ok := stmt.Limit.Rowcount.(*SQLVal); ok && limitExpr.Type == IntVal {
if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil {
if limit64 > int64(math.MaxInt) || limit64 < 0 {
return nil, fmt.Errorf("LIMIT value %d is out of range", limit64)
}
// Safe conversion after bounds check
limit = int(limit64)
}
}
}
if stmt.Limit != nil && stmt.Limit.Offset != nil {
if offsetExpr, ok := stmt.Limit.Offset.(*SQLVal); ok && offsetExpr.Type == IntVal {
if offset64, err := strconv.ParseInt(string(offsetExpr.Val), 10, 64); err == nil {
if offset64 > int64(math.MaxInt) || offset64 < 0 {
return nil, fmt.Errorf("OFFSET value %d is out of range", offset64)
}
// Safe conversion after bounds check
offset = int(offset64)
}
}
}
// Parse WHERE clause for filtering
var predicate func(*schema_pb.RecordValue) bool
var err error
if stmt.Where != nil {
predicate, err = e.buildPredicate(stmt.Where.Expr)
if err != nil {
return &QueryResult{Error: err}, err
}
}
// Extract time filters and validate that WHERE clause contains only time-based predicates
startTimeNs, stopTimeNs := int64(0), int64(0)
onlyTimePredicates := true
if stmt.Where != nil {
startTimeNs, stopTimeNs, onlyTimePredicates = e.extractTimeFiltersWithValidation(stmt.Where.Expr)
}
// FAST PATH WITH TIME-BASED OPTIMIZATION:
// Allow fast path only for queries without WHERE clause or with time-only WHERE clauses
// This prevents incorrect results when non-time predicates are present
canAttemptFastPath := stmt.Where == nil || onlyTimePredicates
if canAttemptFastPath {
if isDebugMode(ctx) {
if stmt.Where == nil {
fmt.Printf("\nFast path optimization attempt (no WHERE clause)...\n")
} else {
fmt.Printf("\nFast path optimization attempt (time-only WHERE clause)...\n")
}
}
fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan, startTimeNs, stopTimeNs, stmt)
if canOptimize {
if isDebugMode(ctx) {
fmt.Printf("Fast path optimization succeeded!\n")
}
return fastResult, nil
} else {
if isDebugMode(ctx) {
fmt.Printf("Fast path optimization failed, falling back to slow path\n")
}
}
} else {
if isDebugMode(ctx) {
fmt.Printf("Fast path not applicable due to complex WHERE clause\n")
}
}
// SLOW PATH: Fall back to full table scan
if isDebugMode(ctx) {
fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n")
}
// Extract columns needed for aggregations
columnsNeeded := make(map[string]bool)
for _, spec := range aggregations {
if spec.Column != "*" {
columnsNeeded[spec.Column] = true
}
}
// Convert to slice
var scanColumns []string
if len(columnsNeeded) > 0 {
scanColumns = make([]string, 0, len(columnsNeeded))
for col := range columnsNeeded {
scanColumns = append(scanColumns, col)
}
}
// If no specific columns needed (COUNT(*) only), don't specify columns (scan all)
// Build scan options for full table scan (aggregations need all data during scanning)
hybridScanOptions := HybridScanOptions{
StartTimeNs: startTimeNs,
StopTimeNs: stopTimeNs,
Limit: -1, // Use -1 to mean "no limit" - need all data for aggregation
Offset: 0, // No offset during scanning - OFFSET applies to final results
Predicate: predicate,
Columns: scanColumns, // Include columns needed for aggregation functions
}
// DEBUG: Log scan options for aggregation
debugHybridScanOptions(ctx, hybridScanOptions, "AGGREGATION")
// Execute the hybrid scan to get all matching records
var results []HybridScanResult
if plan != nil {
// EXPLAIN mode - capture broker buffer stats
var stats *HybridScanStats
results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions)
if err != nil {
return &QueryResult{Error: err}, err
}
// Populate plan with broker buffer information
if stats != nil {
plan.BrokerBufferQueried = stats.BrokerBufferQueried
plan.BrokerBufferMessages = stats.BrokerBufferMessages
plan.BufferStartIndex = stats.BufferStartIndex
// Add broker_buffer to data sources if buffer was queried
if stats.BrokerBufferQueried {
// Check if broker_buffer is already in data sources
hasBrokerBuffer := false
for _, source := range plan.DataSources {
if source == "broker_buffer" {
hasBrokerBuffer = true
break
}
}
if !hasBrokerBuffer {
plan.DataSources = append(plan.DataSources, "broker_buffer")
}
}
}
} else {
// Normal mode - just get results
results, err = hybridScanner.Scan(ctx, hybridScanOptions)
if err != nil {
return &QueryResult{Error: err}, err
}
}
// DEBUG: Log scan results
if isDebugMode(ctx) {
fmt.Printf("AGGREGATION SCAN RESULTS: %d rows returned\n", len(results))
}
// Compute aggregations
aggResults := e.computeAggregations(results, aggregations)
// Build result set
columns := make([]string, len(aggregations))
row := make([]sqltypes.Value, len(aggregations))
for i, spec := range aggregations {
columns[i] = spec.Alias
row[i] = e.formatAggregationResult(spec, aggResults[i])
}
// Apply OFFSET and LIMIT to aggregation results
// Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
rows := [][]sqltypes.Value{row}
if offset > 0 || limit >= 0 {
// Handle LIMIT 0 first
if limit == 0 {
rows = [][]sqltypes.Value{}
} else {
// Apply OFFSET first
if offset > 0 {
if offset >= len(rows) {
rows = [][]sqltypes.Value{}
} else {
rows = rows[offset:]
}
}
// Apply LIMIT after OFFSET (only if limit > 0)
if limit > 0 && len(rows) > limit {
rows = rows[:limit]
}
}
}
result := &QueryResult{
Columns: columns,
Rows: rows,
}
// Build execution tree for aggregation queries if plan is provided
if plan != nil {
// Populate detailed plan information for full scan (similar to fast path)
e.populateFullScanPlanDetails(ctx, plan, hybridScanner, stmt)
plan.RootNode = e.buildExecutionTree(plan, stmt)
}
return result, nil
}
// populateFullScanPlanDetails populates detailed plan information for full scan queries
// This provides consistency with fast path execution plan details
func (e *SQLEngine) populateFullScanPlanDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, stmt *SelectStatement) {
// plan.Details is initialized at the start of the SELECT execution
// Extract table information
var database, tableName string
if len(stmt.From) == 1 {
if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
if tableExpr, ok := table.Expr.(TableName); ok {
tableName = tableExpr.Name.String()
if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
database = tableExpr.Qualifier.String()
}
}
}
}
// Use current database if not specified
if database == "" {
database = e.catalog.currentDatabase
if database == "" {
database = "default"
}
}
// Discover partitions and populate file details
if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil {
// Add partition paths to execution plan details
plan.Details["partition_paths"] = partitions
// Populate detailed file information using shared helper
e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt)
} else {
// Record discovery error to plan for better diagnostics
plan.Details["error_partition_discovery"] = discoverErr.Error()
}
}
// tryFastParquetAggregation attempts to compute aggregations using hybrid approach:
// - Use parquet metadata for parquet files
// - Count live log files for live data
// - Combine both for accurate results per partition
// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used
func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) {
return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil, 0, 0, nil)
}
// tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided
// startTimeNs, stopTimeNs: optional time range filters for parquet file optimization (0 means no filtering)
// stmt: SELECT statement for column statistics pruning optimization (can be nil)
func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan, startTimeNs, stopTimeNs int64, stmt *SelectStatement) (*QueryResult, bool) {
// Use the new modular components
optimizer := NewFastPathOptimizer(e)
computer := NewAggregationComputer(e)
// Step 1: Determine strategy
strategy := optimizer.DetermineStrategy(aggregations)
if !strategy.CanUseFastPath {
return nil, false
}
// Step 2: Collect data sources with time filtering for parquet file optimization
dataSources, err := optimizer.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, startTimeNs, stopTimeNs)
if err != nil {
return nil, false
}
// Build partition list for aggregation computer
// Note: discoverTopicPartitions always returns absolute paths
partitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
if err != nil {
return nil, false
}
// Debug: Show the hybrid optimization results (only in explain mode)
if isDebugMode(ctx) && (dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0) {
partitionsWithLiveLogs := 0
if dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0 {
partitionsWithLiveLogs = 1 // Simplified for now
}
fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows + %d broker buffer rows from %d partitions\n",
dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount, partitionsWithLiveLogs)
}
// Step 3: Compute aggregations using fast path
aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
if err != nil {
return nil, false
}
// Step 3.5: Validate fast path results (safety check)
// For simple COUNT(*) queries, ensure we got a reasonable result
if len(aggregations) == 1 && aggregations[0].Function == FuncCOUNT && aggregations[0].Column == "*" {
totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
countResult := aggResults[0].Count
if isDebugMode(ctx) {
fmt.Printf("Validating fast path: COUNT=%d, Sources=%d\n", countResult, totalRows)
}
if totalRows == 0 && countResult > 0 {
// Fast path found data but data sources show 0 - this suggests a bug
if isDebugMode(ctx) {
fmt.Printf("Fast path validation failed: COUNT=%d but sources=0\n", countResult)
}
return nil, false
}
if totalRows > 0 && countResult == 0 {
// Data sources show data but COUNT is 0 - this also suggests a bug
if isDebugMode(ctx) {
fmt.Printf("Fast path validation failed: sources=%d but COUNT=0\n", totalRows)
}
return nil, false
}
if countResult != totalRows {
// Counts don't match - this suggests inconsistent logic
if isDebugMode(ctx) {
fmt.Printf("Fast path validation failed: COUNT=%d != sources=%d\n", countResult, totalRows)
}
return nil, false
}
if isDebugMode(ctx) {
fmt.Printf("Fast path validation passed: COUNT=%d\n", countResult)
}
}
// Step 4: Populate execution plan if provided (for EXPLAIN queries)
if plan != nil {
strategy := optimizer.DetermineStrategy(aggregations)
builder := &ExecutionPlanBuilder{}
// Create a minimal SELECT statement for the plan builder (avoid nil pointer)
stmt := &SelectStatement{}
// Build aggregation plan with fast path strategy
aggPlan := builder.BuildAggregationPlan(stmt, aggregations, strategy, dataSources)
// Copy relevant fields to the main plan
plan.ExecutionStrategy = aggPlan.ExecutionStrategy
plan.DataSources = aggPlan.DataSources
plan.OptimizationsUsed = aggPlan.OptimizationsUsed
plan.PartitionsScanned = aggPlan.PartitionsScanned
plan.ParquetFilesScanned = aggPlan.ParquetFilesScanned
plan.LiveLogFilesScanned = aggPlan.LiveLogFilesScanned
plan.TotalRowsProcessed = aggPlan.TotalRowsProcessed
plan.Aggregations = aggPlan.Aggregations
// Indicate broker buffer participation for EXPLAIN tree rendering
if dataSources.BrokerUnflushedCount > 0 {
plan.BrokerBufferQueried = true
plan.BrokerBufferMessages = int(dataSources.BrokerUnflushedCount)
}
// Merge details while preserving existing ones
for key, value := range aggPlan.Details {
plan.Details[key] = value
}
// Add file path information from the data collection
plan.Details["partition_paths"] = partitions
// Populate detailed file information using shared helper, including time filters for pruning
plan.Details[PlanDetailStartTimeNs] = startTimeNs
plan.Details[PlanDetailStopTimeNs] = stopTimeNs
e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt)
// Update counts to match discovered live log files
if liveLogFiles, ok := plan.Details["live_log_files"].([]string); ok {
dataSources.LiveLogFilesCount = len(liveLogFiles)
plan.LiveLogFilesScanned = len(liveLogFiles)
}
// Ensure PartitionsScanned is set so Statistics section appears
if plan.PartitionsScanned == 0 && len(partitions) > 0 {
plan.PartitionsScanned = len(partitions)
}
if isDebugMode(ctx) {
fmt.Printf("Populated execution plan with fast path strategy\n")
}
}
// Step 5: Build final query result
columns := make([]string, len(aggregations))
row := make([]sqltypes.Value, len(aggregations))
for i, spec := range aggregations {
columns[i] = spec.Alias
row[i] = e.formatAggregationResult(spec, aggResults[i])
}
result := &QueryResult{
Columns: columns,
Rows: [][]sqltypes.Value{row},
}
return result, true
}
// computeAggregations computes aggregation results from a full table scan
func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult {
aggResults := make([]AggregationResult, len(aggregations))
for i, spec := range aggregations {
switch spec.Function {
case FuncCOUNT:
if spec.Column == "*" {
aggResults[i].Count = int64(len(results))
} else {
count := int64(0)
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil && !e.isNullValue(value) {
count++
}
}
aggResults[i].Count = count
}
case FuncSUM:
sum := float64(0)
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if numValue := e.convertToNumber(value); numValue != nil {
sum += *numValue
}
}
}
aggResults[i].Sum = sum
case FuncAVG:
sum := float64(0)
count := int64(0)
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if numValue := e.convertToNumber(value); numValue != nil {
sum += *numValue
count++
}
}
}
if count > 0 {
aggResults[i].Sum = sum / float64(count) // Store average in Sum field
aggResults[i].Count = count
}
case FuncMIN:
var min interface{}
var minValue *schema_pb.Value
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if minValue == nil || e.compareValues(value, minValue) < 0 {
minValue = value
min = e.extractRawValue(value)
}
}
}
aggResults[i].Min = min
case FuncMAX:
var max interface{}
var maxValue *schema_pb.Value
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if maxValue == nil || e.compareValues(value, maxValue) > 0 {
maxValue = value
max = e.extractRawValue(value)
}
}
}
aggResults[i].Max = max
}
}
return aggResults
}
// canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats
func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool {
switch spec.Function {
case FuncCOUNT:
return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
case FuncMIN, FuncMAX:
return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
case FuncSUM, FuncAVG:
// These require scanning actual values, not just min/max
return false
default:
return false
}
}
// debugHybridScanOptions logs the exact scan options being used
func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, queryType string) {
if isDebugMode(ctx) {
fmt.Printf("\n=== HYBRID SCAN OPTIONS DEBUG (%s) ===\n", queryType)
fmt.Printf("StartTimeNs: %d\n", options.StartTimeNs)
fmt.Printf("StopTimeNs: %d\n", options.StopTimeNs)
fmt.Printf("Limit: %d\n", options.Limit)
fmt.Printf("Offset: %d\n", options.Offset)
fmt.Printf("Predicate: %v\n", options.Predicate != nil)
fmt.Printf("Columns: %v\n", options.Columns)
fmt.Printf("==========================================\n")
}
}