Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add usage of paging copr in optimizer #30536

Merged
merged 40 commits into from
Dec 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
aef1c85
add optimizer support
you06 Dec 6, 2021
9223044
fix compare cost
you06 Dec 6, 2021
7d75aca
set paging
you06 Dec 6, 2021
bcccba1
handle expect cnt more than max paging size
you06 Dec 7, 2021
00c4ef8
add metrics
you06 Dec 8, 2021
7f9509b
clean code
you06 Dec 8, 2021
28fe7e3
fix streaming metric label
you06 Dec 8, 2021
2f31fe4
use a threshold to decied whether go paging
you06 Dec 8, 2021
b5ab23b
decide go paging only by limit count
you06 Dec 8, 2021
72a7cfb
correct expectCnt
you06 Dec 9, 2021
aba1a8c
paging do not enlarge idx cost
you06 Dec 9, 2021
c6cc613
Merge branch 'master' into paging-copr-optimizer
you06 Dec 9, 2021
1917d29
Merge branch 'master' into paging-copr-optimizer
you06 Dec 9, 2021
9e9d34d
fix metric schame test
you06 Dec 9, 2021
cee8aaa
adjust the paging threshold
you06 Dec 10, 2021
653f165
fix test
you06 Dec 10, 2021
c6c0dcc
Merge branch 'master' into paging-copr-optimizer
you06 Dec 13, 2021
963c84c
Merge branch 'master' into paging-copr-optimizer
you06 Dec 13, 2021
0b45c0b
address comment
you06 Dec 15, 2021
66408c4
Merge branch 'master' into paging-copr-optimizer
you06 Dec 15, 2021
4d096f1
Update planner/core/task.go
you06 Dec 16, 2021
1f0b0e4
fix legend format
you06 Dec 16, 2021
5d23006
address comment
you06 Dec 16, 2021
381aa2a
extract paging constants to individual package
you06 Dec 16, 2021
367aead
add test
you06 Dec 16, 2021
90a628b
add licsnee
you06 Dec 16, 2021
c992665
add comment
you06 Dec 16, 2021
e283868
sort imports
you06 Dec 17, 2021
ab251f0
sort imports
you06 Dec 17, 2021
fb4b003
fix lint
you06 Dec 17, 2021
74c02f9
Merge branch 'master' into paging-copr-optimizer
you06 Dec 17, 2021
4a8e572
add bridge to satisfy test
you06 Dec 20, 2021
1f2167e
address comment
you06 Dec 20, 2021
7c9498b
Merge branch 'master' into paging-copr-optimizer
you06 Dec 21, 2021
b485afd
Merge branch 'master' into paging-copr-optimizer
you06 Dec 21, 2021
8c6794c
remote parallel test
you06 Dec 21, 2021
a4d48ef
use prop to pass expectCnt, add tests
you06 Dec 23, 2021
1173169
Merge branch 'master' into paging-copr-optimizer
ti-chi-bot Dec 24, 2021
fd44eb2
Merge branch 'master' into paging-copr-optimizer
ti-chi-bot Dec 24, 2021
905738b
remove parallel in test T^T
you06 Dec 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
memTracker: kvReq.MemTracker,
encodeType: encodetype,
storeType: kvReq.StoreType,
paging: kvReq.Paging,
}, nil
}

Expand Down
9 changes: 7 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ type selectResult struct {
durationReported bool
memTracker *memory.Tracker

stats *selectResultRuntimeStats
stats *selectResultRuntimeStats
paging bool
}

func (r *selectResult) fetchResp(ctx context.Context) error {
Expand Down Expand Up @@ -206,7 +207,11 @@ func (r *selectResult) fetchResp(ctx context.Context) error {
// final round of fetch
// TODO: Add a label to distinguish between success or failure.
// https://github.com/pingcap/tidb/issues/11397
metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType).Observe(r.fetchDuration.Seconds())
if r.paging {
metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType, "paging").Observe(r.fetchDuration.Seconds())
} else {
metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType, "common").Observe(r.fetchDuration.Seconds())
}
r.durationReported = true
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Respons
if !r.durationReported {
// TODO: Add a label to distinguish between success or failure.
// https://github.com/pingcap/tidb/issues/11397
metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType).Observe(r.fetchDuration.Seconds())
metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType, "streaming").Observe(r.fetchDuration.Seconds())
r.durationReported = true
}
return true, nil
Expand Down
6 changes: 6 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3409,6 +3409,11 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
if err != nil {
return nil, err
}
indexPaging := false
if v.Paging {
indexPaging = true
indexStreaming = false
}
tableReq, tableStreaming, tbl, err := buildTableReq(b, v.Schema().Len(), v.TablePlans)
if err != nil {
return nil, err
Expand All @@ -3430,6 +3435,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
columns: ts.Columns,
indexStreaming: indexStreaming,
tableStreaming: tableStreaming,
indexPaging: indexPaging,
dataReaderBuilder: &dataReaderBuilder{executorBuilder: b},
corColInIdxSide: b.corColInDistPlan(v.IndexPlans),
corColInTblSide: b.corColInDistPlan(v.TablePlans),
Expand Down
2 changes: 2 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ type IndexLookUpExecutor struct {

indexStreaming bool
tableStreaming bool
indexPaging bool

corColInIdxSide bool
corColInTblSide bool
Expand Down Expand Up @@ -560,6 +561,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.indexStreaming).
SetPaging(e.indexPaging).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.ctx.GetSessionVars()).
Expand Down
2 changes: 1 addition & 1 deletion metrics/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
Name: "handle_query_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handled queries.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblType, LblSQLType})
}, []string{LblType, LblSQLType, LblCoprType})

DistSQLScanKeysPartialHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Expand Down
4 changes: 2 additions & 2 deletions metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -6168,10 +6168,10 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(tidb_distsql_handle_query_duration_seconds_count{tidb_cluster=\"$tidb_cluster\"}[1m]))",
"expr": "sum(rate(tidb_distsql_handle_query_duration_seconds_count{tidb_cluster=\"$tidb_cluster\"}[1m])) by (copr_type)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "",
"legendFormat": "{{copr_type}}",
"metric": "tidb_distsql_query_total",
"refId": "A",
"step": 4
Expand Down
1 change: 1 addition & 0 deletions metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ const (
LblDb = "db"
LblResult = "result"
LblSQLType = "sql_type"
LblCoprType = "copr_type"
LblGeneral = "general"
LblInternal = "internal"
LbTxnMode = "txn_mode"
Expand Down
11 changes: 8 additions & 3 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,17 +456,22 @@ func (p *PhysicalIndexReader) accessObject(sctx sessionctx.Context) string {

// ExplainInfo implements Plan interface.
func (p *PhysicalIndexLookUpReader) ExplainInfo() string {
var str strings.Builder
// The children can be inferred by the relation symbol.
if p.PushedLimit != nil {
var str strings.Builder
str.WriteString("limit embedded(offset:")
str.WriteString(strconv.FormatUint(p.PushedLimit.Offset, 10))
str.WriteString(", count:")
str.WriteString(strconv.FormatUint(p.PushedLimit.Count, 10))
str.WriteString(")")
return str.String()
}
return ""
if p.Paging {
if p.PushedLimit != nil {
str.WriteString(", ")
}
str.WriteString("paging:true")
}
return str.String()
}

func (p *PhysicalIndexLookUpReader) accessObject(sctx sessionctx.Context) string {
Expand Down
1 change: 1 addition & 0 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid
indexPlan: is,
tblColHists: ds.TblColHists,
tblCols: ds.TblCols,
expectCnt: uint64(prop.ExpectedCnt),
}
cop.partitionInfo = PartitionInfo{
PruningConds: ds.allConds,
Expand Down
1 change: 1 addition & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type PhysicalIndexLookUpReader struct {
TablePlans []PhysicalPlan
indexPlan PhysicalPlan
tablePlan PhysicalPlan
Paging bool

ExtraHandleCol *expression.Column
// PushedLimit is used to avoid unnecessary table scan tasks of IndexLookUpReader.
Expand Down
64 changes: 64 additions & 0 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"fmt"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/config"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
kit "github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/testkit"
Expand Down Expand Up @@ -639,3 +641,65 @@ func (s *testPlanNormalize) TestIssue25729(c *C) {
tk.MustExec("insert into t1 values(\"a\", \"adwa\");")
tk.MustQuery("select * from t1 where concat(a, b) like \"aadwa\" and a = \"a\";").Check(testkit.Rows("a adwa"))
}

func TestCopPaging(t *testing.T) {
store, clean := kit.CreateMockStore(t)
defer clean()

tk := kit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("set session tidb_enable_paging = 1")
tk.MustExec("create table t(id int, c1 int, c2 int, primary key (id), key i(c1))")
defer tk.MustExec("drop table t")
for i := 0; i < 1024; i++ {
tk.MustExec("insert into t values(?, ?, ?)", i, i, i)
}
tk.MustExec("analyze table t")

// limit 960 should go paging
for i := 0; i < 10; i++ {
tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 960").Check(kit.Rows(
"Limit 4.00 root offset:0, count:960",
"└─IndexLookUp 4.00 root paging:true",
" ├─Selection(Build) 1024.00 cop[tikv] le(test.t.id, 1024)",
" │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true",
" └─Selection(Probe) 4.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)",
" └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false"))
}

// selection between limit and indexlookup, limit 960 should also go paging
for i := 0; i < 10; i++ {
tk.MustQuery("explain format='brief' select * from t force index(i) where mod(id, 2) > 0 and id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 960").Check(kit.Rows(
"Limit 3.20 root offset:0, count:960",
"└─Selection 2.56 root gt(mod(test.t.id, 2), 0)",
" └─IndexLookUp 3.20 root paging:true",
" ├─Selection(Build) 819.20 cop[tikv] le(test.t.id, 1024)",
" │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true",
" └─Selection(Probe) 3.20 cop[tikv] in(test.t.c2, 2, 4, 6, 8)",
" └─TableRowIDScan 819.20 cop[tikv] table:t keep order:false"))
}

// limit 961 exceeds the threshold, it should not go paging
for i := 0; i < 10; i++ {
tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 961").Check(kit.Rows(
"Limit 4.00 root offset:0, count:961",
"└─IndexLookUp 4.00 root ",
" ├─Selection(Build) 1024.00 cop[tikv] le(test.t.id, 1024)",
" │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true",
" └─Selection(Probe) 4.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)",
" └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false"))
}

// selection between limit and indexlookup, limit 961 should not go paging too
for i := 0; i < 10; i++ {
tk.MustQuery("explain format='brief' select * from t force index(i) where mod(id, 2) > 0 and id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 961").Check(kit.Rows(
"Limit 3.20 root offset:0, count:961",
"└─Selection 2.56 root gt(mod(test.t.id, 2), 0)",
" └─IndexLookUp 3.20 root ",
" ├─Selection(Build) 819.20 cop[tikv] le(test.t.id, 1024)",
" │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true",
" └─Selection(Probe) 3.20 cop[tikv] in(test.t.c2, 2, 4, 6, 8)",
" └─TableRowIDScan 819.20 cop[tikv] table:t keep order:false"))
}
}
52 changes: 51 additions & 1 deletion planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/paging"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
Expand Down Expand Up @@ -89,6 +90,10 @@ type copTask struct {

// For table partition.
partitionInfo PartitionInfo

// expectCnt is the expected row count of upper task, 0 for unlimited.
// It's used for deciding whether using paging distsql.
expectCnt uint64
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
}

func (t *copTask) invalid() bool {
Expand Down Expand Up @@ -914,7 +919,17 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
// (indexRows / batchSize) * batchSize * CPUFactor
// Since we don't know the number of copTasks built, ignore these network cost now.
indexRows := t.indexPlan.statsInfo().RowCount
newTask.cst += indexRows * sessVars.CPUFactor
idxCst := indexRows * sessVars.CPUFactor
// if the expectCnt is below the paging threshold, using paging API, recalculate idxCst.
// paging API reduces the count of index and table rows, however introduces more seek cost.
if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= paging.Threshold {
p.Paging = true
pagingCst := calcPagingCost(ctx, t)
// prevent enlarging the cost because we take paging as a better plan,
// if the cost is enlarged, it'll be easier to go another plan.
idxCst = math.Min(idxCst, pagingCst)
}
newTask.cst += idxCst
// Add cost of worker goroutines in index lookup.
numTblWorkers := float64(sessVars.IndexLookupConcurrency())
newTask.cst += (numTblWorkers + 1) * sessVars.ConcurrencyFactor
Expand Down Expand Up @@ -951,6 +966,41 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
return newTask
}

func extractRows(p PhysicalPlan) float64 {
f := float64(0)
for _, c := range p.Children() {
if len(c.Children()) != 0 {
f += extractRows(c)
} else {
f += c.statsInfo().RowCount
}
}
return f
}

// calcPagingCost calculates the cost for paging processing which may increase the seekCnt and reduce scanned rows.
func calcPagingCost(ctx sessionctx.Context, t *copTask) float64 {
you06 marked this conversation as resolved.
Show resolved Hide resolved
sessVars := ctx.GetSessionVars()
indexRows := t.indexPlan.statsInfo().RowCount
expectCnt := t.expectCnt
sourceRows := extractRows(t.indexPlan)
// with paging, the scanned rows is always less than or equal to source rows.
if uint64(sourceRows) < expectCnt {
expectCnt = uint64(sourceRows)
}
seekCnt := paging.CalculateSeekCnt(expectCnt)
indexSelectivity := float64(1)
if sourceRows > indexRows {
indexSelectivity = indexRows / sourceRows
}
pagingCst := seekCnt*sessVars.GetSeekFactor(nil) + float64(expectCnt)*sessVars.CPUFactor
pagingCst *= indexSelectivity

// we want the diff between idxCst and pagingCst here,
// however, the idxCst does not contain seekFactor, so a seekFactor needs to be removed
return pagingCst - sessVars.GetSeekFactor(nil)
}

func (t *rootTask) convertToRootTask(_ sessionctx.Context) *rootTask {
return t.copy().(*rootTask)
}
Expand Down
25 changes: 3 additions & 22 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/paging"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/metrics"
Expand All @@ -61,18 +62,6 @@ const (
copNextMaxBackoff = 20000
)

// A paging request may be separated into multi requests if there are more data than a page.
// The paging size grows from min to max, it's not well tuned yet.
// e.g. a paging request scans over range (r1, r200), it requires 64 rows in the first batch,
// if it's not drained, then the paging size grows, the new range is calculated like (r100, r200), then send a request again.
// Compare with the common unary request, paging request allows early access of data, it offers a streaming-like way processing data.
// TODO: may make the paging parameters configurable.
const (
minPagingSize uint64 = 64
maxPagingSize = minPagingSize * 128
pagingSizeGrow uint64 = 2
)

// CopClient is coprocessor client.
type CopClient struct {
kv.RequestTypeSupportedChecker
Expand Down Expand Up @@ -212,7 +201,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
// the size will grow every round.
pagingSize := uint64(0)
if req.Paging {
pagingSize = minPagingSize
pagingSize = paging.MinPagingSize
}
tasks = append(tasks, &copTask{
region: loc.Location.Region,
Expand Down Expand Up @@ -928,7 +917,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti
if task.ranges.Len() == 0 {
return nil, nil
}
task.pagingSize = growPagingSize(task.pagingSize)
task.pagingSize = paging.GrowPagingSize(task.pagingSize)
return []*copTask{task}, nil
}

Expand Down Expand Up @@ -1332,11 +1321,3 @@ func isolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel {
return kvrpcpb.IsolationLevel_SI
}
}

func growPagingSize(size uint64) uint64 {
size *= pagingSizeGrow
if size > maxPagingSize {
return maxPagingSize
}
return size
}
3 changes: 2 additions & 1 deletion store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/util/paging"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -318,7 +319,7 @@ func TestBuildPagingTasks(t *testing.T) {
require.Len(t, tasks, 1)
taskEqual(t, tasks[0], regionIDs[0], "a", "c")
require.True(t, tasks[0].paging)
require.Equal(t, tasks[0].pagingSize, minPagingSize)
require.Equal(t, tasks[0].pagingSize, paging.MinPagingSize)
}

func toCopRange(r kv.KeyRange) *coprocessor.KeyRange {
Expand Down
Loading