diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 369a1572d8580..dd264592fb5fb 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1530,3 +1530,103 @@ func (s *testSerialSuite) TestAggInDisk(c *C) { tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t;").Check(testkit.Rows("0")) tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t group by c1;").Check(testkit.Rows()) } +<<<<<<< HEAD +======= + +func TestRandomPanicAggConsume(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_max_chunk_size=32") + tk.MustExec("set @@tidb_init_chunk_size=1") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + for i := 0; i <= 1000; i++ { + tk.MustExec(fmt.Sprintf("insert into t values(%v),(%v),(%v)", i, i, i)) + } + + fpName := "github.com/pingcap/tidb/executor/ConsumeRandomPanic" + require.NoError(t, failpoint.Enable(fpName, "5%panic(\"ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]\")")) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + + // Test 10 times panic for each AggExec. + var res sqlexec.RecordSet + for i := 1; i <= 10; i++ { + var err error + for err == nil { + // Test paralleled hash agg. + res, err = tk.Exec("select /*+ HASH_AGG() */ count(a) from t group by a") + if err == nil { + _, err = session.GetRows4Test(context.Background(), tk.Session(), res) + require.NoError(t, res.Close()) + } + } + require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") + + err = nil + for err == nil { + // Test unparalleled hash agg. + res, err = tk.Exec("select /*+ HASH_AGG() */ count(distinct a) from t") + if err == nil { + _, err = session.GetRows4Test(context.Background(), tk.Session(), res) + require.NoError(t, res.Close()) + } + } + require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") + + err = nil + for err == nil { + // Test stream agg. + res, err = tk.Exec("select /*+ STREAM_AGG() */ count(a) from t") + if err == nil { + _, err = session.GetRows4Test(context.Background(), tk.Session(), res) + require.NoError(t, res.Close()) + } + } + require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") + } +} + +func TestIssue35295(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t100") + // This bug only happens on partition prune mode = 'static' + tk.MustExec("set @@tidb_partition_prune_mode = 'static'") + tk.MustExec(`CREATE TABLE t100 ( +ID bigint(20) unsigned NOT NULL AUTO_INCREMENT, +col1 int(10) NOT NULL DEFAULT '0' COMMENT 'test', +money bigint(20) NOT NULL COMMENT 'test', +logtime datetime NOT NULL COMMENT '记录时间', +PRIMARY KEY (ID,logtime) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=1 COMMENT='test' +PARTITION BY RANGE COLUMNS(logtime) ( +PARTITION p20220608 VALUES LESS THAN ("20220609"), +PARTITION p20220609 VALUES LESS THAN ("20220610"), +PARTITION p20220610 VALUES LESS THAN ("20220611"), +PARTITION p20220611 VALUES LESS THAN ("20220612"), +PARTITION p20220612 VALUES LESS THAN ("20220613"), +PARTITION p20220613 VALUES LESS THAN ("20220614"), +PARTITION p20220614 VALUES LESS THAN ("20220615"), +PARTITION p20220615 VALUES LESS THAN ("20220616"), +PARTITION p20220616 VALUES LESS THAN ("20220617"), +PARTITION p20220617 VALUES LESS THAN ("20220618"), +PARTITION p20220618 VALUES LESS THAN ("20220619"), +PARTITION p20220619 VALUES LESS THAN ("20220620"), +PARTITION p20220620 VALUES LESS THAN ("20220621"), +PARTITION p20220621 VALUES LESS THAN ("20220622"), +PARTITION p20220622 VALUES LESS THAN ("20220623"), +PARTITION p20220623 VALUES LESS THAN ("20220624"), +PARTITION p20220624 VALUES LESS THAN ("20220625") + );`) + tk.MustExec("insert into t100(col1,money,logtime) values (100,10,'2022-06-09 00:00:00');") + tk.MustExec("insert into t100(col1,money,logtime) values (100,10,'2022-06-10 00:00:00');") + tk.MustQuery("SELECT /*+STREAM_AGG()*/ col1,sum(money) FROM t100 WHERE logtime>='2022-06-09 00:00:00' AND col1=100 ;").Check(testkit.Rows("100 20")) + tk.MustQuery("SELECT /*+HASH_AGG()*/ col1,sum(money) FROM t100 WHERE logtime>='2022-06-09 00:00:00' AND col1=100 ;").Check(testkit.Rows("100 20")) +} +>>>>>>> d99b35822... *: only add default value for final aggregation to fix the aggregate push down (partition) union case (#35443) diff --git a/executor/builder.go b/executor/builder.go index 21a9db36cecf6..3e330c49d214a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1316,7 +1316,9 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) { e.defaultVal = nil } else { - e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) + if v.IsFinalAgg() { + e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) + } } for _, aggDesc := range v.AggFuncs { if aggDesc.HasDistinct || len(aggDesc.OrderByItems) > 0 { @@ -1372,10 +1374,14 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu groupChecker: newVecGroupChecker(b.ctx, v.GroupByItems), aggFuncs: make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)), } + if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) { e.defaultVal = nil } else { - e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) + // Only do this for final agg, see issue #35295, #30923 + if v.IsFinalAgg() { + e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) + } } for i, aggDesc := range v.AggFuncs { aggFunc := aggfuncs.Build(b.ctx, aggDesc, i) diff --git a/expression/aggregation/descriptor.go b/expression/aggregation/descriptor.go index 2d1361a5fcb61..fc9716b9d2168 100644 --- a/expression/aggregation/descriptor.go +++ b/expression/aggregation/descriptor.go @@ -122,8 +122,6 @@ func (a *AggFuncDesc) Split(ordinal []int) (partialAggDesc, finalAggDesc *AggFun partialAggDesc.Mode = Partial1Mode } else if a.Mode == FinalMode { partialAggDesc.Mode = Partial2Mode - } else { - panic("Error happened during AggFuncDesc.Split, the AggFunctionMode is not CompleteMode or FinalMode.") } finalAggDesc = &AggFuncDesc{ Mode: FinalMode, // We only support FinalMode now in final phase. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 01f5ada33df88..5c96d96b656a1 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -1020,7 +1020,7 @@ type basePhysicalAgg struct { MppPartitionCols []*property.MPPPartitionColumn } -func (p *basePhysicalAgg) isFinalAgg() bool { +func (p *basePhysicalAgg) IsFinalAgg() bool { if len(p.AggFuncs) > 0 { if p.AggFuncs[0].Mode == aggregation.FinalMode || p.AggFuncs[0].Mode == aggregation.CompleteMode { return true diff --git a/planner/core/rule_aggregation_push_down.go b/planner/core/rule_aggregation_push_down.go index 98e9dec865dad..eb5d072fa8c1a 100644 --- a/planner/core/rule_aggregation_push_down.go +++ b/planner/core/rule_aggregation_push_down.go @@ -399,6 +399,16 @@ func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAl if pushedAgg == nil { return nil } + + // Update the agg mode for the pushed down aggregation. + for _, aggFunc := range pushedAgg.AggFuncs { + if aggFunc.Mode == aggregation.CompleteMode { + aggFunc.Mode = aggregation.Partial1Mode + } else if aggFunc.Mode == aggregation.FinalMode { + aggFunc.Mode = aggregation.Partial2Mode + } + } + newChildren := make([]LogicalPlan, 0, len(union.Children())) for _, child := range union.Children() { newChild, err := a.pushAggCrossUnion(pushedAgg, union.Schema(), child) diff --git a/planner/core/rule_eliminate_projection.go b/planner/core/rule_eliminate_projection.go index 1d8d9c96d3acd..4680d814cd654 100644 --- a/planner/core/rule_eliminate_projection.go +++ b/planner/core/rule_eliminate_projection.go @@ -46,14 +46,14 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool { // passing down the aggregation mode to TiFlash. if physicalAgg, ok := p.Children()[0].(*PhysicalHashAgg); ok { if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar { - if physicalAgg.isFinalAgg() { + if physicalAgg.IsFinalAgg() { return false } } } if physicalAgg, ok := p.Children()[0].(*PhysicalStreamAgg); ok { if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar { - if physicalAgg.isFinalAgg() { + if physicalAgg.IsFinalAgg() { return false } } diff --git a/planner/core/task.go b/planner/core/task.go index 56b61ba1cdc3d..ca254a41fe256 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1659,7 +1659,15 @@ func BuildFinalModeAggregation( finalAggFunc.OrderByItems = byItems finalAggFunc.HasDistinct = aggFunc.HasDistinct - finalAggFunc.Mode = aggregation.CompleteMode + // In logical optimize phase, the Agg->PartitionUnion->TableReader may become + // Agg1->PartitionUnion->Agg2->TableReader, and the Agg2 is a partial aggregation. + // So in the push down here, we need to add a new if-condition check: + // If the original agg mode is partial already, the finalAggFunc's mode become Partial2. + if aggFunc.Mode == aggregation.CompleteMode { + finalAggFunc.Mode = aggregation.CompleteMode + } else if aggFunc.Mode == aggregation.Partial1Mode || aggFunc.Mode == aggregation.Partial2Mode { + finalAggFunc.Mode = aggregation.Partial2Mode + } } else { if aggFunc.Name == ast.AggFuncGroupConcat && len(aggFunc.OrderByItems) > 0 { // group_concat can only run in one phase if it has order by items but without distinct property @@ -1730,7 +1738,15 @@ func BuildFinalModeAggregation( } } - finalAggFunc.Mode = aggregation.FinalMode + // In logical optimize phase, the Agg->PartitionUnion->TableReader may become + // Agg1->PartitionUnion->Agg2->TableReader, and the Agg2 is a partial aggregation. + // So in the push down here, we need to add a new if-condition check: + // If the original agg mode is partial already, the finalAggFunc's mode become Partial2. + if aggFunc.Mode == aggregation.CompleteMode { + finalAggFunc.Mode = aggregation.FinalMode + } else if aggFunc.Mode == aggregation.Partial1Mode || aggFunc.Mode == aggregation.Partial2Mode { + finalAggFunc.Mode = aggregation.Partial2Mode + } } finalAggFunc.Args = args @@ -1791,7 +1807,7 @@ func (p *basePhysicalAgg) convertAvgForMPP() *PhysicalProjection { } // no avgs // for final agg, always add project due to in-compatibility between TiDB and TiFlash - if len(p.schema.Columns) == len(newSchema.Columns) && !p.isFinalAgg() { + if len(p.schema.Columns) == len(newSchema.Columns) && !p.IsFinalAgg() { return nil } // add remaining columns to exprs