From cdde0398fee9036d50ae03f34333a9eb69b0f2fc Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 6 Jul 2022 10:27:02 +0800 Subject: [PATCH] sessionctx,kv,planner: add system variable for fine_grained_shuffle (#35256) close pingcap/tidb#35342 --- DEPS.bzl | 8 +- executor/mpp_gather.go | 4 +- executor/set_test.go | 42 ++ go.mod | 4 +- go.sum | 7 +- planner/core/explain.go | 32 +- planner/core/integration_test.go | 102 +++++ planner/core/optimizer.go | 130 ++++++ planner/core/optimizer_test.go | 187 +++++++++ planner/core/plan.go | 10 +- planner/core/plan_test.go | 30 ++ planner/core/plan_to_pb.go | 32 +- .../core/testdata/integration_suite_in.json | 29 ++ .../core/testdata/integration_suite_out.json | 160 ++++++++ .../testdata/window_push_down_suite_out.json | 86 ++-- sessionctx/variable/session.go | 4 + sessionctx/variable/sysvar.go | 10 + sessionctx/variable/tidb_vars.go | 387 +++++++++--------- .../unistore/cophandler/cop_handler.go | 2 +- 19 files changed, 1009 insertions(+), 257 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index f77373b9d6b00..732ba50c819e8 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2502,8 +2502,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw=", - version = "v0.0.0-20220525022339-6aaebf466305", + sum = "h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=", + version = "v0.0.0-20220705053936-aa9c2d20cd2a", ) go_repository( name = "com_github_pingcap_log", @@ -2523,8 +2523,8 @@ def go_deps(): name = "com_github_pingcap_tipb", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/tipb", - sum = "h1:L4nZwfYSrIsWPAZR8zMwHaNQJy0Rjy3Od6Smj5mlOms=", - version = "v0.0.0-20220602075447-4847c5d68e73", + sum = "h1:oYn6UiUSnVlMBr4rLOweNWtdAon5wCLnLGDSFf/8kMA=", + version = "v0.0.0-20220704030114-0f4f873beca8", ) go_repository( name = "com_github_pkg_browser", diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index a9a6032d1f779..42526774dbdd5 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -77,7 +77,9 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { if err != nil { return errors.Trace(err) } - logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs), zap.Int64("ID", mppTask.ID), zap.String("address", mppTask.Meta.GetAddress()), zap.String("plan", plannercore.ToString(pf.ExchangeSender))) + logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs), + zap.Int64("ID", mppTask.ID), zap.String("address", mppTask.Meta.GetAddress()), + zap.String("plan", plannercore.ToString(pf.ExchangeSender))) req := &kv.MPPDispatchRequest{ Data: pbData, Meta: mppTask.Meta, diff --git a/executor/set_test.go b/executor/set_test.go index 4eefe6a4e4e57..f91d1f1e80388 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -1834,3 +1834,45 @@ func TestGcMaxWaitTime(t *testing.T) { tk.MustExec("set global tidb_gc_life_time = \"72h\"") tk.MustExec("set global tidb_gc_max_wait_time = 1000") } + +func TestTiFlashFineGrainedShuffle(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + // Default is -1. + tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("-1")) + + tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = -1") + tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("-1")) + // Min val is -1. + tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = -2") + tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("-1")) + + tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = 0") + tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("0")) + + tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = 1024") + tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("1024")) + // Max val is 1024. + tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = 1025") + tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("1024")) + + // Default is 8192. + tk.MustQuery("select @@tiflash_fine_grained_shuffle_batch_size;").Check(testkit.Rows("8192")) + + // Min is 1. + tk.MustExec("set @@tiflash_fine_grained_shuffle_batch_size = 0") + tk.MustQuery("select @@tiflash_fine_grained_shuffle_batch_size;").Check(testkit.Rows("1")) + tk.MustExec("set @@tiflash_fine_grained_shuffle_batch_size = -1") + tk.MustQuery("select @@tiflash_fine_grained_shuffle_batch_size;").Check(testkit.Rows("1")) + + // Max is uint64_max. + tk.MustExec("set @@tiflash_fine_grained_shuffle_batch_size = 18446744073709551615") + tk.MustQuery("select @@tiflash_fine_grained_shuffle_batch_size;").Check(testkit.Rows("18446744073709551615")) + + // Test set global. + tk.MustExec("set global tiflash_fine_grained_shuffle_stream_count = -1") + tk.MustExec("set global tiflash_fine_grained_shuffle_batch_size = 8192") +} diff --git a/go.mod b/go.mod index ccbba6068c6f2..13aff07d173b8 100644 --- a/go.mod +++ b/go.mod @@ -46,11 +46,11 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 + github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a github.com/pingcap/log v1.1.0 github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e - github.com/pingcap/tipb v0.0.0-20220602075447-4847c5d68e73 + github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8 github.com/prometheus/client_golang v1.12.2 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.32.1 diff --git a/go.sum b/go.sum index 7f7cdca94e04f..e6c81e2bb20a7 100644 --- a/go.sum +++ b/go.sum @@ -667,8 +667,9 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw= github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE= +github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= @@ -677,8 +678,8 @@ github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops= -github.com/pingcap/tipb v0.0.0-20220602075447-4847c5d68e73 h1:L4nZwfYSrIsWPAZR8zMwHaNQJy0Rjy3Od6Smj5mlOms= -github.com/pingcap/tipb v0.0.0-20220602075447-4847c5d68e73/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8 h1:oYn6UiUSnVlMBr4rLOweNWtdAon5wCLnLGDSFf/8kMA= +github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/planner/core/explain.go b/planner/core/explain.go index 6d8512d5dd94a..6fb3f7d593e2f 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -518,7 +518,11 @@ func (p *PhysicalUnionScan) ExplainInfo() string { // ExplainInfo implements Plan interface. func (p *PhysicalSelection) ExplainInfo() string { - return string(expression.SortedExplainExpressionList(p.Conditions)) + exprStr := string(expression.SortedExplainExpressionList(p.Conditions)) + if p.TiFlashFineGrainedShuffleStreamCount > 0 { + exprStr += fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount) + } + return exprStr } // ExplainNormalizedInfo implements Plan interface. @@ -528,7 +532,11 @@ func (p *PhysicalSelection) ExplainNormalizedInfo() string { // ExplainInfo implements Plan interface. func (p *PhysicalProjection) ExplainInfo() string { - return expression.ExplainExpressionList(p.Exprs, p.schema) + exprStr := expression.ExplainExpressionList(p.Exprs, p.schema) + if p.TiFlashFineGrainedShuffleStreamCount > 0 { + exprStr += fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount) + } + return exprStr } // ExplainNormalizedInfo implements Plan interface. @@ -547,7 +555,11 @@ func (p *PhysicalTableDual) ExplainInfo() string { // ExplainInfo implements Plan interface. func (p *PhysicalSort) ExplainInfo() string { buffer := bytes.NewBufferString("") - return explainByItems(buffer, p.ByItems).String() + buffer = explainByItems(buffer, p.ByItems) + if p.TiFlashFineGrainedShuffleStreamCount > 0 { + buffer.WriteString(fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)) + } + return buffer.String() } // ExplainInfo implements Plan interface. @@ -867,6 +879,9 @@ func (p *PhysicalWindow) ExplainInfo() string { p.formatFrameBound(buffer, p.Frame.End) } buffer.WriteString(")") + if p.TiFlashFineGrainedShuffleStreamCount > 0 { + buffer.WriteString(fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)) + } return buffer.String() } @@ -995,9 +1010,20 @@ func (p *PhysicalExchangeSender) ExplainInfo() string { } fmt.Fprintf(buffer, "]") } + if p.TiFlashFineGrainedShuffleStreamCount > 0 { + buffer.WriteString(fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)) + } return buffer.String() } +// ExplainInfo implements Plan interface. +func (p *PhysicalExchangeReceiver) ExplainInfo() (res string) { + if p.TiFlashFineGrainedShuffleStreamCount > 0 { + res = fmt.Sprintf("stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount) + } + return res +} + // ExplainInfo implements Plan interface. func (p *LogicalUnionScan) ExplainInfo() string { buffer := bytes.NewBufferString("") diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 326755419ae26..63e9d118507a3 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -17,6 +17,7 @@ package core_test import ( "bytes" "fmt" + "regexp" "strconv" "strings" "testing" @@ -6547,6 +6548,107 @@ func TestTiFlashPartitionTableScan(t *testing.T) { tk.MustExec("drop table hp_t;") } +func TestTiFlashFineGrainedShuffle(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@tidb_enforce_mpp = on") + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int, c2 int)") + + tbl1, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t1", L: "t1"}) + require.NoError(t, err) + // Set the hacked TiFlash replica for explain tests. + tbl1.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true} + var input []string + var output []struct { + SQL string + Plan []string + } + integrationSuiteData := core.GetIntegrationSuiteData() + integrationSuiteData.GetTestCases(t, &input, &output) + for i, tt := range input { + testdata.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...)) + } +} + +func TestTiFlashFineGrainedShuffleWithMaxTiFlashThreads(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@tidb_enforce_mpp = on") + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int, c2 int)") + tbl1, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t1", L: "t1"}) + require.NoError(t, err) + // Set the hacked TiFlash replica for explain tests. + tbl1.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true} + + sql := "explain select row_number() over w1 from t1 window w1 as (partition by c1);" + + getStreamCountFromExplain := func(rows [][]interface{}) (res []uint64) { + re := regexp.MustCompile("stream_count: ([0-9]+)") + for _, row := range rows { + buf := bytes.NewBufferString("") + _, _ = fmt.Fprintf(buf, "%s\n", row) + if matched := re.FindStringSubmatch(buf.String()); matched != nil { + require.Equal(t, len(matched), 2) + c, err := strconv.ParseUint(matched[1], 10, 64) + require.NoError(t, err) + res = append(res, c) + } + } + return res + } + + // tiflash_fine_grained_shuffle_stream_count should be same with tidb_max_tiflash_threads. + tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = -1") + tk.MustExec("set @@tidb_max_tiflash_threads = 10") + rows := tk.MustQuery(sql).Rows() + streamCount := getStreamCountFromExplain(rows) + // require.Equal(t, len(streamCount), 1) + require.Equal(t, uint64(10), streamCount[0]) + + // tiflash_fine_grained_shuffle_stream_count should be default value when tidb_max_tiflash_threads is -1. + tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = -1") + tk.MustExec("set @@tidb_max_tiflash_threads = -1") + rows = tk.MustQuery(sql).Rows() + streamCount = getStreamCountFromExplain(rows) + // require.Equal(t, len(streamCount), 1) + require.Equal(t, uint64(variable.DefStreamCountWhenMaxThreadsNotSet), streamCount[0]) + + // tiflash_fine_grained_shuffle_stream_count should be default value when tidb_max_tiflash_threads is 0. + tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = -1") + tk.MustExec("set @@tidb_max_tiflash_threads = 0") + rows = tk.MustQuery(sql).Rows() + streamCount = getStreamCountFromExplain(rows) + // require.Equal(t, len(streamCount), 1) + require.Equal(t, uint64(variable.DefStreamCountWhenMaxThreadsNotSet), streamCount[0]) + + // Disabled when tiflash_fine_grained_shuffle_stream_count is 0. + tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = 0") + tk.MustExec("set @@tidb_max_tiflash_threads = 10") + rows = tk.MustQuery(sql).Rows() + streamCount = getStreamCountFromExplain(rows) + require.Equal(t, len(streamCount), 0) + + // Test when tiflash_fine_grained_shuffle_stream_count is greater than 0. + tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = 16") + tk.MustExec("set @@tidb_max_tiflash_threads = 10") + rows = tk.MustQuery(sql).Rows() + streamCount = getStreamCountFromExplain(rows) + // require.Equal(t, len(streamCount), 1) + require.Equal(t, uint64(16), streamCount[0]) +} + func TestIssue33175(t *testing.T) { store, _, clean := testkit.CreateMockStoreAndDomain(t) defer clean() diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index fde76b3a41eec..20d4fd598e701 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/tracing" + "github.com/pingcap/tipb/go-tipb" "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -373,10 +374,139 @@ func postOptimize(sctx sessionctx.Context, plan PhysicalPlan) PhysicalPlan { mergeContinuousSelections(plan) plan = eliminateUnionScanAndLock(sctx, plan) plan = enableParallelApply(sctx, plan) + handleFineGrainedShuffle(sctx, plan) checkPlanCacheable(sctx, plan) return plan } +// Only for MPP(Window<-[Sort]<-ExchangeReceiver<-ExchangeSender). +// TiFlashFineGrainedShuffleStreamCount: +// == 0: fine grained shuffle is disabled. +// > 0: use TiFlashFineGrainedShuffleStreamCount as stream count. +// < 0: use TiFlashMaxThreads as stream count when it's greater than 0. Otherwise use DefStreamCountWhenMaxThreadsNotSet. +func handleFineGrainedShuffle(sctx sessionctx.Context, plan PhysicalPlan) { + streamCount := sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount + if streamCount == 0 { + return + } + if streamCount < 0 { + if sctx.GetSessionVars().TiFlashMaxThreads > 0 { + streamCount = sctx.GetSessionVars().TiFlashMaxThreads + } else { + streamCount = variable.DefStreamCountWhenMaxThreadsNotSet + } + } + setupFineGrainedShuffle(uint64(streamCount), plan) +} + +func setupFineGrainedShuffle(streamCount uint64, plan PhysicalPlan) { + if tableReader, ok := plan.(*PhysicalTableReader); ok { + if _, isExchangeSender := tableReader.tablePlan.(*PhysicalExchangeSender); isExchangeSender { + helper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: make([]*basePhysicalPlan, 1)} + setupFineGrainedShuffleInternal(tableReader.tablePlan, &helper, streamCount) + } + } else { + for _, child := range plan.Children() { + setupFineGrainedShuffle(streamCount, child) + } + } +} + +type shuffleTarget uint8 + +const ( + unknown shuffleTarget = iota + window + joinBuild +) + +type fineGrainedShuffleHelper struct { + shuffleTarget shuffleTarget + plans []*basePhysicalPlan +} + +func (h *fineGrainedShuffleHelper) clear() { + h.shuffleTarget = unknown + h.plans = h.plans[:0] +} + +func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysicalPlan) { + h.shuffleTarget = t + h.plans = append(h.plans, p) +} + +func setupFineGrainedShuffleInternal(plan PhysicalPlan, helper *fineGrainedShuffleHelper, streamCount uint64) { + switch x := plan.(type) { + case *PhysicalWindow: + // Do not clear the plans because window executor will keep the data partition. + // For non hash partition window function, there will be a passthrough ExchangeSender to collect data, + // which will break data partition. + helper.updateTarget(window, &x.basePhysicalPlan) + setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + case *PhysicalSort: + if x.IsPartialSort { + // Partial sort will keep the data partition. + helper.plans = append(helper.plans, &x.basePhysicalPlan) + } else { + // Global sort will break the data partition. + helper.clear() + } + setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + case *PhysicalSelection: + helper.plans = append(helper.plans, &x.basePhysicalPlan) + setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + case *PhysicalProjection: + helper.plans = append(helper.plans, &x.basePhysicalPlan) + setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + case *PhysicalExchangeReceiver: + helper.plans = append(helper.plans, &x.basePhysicalPlan) + setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + case *PhysicalHashAgg: + // HashAgg is not implemented for now. + helper.clear() + setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + case *PhysicalHashJoin: + child0 := x.children[0] + child1 := x.children[1] + if x.InnerChildIdx == 0 { + // Child0 is build side. + child0Helper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}} + setupFineGrainedShuffleInternal(child0, &child0Helper, streamCount) + + // HashJoin is not implemented for now. + helper.clear() + setupFineGrainedShuffleInternal(child1, helper, streamCount) + } else { + // Child1 is build side. + child1Helper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}} + setupFineGrainedShuffleInternal(child1, &child1Helper, streamCount) + + // HashJoin is not implemented for now. + helper.clear() + setupFineGrainedShuffleInternal(child0, helper, streamCount) + } + case *PhysicalExchangeSender: + if x.ExchangeType == tipb.ExchangeType_Hash { + if helper.shuffleTarget == window { + // Set up stream count for all plans based on shuffle target type. + // Currently, only enable fine grained shuffle if the shuffle target is window. + x.TiFlashFineGrainedShuffleStreamCount = streamCount + for _, p := range helper.plans { + p.TiFlashFineGrainedShuffleStreamCount = streamCount + } + } + } + // exchange sender will break the data partition. + helper.clear() + setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + default: + for _, child := range x.Children() { + childHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}} + setupFineGrainedShuffleInternal(child, &childHelper, streamCount) + } + } +} + // checkPlanCacheable used to check whether a plan can be cached. Plans that // meet the following characteristics cannot be cached: // 1. Use the TiFlash engine. diff --git a/planner/core/optimizer_test.go b/planner/core/optimizer_test.go index cc742c747b406..dd8a41bbab1f3 100644 --- a/planner/core/optimizer_test.go +++ b/planner/core/optimizer_test.go @@ -15,10 +15,13 @@ package core import ( + "reflect" "testing" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/types" + "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" ) @@ -102,3 +105,187 @@ func TestMPPJoinKeyTypeConvert(t *testing.T) { testJoinKeyTypeConvert(t, unsignedBigIntType, bigIntType, decimalType, true, true) testJoinKeyTypeConvert(t, bigIntType, unsignedBigIntType, decimalType, true, true) } + +// Test for core.handleFineGrainedShuffle() +func TestHandleFineGrainedShuffle(t *testing.T) { + sortItem := property.SortItem{ + Col: nil, + Desc: true, + } + var plans []*basePhysicalPlan + tableReader := &PhysicalTableReader{} + partWindow := &PhysicalWindow{ + // Meaningless sort item, just for test. + PartitionBy: []property.SortItem{sortItem}, + } + partialSort := &PhysicalSort{ + IsPartialSort: true, + } + sort := &PhysicalSort{} + recv := &PhysicalExchangeReceiver{} + passSender := &PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_PassThrough, + } + hashSender := &PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_Hash, + } + tableScan := &PhysicalTableScan{} + plans = append(plans, &partWindow.basePhysicalPlan) + plans = append(plans, &partialSort.basePhysicalPlan) + plans = append(plans, &sort.basePhysicalPlan) + plans = append(plans, &recv.basePhysicalPlan) + plans = append(plans, &hashSender.basePhysicalPlan) + clear := func(plans []*basePhysicalPlan) { + for _, p := range plans { + p.children = nil + p.TiFlashFineGrainedShuffleStreamCount = 0 + } + } + var check func(p PhysicalPlan, expStreamCount int64, expChildCount int, curChildCount int) + check = func(p PhysicalPlan, expStreamCount int64, expChildCount int, curChildCount int) { + if len(p.Children()) == 0 { + require.Equal(t, expChildCount, curChildCount) + _, isTableScan := p.(*PhysicalTableScan) + require.True(t, isTableScan) + return + } + val := reflect.ValueOf(p) + actStreamCount := reflect.Indirect(val).FieldByName("TiFlashFineGrainedShuffleStreamCount").Interface().(uint64) + require.Equal(t, uint64(expStreamCount), actStreamCount) + for _, child := range p.Children() { + check(child, expStreamCount, expChildCount, curChildCount+1) + } + } + + const expStreamCount int64 = 8 + sctx := MockContext() + sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount = expStreamCount + + start := func(p PhysicalPlan, expStreamCount int64, expChildCount int, curChildCount int) { + handleFineGrainedShuffle(sctx, tableReader) + check(p, expStreamCount, expChildCount, curChildCount) + clear(plans) + } + + // Window <- Sort <- ExchangeReceiver <- ExchangeSender + tableReader.tablePlan = passSender + passSender.children = []PhysicalPlan{partWindow} + partWindow.children = []PhysicalPlan{partialSort} + partialSort.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{tableScan} + start(partWindow, expStreamCount, 4, 0) + + // Window <- ExchangeReceiver <- ExchangeSender + tableReader.tablePlan = passSender + passSender.children = []PhysicalPlan{partWindow} + partWindow.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{tableScan} + start(partWindow, expStreamCount, 3, 0) + + // Window <- Sort(x) <- ExchangeReceiver <- ExchangeSender + // Fine-grained shuffle is disabled because sort is not partial. + tableReader.tablePlan = passSender + passSender.children = []PhysicalPlan{partWindow} + partWindow.children = []PhysicalPlan{sort} + sort.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{tableScan} + start(partWindow, 0, 4, 0) + + // Window <- Sort <- Window <- Sort <- ExchangeReceiver <- ExchangeSender + partWindow1 := &PhysicalWindow{ + // Meaningless sort item, just for test. + PartitionBy: []property.SortItem{sortItem}, + } + partialSort1 := &PhysicalSort{ + IsPartialSort: true, + } + tableReader.tablePlan = passSender + passSender.children = []PhysicalPlan{partWindow} + partWindow.children = []PhysicalPlan{partialSort} + partialSort.children = []PhysicalPlan{partWindow1} + partWindow1.children = []PhysicalPlan{partialSort1} + partialSort1.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{tableScan} + start(partWindow, expStreamCount, 6, 0) + + // Window <- Sort <- Window(x) <- Sort <- ExchangeReceiver <- ExchangeSender(x) + // Fine-grained shuffle is disabled because Window is not hash partition. + nonPartWindow := &PhysicalWindow{} + partialSort1 = &PhysicalSort{ + IsPartialSort: true, + } + tableReader.tablePlan = passSender + passSender.children = []PhysicalPlan{partWindow} + partWindow.children = []PhysicalPlan{partialSort} + partialSort.children = []PhysicalPlan{nonPartWindow} + nonPartWindow.children = []PhysicalPlan{partialSort1} + partialSort1.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{passSender} + passSender.children = []PhysicalPlan{tableScan} + start(partWindow, 0, 6, 0) + + // HashAgg <- Window <- ExchangeReceiver <- ExchangeSender + hashAgg := &PhysicalHashAgg{} + tableReader.tablePlan = passSender + passSender.children = []PhysicalPlan{hashAgg} + hashAgg.children = []PhysicalPlan{partWindow} + partWindow.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{tableScan} + require.Equal(t, uint64(0), hashAgg.TiFlashFineGrainedShuffleStreamCount) + start(partWindow, expStreamCount, 3, 0) + + // Window <- HashAgg(x) <- ExchangeReceiver <- ExchangeSender + tableReader.tablePlan = passSender + passSender.children = []PhysicalPlan{partWindow} + hashAgg = &PhysicalHashAgg{} + partWindow.children = []PhysicalPlan{hashAgg} + hashAgg.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{tableScan} + start(partWindow, 0, 4, 0) + + // Window <- Join(x) <- ExchangeReceiver <- ExchangeSender + // <- ExchangeReceiver <- ExchangeSender + tableReader.tablePlan = passSender + passSender.children = []PhysicalPlan{partWindow} + hashJoin := &PhysicalHashJoin{} + recv1 := &PhysicalExchangeReceiver{} + tableScan1 := &PhysicalTableScan{} + partWindow.children = []PhysicalPlan{hashJoin} + hashSender1 := &PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_Hash, + } + hashJoin.children = []PhysicalPlan{recv, recv1} + recv.children = []PhysicalPlan{hashSender} + recv1.children = []PhysicalPlan{hashSender1} + hashSender.children = []PhysicalPlan{tableScan} + hashSender1.children = []PhysicalPlan{tableScan1} + start(partWindow, 0, 4, 0) + + // Join <- ExchangeReceiver <- ExchangeSender <- Window <- ExchangeReceiver(2) <- ExchangeSender(2) + // <- ExchangeReceiver(1) <- ExchangeSender(1) + tableReader.tablePlan = passSender + passSender.children = []PhysicalPlan{partWindow} + hashJoin = &PhysicalHashJoin{} + recv1 = &PhysicalExchangeReceiver{} + hashJoin.children = []PhysicalPlan{recv, recv1} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{partWindow} + recv2 := &PhysicalExchangeReceiver{} + hashSender2 := &PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_Hash, + } + tableScan2 := &PhysicalTableScan{} + partWindow.children = []PhysicalPlan{recv2} + recv2.children = []PhysicalPlan{hashSender2} + hashSender2.children = []PhysicalPlan{tableScan2} + recv1.children = []PhysicalPlan{hashSender1} + tableScan1 = &PhysicalTableScan{} + hashSender1.children = []PhysicalPlan{tableScan1} + start(partWindow, expStreamCount, 3, 0) +} diff --git a/planner/core/plan.go b/planner/core/plan.go index aad8d06b68a7e..1dedfd05cf7e2 100644 --- a/planner/core/plan.go +++ b/planner/core/plan.go @@ -427,6 +427,11 @@ type basePhysicalPlan struct { // used by the new cost interface planCostInit bool planCost float64 + + // Only for MPP. If TiFlashFineGrainedShuffleStreamCount > 0: + // 1. For ExchangeSender, means its output will be partitioned by hash key. + // 2. For ExchangeReceiver/Window/Sort, means its input is already partitioned. + TiFlashFineGrainedShuffleStreamCount uint64 } // Cost implements PhysicalPlan interface. @@ -441,8 +446,9 @@ func (p *basePhysicalPlan) SetCost(cost float64) { func (p *basePhysicalPlan) cloneWithSelf(newSelf PhysicalPlan) (*basePhysicalPlan, error) { base := &basePhysicalPlan{ - basePlan: p.basePlan, - self: newSelf, + basePlan: p.basePlan, + self: newSelf, + TiFlashFineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount, } for _, child := range p.children { cloned, err := child.Clone() diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index e9ec780bbfd13..003ca690a206d 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -893,6 +893,36 @@ func TestIssue34863(t *testing.T) { tk.MustQuery("select count(o.c_id) from c right join o on c.c_id=o.c_id;").Check(testkit.Rows("5")) } +func TestCloneFineGrainedShuffleStreamCount(t *testing.T) { + window := &core.PhysicalWindow{} + newPlan, err := window.Clone() + require.NoError(t, err) + newWindow, ok := newPlan.(*core.PhysicalWindow) + require.Equal(t, ok, true) + require.Equal(t, window.TiFlashFineGrainedShuffleStreamCount, newWindow.TiFlashFineGrainedShuffleStreamCount) + + window.TiFlashFineGrainedShuffleStreamCount = 8 + newPlan, err = window.Clone() + require.NoError(t, err) + newWindow, ok = newPlan.(*core.PhysicalWindow) + require.Equal(t, ok, true) + require.Equal(t, window.TiFlashFineGrainedShuffleStreamCount, newWindow.TiFlashFineGrainedShuffleStreamCount) + + sort := &core.PhysicalSort{} + newPlan, err = sort.Clone() + require.NoError(t, err) + newSort, ok := newPlan.(*core.PhysicalSort) + require.Equal(t, ok, true) + require.Equal(t, sort.TiFlashFineGrainedShuffleStreamCount, newSort.TiFlashFineGrainedShuffleStreamCount) + + sort.TiFlashFineGrainedShuffleStreamCount = 8 + newPlan, err = sort.Clone() + require.NoError(t, err) + newSort, ok = newPlan.(*core.PhysicalSort) + require.Equal(t, ok, true) + require.Equal(t, sort.TiFlashFineGrainedShuffleStreamCount, newSort.TiFlashFineGrainedShuffleStreamCount) +} + // https://github.com/pingcap/tidb/issues/35527. func TestTableDualAsSubQuery(t *testing.T) { store, clean := testkit.CreateMockStore(t) diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index 7f93dd440b3fe..dbea51006c1dd 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -295,9 +295,11 @@ func (e *PhysicalExchangeSender) ToPB(ctx sessionctx.Context, storeType kv.Store } executorID := e.ExplainID().String() return &tipb.Executor{ - Tp: tipb.ExecType_TypeExchangeSender, - ExchangeSender: ecExec, - ExecutorId: &executorID, + Tp: tipb.ExecType_TypeExchangeSender, + ExchangeSender: ecExec, + ExecutorId: &executorID, + FineGrainedShuffleStreamCount: e.TiFlashFineGrainedShuffleStreamCount, + FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize, }, nil } @@ -327,9 +329,11 @@ func (e *PhysicalExchangeReceiver) ToPB(ctx sessionctx.Context, storeType kv.Sto } executorID := e.ExplainID().String() return &tipb.Executor{ - Tp: tipb.ExecType_TypeExchangeReceiver, - ExchangeReceiver: ecExec, - ExecutorId: &executorID, + Tp: tipb.ExecType_TypeExchangeReceiver, + ExchangeReceiver: ecExec, + ExecutorId: &executorID, + FineGrainedShuffleStreamCount: e.TiFlashFineGrainedShuffleStreamCount, + FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize, }, nil } @@ -540,7 +544,13 @@ func (p *PhysicalWindow) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (* return nil, errors.Trace(err) } executorID := p.ExplainID().String() - return &tipb.Executor{Tp: tipb.ExecType_TypeWindow, Window: windowExec, ExecutorId: &executorID}, nil + return &tipb.Executor{ + Tp: tipb.ExecType_TypeWindow, + Window: windowExec, + ExecutorId: &executorID, + FineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount, + FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize, + }, nil } // ToPB implements PhysicalPlan ToPB interface. @@ -565,7 +575,13 @@ func (p *PhysicalSort) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*ti return nil, errors.Trace(err) } executorID := p.ExplainID().String() - return &tipb.Executor{Tp: tipb.ExecType_TypeSort, Sort: sortExec, ExecutorId: &executorID}, nil + return &tipb.Executor{ + Tp: tipb.ExecType_TypeSort, + Sort: sortExec, + ExecutorId: &executorID, + FineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount, + FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize, + }, nil } // SetPBColumnsDefaultValue sets the default values of tipb.ColumnInfos. diff --git a/planner/core/testdata/integration_suite_in.json b/planner/core/testdata/integration_suite_in.json index e66c8ebc2fac3..dc2b7b07239e1 100644 --- a/planner/core/testdata/integration_suite_in.json +++ b/planner/core/testdata/integration_suite_in.json @@ -938,5 +938,34 @@ "explain format = 'brief' select count(*) from rp_t where a = 1 or a = 20", "explain format = 'brief' select count(*) from hp_t where a = 1 or a = 20" ] + }, + { + "name": "TestTiFlashFineGrainedShuffle", + "cases": [ + // 1. Can use fine grained shuffle. + "explain format = 'brief' select row_number() over w1 from t1 window w1 as (partition by c1 order by c1);", + // Test two window function. + "explain format = 'brief' select row_number() over w1, rank() over w2 from t1 window w1 as (partition by c1 order by c1), w2 as (partition by c2);", + // Limit + Order. + "explain format = 'brief' select row_number() over w1, rank() over w2 from t1 window w1 as (partition by c1 order by c1), w2 as (partition by c2) order by 1, 2 limit 10;", + // // No partition by key in w2, so disabled. But w1 is still enabled. BUG: https://github.com/pingcap/tidb/pull/35256#discussion_r913324160 + // "explain format = 'brief' select row_number() over w1, row_number() over w2 from t1 window w1 as (partition by c1 order by c1), w2 as (order by c1);", + // GroupBy key and window function partition key are not same. + "explain format = 'brief' select row_number() over w1, count(c2) from t1 group by c1 having c1 > 10 window w1 as (partition by c2 order by c2);", + "explain format = 'brief' select row_number() over w1, count(c1) from t1 group by c2 having c2 > 10 window w1 as (partition by c1 order by c2);", + // Join, same as GroupBy. + "explain format = 'brief' select row_number() over w1 from t1 a join t1 b on a.c1 = b.c2 window w1 as (partition by a.c1);", + // Selection. + "explain format = 'brief' select row_number() over w1 from t1 where c1 < 100 window w1 as (partition by c1 order by c1);", + + // 2. Cannot use fine grained shuffle. + // No window function, so disabled. + "explain format = 'brief' select * from t1;", + // No partition key in window function, so disabled. + "explain format = 'brief' select row_number() over w1 from t1 window w1 as (order by c1);", + // GroupBy key is same with window function partition key, so they are in one fragment. + // But fine grained shuffle doesn't support group by for now. + "explain format = 'brief' select row_number() over w1, count(c2) from t1 group by c1 having c1 > 10 window w1 as (partition by c1 order by c2);" + ] } ] diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index d305f810334b1..aeff80fd103ea 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -7031,5 +7031,165 @@ ] } ] + }, + { + "Name": "TestTiFlashFineGrainedShuffle", + "Cases": [ + { + "SQL": "explain format = 'brief' select row_number() over w1 from t1 window w1 as (partition by c1 order by c1);", + "Plan": [ + "TableReader 10000.00 root data:ExchangeSender", + "└─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 10000.00 mpp[tiflash] Column#5, stream_count: 8", + " └─Window 10000.00 mpp[tiflash] row_number()->Column#5 over(partition by test.t1.c1 order by test.t1.c1 rows between current row and current row), stream_count: 8", + " └─Sort 10000.00 mpp[tiflash] test.t1.c1, test.t1.c1, stream_count: 8", + " └─ExchangeReceiver 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c1, collate: binary], stream_count: 8", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select row_number() over w1, rank() over w2 from t1 window w1 as (partition by c1 order by c1), w2 as (partition by c2);", + "Plan": [ + "TableReader 10000.00 root data:ExchangeSender", + "└─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 10000.00 mpp[tiflash] Column#7, Column#6, stream_count: 8", + " └─Window 10000.00 mpp[tiflash] row_number()->Column#7 over(partition by test.t1.c1 order by test.t1.c1 rows between current row and current row), stream_count: 8", + " └─Sort 10000.00 mpp[tiflash] test.t1.c1, test.t1.c1, stream_count: 8", + " └─ExchangeReceiver 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c1, collate: binary], stream_count: 8", + " └─Window 10000.00 mpp[tiflash] rank()->Column#6 over(partition by test.t1.c2), stream_count: 8", + " └─Sort 10000.00 mpp[tiflash] test.t1.c2, stream_count: 8", + " └─ExchangeReceiver 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c2, collate: binary], stream_count: 8", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select row_number() over w1, rank() over w2 from t1 window w1 as (partition by c1 order by c1), w2 as (partition by c2) order by 1, 2 limit 10;", + "Plan": [ + "Projection 10.00 root Column#7, Column#6", + "└─TopN 10.00 root Column#7, Column#6, offset:0, count:10", + " └─TableReader 10.00 root data:ExchangeSender", + " └─ExchangeSender 10.00 mpp[tiflash] ExchangeType: PassThrough", + " └─TopN 10.00 mpp[tiflash] Column#7, Column#6, offset:0, count:10", + " └─Window 10000.00 mpp[tiflash] row_number()->Column#7 over(partition by test.t1.c1 order by test.t1.c1 rows between current row and current row), stream_count: 8", + " └─Sort 10000.00 mpp[tiflash] test.t1.c1, test.t1.c1, stream_count: 8", + " └─ExchangeReceiver 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c1, collate: binary], stream_count: 8", + " └─Window 10000.00 mpp[tiflash] rank()->Column#6 over(partition by test.t1.c2), stream_count: 8", + " └─Sort 10000.00 mpp[tiflash] test.t1.c2, stream_count: 8", + " └─ExchangeReceiver 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c2, collate: binary], stream_count: 8", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select row_number() over w1, count(c2) from t1 group by c1 having c1 > 10 window w1 as (partition by c2 order by c2);", + "Plan": [ + "TableReader 2666.67 root data:ExchangeSender", + "└─ExchangeSender 2666.67 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 2666.67 mpp[tiflash] Column#6, Column#4, stream_count: 8", + " └─Window 2666.67 mpp[tiflash] row_number()->Column#6 over(partition by test.t1.c2 order by test.t1.c2 rows between current row and current row), stream_count: 8", + " └─Sort 2666.67 mpp[tiflash] test.t1.c2, test.t1.c2, stream_count: 8", + " └─ExchangeReceiver 2666.67 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 2666.67 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c2, collate: binary], stream_count: 8", + " └─Projection 2666.67 mpp[tiflash] Column#4, test.t1.c2", + " └─HashAgg 2666.67 mpp[tiflash] group by:test.t1.c1, funcs:count(test.t1.c2)->Column#4, funcs:firstrow(test.t1.c2)->test.t1.c2", + " └─ExchangeReceiver 3333.33 mpp[tiflash] ", + " └─ExchangeSender 3333.33 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c1, collate: binary]", + " └─Selection 3333.33 mpp[tiflash] gt(test.t1.c1, 10)", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select row_number() over w1, count(c1) from t1 group by c2 having c2 > 10 window w1 as (partition by c1 order by c2);", + "Plan": [ + "TableReader 2666.67 root data:ExchangeSender", + "└─ExchangeSender 2666.67 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 2666.67 mpp[tiflash] Column#6, Column#4, stream_count: 8", + " └─Window 2666.67 mpp[tiflash] row_number()->Column#6 over(partition by test.t1.c1 order by test.t1.c2 rows between current row and current row), stream_count: 8", + " └─Sort 2666.67 mpp[tiflash] test.t1.c1, test.t1.c2, stream_count: 8", + " └─ExchangeReceiver 2666.67 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 2666.67 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c1, collate: binary], stream_count: 8", + " └─Projection 2666.67 mpp[tiflash] Column#4, test.t1.c1, test.t1.c2", + " └─HashAgg 2666.67 mpp[tiflash] group by:test.t1.c2, funcs:count(test.t1.c1)->Column#4, funcs:firstrow(test.t1.c1)->test.t1.c1, funcs:firstrow(test.t1.c2)->test.t1.c2", + " └─ExchangeReceiver 3333.33 mpp[tiflash] ", + " └─ExchangeSender 3333.33 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c2, collate: binary]", + " └─Selection 3333.33 mpp[tiflash] gt(test.t1.c2, 10)", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select row_number() over w1 from t1 a join t1 b on a.c1 = b.c2 window w1 as (partition by a.c1);", + "Plan": [ + "TableReader 12487.50 root data:ExchangeSender", + "└─ExchangeSender 12487.50 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 12487.50 mpp[tiflash] Column#8, stream_count: 8", + " └─Window 12487.50 mpp[tiflash] row_number()->Column#8 over(partition by test.t1.c1 rows between current row and current row), stream_count: 8", + " └─Sort 12487.50 mpp[tiflash] test.t1.c1, stream_count: 8", + " └─ExchangeReceiver 12487.50 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 12487.50 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c1, collate: binary], stream_count: 8", + " └─HashJoin 12487.50 mpp[tiflash] inner join, equal:[eq(test.t1.c1, test.t1.c2)]", + " ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ", + " │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: Broadcast", + " │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t1.c1))", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:a keep order:false, stats:pseudo", + " └─Selection(Probe) 9990.00 mpp[tiflash] not(isnull(test.t1.c2))", + " └─TableFullScan 10000.00 mpp[tiflash] table:b keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select row_number() over w1 from t1 where c1 < 100 window w1 as (partition by c1 order by c1);", + "Plan": [ + "TableReader 3323.33 root data:ExchangeSender", + "└─ExchangeSender 3323.33 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 3323.33 mpp[tiflash] Column#5, stream_count: 8", + " └─Window 3323.33 mpp[tiflash] row_number()->Column#5 over(partition by test.t1.c1 order by test.t1.c1 rows between current row and current row), stream_count: 8", + " └─Sort 3323.33 mpp[tiflash] test.t1.c1, test.t1.c1, stream_count: 8", + " └─ExchangeReceiver 3323.33 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 3323.33 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c1, collate: binary], stream_count: 8", + " └─Selection 3323.33 mpp[tiflash] lt(test.t1.c1, 100)", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select * from t1;", + "Plan": [ + "TableReader 10000.00 root data:ExchangeSender", + "└─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select row_number() over w1 from t1 window w1 as (order by c1);", + "Plan": [ + "TableReader 10000.00 root data:ExchangeSender", + "└─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 10000.00 mpp[tiflash] Column#5", + " └─Window 10000.00 mpp[tiflash] row_number()->Column#5 over(order by test.t1.c1 rows between current row and current row)", + " └─Sort 10000.00 mpp[tiflash] test.t1.c1", + " └─ExchangeReceiver 10000.00 mpp[tiflash] ", + " └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "explain format = 'brief' select row_number() over w1, count(c2) from t1 group by c1 having c1 > 10 window w1 as (partition by c1 order by c2);", + "Plan": [ + "TableReader 2666.67 root data:ExchangeSender", + "└─ExchangeSender 2666.67 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 2666.67 mpp[tiflash] Column#6, Column#4", + " └─Window 2666.67 mpp[tiflash] row_number()->Column#6 over(partition by test.t1.c1 order by test.t1.c2 rows between current row and current row)", + " └─Sort 2666.67 mpp[tiflash] test.t1.c1, test.t1.c2", + " └─Projection 2666.67 mpp[tiflash] Column#4, test.t1.c1, test.t1.c2", + " └─HashAgg 2666.67 mpp[tiflash] group by:test.t1.c1, funcs:count(test.t1.c2)->Column#4, funcs:firstrow(test.t1.c1)->test.t1.c1, funcs:firstrow(test.t1.c2)->test.t1.c2", + " └─ExchangeReceiver 3333.33 mpp[tiflash] ", + " └─ExchangeSender 3333.33 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.c1, collate: binary]", + " └─Selection 3333.33 mpp[tiflash] gt(test.t1.c1, 10)", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + } + ] } ] diff --git a/planner/core/testdata/window_push_down_suite_out.json b/planner/core/testdata/window_push_down_suite_out.json index 2b7b7b893cda4..085d1326f3daa 100644 --- a/planner/core/testdata/window_push_down_suite_out.json +++ b/planner/core/testdata/window_push_down_suite_out.json @@ -37,10 +37,10 @@ "Plan": [ "TableReader_24 10000.00 root data:ExchangeSender_23", "└─ExchangeSender_23 10000.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Window_22 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row)", - " └─Sort_13 10000.00 mpp[tiflash] test.employee.deptid", - " └─ExchangeReceiver_12 10000.00 mpp[tiflash] ", - " └─ExchangeSender_11 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─Window_22 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row), stream_count: 8", + " └─Sort_13 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", + " └─ExchangeReceiver_12 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_11 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary], stream_count: 8", " └─TableFullScan_10 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], "Warn": null @@ -50,11 +50,11 @@ "Plan": [ "TableReader_30 10000.00 root data:ExchangeSender_29", "└─ExchangeSender_29 10000.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Projection_7 10000.00 mpp[tiflash] test.employee.empid, test.employee.deptid, test.employee.salary, Column#7", - " └─Window_28 10000.00 mpp[tiflash] row_number()->Column#7 over(partition by Column#6 rows between current row and current row)", - " └─Sort_14 10000.00 mpp[tiflash] Column#6", - " └─ExchangeReceiver_13 10000.00 mpp[tiflash] ", - " └─ExchangeSender_12 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#6, collate: binary]", + " └─Projection_7 10000.00 mpp[tiflash] test.employee.empid, test.employee.deptid, test.employee.salary, Column#7, stream_count: 8", + " └─Window_28 10000.00 mpp[tiflash] row_number()->Column#7 over(partition by Column#6 rows between current row and current row), stream_count: 8", + " └─Sort_14 10000.00 mpp[tiflash] Column#6, stream_count: 8", + " └─ExchangeReceiver_13 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_12 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#6, collate: binary], stream_count: 8", " └─Projection_10 10000.00 mpp[tiflash] test.employee.empid, test.employee.deptid, test.employee.salary, plus(test.employee.deptid, 1)->Column#6", " └─TableFullScan_11 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], @@ -65,10 +65,10 @@ "Plan": [ "TableReader_24 10000.00 root data:ExchangeSender_23", "└─ExchangeSender_23 10000.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Window_22 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid order by test.employee.salary desc rows between current row and current row)", - " └─Sort_13 10000.00 mpp[tiflash] test.employee.deptid, test.employee.salary:desc", - " └─ExchangeReceiver_12 10000.00 mpp[tiflash] ", - " └─ExchangeSender_11 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─Window_22 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid order by test.employee.salary desc rows between current row and current row), stream_count: 8", + " └─Sort_13 10000.00 mpp[tiflash] test.employee.deptid, test.employee.salary:desc, stream_count: 8", + " └─ExchangeReceiver_12 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_11 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary], stream_count: 8", " └─TableFullScan_10 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], "Warn": null @@ -78,10 +78,10 @@ "Plan": [ "TableReader_24 10000.00 root data:ExchangeSender_23", "└─ExchangeSender_23 10000.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Window_22 10000.00 mpp[tiflash] rank()->Column#7, dense_rank()->Column#8 over(partition by test.employee.deptid)", - " └─Sort_13 10000.00 mpp[tiflash] test.employee.deptid", - " └─ExchangeReceiver_12 10000.00 mpp[tiflash] ", - " └─ExchangeSender_11 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─Window_22 10000.00 mpp[tiflash] rank()->Column#7, dense_rank()->Column#8 over(partition by test.employee.deptid), stream_count: 8", + " └─Sort_13 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", + " └─ExchangeReceiver_12 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_11 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary], stream_count: 8", " └─TableFullScan_10 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], "Warn": null @@ -91,12 +91,12 @@ "Plan": [ "TableReader_36 10000.00 root data:ExchangeSender_35", "└─ExchangeSender_35 10000.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Projection_9 10000.00 mpp[tiflash] test.employee.empid, test.employee.deptid, test.employee.salary, Column#8, Column#7", - " └─Window_34 10000.00 mpp[tiflash] row_number()->Column#8 over(partition by test.employee.deptid rows between current row and current row)", - " └─Window_12 10000.00 mpp[tiflash] rank()->Column#7 over(partition by test.employee.deptid)", - " └─Sort_17 10000.00 mpp[tiflash] test.employee.deptid", - " └─ExchangeReceiver_16 10000.00 mpp[tiflash] ", - " └─ExchangeSender_15 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─Projection_9 10000.00 mpp[tiflash] test.employee.empid, test.employee.deptid, test.employee.salary, Column#8, Column#7, stream_count: 8", + " └─Window_34 10000.00 mpp[tiflash] row_number()->Column#8 over(partition by test.employee.deptid rows between current row and current row), stream_count: 8", + " └─Window_12 10000.00 mpp[tiflash] rank()->Column#7 over(partition by test.employee.deptid), stream_count: 8", + " └─Sort_17 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", + " └─ExchangeReceiver_16 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_15 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary], stream_count: 8", " └─TableFullScan_14 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], "Warn": null @@ -119,10 +119,10 @@ "Plan": [ "TableReader_36 10000.00 root data:ExchangeSender_35", "└─ExchangeSender_35 10000.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Window_34 10000.00 mpp[tiflash] rank()->Column#8 over(partition by test.employee.deptid)", - " └─Sort_20 10000.00 mpp[tiflash] test.employee.deptid", - " └─ExchangeReceiver_19 10000.00 mpp[tiflash] ", - " └─ExchangeSender_18 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─Window_34 10000.00 mpp[tiflash] rank()->Column#8 over(partition by test.employee.deptid), stream_count: 8", + " └─Sort_20 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", + " └─ExchangeReceiver_19 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_18 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary], stream_count: 8", " └─Window_14 10000.00 mpp[tiflash] row_number()->Column#6 over(rows between current row and current row)", " └─ExchangeReceiver_17 10000.00 mpp[tiflash] ", " └─ExchangeSender_16 10000.00 mpp[tiflash] ExchangeType: PassThrough", @@ -285,10 +285,10 @@ "Plan": [ "TableReader_24 10000.00 root data:ExchangeSender_23", "└─ExchangeSender_23 10000.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Window_22 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.empid order by test.employee.salary rows between current row and current row)", - " └─Sort_13 10000.00 mpp[tiflash] test.employee.empid, test.employee.salary", - " └─ExchangeReceiver_12 10000.00 mpp[tiflash] ", - " └─ExchangeSender_11 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.empid, collate: binary]", + " └─Window_22 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.empid order by test.employee.salary rows between current row and current row), stream_count: 8", + " └─Sort_13 10000.00 mpp[tiflash] test.employee.empid, test.employee.salary, stream_count: 8", + " └─ExchangeReceiver_12 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_11 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.empid, collate: binary], stream_count: 8", " └─TableFullScan_10 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], "Warn": [ @@ -353,10 +353,10 @@ "Plan": [ "TableReader_45 1.00 root data:ExchangeSender_44", "└─ExchangeSender_44 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─Window_43 1.00 mpp[tiflash] row_number()->Column#7 over(partition by Column#5 rows between current row and current row)", - " └─Sort_20 1.00 mpp[tiflash] Column#5", - " └─ExchangeReceiver_19 1.00 mpp[tiflash] ", - " └─ExchangeSender_18 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#5, collate: binary]", + " └─Window_43 1.00 mpp[tiflash] row_number()->Column#7 over(partition by Column#5 rows between current row and current row), stream_count: 8", + " └─Sort_20 1.00 mpp[tiflash] Column#5, stream_count: 8", + " └─ExchangeReceiver_19 1.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_18 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#5, collate: binary], stream_count: 8", " └─Projection_14 1.00 mpp[tiflash] Column#5", " └─HashAgg_15 1.00 mpp[tiflash] funcs:count(distinct test.employee.empid)->Column#5", " └─ExchangeReceiver_17 1.00 mpp[tiflash] ", @@ -405,10 +405,10 @@ " └─ExchangeReceiver_43 1.00 mpp[tiflash] ", " └─ExchangeSender_42 1.00 mpp[tiflash] ExchangeType: PassThrough", " └─HashAgg_39 1.00 mpp[tiflash] group by:test.employee.empid, ", - " └─Window_27 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row)", - " └─Sort_18 10000.00 mpp[tiflash] test.employee.deptid", - " └─ExchangeReceiver_17 10000.00 mpp[tiflash] ", - " └─ExchangeSender_16 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─Window_27 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row), stream_count: 8", + " └─Sort_18 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", + " └─ExchangeReceiver_17 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_16 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary], stream_count: 8", " └─TableFullScan_15 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], "Warn": null @@ -436,10 +436,10 @@ " └─HashAgg_46 10000.00 mpp[tiflash] group by:Column#6, funcs:count(test.employee.empid)->Column#7", " └─ExchangeReceiver_32 10000.00 mpp[tiflash] ", " └─ExchangeSender_31 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#6, collate: binary]", - " └─Window_30 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row)", - " └─Sort_21 10000.00 mpp[tiflash] test.employee.deptid", - " └─ExchangeReceiver_20 10000.00 mpp[tiflash] ", - " └─ExchangeSender_19 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary]", + " └─Window_30 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row), stream_count: 8", + " └─Sort_21 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", + " └─ExchangeReceiver_20 10000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender_19 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.deptid, collate: binary], stream_count: 8", " └─TableFullScan_18 10000.00 mpp[tiflash] table:employee keep order:false, stats:pseudo" ], "Warn": null diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 4ed0b20abedea..6c1e2f894a2e3 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1159,6 +1159,10 @@ type SessionVars struct { // MaxAllowedPacket indicates the maximum size of a packet for the MySQL protocol. MaxAllowedPacket uint64 + // TiFlash related optimization, only for MPP. + TiFlashFineGrainedShuffleStreamCount int64 + TiFlashFineGrainedShuffleBatchSize uint64 + // RequestSourceType is the type of inner request. RequestSourceType string } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 2e7865ac57040..c4e0086374f6e 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1662,6 +1662,16 @@ var defaultSysVars = []*SysVar{ return nil }, }, + {Scope: ScopeGlobal | ScopeSession, Name: TiFlashFineGrainedShuffleStreamCount, Value: strconv.Itoa(DefTiFlashFineGrainedShuffleStreamCount), Type: TypeInt, MinValue: -1, MaxValue: 1024, + SetSession: func(s *SessionVars, val string) error { + s.TiFlashFineGrainedShuffleStreamCount = TidbOptInt64(val, DefTiFlashFineGrainedShuffleStreamCount) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiFlashFineGrainedShuffleBatchSize, Value: strconv.Itoa(DefTiFlashFineGrainedShuffleBatchSize), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxUint64, + SetSession: func(s *SessionVars, val string) error { + s.TiFlashFineGrainedShuffleBatchSize = uint64(TidbOptInt64(val, DefTiFlashFineGrainedShuffleBatchSize)) + return nil + }}, {Scope: ScopeGlobal, Name: TiDBSimplifiedMetrics, Value: BoolToOnOff(DefTiDBSimplifiedMetrics), Type: TypeBool, SetGlobal: func(vars *SessionVars, s string) error { metrics.ToggleSimplifiedMode(TiDBOptOn(s)) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 529c31b6e560d..2e55dfdb2353d 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -678,6 +678,10 @@ const ( // When set to true, a non-transactional DML finishes all batches even if errors are met in some batches. TiDBNonTransactionalIgnoreError = "tidb_nontransactional_ignore_error" + // Fine grained shuffle is disabled when TiFlashFineGrainedShuffleStreamCount is zero. + TiFlashFineGrainedShuffleStreamCount = "tiflash_fine_grained_shuffle_stream_count" + TiFlashFineGrainedShuffleBatchSize = "tiflash_fine_grained_shuffle_batch_size" + // TiDBSimplifiedMetrics controls whether to unregister some unused metrics. TiDBSimplifiedMetrics = "tidb_simplified_metrics" ) @@ -752,196 +756,199 @@ const ( // Default TiDB system variable values. const ( - DefHostname = "localhost" - DefIndexLookupConcurrency = ConcurrencyUnset - DefIndexLookupJoinConcurrency = ConcurrencyUnset - DefIndexSerialScanConcurrency = 1 - DefIndexJoinBatchSize = 25000 - DefIndexLookupSize = 20000 - DefDistSQLScanConcurrency = 15 - DefBuildStatsConcurrency = 4 - DefAutoAnalyzeRatio = 0.5 - DefAutoAnalyzeStartTime = "00:00 +0000" - DefAutoAnalyzeEndTime = "23:59 +0000" - DefAutoIncrementIncrement = 1 - DefAutoIncrementOffset = 1 - DefChecksumTableConcurrency = 4 - DefSkipUTF8Check = false - DefSkipASCIICheck = false - DefOptAggPushDown = false - DefOptCartesianBCJ = 1 - DefOptMPPOuterJoinFixedBuildSide = false - DefOptWriteRowID = false - DefOptEnableCorrelationAdjustment = true - DefOptLimitPushDownThreshold = 100 - DefOptCorrelationThreshold = 0.9 - DefOptCorrelationExpFactor = 1 - DefOptCPUFactor = 3.0 - DefOptCopCPUFactor = 3.0 - DefOptTiFlashConcurrencyFactor = 24.0 - DefOptNetworkFactor = 1.0 - DefOptScanFactor = 1.5 - DefOptDescScanFactor = 3.0 - DefOptSeekFactor = 20.0 - DefOptMemoryFactor = 0.001 - DefOptDiskFactor = 1.5 - DefOptConcurrencyFactor = 3.0 - DefOptCPUFactorV2 = 30.0 - DefOptCopCPUFactorV2 = 30.0 - DefOptTiFlashCPUFactorV2 = 2.0 - DefOptNetworkFactorV2 = 4.0 - DefOptScanFactorV2 = 100.0 - DefOptDescScanFactorV2 = 150.0 - DefOptTiFlashScanFactorV2 = 15.0 - DefOptSeekFactorV2 = 9500000.0 - DefOptMemoryFactorV2 = 0.001 - DefOptDiskFactorV2 = 1.5 - DefOptConcurrencyFactorV2 = 3.0 - DefOptInSubqToJoinAndAgg = true - DefOptPreferRangeScan = false - DefBatchInsert = false - DefBatchDelete = false - DefBatchCommit = false - DefCurretTS = 0 - DefInitChunkSize = 32 - DefMaxChunkSize = 1024 - DefDMLBatchSize = 0 - DefMaxPreparedStmtCount = -1 - DefWaitTimeout = 28800 - DefTiDBMemQuotaApplyCache = 32 << 20 // 32MB. - DefTiDBMemQuotaBindingCache = 64 << 20 // 64MB. - DefTiDBGeneralLog = false - DefTiDBPProfSQLCPU = 0 - DefTiDBRetryLimit = 10 - DefTiDBDisableTxnAutoRetry = true - DefTiDBConstraintCheckInPlace = false - DefTiDBHashJoinConcurrency = ConcurrencyUnset - DefTiDBProjectionConcurrency = ConcurrencyUnset - DefBroadcastJoinThresholdSize = 100 * 1024 * 1024 - DefBroadcastJoinThresholdCount = 10 * 1024 - DefTiDBOptimizerSelectivityLevel = 0 - DefTiDBOptimizerEnableNewOFGB = false - DefTiDBEnableOuterJoinReorder = true - DefTiDBAllowBatchCop = 1 - DefTiDBAllowMPPExecution = true - DefTiDBHashExchangeWithNewCollation = true - DefTiDBEnforceMPPExecution = false - DefTiFlashMaxThreads = -1 - DefTiDBMPPStoreFailTTL = "60s" - DefTiDBTxnMode = "" - DefTiDBRowFormatV1 = 1 - DefTiDBRowFormatV2 = 2 - DefTiDBDDLReorgWorkerCount = 4 - DefTiDBDDLReorgBatchSize = 256 - DefTiDBDDLErrorCountLimit = 512 - DefTiDBMaxDeltaSchemaCount = 1024 - DefTiDBChangeMultiSchema = false - DefTiDBPointGetCache = false - DefTiDBPlacementMode = PlacementModeStrict - DefTiDBEnableAutoIncrementInGenerated = false - DefTiDBHashAggPartialConcurrency = ConcurrencyUnset - DefTiDBHashAggFinalConcurrency = ConcurrencyUnset - DefTiDBWindowConcurrency = ConcurrencyUnset - DefTiDBMergeJoinConcurrency = 1 // disable optimization by default - DefTiDBStreamAggConcurrency = 1 - DefTiDBForcePriority = mysql.NoPriority - DefEnableWindowFunction = true - DefEnablePipelinedWindowFunction = true - DefEnableStrictDoubleTypeCheck = true - DefEnableVectorizedExpression = true - DefTiDBOptJoinReorderThreshold = 0 - DefTiDBDDLSlowOprThreshold = 300 - DefTiDBUseFastAnalyze = false - DefTiDBSkipIsolationLevelCheck = false - DefTiDBExpensiveQueryTimeThreshold = 60 // 60s - DefTiDBScatterRegion = false - DefTiDBWaitSplitRegionFinish = true - DefWaitSplitRegionTimeout = 300 // 300s - DefTiDBEnableNoopFuncs = Off - DefTiDBEnableNoopVariables = true - DefTiDBAllowRemoveAutoInc = false - DefTiDBUsePlanBaselines = true - DefTiDBEvolvePlanBaselines = false - DefTiDBEvolvePlanTaskMaxTime = 600 // 600s - DefTiDBEvolvePlanTaskStartTime = "00:00 +0000" - DefTiDBEvolvePlanTaskEndTime = "23:59 +0000" - DefInnodbLockWaitTimeout = 50 // 50s - DefTiDBStoreLimit = 0 - DefTiDBMetricSchemaStep = 60 // 60s - DefTiDBMetricSchemaRangeDuration = 60 // 60s - DefTiDBFoundInPlanCache = false - DefTiDBFoundInBinding = false - DefTiDBEnableCollectExecutionInfo = true - DefTiDBAllowAutoRandExplicitInsert = false - DefTiDBEnableClusteredIndex = ClusteredIndexDefModeIntOnly - DefTiDBRedactLog = false - DefTiDBRestrictedReadOnly = false - DefTiDBSuperReadOnly = false - DefTiDBShardAllocateStep = math.MaxInt64 - DefTiDBEnableTelemetry = true - DefTiDBEnableParallelApply = false - DefTiDBEnableAmendPessimisticTxn = false - DefTiDBPartitionPruneMode = "static" - DefTiDBEnableRateLimitAction = true - DefTiDBEnableAsyncCommit = false - DefTiDBEnable1PC = false - DefTiDBGuaranteeLinearizability = true - DefTiDBAnalyzeVersion = 2 - DefTiDBEnableIndexMergeJoin = false - DefTiDBTrackAggregateMemoryUsage = true - DefTiDBEnableExchangePartition = false - DefCTEMaxRecursionDepth = 1000 - DefTiDBTmpTableMaxSize = 64 << 20 // 64MB. - DefTiDBEnableLocalTxn = false - DefTiDBTSOClientBatchMaxWaitTime = 0.0 // 0ms - DefTiDBEnableTSOFollowerProxy = false - DefTiDBEnableOrderedResultMode = false - DefTiDBEnablePseudoForOutdatedStats = true - DefTiDBRegardNULLAsPoint = true - DefEnablePlacementCheck = true - DefTimestamp = "0" - DefTiDBEnableStmtSummary = true - DefTiDBStmtSummaryInternalQuery = false - DefTiDBStmtSummaryRefreshInterval = 1800 - DefTiDBStmtSummaryHistorySize = 24 - DefTiDBStmtSummaryMaxStmtCount = 3000 - DefTiDBStmtSummaryMaxSQLLength = 4096 - DefTiDBCapturePlanBaseline = Off - DefTiDBEnableIndexMerge = true - DefEnableLegacyInstanceScope = true - DefTiDBTableCacheLease = 3 // 3s - DefTiDBPersistAnalyzeOptions = true - DefTiDBEnableColumnTracking = false - DefTiDBStatsLoadSyncWait = 0 - DefTiDBStatsLoadPseudoTimeout = false - DefSysdateIsNow = false - DefTiDBEnableMutationChecker = false - DefTiDBTxnAssertionLevel = AssertionOffStr - DefTiDBIgnorePreparedCacheCloseStmt = false - DefTiDBBatchPendingTiFlashCount = 4000 - DefRCReadCheckTS = false - DefTiDBRemoveOrderbyInSubquery = false - DefTiDBReadStaleness = 0 - DefTiDBGCMaxWaitTime = 24 * 60 * 60 - DefMaxAllowedPacket uint64 = 67108864 - DefTiDBEnableBatchDML = false - DefTiDBMemQuotaQuery = 1073741824 // 1GB - DefTiDBStatsCacheMemQuota = 0 - MaxTiDBStatsCacheMemQuota = 1024 * 1024 * 1024 * 1024 // 1TB - DefTiDBQueryLogMaxLen = 4096 - DefRequireSecureTransport = false - DefTiDBCommitterConcurrency = 128 - DefTiDBBatchDMLIgnoreError = false - DefTiDBMemQuotaAnalyze = -1 - DefTiDBEnableAutoAnalyze = true - DefTiDBMemOOMAction = "CANCEL" - DefTiDBMaxAutoAnalyzeTime = 12 * 60 * 60 - DefTiDBEnablePrepPlanCache = true - DefTiDBPrepPlanCacheSize = 100 - DefTiDBPrepPlanCacheMemoryGuardRatio = 0.1 - DefTiDBEnableConcurrentDDL = true - DefTiDBSimplifiedMetrics = false - DefTiDBEnablePaging = true + DefHostname = "localhost" + DefIndexLookupConcurrency = ConcurrencyUnset + DefIndexLookupJoinConcurrency = ConcurrencyUnset + DefIndexSerialScanConcurrency = 1 + DefIndexJoinBatchSize = 25000 + DefIndexLookupSize = 20000 + DefDistSQLScanConcurrency = 15 + DefBuildStatsConcurrency = 4 + DefAutoAnalyzeRatio = 0.5 + DefAutoAnalyzeStartTime = "00:00 +0000" + DefAutoAnalyzeEndTime = "23:59 +0000" + DefAutoIncrementIncrement = 1 + DefAutoIncrementOffset = 1 + DefChecksumTableConcurrency = 4 + DefSkipUTF8Check = false + DefSkipASCIICheck = false + DefOptAggPushDown = false + DefOptCartesianBCJ = 1 + DefOptMPPOuterJoinFixedBuildSide = false + DefOptWriteRowID = false + DefOptEnableCorrelationAdjustment = true + DefOptLimitPushDownThreshold = 100 + DefOptCorrelationThreshold = 0.9 + DefOptCorrelationExpFactor = 1 + DefOptCPUFactor = 3.0 + DefOptCopCPUFactor = 3.0 + DefOptTiFlashConcurrencyFactor = 24.0 + DefOptNetworkFactor = 1.0 + DefOptScanFactor = 1.5 + DefOptDescScanFactor = 3.0 + DefOptSeekFactor = 20.0 + DefOptMemoryFactor = 0.001 + DefOptDiskFactor = 1.5 + DefOptConcurrencyFactor = 3.0 + DefOptCPUFactorV2 = 30.0 + DefOptCopCPUFactorV2 = 30.0 + DefOptTiFlashCPUFactorV2 = 2.0 + DefOptNetworkFactorV2 = 4.0 + DefOptScanFactorV2 = 100.0 + DefOptDescScanFactorV2 = 150.0 + DefOptTiFlashScanFactorV2 = 15.0 + DefOptSeekFactorV2 = 9500000.0 + DefOptMemoryFactorV2 = 0.001 + DefOptDiskFactorV2 = 1.5 + DefOptConcurrencyFactorV2 = 3.0 + DefOptInSubqToJoinAndAgg = true + DefOptPreferRangeScan = false + DefBatchInsert = false + DefBatchDelete = false + DefBatchCommit = false + DefCurretTS = 0 + DefInitChunkSize = 32 + DefMaxChunkSize = 1024 + DefDMLBatchSize = 0 + DefMaxPreparedStmtCount = -1 + DefWaitTimeout = 28800 + DefTiDBMemQuotaApplyCache = 32 << 20 // 32MB. + DefTiDBMemQuotaBindingCache = 64 << 20 // 64MB. + DefTiDBGeneralLog = false + DefTiDBPProfSQLCPU = 0 + DefTiDBRetryLimit = 10 + DefTiDBDisableTxnAutoRetry = true + DefTiDBConstraintCheckInPlace = false + DefTiDBHashJoinConcurrency = ConcurrencyUnset + DefTiDBProjectionConcurrency = ConcurrencyUnset + DefBroadcastJoinThresholdSize = 100 * 1024 * 1024 + DefBroadcastJoinThresholdCount = 10 * 1024 + DefTiDBOptimizerSelectivityLevel = 0 + DefTiDBOptimizerEnableNewOFGB = false + DefTiDBEnableOuterJoinReorder = true + DefTiDBAllowBatchCop = 1 + DefTiDBAllowMPPExecution = true + DefTiDBHashExchangeWithNewCollation = true + DefTiDBEnforceMPPExecution = false + DefTiFlashMaxThreads = -1 + DefTiDBMPPStoreFailTTL = "60s" + DefTiDBTxnMode = "" + DefTiDBRowFormatV1 = 1 + DefTiDBRowFormatV2 = 2 + DefTiDBDDLReorgWorkerCount = 4 + DefTiDBDDLReorgBatchSize = 256 + DefTiDBDDLErrorCountLimit = 512 + DefTiDBMaxDeltaSchemaCount = 1024 + DefTiDBChangeMultiSchema = false + DefTiDBPointGetCache = false + DefTiDBPlacementMode = PlacementModeStrict + DefTiDBEnableAutoIncrementInGenerated = false + DefTiDBHashAggPartialConcurrency = ConcurrencyUnset + DefTiDBHashAggFinalConcurrency = ConcurrencyUnset + DefTiDBWindowConcurrency = ConcurrencyUnset + DefTiDBMergeJoinConcurrency = 1 // disable optimization by default + DefTiDBStreamAggConcurrency = 1 + DefTiDBForcePriority = mysql.NoPriority + DefEnableWindowFunction = true + DefEnablePipelinedWindowFunction = true + DefEnableStrictDoubleTypeCheck = true + DefEnableVectorizedExpression = true + DefTiDBOptJoinReorderThreshold = 0 + DefTiDBDDLSlowOprThreshold = 300 + DefTiDBUseFastAnalyze = false + DefTiDBSkipIsolationLevelCheck = false + DefTiDBExpensiveQueryTimeThreshold = 60 // 60s + DefTiDBScatterRegion = false + DefTiDBWaitSplitRegionFinish = true + DefWaitSplitRegionTimeout = 300 // 300s + DefTiDBEnableNoopFuncs = Off + DefTiDBEnableNoopVariables = true + DefTiDBAllowRemoveAutoInc = false + DefTiDBUsePlanBaselines = true + DefTiDBEvolvePlanBaselines = false + DefTiDBEvolvePlanTaskMaxTime = 600 // 600s + DefTiDBEvolvePlanTaskStartTime = "00:00 +0000" + DefTiDBEvolvePlanTaskEndTime = "23:59 +0000" + DefInnodbLockWaitTimeout = 50 // 50s + DefTiDBStoreLimit = 0 + DefTiDBMetricSchemaStep = 60 // 60s + DefTiDBMetricSchemaRangeDuration = 60 // 60s + DefTiDBFoundInPlanCache = false + DefTiDBFoundInBinding = false + DefTiDBEnableCollectExecutionInfo = true + DefTiDBAllowAutoRandExplicitInsert = false + DefTiDBEnableClusteredIndex = ClusteredIndexDefModeIntOnly + DefTiDBRedactLog = false + DefTiDBRestrictedReadOnly = false + DefTiDBSuperReadOnly = false + DefTiDBShardAllocateStep = math.MaxInt64 + DefTiDBEnableTelemetry = true + DefTiDBEnableParallelApply = false + DefTiDBEnableAmendPessimisticTxn = false + DefTiDBPartitionPruneMode = "static" + DefTiDBEnableRateLimitAction = true + DefTiDBEnableAsyncCommit = false + DefTiDBEnable1PC = false + DefTiDBGuaranteeLinearizability = true + DefTiDBAnalyzeVersion = 2 + DefTiDBEnableIndexMergeJoin = false + DefTiDBTrackAggregateMemoryUsage = true + DefTiDBEnableExchangePartition = false + DefCTEMaxRecursionDepth = 1000 + DefTiDBTmpTableMaxSize = 64 << 20 // 64MB. + DefTiDBEnableLocalTxn = false + DefTiDBTSOClientBatchMaxWaitTime = 0.0 // 0ms + DefTiDBEnableTSOFollowerProxy = false + DefTiDBEnableOrderedResultMode = false + DefTiDBEnablePseudoForOutdatedStats = true + DefTiDBRegardNULLAsPoint = true + DefEnablePlacementCheck = true + DefTimestamp = "0" + DefTiDBEnableStmtSummary = true + DefTiDBStmtSummaryInternalQuery = false + DefTiDBStmtSummaryRefreshInterval = 1800 + DefTiDBStmtSummaryHistorySize = 24 + DefTiDBStmtSummaryMaxStmtCount = 3000 + DefTiDBStmtSummaryMaxSQLLength = 4096 + DefTiDBCapturePlanBaseline = Off + DefTiDBEnableIndexMerge = true + DefEnableLegacyInstanceScope = true + DefTiDBTableCacheLease = 3 // 3s + DefTiDBPersistAnalyzeOptions = true + DefTiDBEnableColumnTracking = false + DefTiDBStatsLoadSyncWait = 0 + DefTiDBStatsLoadPseudoTimeout = false + DefSysdateIsNow = false + DefTiDBEnableMutationChecker = false + DefTiDBTxnAssertionLevel = AssertionOffStr + DefTiDBIgnorePreparedCacheCloseStmt = false + DefTiDBBatchPendingTiFlashCount = 4000 + DefRCReadCheckTS = false + DefTiDBRemoveOrderbyInSubquery = false + DefTiDBReadStaleness = 0 + DefTiDBGCMaxWaitTime = 24 * 60 * 60 + DefMaxAllowedPacket uint64 = 67108864 + DefTiDBEnableBatchDML = false + DefTiDBMemQuotaQuery = 1073741824 // 1GB + DefTiDBStatsCacheMemQuota = 0 + MaxTiDBStatsCacheMemQuota = 1024 * 1024 * 1024 * 1024 // 1TB + DefTiDBQueryLogMaxLen = 4096 + DefRequireSecureTransport = false + DefTiDBCommitterConcurrency = 128 + DefTiDBBatchDMLIgnoreError = false + DefTiDBMemQuotaAnalyze = -1 + DefTiDBEnableAutoAnalyze = true + DefTiDBMemOOMAction = "CANCEL" + DefTiDBMaxAutoAnalyzeTime = 12 * 60 * 60 + DefTiDBEnablePrepPlanCache = true + DefTiDBPrepPlanCacheSize = 100 + DefTiDBPrepPlanCacheMemoryGuardRatio = 0.1 + DefTiDBEnableConcurrentDDL = true + DefTiDBSimplifiedMetrics = false + DefTiDBEnablePaging = true + DefTiFlashFineGrainedShuffleStreamCount = -1 + DefStreamCountWhenMaxThreadsNotSet = 8 + DefTiFlashFineGrainedShuffleBatchSize = 8192 ) // Process global variables. diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index 3351f01f71888..75fa686ff8fca 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -501,7 +501,7 @@ func genRespWithMPPExec(chunks []tipb.Chunk, lastRange *coprocessor.KeyRange, co } } resp.ExecDetails = &kvrpcpb.ExecDetails{ - TimeDetail: &kvrpcpb.TimeDetail{ProcessWallTimeMs: int64(dur / time.Millisecond)}, + TimeDetail: &kvrpcpb.TimeDetail{ProcessWallTimeMs: uint64(dur / time.Millisecond)}, } resp.ExecDetailsV2 = &kvrpcpb.ExecDetailsV2{ TimeDetail: resp.ExecDetails.TimeDetail,