Skip to content

Commit

Permalink
kv request: configurable KV Timeout (#45601) (#46274)
Browse files Browse the repository at this point in the history
* kv request: configurable KV Timeout (#45601)

close #45380

* update go.mod

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* make bazel_prepare

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* update go.mod

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* update go.mod

Signed-off-by: crazycs520 <crazycs520@gmail.com>

---------

Signed-off-by: crazycs520 <crazycs520@gmail.com>
Co-authored-by: Chen Ding <dingc05@gmail.com>
  • Loading branch information
2 people authored and cfzjywxk committed Aug 29, 2023
1 parent d7ce2f2 commit 5dc9f2d
Show file tree
Hide file tree
Showing 33 changed files with 1,098 additions and 758 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3703,8 +3703,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:xL9X0pr5wZn8onnW6i98sf7KjaZmk0OIQnDhW1z+hRQ=",
version = "v2.0.4-0.20230817162610-e5fe1779769d",
sum = "h1:M6K9eYuCjDqfXZT0xlyxRsw6IlzPRBolTNFTxTnJFlk=",
version = "v2.0.4-0.20230823062051-25cbaa30b4be",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
10 changes: 7 additions & 3 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,11 @@ func TestRuntimeHintsInEvolveTasks(t *testing.T) {
tk.MustExec("create table t(a int, b int, c int, index idx_a(a), index idx_b(b), index idx_c(c))")

tk.MustExec("create global binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_a) where a >= 1 and b >= 1 and c = 0")
tk.MustQuery("select /*+ MAX_EXECUTION_TIME(5000) */ * from t where a >= 4 and b >= 1 and c = 0")
tk.MustQuery("select /*+ MAX_EXECUTION_TIME(5000), TIDB_KV_READ_TIMEOUT(20) */ * from t where a >= 4 and b >= 1 and c = 0")
tk.MustExec("admin flush bindings")
rows := tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 2)
require.Equal(t, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`), max_execution_time(5000)*/ * FROM `test`.`t` WHERE `a` >= 4 AND `b` >= 1 AND `c` = 0", rows[0][1])
require.Equal(t, "SELECT /*+ use_index(@`sel_1` `test`.`t` `idx_c`), no_order_index(@`sel_1` `test`.`t` `idx_c`), max_execution_time(5000), tidb_kv_read_timeout(20)*/ * FROM `test`.`t` WHERE `a` >= 4 AND `b` >= 1 AND `c` = 0", rows[0][1])
}

func TestDefaultSessionVars(t *testing.T) {
Expand Down Expand Up @@ -745,13 +745,15 @@ func TestStmtHints(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx(a))")
tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), MEMORY_QUOTA(2 GB) */ * from t use index(idx)")
tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), TIDB_KV_READ_TIMEOUT(20), MEMORY_QUOTA(2 GB) */ * from t use index(idx)")
tk.MustQuery("select * from t")
require.Equal(t, int64(2147483648), tk.Session().GetSessionVars().MemTracker.GetBytesLimit())
require.Equal(t, uint64(100), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime)
require.Equal(t, uint64(20), tk.Session().GetSessionVars().StmtCtx.TidbKvReadTimeout)
tk.MustQuery("select a, b from t")
require.Equal(t, int64(1073741824), tk.Session().GetSessionVars().MemTracker.GetBytesLimit())
require.Equal(t, uint64(0), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime)
require.Equal(t, uint64(0), tk.Session().GetSessionVars().StmtCtx.TidbKvReadTimeout)
}

func TestPrivileges(t *testing.T) {
Expand Down Expand Up @@ -1284,6 +1286,7 @@ func TestBindSQLDigest(t *testing.T) {
// runtime hints
{"select * from t", "select /*+ memory_quota(1024 MB) */ * from t"},
{"select * from t", "select /*+ max_execution_time(1000) */ * from t"},
{"select * from t", "select /*+ tidb_kv_read_timeout(1000) */ * from t"},
// storage hints
{"select * from t", "select /*+ read_from_storage(tikv[t]) */ * from t"},
// others
Expand Down Expand Up @@ -1345,6 +1348,7 @@ func TestDropBindBySQLDigest(t *testing.T) {
// runtime hints
{"select * from t", "select /*+ memory_quota(1024 MB) */ * from t"},
{"select * from t", "select /*+ max_execution_time(1000) */ * from t"},
{"select * from t", "select /*+ tidb_kv_read_timeout(1000) */ * from t"},
// storage hints
{"select * from t", "select /*+ read_from_storage(tikv[t]) */ * from t"},
// others
Expand Down
1 change: 1 addition & 0 deletions bindinfo/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ func TestCaptureHints(t *testing.T) {
// runtime hints
{"select /*+ memory_quota(1024 MB) */ * from t", "memory_quota(1024 mb)"},
{"select /*+ max_execution_time(1000) */ * from t", "max_execution_time(1000)"},
{"select /*+ tidb_kv_read_timeout(1000) */ * from t", "tidb_kv_read_timeout(1000)"},
// storage hints
{"select /*+ read_from_storage(tikv[t]) */ * from t", "read_from_storage(tikv[`t`])"},
// others
Expand Down
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.RequestSource.RequestSourceInternal = sv.InRestrictedSQL
builder.RequestSource.RequestSourceType = sv.RequestSourceType
builder.StoreBatchSize = sv.StoreBatchSize
builder.Request.TidbKvReadTimeout = sv.GetTidbKvReadTimeout()
return builder
}

Expand Down
27 changes: 27 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,33 @@ func TestRequestBuilder8(t *testing.T) {
require.Equal(t, expect, actual)
}

func TestRequestBuilderTidbKvReadTimeout(t *testing.T) {
sv := variable.NewSessionVars(nil)
sv.TidbKvReadTimeout = 100
actual, err := (&RequestBuilder{}).
SetFromSessionVars(sv).
Build()
require.NoError(t, err)
expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
Data: []uint8(nil),
KeyRanges: kv.NewNonParitionedKeyRanges(nil),
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
MemTracker: (*memory.Tracker)(nil),
SchemaVar: 0,
ReadReplicaScope: kv.GlobalReplicaScope,
TidbKvReadTimeout: 100,
ResourceGroupName: resourcegroup.DefaultResourceGroupName,
}
expect.Paging.MinPagingSize = paging.MinPagingSize
expect.Paging.MaxPagingSize = paging.MaxPagingSize
actual.ResourceGroupTagger = nil
require.Equal(t, expect, actual)
}

func TestTableRangesToKVRangesWithFbs(t *testing.T) {
ranges := []*ranger.Range{
{
Expand Down
3 changes: 2 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1729,7 +1729,8 @@ func getEncodedPlan(stmtCtx *stmtctx.StatementContext, genHint bool) (encodedPla
// some hints like 'memory_quota' cannot be extracted from the PhysicalPlan directly,
// so we have to iterate all hints from the customer and keep some other necessary hints.
switch tableHint.HintName.L {
case "memory_quota", "use_toja", "no_index_merge", "max_execution_time",
case plannercore.HintMemoryQuota, plannercore.HintUseToja, plannercore.HintNoIndexMerge,
plannercore.HintMaxExecutionTime, plannercore.HintTidbKvReadTimeout,
plannercore.HintAggToCop, plannercore.HintIgnoreIndex,
plannercore.HintReadFromStorage, plannercore.HintLimitToCop:
hints = append(hints, tableHint)
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,7 @@ func (b *executorBuilder) getSnapshot() (kv.Snapshot, error) {
replicaReadType := sessVars.GetReplicaRead()
snapshot.SetOption(kv.ReadReplicaScope, b.readReplicaScope)
snapshot.SetOption(kv.TaskID, sessVars.StmtCtx.TaskID)
snapshot.SetOption(kv.TidbKvReadTimeout, sessVars.GetTidbKvReadTimeout())

if replicaReadType.IsClosestRead() && b.readReplicaScope != kv.GlobalTxnScope {
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
Expand Down
67 changes: 67 additions & 0 deletions executor/executor_failpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,70 @@ func TestDeadlocksTable(t *testing.T) {
id2+"/2022-06-11 02:03:04.987654/1/203/<nil>/<nil>/<nil>/<nil>/201",
))
}

func TestTidbKvReadTimeout(t *testing.T) {
if *testkit.WithTiKV != "" {
t.Skip("skip test since it's only work for unistore")
}
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int primary key, b int)")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCDeadlineExceeded", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCDeadlineExceeded"))
}()
// Test for point_get request
rows := tk.MustQuery("explain analyze select /*+ tidb_kv_read_timeout(1) */ * from t where a = 1").Rows()
require.Len(t, rows, 1)
explain := fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*Point_Get.* Get:{num_rpc:2, total_time:.*", explain)

// Test for batch_point_get request
rows = tk.MustQuery("explain analyze select /*+ tidb_kv_read_timeout(1) */ * from t where a in (1,2)").Rows()
require.Len(t, rows, 1)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*Batch_Point_Get.* BatchGet:{num_rpc:2, total_time:.*", explain)

// Test for cop request
rows = tk.MustQuery("explain analyze select /*+ tidb_kv_read_timeout(1) */ * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)

// Test for stale read.
tk.MustExec("set @a=now(6);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select /*+ tidb_kv_read_timeout(1) */ * from t as of timestamp(@a) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)

// Test for tidb_kv_read_timeout session variable.
tk.MustExec("set @@tidb_kv_read_timeout=1;")
// Test for point_get request
rows = tk.MustQuery("explain analyze select * from t where a = 1").Rows()
require.Len(t, rows, 1)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*Point_Get.* Get:{num_rpc:2, total_time:.*", explain)

// Test for batch_point_get request
rows = tk.MustQuery("explain analyze select * from t where a in (1,2)").Rows()
require.Len(t, rows, 1)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*Batch_Point_Get.* BatchGet:{num_rpc:2, total_time:.*", explain)

// Test for cop request
rows = tk.MustQuery("explain analyze select * from t where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)

// Test for stale read.
tk.MustExec("set @a=now(6);")
tk.MustExec("set @@tidb_replica_read='closest-replicas';")
rows = tk.MustQuery("explain analyze select * from t as of timestamp(@a) where b > 1").Rows()
require.Len(t, rows, 3)
explain = fmt.Sprintf("%v", rows[0])
require.Regexp(t, ".*TableReader.* root time:.*, loops:.* cop_task: {num: 1, .* rpc_num: 2.*", explain)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.4-0.20230817162610-e5fe1779769d
github.com/tikv/client-go/v2 v2.0.4-0.20230823062051-25cbaa30b4be
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -935,8 +935,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.4-0.20230817162610-e5fe1779769d h1:xL9X0pr5wZn8onnW6i98sf7KjaZmk0OIQnDhW1z+hRQ=
github.com/tikv/client-go/v2 v2.0.4-0.20230817162610-e5fe1779769d/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ=
github.com/tikv/client-go/v2 v2.0.4-0.20230823062051-25cbaa30b4be h1:M6K9eYuCjDqfXZT0xlyxRsw6IlzPRBolTNFTxTnJFlk=
github.com/tikv/client-go/v2 v2.0.4-0.20230823062051-25cbaa30b4be/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,8 @@ type Request struct {
StoreBatchSize int
// LimitSize indicates whether the request is scan and limit
LimitSize uint64
// TidbKvReadTimeout is the timeout of kv read request
TidbKvReadTimeout uint64
}

// CoprRequestAdjuster is used to check and adjust a copr request according to specific rules.
Expand Down
2 changes: 2 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ const (
// and the next 8 bits are reserved for Lossy DDL reorg Backfill job.
// The remaining 48 bits are reserved for extendability.
TxnSource
// TidbKvReadTimeout sets the timeout value for readonly kv request in milliseconds
TidbKvReadTimeout
)

// ReplicaReadType is the type of replica to read data from
Expand Down
2 changes: 2 additions & 0 deletions parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3721,6 +3721,8 @@ func (n *TableOptimizerHint) Restore(ctx *format.RestoreCtx) error {
switch n.HintName.L {
case "max_execution_time":
ctx.WritePlainf("%d", n.HintData.(uint64))
case "tidb_kv_read_timeout":
ctx.WritePlainf("%d", n.HintData.(uint64))
case "nth_plan":
ctx.WritePlainf("%d", n.HintData.(int64))
case "tidb_hj", "tidb_smj", "tidb_inlj", "hash_join", "hash_join_build", "hash_join_probe", "merge_join", "inl_join", "broadcast_join", "shuffle_join", "inl_hash_join", "inl_merge_join", "leading":
Expand Down
2 changes: 2 additions & 0 deletions parser/ast/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func TestTableOptimizerHintRestore(t *testing.T) {
{"LEADING(t1@sel1, c1, t2)", "LEADING(`t1`@`sel1`, `c1`, `t2`)"},
{"MAX_EXECUTION_TIME(3000)", "MAX_EXECUTION_TIME(3000)"},
{"MAX_EXECUTION_TIME(@sel1 3000)", "MAX_EXECUTION_TIME(@`sel1` 3000)"},
{"TIDB_KV_READ_TIMEOUT(3000)", "TIDB_KV_READ_TIMEOUT(3000)"},
{"TIDB_KV_READ_TIMEOUT(@sel1 3000)", "TIDB_KV_READ_TIMEOUT(@`sel1` 3000)"},
{"USE_INDEX_MERGE(t1 c1)", "USE_INDEX_MERGE(`t1` `c1`)"},
{"USE_INDEX_MERGE(@sel1 t1 c1)", "USE_INDEX_MERGE(@`sel1` `t1` `c1`)"},
{"USE_INDEX_MERGE(t1@sel1 c1)", "USE_INDEX_MERGE(`t1`@`sel1` `c1`)"},
Expand Down
Loading

0 comments on commit 5dc9f2d

Please sign in to comment.