Skip to content

Commit

Permalink
planner: add usage of paging copr in optimizer (pingcap#30536)
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored and tangenta committed Dec 24, 2021
1 parent 391a8c0 commit bff723b
Show file tree
Hide file tree
Showing 18 changed files with 275 additions and 33 deletions.
1 change: 1 addition & 0 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
memTracker: kvReq.MemTracker,
encodeType: encodetype,
storeType: kvReq.StoreType,
paging: kvReq.Paging,
}, nil
}

Expand Down
9 changes: 7 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ type selectResult struct {
durationReported bool
memTracker *memory.Tracker

stats *selectResultRuntimeStats
stats *selectResultRuntimeStats
paging bool
}

func (r *selectResult) fetchResp(ctx context.Context) error {
Expand Down Expand Up @@ -206,7 +207,11 @@ func (r *selectResult) fetchResp(ctx context.Context) error {
// final round of fetch
// TODO: Add a label to distinguish between success or failure.
// https://github.com/pingcap/tidb/issues/11397
metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType).Observe(r.fetchDuration.Seconds())
if r.paging {
metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType, "paging").Observe(r.fetchDuration.Seconds())
} else {
metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType, "common").Observe(r.fetchDuration.Seconds())
}
r.durationReported = true
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Respons
if !r.durationReported {
// TODO: Add a label to distinguish between success or failure.
// https://github.com/pingcap/tidb/issues/11397
metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType).Observe(r.fetchDuration.Seconds())
metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType, "streaming").Observe(r.fetchDuration.Seconds())
r.durationReported = true
}
return true, nil
Expand Down
6 changes: 6 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3409,6 +3409,11 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
if err != nil {
return nil, err
}
indexPaging := false
if v.Paging {
indexPaging = true
indexStreaming = false
}
tableReq, tableStreaming, tbl, err := buildTableReq(b, v.Schema().Len(), v.TablePlans)
if err != nil {
return nil, err
Expand All @@ -3430,6 +3435,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
columns: ts.Columns,
indexStreaming: indexStreaming,
tableStreaming: tableStreaming,
indexPaging: indexPaging,
dataReaderBuilder: &dataReaderBuilder{executorBuilder: b},
corColInIdxSide: b.corColInDistPlan(v.IndexPlans),
corColInTblSide: b.corColInDistPlan(v.TablePlans),
Expand Down
2 changes: 2 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ type IndexLookUpExecutor struct {

indexStreaming bool
tableStreaming bool
indexPaging bool

corColInIdxSide bool
corColInTblSide bool
Expand Down Expand Up @@ -560,6 +561,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.indexStreaming).
SetPaging(e.indexPaging).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.ctx.GetSessionVars()).
Expand Down
2 changes: 1 addition & 1 deletion metrics/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
Name: "handle_query_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handled queries.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblType, LblSQLType})
}, []string{LblType, LblSQLType, LblCoprType})

DistSQLScanKeysPartialHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Expand Down
4 changes: 2 additions & 2 deletions metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -6168,10 +6168,10 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(tidb_distsql_handle_query_duration_seconds_count{tidb_cluster=\"$tidb_cluster\"}[1m]))",
"expr": "sum(rate(tidb_distsql_handle_query_duration_seconds_count{tidb_cluster=\"$tidb_cluster\"}[1m])) by (copr_type)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "",
"legendFormat": "{{copr_type}}",
"metric": "tidb_distsql_query_total",
"refId": "A",
"step": 4
Expand Down
1 change: 1 addition & 0 deletions metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ const (
LblDb = "db"
LblResult = "result"
LblSQLType = "sql_type"
LblCoprType = "copr_type"
LblGeneral = "general"
LblInternal = "internal"
LbTxnMode = "txn_mode"
Expand Down
11 changes: 8 additions & 3 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,17 +456,22 @@ func (p *PhysicalIndexReader) accessObject(sctx sessionctx.Context) string {

// ExplainInfo implements Plan interface.
func (p *PhysicalIndexLookUpReader) ExplainInfo() string {
var str strings.Builder
// The children can be inferred by the relation symbol.
if p.PushedLimit != nil {
var str strings.Builder
str.WriteString("limit embedded(offset:")
str.WriteString(strconv.FormatUint(p.PushedLimit.Offset, 10))
str.WriteString(", count:")
str.WriteString(strconv.FormatUint(p.PushedLimit.Count, 10))
str.WriteString(")")
return str.String()
}
return ""
if p.Paging {
if p.PushedLimit != nil {
str.WriteString(", ")
}
str.WriteString("paging:true")
}
return str.String()
}

func (p *PhysicalIndexLookUpReader) accessObject(sctx sessionctx.Context) string {
Expand Down
1 change: 1 addition & 0 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid
indexPlan: is,
tblColHists: ds.TblColHists,
tblCols: ds.TblCols,
expectCnt: uint64(prop.ExpectedCnt),
}
cop.partitionInfo = PartitionInfo{
PruningConds: ds.allConds,
Expand Down
1 change: 1 addition & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type PhysicalIndexLookUpReader struct {
TablePlans []PhysicalPlan
indexPlan PhysicalPlan
tablePlan PhysicalPlan
Paging bool

ExtraHandleCol *expression.Column
// PushedLimit is used to avoid unnecessary table scan tasks of IndexLookUpReader.
Expand Down
64 changes: 64 additions & 0 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"fmt"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/config"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
kit "github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/testkit"
Expand Down Expand Up @@ -639,3 +641,65 @@ func (s *testPlanNormalize) TestIssue25729(c *C) {
tk.MustExec("insert into t1 values(\"a\", \"adwa\");")
tk.MustQuery("select * from t1 where concat(a, b) like \"aadwa\" and a = \"a\";").Check(testkit.Rows("a adwa"))
}

func TestCopPaging(t *testing.T) {
store, clean := kit.CreateMockStore(t)
defer clean()

tk := kit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("set session tidb_enable_paging = 1")
tk.MustExec("create table t(id int, c1 int, c2 int, primary key (id), key i(c1))")
defer tk.MustExec("drop table t")
for i := 0; i < 1024; i++ {
tk.MustExec("insert into t values(?, ?, ?)", i, i, i)
}
tk.MustExec("analyze table t")

// limit 960 should go paging
for i := 0; i < 10; i++ {
tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 960").Check(kit.Rows(
"Limit 4.00 root offset:0, count:960",
"└─IndexLookUp 4.00 root paging:true",
" ├─Selection(Build) 1024.00 cop[tikv] le(test.t.id, 1024)",
" │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true",
" └─Selection(Probe) 4.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)",
" └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false"))
}

// selection between limit and indexlookup, limit 960 should also go paging
for i := 0; i < 10; i++ {
tk.MustQuery("explain format='brief' select * from t force index(i) where mod(id, 2) > 0 and id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 960").Check(kit.Rows(
"Limit 3.20 root offset:0, count:960",
"└─Selection 2.56 root gt(mod(test.t.id, 2), 0)",
" └─IndexLookUp 3.20 root paging:true",
" ├─Selection(Build) 819.20 cop[tikv] le(test.t.id, 1024)",
" │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true",
" └─Selection(Probe) 3.20 cop[tikv] in(test.t.c2, 2, 4, 6, 8)",
" └─TableRowIDScan 819.20 cop[tikv] table:t keep order:false"))
}

// limit 961 exceeds the threshold, it should not go paging
for i := 0; i < 10; i++ {
tk.MustQuery("explain format='brief' select * from t force index(i) where id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 961").Check(kit.Rows(
"Limit 4.00 root offset:0, count:961",
"└─IndexLookUp 4.00 root ",
" ├─Selection(Build) 1024.00 cop[tikv] le(test.t.id, 1024)",
" │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true",
" └─Selection(Probe) 4.00 cop[tikv] in(test.t.c2, 2, 4, 6, 8)",
" └─TableRowIDScan 1024.00 cop[tikv] table:t keep order:false"))
}

// selection between limit and indexlookup, limit 961 should not go paging too
for i := 0; i < 10; i++ {
tk.MustQuery("explain format='brief' select * from t force index(i) where mod(id, 2) > 0 and id <= 1024 and c1 >= 0 and c1 <= 1024 and c2 in (2, 4, 6, 8) order by c1 limit 961").Check(kit.Rows(
"Limit 3.20 root offset:0, count:961",
"└─Selection 2.56 root gt(mod(test.t.id, 2), 0)",
" └─IndexLookUp 3.20 root ",
" ├─Selection(Build) 819.20 cop[tikv] le(test.t.id, 1024)",
" │ └─IndexRangeScan 1024.00 cop[tikv] table:t, index:i(c1) range:[0,1024], keep order:true",
" └─Selection(Probe) 3.20 cop[tikv] in(test.t.c2, 2, 4, 6, 8)",
" └─TableRowIDScan 819.20 cop[tikv] table:t keep order:false"))
}
}
52 changes: 51 additions & 1 deletion planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/paging"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
Expand Down Expand Up @@ -89,6 +90,10 @@ type copTask struct {

// For table partition.
partitionInfo PartitionInfo

// expectCnt is the expected row count of upper task, 0 for unlimited.
// It's used for deciding whether using paging distsql.
expectCnt uint64
}

func (t *copTask) invalid() bool {
Expand Down Expand Up @@ -914,7 +919,17 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
// (indexRows / batchSize) * batchSize * CPUFactor
// Since we don't know the number of copTasks built, ignore these network cost now.
indexRows := t.indexPlan.statsInfo().RowCount
newTask.cst += indexRows * sessVars.CPUFactor
idxCst := indexRows * sessVars.CPUFactor
// if the expectCnt is below the paging threshold, using paging API, recalculate idxCst.
// paging API reduces the count of index and table rows, however introduces more seek cost.
if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= paging.Threshold {
p.Paging = true
pagingCst := calcPagingCost(ctx, t)
// prevent enlarging the cost because we take paging as a better plan,
// if the cost is enlarged, it'll be easier to go another plan.
idxCst = math.Min(idxCst, pagingCst)
}
newTask.cst += idxCst
// Add cost of worker goroutines in index lookup.
numTblWorkers := float64(sessVars.IndexLookupConcurrency())
newTask.cst += (numTblWorkers + 1) * sessVars.ConcurrencyFactor
Expand Down Expand Up @@ -951,6 +966,41 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
return newTask
}

func extractRows(p PhysicalPlan) float64 {
f := float64(0)
for _, c := range p.Children() {
if len(c.Children()) != 0 {
f += extractRows(c)
} else {
f += c.statsInfo().RowCount
}
}
return f
}

// calcPagingCost calculates the cost for paging processing which may increase the seekCnt and reduce scanned rows.
func calcPagingCost(ctx sessionctx.Context, t *copTask) float64 {
sessVars := ctx.GetSessionVars()
indexRows := t.indexPlan.statsInfo().RowCount
expectCnt := t.expectCnt
sourceRows := extractRows(t.indexPlan)
// with paging, the scanned rows is always less than or equal to source rows.
if uint64(sourceRows) < expectCnt {
expectCnt = uint64(sourceRows)
}
seekCnt := paging.CalculateSeekCnt(expectCnt)
indexSelectivity := float64(1)
if sourceRows > indexRows {
indexSelectivity = indexRows / sourceRows
}
pagingCst := seekCnt*sessVars.GetSeekFactor(nil) + float64(expectCnt)*sessVars.CPUFactor
pagingCst *= indexSelectivity

// we want the diff between idxCst and pagingCst here,
// however, the idxCst does not contain seekFactor, so a seekFactor needs to be removed
return pagingCst - sessVars.GetSeekFactor(nil)
}

func (t *rootTask) convertToRootTask(_ sessionctx.Context) *rootTask {
return t.copy().(*rootTask)
}
Expand Down
25 changes: 3 additions & 22 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/paging"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/metrics"
Expand All @@ -61,18 +62,6 @@ const (
copNextMaxBackoff = 20000
)

// A paging request may be separated into multi requests if there are more data than a page.
// The paging size grows from min to max, it's not well tuned yet.
// e.g. a paging request scans over range (r1, r200), it requires 64 rows in the first batch,
// if it's not drained, then the paging size grows, the new range is calculated like (r100, r200), then send a request again.
// Compare with the common unary request, paging request allows early access of data, it offers a streaming-like way processing data.
// TODO: may make the paging parameters configurable.
const (
minPagingSize uint64 = 64
maxPagingSize = minPagingSize * 128
pagingSizeGrow uint64 = 2
)

// CopClient is coprocessor client.
type CopClient struct {
kv.RequestTypeSupportedChecker
Expand Down Expand Up @@ -212,7 +201,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
// the size will grow every round.
pagingSize := uint64(0)
if req.Paging {
pagingSize = minPagingSize
pagingSize = paging.MinPagingSize
}
tasks = append(tasks, &copTask{
region: loc.Location.Region,
Expand Down Expand Up @@ -928,7 +917,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti
if task.ranges.Len() == 0 {
return nil, nil
}
task.pagingSize = growPagingSize(task.pagingSize)
task.pagingSize = paging.GrowPagingSize(task.pagingSize)
return []*copTask{task}, nil
}

Expand Down Expand Up @@ -1332,11 +1321,3 @@ func isolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel {
return kvrpcpb.IsolationLevel_SI
}
}

func growPagingSize(size uint64) uint64 {
size *= pagingSizeGrow
if size > maxPagingSize {
return maxPagingSize
}
return size
}
3 changes: 2 additions & 1 deletion store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/util/paging"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -318,7 +319,7 @@ func TestBuildPagingTasks(t *testing.T) {
require.Len(t, tasks, 1)
taskEqual(t, tasks[0], regionIDs[0], "a", "c")
require.True(t, tasks[0].paging)
require.Equal(t, tasks[0].pagingSize, minPagingSize)
require.Equal(t, tasks[0].pagingSize, paging.MinPagingSize)
}

func toCopRange(r kv.KeyRange) *coprocessor.KeyRange {
Expand Down
Loading

0 comments on commit bff723b

Please sign in to comment.