Skip to content

Commit

Permalink
executor: TiFlash supports stale read (#40048)
Browse files Browse the repository at this point in the history
close #40047
  • Loading branch information
hehechen authored Dec 27, 2022
1 parent 83d275c commit afbef28
Show file tree
Hide file tree
Showing 17 changed files with 149 additions and 106 deletions.
16 changes: 12 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2923,8 +2923,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=",
version = "v0.0.0-20221130022225-6c56ac56fe5f",
sum = "h1:v0Z0nC0knwWHn3e9br8EMNfLBB14QDULn142UGjiTMQ=",
version = "v0.0.0-20221213093948-9ccc6beaf0aa",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3523,12 +3523,20 @@ def go_deps():
sum = "h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=",
version = "v0.0.0-20181126055449-889f96f722a2",
)
go_repository(
name = "com_github_tiancaiamao_gp",
build_file_proto_mode = "disable",
importpath = "github.com/tiancaiamao/gp",
sum = "h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=",
version = "v0.0.0-20221221095600-1a473d1f9b4b",
)

go_repository(
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=",
version = "v2.0.3",
sum = "h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=",
version = "v2.0.4-0.20221226080148-018c59dbd837",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
4 changes: 2 additions & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (
)

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64) (SelectResult, error) {
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64, mppQueryID kv.MPPQueryID) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
ctx = SetTiFlashMaxThreadsInContext(ctx, sctx)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, mppQueryID)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
4 changes: 4 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,10 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
sessVars.DurationParse = 0
// Clean the stale read flag when statement execution finish
sessVars.StmtCtx.IsStaleness = false
// Clean the MPP query info
sessVars.StmtCtx.MPPQueryInfo.QueryID.Store(0)
sessVars.StmtCtx.MPPQueryInfo.QueryTS.Store(0)
sessVars.StmtCtx.MPPQueryInfo.AllocatedMPPTaskID.Store(0)

if sessVars.StmtCtx.ReadFromTableCache {
metrics.ReadFromTableCacheCounter.Inc()
Expand Down
5 changes: 1 addition & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3402,17 +3402,14 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
is: b.is,
originalPlan: v.GetTablePlan(),
startTS: startTs,
mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()},
}
return gather
}

// buildTableReader builds a table reader executor. It first build a no range table reader,
// and then update it ranges from table scan plan.
func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) Executor {
if v.StoreType != kv.TiKV && b.isStaleness {
b.err = errors.New("stale requests require tikv backend")
return nil
}
failpoint.Inject("checkUseMPP", func(val failpoint.Value) {
if !b.ctx.GetSessionVars().InRestrictedSQL && val.(bool) != useMPPExecution(b.ctx, v) {
if val.(bool) {
Expand Down
2 changes: 2 additions & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -211,6 +212,7 @@ func defaultCtx() sessionctx.Context {
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, ctx.GetSessionVars().MemQuotaQuery)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(-1, -1)
ctx.GetSessionVars().SnapshotTS = uint64(1)
domain.BindDomain(ctx, domain.NewMockDomain())
return ctx
}

Expand Down
38 changes: 27 additions & 11 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -38,13 +39,26 @@ func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader
return ok
}

func getMPPQueryID(ctx sessionctx.Context) uint64 {
mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo
mppQueryInfo.QueryID.CompareAndSwap(0, plannercore.AllocMPPQueryID())
return mppQueryInfo.QueryID.Load()
}

func getMPPQueryTS(ctx sessionctx.Context) uint64 {
mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo
mppQueryInfo.QueryTS.CompareAndSwap(0, uint64(time.Now().UnixNano()))
return mppQueryInfo.QueryTS.Load()
}

// MPPGather dispatch MPP tasks and read data from root tasks.
type MPPGather struct {
// following fields are construct needed
baseExecutor
is infoschema.InfoSchema
originalPlan plannercore.PhysicalPlan
startTS uint64
mppQueryID kv.MPPQueryID

mppReqs []*kv.MPPDispatchRequest

Expand Down Expand Up @@ -78,17 +92,19 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
return errors.Trace(err)
}
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
zap.Int64("ID", mppTask.ID), zap.String("address", mppTask.Meta.GetAddress()),
zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID),
zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()),
zap.String("plan", plannercore.ToString(pf.ExchangeSender)))
req := &kv.MPPDispatchRequest{
Data: pbData,
Meta: mppTask.Meta,
ID: mppTask.ID,
IsRoot: pf.IsRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
State: kv.MppTaskReady,
Data: pbData,
Meta: mppTask.Meta,
ID: mppTask.ID,
IsRoot: pf.IsRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
MppQueryID: mppTask.MppQueryID,
State: kv.MppTaskReady,
}
e.mppReqs = append(e.mppReqs, req)
}
Expand All @@ -109,7 +125,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
// TODO: Move the construct tasks logic to planner, so we can see the explain results.
sender := e.originalPlan.(*plannercore.PhysicalExchangeSender)
planIDs := collectPlanIDS(e.originalPlan, nil)
frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is)
frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, e.mppQueryID, sender, e.is)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -124,7 +140,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs)))
}
})
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS)
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS, e.mppQueryID)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions executor/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_test(
"//executor",
"//meta/autoid",
"//parser/terror",
"//planner/core",
"//store/mockstore",
"//store/mockstore/unistore",
"//testkit",
Expand Down
16 changes: 6 additions & 10 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -267,14 +268,9 @@ func TestMppExecution(t *testing.T) {
tk.MustExec("begin")
tk.MustQuery("select count(*) from ( select * from t2 group by a, b) A group by A.b").Check(testkit.Rows("3"))
tk.MustQuery("select count(*) from t1 where t1.a+100 > ( select count(*) from t2 where t1.a=t2.a and t1.b=t2.b) group by t1.b").Check(testkit.Rows("4"))
txn, err := tk.Session().Txn(true)
require.NoError(t, err)
ts := txn.StartTS()
taskID := tk.Session().GetSessionVars().AllocMPPTaskID(ts)
require.Equal(t, int64(6), taskID)
tk.MustExec("commit")
taskID = tk.Session().GetSessionVars().AllocMPPTaskID(ts + 1)
taskID := plannercore.AllocMPPTaskID(tk.Session())
require.Equal(t, int64(1), taskID)
tk.MustExec("commit")

failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(3)`)
// all the data is related to one store, so there are three tasks.
Expand Down Expand Up @@ -1043,7 +1039,7 @@ func TestTiFlashPartitionTableBroadcastJoin(t *testing.T) {
}
}

func TestForbidTiflashDuringStaleRead(t *testing.T) {
func TestTiflashSupportStaleRead(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -1075,8 +1071,8 @@ func TestForbidTiflashDuringStaleRead(t *testing.T) {
fmt.Fprintf(resBuff, "%s\n", row)
}
res = resBuff.String()
require.NotContains(t, res, "tiflash")
require.Contains(t, res, "tikv")
require.Contains(t, res, "tiflash")
require.NotContains(t, res, "tikv")
}

func TestForbidTiFlashIfExtraPhysTableIDIsNeeded(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f
github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand All @@ -89,7 +89,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.3
github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down Expand Up @@ -219,6 +219,7 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -781,8 +781,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa h1:v0Z0nC0knwWHn3e9br8EMNfLBB14QDULn142UGjiTMQ=
github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -932,8 +932,10 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.3 h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=
github.com/tikv/client-go/v2 v2.0.3/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg=
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837 h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=
github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837/go.mod h1:ptS8K+VBrEH2gIS3JxaiFSSLfDDyuS2xcdLozOtBWBw=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
34 changes: 23 additions & 11 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,32 @@ type MPPTaskMeta interface {
GetAddress() string
}

// MPPQueryID means the global unique id of a mpp query.
type MPPQueryID struct {
QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule
LocalQueryID uint64 // unique mpp query id in local tidb memory.
ServerID uint64
}

// MPPTask means the minimum execution unit of a mpp computation job.
type MPPTask struct {
Meta MPPTaskMeta // on which store this task will execute
ID int64 // mppTaskID
StartTs uint64
TableID int64 // physical table id
Meta MPPTaskMeta // on which store this task will execute
ID int64 // mppTaskID
StartTs uint64
MppQueryID MPPQueryID
TableID int64 // physical table id

PartitionTableIDs []int64
}

// ToPB generates the pb structure.
func (t *MPPTask) ToPB() *mpp.TaskMeta {
meta := &mpp.TaskMeta{
StartTs: t.StartTs,
TaskId: t.ID,
StartTs: t.StartTs,
QueryTs: t.MppQueryID.QueryTs,
LocalQueryId: t.MppQueryID.LocalQueryID,
ServerId: t.MppQueryID.ServerID,
TaskId: t.ID,
}
if t.ID != -1 {
meta.Address = t.Meta.GetAddress()
Expand Down Expand Up @@ -70,10 +81,11 @@ type MPPDispatchRequest struct {
IsRoot bool // root task returns data to tidb directly.
Timeout uint64 // If task is assigned but doesn't receive a connect request during timeout, the task should be destroyed.
// SchemaVer is for any schema-ful storage (like tiflash) to validate schema correctness if necessary.
SchemaVar int64
StartTs uint64
ID int64 // identify a single task
State MppTaskStates
SchemaVar int64
StartTs uint64
MppQueryID MPPQueryID
ID int64 // identify a single task
State MppTaskStates
}

// MPPClient accepts and processes mpp requests.
Expand All @@ -83,7 +95,7 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
Loading

0 comments on commit afbef28

Please sign in to comment.