From a8767d5fab6cb687ab52ec66f122fc4de989bc1e Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 7 Dec 2023 13:01:11 +0800 Subject: [PATCH 1/2] executor: fill correlated column value in late materialization filter condition Signed-off-by: Lloyd-Pottiger --- pkg/executor/builder.go | 19 ++++++++++++------- pkg/planner/core/explain.go | 6 +++--- pkg/planner/core/physical_plans.go | 4 ++-- pkg/planner/core/plan_to_pb.go | 8 ++++---- .../tiflash_selection_late_materialization.go | 2 +- 5 files changed, 22 insertions(+), 17 deletions(-) diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index a305c56ae2990..cb867a6690557 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -2972,13 +2972,18 @@ func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expr func (*executorBuilder) corColInDistPlan(plans []plannercore.PhysicalPlan) bool { for _, p := range plans { - x, ok := p.(*plannercore.PhysicalSelection) - if !ok { - continue - } - for _, cond := range x.Conditions { - if len(expression.ExtractCorColumns(cond)) > 0 { - return true + switch x := p.(type) { + case *plannercore.PhysicalSelection: + for _, cond := range x.Conditions { + if len(expression.ExtractCorColumns(cond)) > 0 { + return true + } + } + case *plannercore.PhysicalTableScan: + for _, cond := range x.LateMaterializationFilterCondition { + if len(expression.ExtractCorColumns(cond)) > 0 { + return true + } } } } diff --git a/pkg/planner/core/explain.go b/pkg/planner/core/explain.go index 3991d6a0863c8..689d3072ca012 100644 --- a/pkg/planner/core/explain.go +++ b/pkg/planner/core/explain.go @@ -218,11 +218,11 @@ func (p *PhysicalTableScan) OperatorInfo(normalized bool) string { } if p.SCtx().GetSessionVars().EnableLateMaterialization && len(p.filterCondition) > 0 && p.StoreType == kv.TiFlash { buffer.WriteString("pushed down filter:") - if len(p.lateMaterializationFilterCondition) > 0 { + if len(p.LateMaterializationFilterCondition) > 0 { if normalized { - buffer.Write(expression.SortedExplainNormalizedExpressionList(p.lateMaterializationFilterCondition)) + buffer.Write(expression.SortedExplainNormalizedExpressionList(p.LateMaterializationFilterCondition)) } else { - buffer.Write(expression.SortedExplainExpressionList(p.SCtx(), p.lateMaterializationFilterCondition)) + buffer.Write(expression.SortedExplainExpressionList(p.SCtx(), p.LateMaterializationFilterCondition)) } } else { buffer.WriteString("empty") diff --git a/pkg/planner/core/physical_plans.go b/pkg/planner/core/physical_plans.go index 8626ef7e94b3c..faf39b3ba3505 100644 --- a/pkg/planner/core/physical_plans.go +++ b/pkg/planner/core/physical_plans.go @@ -829,10 +829,10 @@ type PhysicalTableScan struct { // AccessCondition is used to calculate range. AccessCondition []expression.Expression filterCondition []expression.Expression - // lateMaterializationFilterCondition is used to record the filter conditions + // LateMaterializationFilterCondition is used to record the filter conditions // that are pushed down to table scan from selection by late materialization. // TODO: remove this field after we support pushing down selection to coprocessor. - lateMaterializationFilterCondition []expression.Expression + LateMaterializationFilterCondition []expression.Expression Table *model.TableInfo Columns []*model.ColumnInfo diff --git a/pkg/planner/core/plan_to_pb.go b/pkg/planner/core/plan_to_pb.go index 6e5a652ed555a..ee23c81c35036 100644 --- a/pkg/planner/core/plan_to_pb.go +++ b/pkg/planner/core/plan_to_pb.go @@ -248,9 +248,9 @@ func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType) tsExec.KeepOrder = &keepOrder tsExec.IsFastScan = &(ctx.GetSessionVars().TiFlashFastScan) - if len(p.lateMaterializationFilterCondition) > 0 { + if len(p.LateMaterializationFilterCondition) > 0 { client := ctx.GetClient() - conditions, err := expression.ExpressionsToPBList(ctx, p.lateMaterializationFilterCondition, client) + conditions, err := expression.ExpressionsToPBList(ctx, p.LateMaterializationFilterCondition, client) if err != nil { return nil, err } @@ -287,9 +287,9 @@ func (p *PhysicalTableScan) partitionTableScanToPBForFlash(ctx sessionctx.Contex telemetry.CurrentTiflashTableScanWithFastScanCount.Inc() } - if len(p.lateMaterializationFilterCondition) > 0 { + if len(p.LateMaterializationFilterCondition) > 0 { client := ctx.GetClient() - conditions, err := expression.ExpressionsToPBList(ctx, p.lateMaterializationFilterCondition, client) + conditions, err := expression.ExpressionsToPBList(ctx, p.LateMaterializationFilterCondition, client) if err != nil { return nil, err } diff --git a/pkg/planner/core/tiflash_selection_late_materialization.go b/pkg/planner/core/tiflash_selection_late_materialization.go index df84a57a74106..aab128d4166df 100644 --- a/pkg/planner/core/tiflash_selection_late_materialization.go +++ b/pkg/planner/core/tiflash_selection_late_materialization.go @@ -255,7 +255,7 @@ func predicatePushDownToTableScanImpl(sctx sessionctx.Context, physicalSelection // remove the pushed down conditions from selection removeSpecificExprsFromSelection(physicalSelection, selectedConds) // add the pushed down conditions to table scan - physicalTableScan.lateMaterializationFilterCondition = selectedConds + physicalTableScan.LateMaterializationFilterCondition = selectedConds // Update the row count of table scan after pushing down the conditions. physicalTableScan.StatsInfo().RowCount *= selectedSelectivity } From 38a9869ff8def505220446c84f11c9b1074a6175 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Tue, 12 Dec 2023 17:48:49 +0800 Subject: [PATCH 2/2] add some comments Signed-off-by: Lloyd-Pottiger --- pkg/executor/table_reader.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/executor/table_reader.go b/pkg/executor/table_reader.go index 61d82eae3ec48..4d87861c9511b 100644 --- a/pkg/executor/table_reader.go +++ b/pkg/executor/table_reader.go @@ -112,7 +112,8 @@ type TableReaderExecutor struct { byItems []*util.ByItems paging bool storeType kv.StoreType - // corColInFilter tells whether there's correlated column in filter. + // corColInFilter tells whether there's correlated column in filter (both conditions in PhysicalSelection and LateMaterializationFilterCondition in PhysicalTableScan) + // If true, we will need to revise the dagPB (fill correlated column value in filter) each time call Open(). corColInFilter bool // corColInAccess tells whether there's correlated column in access conditions. corColInAccess bool @@ -156,6 +157,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { var err error if e.corColInFilter { + // If there's correlated column in filter, need to rewrite dagPB if e.storeType == kv.TiFlash { execs, err := builder.ConstructTreeBasedDistExec(e.Ctx(), e.tablePlan) if err != nil {