Skip to content

Commit

Permalink
sessionctx,kv,planner: add system variable for fine_grained_shuffle (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Jul 6, 2022
1 parent bb5f5bc commit cdde039
Show file tree
Hide file tree
Showing 19 changed files with 1,009 additions and 257 deletions.
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2502,8 +2502,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw=",
version = "v0.0.0-20220525022339-6aaebf466305",
sum = "h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=",
version = "v0.0.0-20220705053936-aa9c2d20cd2a",
)
go_repository(
name = "com_github_pingcap_log",
Expand All @@ -2523,8 +2523,8 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sum = "h1:L4nZwfYSrIsWPAZR8zMwHaNQJy0Rjy3Od6Smj5mlOms=",
version = "v0.0.0-20220602075447-4847c5d68e73",
sum = "h1:oYn6UiUSnVlMBr4rLOweNWtdAon5wCLnLGDSFf/8kMA=",
version = "v0.0.0-20220704030114-0f4f873beca8",
)
go_repository(
name = "com_github_pkg_browser",
Expand Down
4 changes: 3 additions & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
if err != nil {
return errors.Trace(err)
}
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs), zap.Int64("ID", mppTask.ID), zap.String("address", mppTask.Meta.GetAddress()), zap.String("plan", plannercore.ToString(pf.ExchangeSender)))
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
zap.Int64("ID", mppTask.ID), zap.String("address", mppTask.Meta.GetAddress()),
zap.String("plan", plannercore.ToString(pf.ExchangeSender)))
req := &kv.MPPDispatchRequest{
Data: pbData,
Meta: mppTask.Meta,
Expand Down
42 changes: 42 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1834,3 +1834,45 @@ func TestGcMaxWaitTime(t *testing.T) {
tk.MustExec("set global tidb_gc_life_time = \"72h\"")
tk.MustExec("set global tidb_gc_max_wait_time = 1000")
}

func TestTiFlashFineGrainedShuffle(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

// Default is -1.
tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("-1"))

tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = -1")
tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("-1"))
// Min val is -1.
tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = -2")
tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("-1"))

tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = 0")
tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("0"))

tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = 1024")
tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("1024"))
// Max val is 1024.
tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = 1025")
tk.MustQuery("select @@tiflash_fine_grained_shuffle_stream_count;").Check(testkit.Rows("1024"))

// Default is 8192.
tk.MustQuery("select @@tiflash_fine_grained_shuffle_batch_size;").Check(testkit.Rows("8192"))

// Min is 1.
tk.MustExec("set @@tiflash_fine_grained_shuffle_batch_size = 0")
tk.MustQuery("select @@tiflash_fine_grained_shuffle_batch_size;").Check(testkit.Rows("1"))
tk.MustExec("set @@tiflash_fine_grained_shuffle_batch_size = -1")
tk.MustQuery("select @@tiflash_fine_grained_shuffle_batch_size;").Check(testkit.Rows("1"))

// Max is uint64_max.
tk.MustExec("set @@tiflash_fine_grained_shuffle_batch_size = 18446744073709551615")
tk.MustQuery("select @@tiflash_fine_grained_shuffle_batch_size;").Check(testkit.Rows("18446744073709551615"))

// Test set global.
tk.MustExec("set global tiflash_fine_grained_shuffle_stream_count = -1")
tk.MustExec("set global tiflash_fine_grained_shuffle_batch_size = 8192")
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
github.com/pingcap/log v1.1.0
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
github.com/pingcap/tipb v0.0.0-20220602075447-4847c5d68e73
github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8
github.com/prometheus/client_golang v1.12.2
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.32.1
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,9 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand All @@ -677,8 +678,8 @@ github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops=
github.com/pingcap/tipb v0.0.0-20220602075447-4847c5d68e73 h1:L4nZwfYSrIsWPAZR8zMwHaNQJy0Rjy3Od6Smj5mlOms=
github.com/pingcap/tipb v0.0.0-20220602075447-4847c5d68e73/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8 h1:oYn6UiUSnVlMBr4rLOweNWtdAon5wCLnLGDSFf/8kMA=
github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
32 changes: 29 additions & 3 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,11 @@ func (p *PhysicalUnionScan) ExplainInfo() string {

// ExplainInfo implements Plan interface.
func (p *PhysicalSelection) ExplainInfo() string {
return string(expression.SortedExplainExpressionList(p.Conditions))
exprStr := string(expression.SortedExplainExpressionList(p.Conditions))
if p.TiFlashFineGrainedShuffleStreamCount > 0 {
exprStr += fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)
}
return exprStr
}

// ExplainNormalizedInfo implements Plan interface.
Expand All @@ -528,7 +532,11 @@ func (p *PhysicalSelection) ExplainNormalizedInfo() string {

// ExplainInfo implements Plan interface.
func (p *PhysicalProjection) ExplainInfo() string {
return expression.ExplainExpressionList(p.Exprs, p.schema)
exprStr := expression.ExplainExpressionList(p.Exprs, p.schema)
if p.TiFlashFineGrainedShuffleStreamCount > 0 {
exprStr += fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)
}
return exprStr
}

// ExplainNormalizedInfo implements Plan interface.
Expand All @@ -547,7 +555,11 @@ func (p *PhysicalTableDual) ExplainInfo() string {
// ExplainInfo implements Plan interface.
func (p *PhysicalSort) ExplainInfo() string {
buffer := bytes.NewBufferString("")
return explainByItems(buffer, p.ByItems).String()
buffer = explainByItems(buffer, p.ByItems)
if p.TiFlashFineGrainedShuffleStreamCount > 0 {
buffer.WriteString(fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount))
}
return buffer.String()
}

// ExplainInfo implements Plan interface.
Expand Down Expand Up @@ -867,6 +879,9 @@ func (p *PhysicalWindow) ExplainInfo() string {
p.formatFrameBound(buffer, p.Frame.End)
}
buffer.WriteString(")")
if p.TiFlashFineGrainedShuffleStreamCount > 0 {
buffer.WriteString(fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount))
}
return buffer.String()
}

Expand Down Expand Up @@ -995,9 +1010,20 @@ func (p *PhysicalExchangeSender) ExplainInfo() string {
}
fmt.Fprintf(buffer, "]")
}
if p.TiFlashFineGrainedShuffleStreamCount > 0 {
buffer.WriteString(fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount))
}
return buffer.String()
}

// ExplainInfo implements Plan interface.
func (p *PhysicalExchangeReceiver) ExplainInfo() (res string) {
if p.TiFlashFineGrainedShuffleStreamCount > 0 {
res = fmt.Sprintf("stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)
}
return res
}

// ExplainInfo implements Plan interface.
func (p *LogicalUnionScan) ExplainInfo() string {
buffer := bytes.NewBufferString("")
Expand Down
102 changes: 102 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package core_test
import (
"bytes"
"fmt"
"regexp"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -6547,6 +6548,107 @@ func TestTiFlashPartitionTableScan(t *testing.T) {
tk.MustExec("drop table hp_t;")
}

func TestTiFlashFineGrainedShuffle(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@tidb_enforce_mpp = on")
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int, c2 int)")

tbl1, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t1", L: "t1"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl1.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}
var input []string
var output []struct {
SQL string
Plan []string
}
integrationSuiteData := core.GetIntegrationSuiteData()
integrationSuiteData.GetTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...))
}
}

func TestTiFlashFineGrainedShuffleWithMaxTiFlashThreads(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@tidb_enforce_mpp = on")
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int, c2 int)")
tbl1, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t1", L: "t1"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl1.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

sql := "explain select row_number() over w1 from t1 window w1 as (partition by c1);"

getStreamCountFromExplain := func(rows [][]interface{}) (res []uint64) {
re := regexp.MustCompile("stream_count: ([0-9]+)")
for _, row := range rows {
buf := bytes.NewBufferString("")
_, _ = fmt.Fprintf(buf, "%s\n", row)
if matched := re.FindStringSubmatch(buf.String()); matched != nil {
require.Equal(t, len(matched), 2)
c, err := strconv.ParseUint(matched[1], 10, 64)
require.NoError(t, err)
res = append(res, c)
}
}
return res
}

// tiflash_fine_grained_shuffle_stream_count should be same with tidb_max_tiflash_threads.
tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = -1")
tk.MustExec("set @@tidb_max_tiflash_threads = 10")
rows := tk.MustQuery(sql).Rows()
streamCount := getStreamCountFromExplain(rows)
// require.Equal(t, len(streamCount), 1)
require.Equal(t, uint64(10), streamCount[0])

// tiflash_fine_grained_shuffle_stream_count should be default value when tidb_max_tiflash_threads is -1.
tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = -1")
tk.MustExec("set @@tidb_max_tiflash_threads = -1")
rows = tk.MustQuery(sql).Rows()
streamCount = getStreamCountFromExplain(rows)
// require.Equal(t, len(streamCount), 1)
require.Equal(t, uint64(variable.DefStreamCountWhenMaxThreadsNotSet), streamCount[0])

// tiflash_fine_grained_shuffle_stream_count should be default value when tidb_max_tiflash_threads is 0.
tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = -1")
tk.MustExec("set @@tidb_max_tiflash_threads = 0")
rows = tk.MustQuery(sql).Rows()
streamCount = getStreamCountFromExplain(rows)
// require.Equal(t, len(streamCount), 1)
require.Equal(t, uint64(variable.DefStreamCountWhenMaxThreadsNotSet), streamCount[0])

// Disabled when tiflash_fine_grained_shuffle_stream_count is 0.
tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = 0")
tk.MustExec("set @@tidb_max_tiflash_threads = 10")
rows = tk.MustQuery(sql).Rows()
streamCount = getStreamCountFromExplain(rows)
require.Equal(t, len(streamCount), 0)

// Test when tiflash_fine_grained_shuffle_stream_count is greater than 0.
tk.MustExec("set @@tiflash_fine_grained_shuffle_stream_count = 16")
tk.MustExec("set @@tidb_max_tiflash_threads = 10")
rows = tk.MustQuery(sql).Rows()
streamCount = getStreamCountFromExplain(rows)
// require.Equal(t, len(streamCount), 1)
require.Equal(t, uint64(16), streamCount[0])
}

func TestIssue33175(t *testing.T) {
store, _, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
Expand Down
Loading

0 comments on commit cdde039

Please sign in to comment.