Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: only add default value for final aggregation to fix the aggregate push down (partition) union case #35443

Merged
merged 6 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,3 +1589,43 @@ func TestRandomPanicAggConsume(t *testing.T) {
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")
}
}

func TestIssue35295(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t100")
// This bug only happens on partition prune mode = 'static'
tk.MustExec("set @@tidb_partition_prune_mode = 'static'")
tk.MustExec(`CREATE TABLE t100 (
ID bigint(20) unsigned NOT NULL AUTO_INCREMENT,
col1 int(10) NOT NULL DEFAULT '0' COMMENT 'test',
money bigint(20) NOT NULL COMMENT 'test',
logtime datetime NOT NULL COMMENT '记录时间',
PRIMARY KEY (ID,logtime)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=1 COMMENT='test'
PARTITION BY RANGE COLUMNS(logtime) (
PARTITION p20220608 VALUES LESS THAN ("20220609"),
PARTITION p20220609 VALUES LESS THAN ("20220610"),
PARTITION p20220610 VALUES LESS THAN ("20220611"),
PARTITION p20220611 VALUES LESS THAN ("20220612"),
PARTITION p20220612 VALUES LESS THAN ("20220613"),
PARTITION p20220613 VALUES LESS THAN ("20220614"),
PARTITION p20220614 VALUES LESS THAN ("20220615"),
PARTITION p20220615 VALUES LESS THAN ("20220616"),
PARTITION p20220616 VALUES LESS THAN ("20220617"),
PARTITION p20220617 VALUES LESS THAN ("20220618"),
PARTITION p20220618 VALUES LESS THAN ("20220619"),
PARTITION p20220619 VALUES LESS THAN ("20220620"),
PARTITION p20220620 VALUES LESS THAN ("20220621"),
PARTITION p20220621 VALUES LESS THAN ("20220622"),
PARTITION p20220622 VALUES LESS THAN ("20220623"),
PARTITION p20220623 VALUES LESS THAN ("20220624"),
PARTITION p20220624 VALUES LESS THAN ("20220625")
);`)
tk.MustExec("insert into t100(col1,money,logtime) values (100,10,'2022-06-09 00:00:00');")
tk.MustExec("insert into t100(col1,money,logtime) values (100,10,'2022-06-10 00:00:00');")
tk.MustQuery("SELECT /*+STREAM_AGG()*/ col1,sum(money) FROM t100 WHERE logtime>='2022-06-09 00:00:00' AND col1=100 ;").Check(testkit.Rows("100 20"))
tk.MustQuery("SELECT /*+HASH_AGG()*/ col1,sum(money) FROM t100 WHERE logtime>='2022-06-09 00:00:00' AND col1=100 ;").Check(testkit.Rows("100 20"))
}
10 changes: 8 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,9 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
if v.IsFinalAgg() {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
}
}
for _, aggDesc := range v.AggFuncs {
if aggDesc.HasDistinct || len(aggDesc.OrderByItems) > 0 {
Expand Down Expand Up @@ -1493,10 +1495,14 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu
groupChecker: newVecGroupChecker(b.ctx, v.GroupByItems),
aggFuncs: make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)),
}

if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
// Only do this for final agg, see issue #35295, #30923
if v.IsFinalAgg() {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
}
}
for i, aggDesc := range v.AggFuncs {
aggFunc := aggfuncs.Build(b.ctx, aggDesc, i)
Expand Down
2 changes: 0 additions & 2 deletions expression/aggregation/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ func (a *AggFuncDesc) Split(ordinal []int) (partialAggDesc, finalAggDesc *AggFun
partialAggDesc.Mode = Partial1Mode
} else if a.Mode == FinalMode {
partialAggDesc.Mode = Partial2Mode
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous assumption, there will be no partial mode agg before coprocessor push down...

But now I change the logical plan aggregation to partial, so in the coprocessor push down here, this line may meet partial mode agg.

panic("Error happened during AggFuncDesc.Split, the AggFunctionMode is not CompleteMode or FinalMode.")
}
finalAggDesc = &AggFuncDesc{
Mode: FinalMode, // We only support FinalMode now in final phase.
Expand Down
2 changes: 1 addition & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ type basePhysicalAgg struct {
MppPartitionCols []*property.MPPPartitionColumn
}

func (p *basePhysicalAgg) isFinalAgg() bool {
func (p *basePhysicalAgg) IsFinalAgg() bool {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if len(p.AggFuncs) > 0 {
if p.AggFuncs[0].Mode == aggregation.FinalMode || p.AggFuncs[0].Mode == aggregation.CompleteMode {
return true
Expand Down
10 changes: 10 additions & 0 deletions planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,16 @@ func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAl
if pushedAgg == nil {
return nil
}

// Update the agg mode for the pushed down aggregation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why this update happens?

I don't get the logic to change 'CompleteMode' -> 'Partial1Mode'...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behaviour between partition / final is different.
Take count(id) for example:

a XX                  a 1                         a 1
b YY                  b 1                         b 2
c ZZ                  c 1                         c 2
     (partial mode)==>         (final mode) ==>  

b SS                  b 1
c KK                  c 1

In partial mode, the input is original data and turned into "xx 1" where xx is a b c
In final mode, the input is partial data and accumulated to "xx n"

If you have this data as input:

a 1
b 2
c 2

In partial mode, the output is

a 1
b 1
c 1

In final mode, the output is

a 1
b 2
c 2

Back to the topic, we have push down agg to partition unions.
If the push-downed agg is not made to partial,

create table t (c1 char, c2 int) partition by hash(c1) partitions 3;
insert into t values ('a', 1),('b', 2),('c', 3),('b', 4),('c', 5);
select count(*) from t group by c1;

We get wrong result:

a 1                   a 1                  a 1
b 2                   b 6                  b 6 
c 3  (final agg) =>   c 8  (final agg) =>  c 8
b 4
c 5

The correct result should be:

a 1                     a 1                  a 1
b 2                     b 2                  b 2
c 3  (partial agg) =>   c 2  (final agg) =>  c 2
b 4
c 5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for _, aggFunc := range pushedAgg.AggFuncs {
if aggFunc.Mode == aggregation.CompleteMode {
aggFunc.Mode = aggregation.Partial1Mode
} else if aggFunc.Mode == aggregation.FinalMode {
aggFunc.Mode = aggregation.Partial2Mode
}
}

newChildren := make([]LogicalPlan, 0, len(union.Children()))
for _, child := range union.Children() {
newChild, err := a.pushAggCrossUnion(pushedAgg, union.Schema(), child)
Expand Down
4 changes: 2 additions & 2 deletions planner/core/rule_eliminate_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool {
// passing down the aggregation mode to TiFlash.
if physicalAgg, ok := p.Children()[0].(*PhysicalHashAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
if physicalAgg.IsFinalAgg() {
return false
}
}
}
if physicalAgg, ok := p.Children()[0].(*PhysicalStreamAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
if physicalAgg.IsFinalAgg() {
return false
}
}
Expand Down
22 changes: 19 additions & 3 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,15 @@ func BuildFinalModeAggregation(

finalAggFunc.OrderByItems = byItems
finalAggFunc.HasDistinct = aggFunc.HasDistinct
finalAggFunc.Mode = aggregation.CompleteMode
// In logical optimize phase, the Agg->PartitionUnion->TableReader may become
// Agg1->PartitionUnion->Agg2->TableReader, and the Agg2 is a partial aggregation.
// So in the push down here, we need to add a new if-condition check:
// If the original agg mode is partial already, the finalAggFunc's mode become Partial2.
if aggFunc.Mode == aggregation.CompleteMode {
finalAggFunc.Mode = aggregation.CompleteMode
} else if aggFunc.Mode == aggregation.Partial1Mode || aggFunc.Mode == aggregation.Partial2Mode {
finalAggFunc.Mode = aggregation.Partial2Mode
}
} else {
if aggFunc.Name == ast.AggFuncGroupConcat && len(aggFunc.OrderByItems) > 0 {
// group_concat can only run in one phase if it has order by items but without distinct property
Expand Down Expand Up @@ -1417,7 +1425,15 @@ func BuildFinalModeAggregation(
}
}

finalAggFunc.Mode = aggregation.FinalMode
// In logical optimize phase, the Agg->PartitionUnion->TableReader may become
// Agg1->PartitionUnion->Agg2->TableReader, and the Agg2 is a partial aggregation.
// So in the push down here, we need to add a new if-condition check:
// If the original agg mode is partial already, the finalAggFunc's mode become Partial2.
if aggFunc.Mode == aggregation.CompleteMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand what does this code block mean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old assumption, before the coprocessor push down, the agg mode is either Complete or Final. So after the push down, the parent agg become Final mode.

But in the new assumption, before the coprocessor push down, the agg mode can be partial. So here the parent agg mode is set to different value accordingly.

For final before, we set it to final.
For partial, it's set to partial2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean that when we are in the partition union mode. The Agg->PartitionUnion->TableReader may become Agg->PartitionUnion->Agg->TableReader, then we do the push down. So we need to add a new if-condition check.

You can add some comments here.

Copy link
Contributor Author

@tiancaiamao tiancaiamao Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agg1->PartitionUnion->Agg2->TableReader

In coprocessor push down, Agg2->TableReader become Agg3(root)->TableReader->Agg4(cop)->TableScan(cop), and the final plan become Agg1->PartitionUnion->Agg3(root)->TableReader->Agg4(cop)->TableScan(cop)

In the past, Agg2 is always Complete or Final, but in this PR here Agg2 could be Partial
So BuildFinalModeAggregation need to consider the difference here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add some comment in the code so the reviewer know what happen.

finalAggFunc.Mode = aggregation.FinalMode
} else if aggFunc.Mode == aggregation.Partial1Mode || aggFunc.Mode == aggregation.Partial2Mode {
finalAggFunc.Mode = aggregation.Partial2Mode
}
}

finalAggFunc.Args = args
Expand Down Expand Up @@ -1483,7 +1499,7 @@ func (p *basePhysicalAgg) convertAvgForMPP() *PhysicalProjection {
}
// no avgs
// for final agg, always add project due to in-compatibility between TiDB and TiFlash
if len(p.schema.Columns) == len(newSchema.Columns) && !p.isFinalAgg() {
if len(p.schema.Columns) == len(newSchema.Columns) && !p.IsFinalAgg() {
return nil
}
// add remaining columns to exprs
Expand Down