From ba4a8896be35ffdcc47c208faa86dbfff154c663 Mon Sep 17 00:00:00 2001 From: Jakub Martin Date: Sun, 17 Jul 2022 13:09:41 +0200 Subject: [PATCH] Fix short-circuiting of outer LIMIT. Print records live even if no watermarks are sent, as long as all records have zero event times. If no retractions are possible and we have an `ORDER BY ... LIMIT ...` clause, only store the top N records. --- cmd/root.go | 12 ++- execution/nodes/order_sensitive_transform.go | 9 +- outputs/batch/live_output.go | 97 ++++++++++++-------- physical/nodes.go | 2 +- 4 files changed, 77 insertions(+), 43 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 845b9233..b81589dc 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -371,12 +371,17 @@ octosql "SELECT * FROM plugins.plugins"`, if limitExpression != nil { val, err := (*limitExpression).Evaluate(execCtx) if err != nil { - + return fmt.Errorf("couldn't evaluate limit expression: %w", err) } if val.Int < 0 { return fmt.Errorf("limit must be positive, got %d", val.Int) } limit = &val.Int + + if len(orderByExpressions) == 0 && physicalPlan.Schema.NoRetractions { + // We want short-circuiting. + executionPlan = nodes.NewLimit(executionPlan, *limitExpression) + } } sink = batch.NewOutputPrinter( @@ -384,6 +389,7 @@ octosql "SELECT * FROM plugins.plugins"`, orderByExpressions, logical.DirectionsToMultipliers(outputOptions.OrderByDirections), limit, + physicalPlan.Schema.NoRetractions, outSchema, func(writer io.Writer) batch.Format { return formats.NewTableFormatter(writer) @@ -392,7 +398,7 @@ octosql "SELECT * FROM plugins.plugins"`, ) case "csv", "json": if len(orderByExpressions) > 0 || (limitExpression != nil && !physicalPlan.Schema.NoRetractions) { - executionPlan = nodes.NewOrderSensitiveTransform(executionPlan, orderByExpressions, logical.DirectionsToMultipliers(outputOptions.OrderByDirections), limitExpression) + executionPlan = nodes.NewOrderSensitiveTransform(executionPlan, orderByExpressions, logical.DirectionsToMultipliers(outputOptions.OrderByDirections), limitExpression, physicalPlan.Schema.NoRetractions) } else if limitExpression != nil { executionPlan = nodes.NewLimit(executionPlan, *limitExpression) } @@ -417,7 +423,7 @@ octosql "SELECT * FROM plugins.plugins"`, case "stream_native": if len(orderByExpressions) > 0 || (limitExpression != nil && !physicalPlan.Schema.NoRetractions) { - executionPlan = nodes.NewOrderSensitiveTransform(executionPlan, orderByExpressions, logical.DirectionsToMultipliers(outputOptions.OrderByDirections), limitExpression) + executionPlan = nodes.NewOrderSensitiveTransform(executionPlan, orderByExpressions, logical.DirectionsToMultipliers(outputOptions.OrderByDirections), limitExpression, physicalPlan.Schema.NoRetractions) } else if limitExpression != nil { executionPlan = nodes.NewLimit(executionPlan, *limitExpression) } diff --git a/execution/nodes/order_sensitive_transform.go b/execution/nodes/order_sensitive_transform.go index 95ef970b..55234f63 100644 --- a/execution/nodes/order_sensitive_transform.go +++ b/execution/nodes/order_sensitive_transform.go @@ -15,14 +15,16 @@ type OrderSensitiveTransform struct { orderByKeyExprs []Expression orderByDirectionMultipliers []int limit *Expression + noRetractionsPossible bool } -func NewOrderSensitiveTransform(source Node, orderByKeyExprs []Expression, orderByDirectionMultipliers []int, limit *Expression) *OrderSensitiveTransform { +func NewOrderSensitiveTransform(source Node, orderByKeyExprs []Expression, orderByDirectionMultipliers []int, limit *Expression, noRetractionsPossible bool) *OrderSensitiveTransform { return &OrderSensitiveTransform{ source: source, orderByKeyExprs: orderByKeyExprs, orderByDirectionMultipliers: orderByDirectionMultipliers, limit: limit, + noRetractionsPossible: noRetractionsPossible, } } @@ -110,6 +112,11 @@ func (o *OrderSensitiveTransform) Run(execCtx ExecutionContext, produce ProduceF } else { recordCounts.Delete(itemTyped) } + if limit != nil && o.noRetractionsPossible && recordCounts.Len() > *limit { + // This doesn't mean we'll always keep just the records that are needed, because tree nodes might have count > 1. + // That said, it's a good approximation, and we'll definitely not lose something that we need to have. + recordCounts.DeleteMax() + } return nil }, func(ctx ProduceContext, msg MetadataMessage) error { diff --git a/outputs/batch/live_output.go b/outputs/batch/live_output.go index cd4f1dd7..29433c3e 100644 --- a/outputs/batch/live_output.go +++ b/outputs/batch/live_output.go @@ -21,25 +21,27 @@ type Format interface { } type OutputPrinter struct { - source Node - keyExprs []Expression - directionMultipliers []int - limit *int + source Node + keyExprs []Expression + directionMultipliers []int + limit *int + noRetractionsPossible bool schema physical.Schema format func(io.Writer) Format live bool } -func NewOutputPrinter(source Node, keyExprs []Expression, directionMultipliers []int, limit *int, schema physical.Schema, format func(io.Writer) Format, live bool) *OutputPrinter { +func NewOutputPrinter(source Node, keyExprs []Expression, directionMultipliers []int, limit *int, noRetractionsPossible bool, schema physical.Schema, format func(io.Writer) Format, live bool) *OutputPrinter { return &OutputPrinter{ - source: source, - keyExprs: keyExprs, - directionMultipliers: directionMultipliers, - limit: limit, - schema: schema, - format: format, - live: live, + source: source, + keyExprs: keyExprs, + directionMultipliers: directionMultipliers, + limit: limit, + noRetractionsPossible: noRetractionsPossible, + schema: schema, + format: format, + live: live, } } @@ -78,6 +80,39 @@ func (o *OutputPrinter) Run(execCtx ExecutionContext) error { liveWriter := uilive.New() lastUpdate := time.Now() + onlyZeroEventTimesSeen := true + + printTable := func() { + lastUpdate = time.Now() + var buf bytes.Buffer + + format := o.format(&buf) + format.SetSchema(o.schema) + + i := 0 + recordCounts.Ascend(func(item btree.Item) bool { + itemTyped := item.(*outputItem) + for j := 0; j < itemTyped.Count; j++ { + if o.limit != nil && i == *o.limit { + return false + } + i++ + + format.Write(itemTyped.Values) + } + return true + }) + + format.Close() + + if !watermark.IsZero() { + fmt.Fprintf(&buf, "watermark: %s\n", watermark.Format(time.RFC3339Nano)) + } + + buf.WriteTo(liveWriter) + liveWriter.Flush() + } + if err := o.source.Run( execCtx, func(ctx ProduceContext, record Record) error { @@ -119,6 +154,17 @@ func (o *OutputPrinter) Run(execCtx ExecutionContext) error { } else { recordCounts.Delete(itemTyped) } + if onlyZeroEventTimesSeen && !record.EventTime.IsZero() { + onlyZeroEventTimesSeen = false + } + if o.limit != nil && o.noRetractionsPossible && recordCounts.Len() > *o.limit { + // This doesn't mean we'll always keep just the records that are needed, because tree nodes might have count > 1. + // That said, it's a good approximation, and we'll definitely not lose something that we need to have. + recordCounts.DeleteMax() + } + if o.live && onlyZeroEventTimesSeen && time.Since(lastUpdate) > time.Second/4 && !record.Retraction /*This last bit just makes the output less jittery*/ { + printTable() + } return nil }, func(ctx ProduceContext, msg MetadataMessage) error { @@ -126,32 +172,7 @@ func (o *OutputPrinter) Run(execCtx ExecutionContext) error { // Print table if o.live && time.Since(lastUpdate) > time.Second/4 { - lastUpdate = time.Now() - var buf bytes.Buffer - - format := o.format(&buf) - format.SetSchema(o.schema) - - i := 0 - recordCounts.Ascend(func(item btree.Item) bool { - itemTyped := item.(*outputItem) - for j := 0; j < itemTyped.Count; j++ { - if o.limit != nil && i == *o.limit { - return false - } - i++ - - format.Write(itemTyped.Values) - } - return true - }) - - format.Close() - - fmt.Fprintf(&buf, "watermark: %s\n", watermark.Format(time.RFC3339Nano)) - - buf.WriteTo(liveWriter) - liveWriter.Flush() + printTable() } return nil }, diff --git a/physical/nodes.go b/physical/nodes.go index 5131e924..89e917ad 100644 --- a/physical/nodes.go +++ b/physical/nodes.go @@ -478,7 +478,7 @@ func (node *Node) Materialize(ctx context.Context, env Environment) (execution.N } if len(orderByKeyExprs) > 0 || (limit != nil && !node.OrderSensitiveTransform.Source.Schema.NoRetractions) { - return nodes.NewOrderSensitiveTransform(source, orderByKeyExprs, node.OrderSensitiveTransform.OrderByDirectionMultipliers, limit), nil + return nodes.NewOrderSensitiveTransform(source, orderByKeyExprs, node.OrderSensitiveTransform.OrderByDirectionMultipliers, limit, node.OrderSensitiveTransform.Source.Schema.NoRetractions), nil } if limit != nil {