From 9848fca390b7bdc49a6f11715333f9606cdf8e18 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 27 Apr 2021 23:52:35 +0800 Subject: [PATCH 01/13] Implement read_ts_in built-in function Signed-off-by: JmPotato --- expression/builtin.go | 4 +- expression/builtin_time.go | 72 ++++++++++++++++++++++++++ expression/builtin_time_test.go | 64 +++++++++++++++++++++++ expression/builtin_time_vec.go | 72 ++++++++++++++++++++++++++ expression/builtin_time_vec_test.go | 6 +++ expression/integration_test.go | 70 +++++++++++++++++++++++++ go.mod | 2 + go.sum | 4 +- kv/interface_mock_test.go | 4 ++ kv/kv.go | 2 + store/helper/helper.go | 1 + store/mockstore/mockstorage/storage.go | 5 ++ store/tikv/kv.go | 39 ++++++++------ util/mock/store.go | 5 ++ 14 files changed, 330 insertions(+), 20 deletions(-) diff --git a/expression/builtin.go b/expression/builtin.go index 9c530f92949e1..f9d636c1fe662 100644 --- a/expression/builtin.go +++ b/expression/builtin.go @@ -687,6 +687,9 @@ var funcs = map[string]functionClass{ ast.Year: &yearFunctionClass{baseFunctionClass{ast.Year, 1, 1}}, ast.YearWeek: &yearWeekFunctionClass{baseFunctionClass{ast.YearWeek, 1, 2}}, ast.LastDay: &lastDayFunctionClass{baseFunctionClass{ast.LastDay, 1, 1}}, + // TSO functions + ast.ReadTSIn: &readTSInFunctionClass{baseFunctionClass{ast.ReadTSIn, 2, 2}}, + ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}}, // string functions ast.ASCII: &asciiFunctionClass{baseFunctionClass{ast.ASCII, 1, 1}}, @@ -881,7 +884,6 @@ var funcs = map[string]functionClass{ // This function is used to show tidb-server version info. ast.TiDBVersion: &tidbVersionFunctionClass{baseFunctionClass{ast.TiDBVersion, 0, 0}}, ast.TiDBIsDDLOwner: &tidbIsDDLOwnerFunctionClass{baseFunctionClass{ast.TiDBIsDDLOwner, 0, 0}}, - ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}}, ast.TiDBDecodePlan: &tidbDecodePlanFunctionClass{baseFunctionClass{ast.TiDBDecodePlan, 1, 1}}, // TiDB Sequence function. diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 1d52cf6adc2c3..4907f3d45eccf 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -27,6 +27,7 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" @@ -7113,3 +7114,74 @@ func handleInvalidZeroTime(ctx sessionctx.Context, t types.Time) (bool, error) { } return true, handleInvalidTimeError(ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, t.String())) } + +// readTSInFunctionClass reads a time window [a, b] and compares it with the latest resolvedTS +// to determine which TS to use in a read only transaction. +type readTSInFunctionClass struct { + baseFunctionClass +} + +func (c *readTSInFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) { + if err := c.verifyArgs(args); err != nil { + return nil, err + } + bf, err := newBaseBuiltinFuncWithTp(ctx, c.funcName, args, types.ETInt, types.ETDatetime, types.ETDatetime) + if err != nil { + return nil, err + } + sig := &builtinReadTSInSig{bf} + return sig, nil +} + +type builtinReadTSInSig struct { + baseBuiltinFunc +} + +func (b *builtinReadTSInSig) Clone() builtinFunc { + newSig := &builtinTidbParseTsoSig{} + newSig.cloneFrom(&b.baseBuiltinFunc) + return newSig +} + +func (b *builtinReadTSInSig) evalInt(row chunk.Row) (int64, bool, error) { + leftTime, isNull, err := b.args[0].EvalTime(b.ctx, row) + if isNull || err != nil { + return 0, true, handleInvalidTimeError(b.ctx, err) + } + rightTime, isNull, err := b.args[1].EvalTime(b.ctx, row) + if isNull || err != nil { + return 0, true, handleInvalidTimeError(b.ctx, err) + } + if invalidLeftTime, invalidRightTime := leftTime.InvalidZero(), rightTime.InvalidZero(); invalidLeftTime || invalidRightTime { + if invalidLeftTime { + err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, leftTime.String())) + } + if invalidRightTime { + err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, rightTime.String())) + } + return 0, true, err + } + minTime, err := leftTime.GoTime(b.ctx.GetSessionVars().TimeZone) + if err != nil { + return 0, true, err + } + maxTime, err := rightTime.GoTime(b.ctx.GetSessionVars().TimeZone) + if err != nil { + return 0, true, err + } + minTS, maxTS := oracle.ComposeTS(minTime.UnixNano()/int64(time.Millisecond), 0), oracle.ComposeTS(maxTime.UnixNano()/int64(time.Millisecond), 0) + var minResolveTS uint64 + if store := b.ctx.GetStore(); store != nil { + minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope()) + } + failpoint.Inject("injectResolveTS", func(val failpoint.Value) { + injectTS := val.(int) + minResolveTS = uint64(injectTS) + }) + if minResolveTS < minTS { + return int64(minTS), false, nil + } else if min <= minResolveTS && minResolveTS <= maxTS { + return int64(minResolveTS), false, nil + } + return int64(maxTS), false, nil +} diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index 97152bf81a6cb..1775110837542 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -14,12 +14,14 @@ package expression import ( + "fmt" "math" "strings" "time" . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/charset" "github.com/pingcap/parser/mysql" @@ -27,6 +29,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/mock" @@ -2862,6 +2865,67 @@ func (s *testEvaluatorSuite) TestTidbParseTso(c *C) { } } +func (s *testEvaluatorSuite) TestReadInTS(c *C) { + const timeParserLayout = "2006-01-02 15:04:05.000" + t1, err := time.Parse(timeParserLayout, "2015-09-21 09:53:04.877") + c.Assert(err, IsNil) + t1Str := t1.Format(timeParserLayout) + ts1 := int64(oracle.ComposeTS(t1.UnixNano()/int64(time.Millisecond), 0)) + t2 := time.Now().UTC() + t2Str := t2.Format(timeParserLayout) + ts2 := int64(oracle.ComposeTS(t2.UnixNano()/int64(time.Millisecond), 0)) + s.ctx.GetSessionVars().TimeZone = time.UTC + tests := []struct { + leftTime interface{} + rightTime interface{} + injectResolveTS uint64 + expect int64 + }{ + { + leftTime: t1Str, + rightTime: t2Str, + injectResolveTS: func() uint64 { + phy := t2.Add(-1*time.Second).UnixNano() / int64(time.Millisecond) + return oracle.ComposeTS(phy, 0) + }(), + expect: func() int64 { + phy := t2.Add(-1*time.Second).UnixNano() / int64(time.Millisecond) + return int64(oracle.ComposeTS(phy, 0)) + }(), + }, + { + leftTime: t1Str, + rightTime: t2Str, + injectResolveTS: func() uint64 { + phy := t1.Add(-1*time.Second).UnixNano() / int64(time.Millisecond) + return oracle.ComposeTS(phy, 0) + }(), + expect: ts1, + }, + { + leftTime: t1Str, + rightTime: t2Str, + injectResolveTS: func() uint64 { + phy := t2.Add(time.Second).UnixNano() / int64(time.Millisecond) + return oracle.ComposeTS(phy, 0) + }(), + expect: ts2, + }, + } + + fc := funcs[ast.ReadTSIn] + for _, test := range tests { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectResolveTS", + fmt.Sprintf("return(%v)", test.injectResolveTS)), IsNil) + f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(test.leftTime), types.NewDatum(test.rightTime)})) + c.Assert(err, IsNil) + d, err := evalBuiltinFunc(f, chunk.Row{}) + c.Assert(err, IsNil) + c.Assert(d.GetInt64(), Equals, test.expect) + failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS") + } +} + func (s *testEvaluatorSuite) TestGetIntervalFromDecimal(c *C) { du := baseDateArithmitical{} diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index 94c1cd8b6f0c4..aa56d3492251e 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/sessionctx/variable" @@ -854,6 +855,77 @@ func (b *builtinTidbParseTsoSig) vecEvalTime(input *chunk.Chunk, result *chunk.C return nil } +func (b *builtinReadTSInSig) vectorized() bool { + return true +} + +func (b *builtinReadTSInSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + buf0, err := b.bufAllocator.get(types.ETDatetime, n) + if err != nil { + return err + } + defer b.bufAllocator.put(buf0) + if err = b.args[0].VecEvalTime(b.ctx, input, buf0); err != nil { + return err + } + buf1, err := b.bufAllocator.get(types.ETDatetime, n) + if err != nil { + return err + } + defer b.bufAllocator.put(buf1) + if err = b.args[1].VecEvalTime(b.ctx, input, buf1); err != nil { + return err + } + result.ResizeInt64(n, false) + result.MergeNulls(buf0, buf1) + args0 := buf0.Times() + args1 := buf1.Times() + i64s := result.Int64s() + for i := 0; i < n; i++ { + if result.IsNull(i) { + continue + } + if invalidArg0, invalidArg1 := args0[i].InvalidZero(), args1[i].InvalidZero(); invalidArg0 || invalidArg1 { + if invalidArg0 { + err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, args0[i].String())) + } + if invalidArg1 { + err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, args1[i].String())) + } + if err != nil { + return err + } + result.SetNull(i, true) + continue + } + minTime, err := args0[i].GoTime(b.ctx.GetSessionVars().TimeZone) + if err != nil { + return err + } + maxTime, err := args1[i].GoTime(b.ctx.GetSessionVars().TimeZone) + if err != nil { + return err + } + minTS, maxTS := oracle.ComposeTS(minTime.UnixNano()/int64(time.Millisecond), 0), oracle.ComposeTS(maxTime.UnixNano()/int64(time.Millisecond), 0) + var minResolveTS uint64 + if store := b.ctx.GetStore(); store != nil { + minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope()) + } + failpoint.Inject("injectResolveTS", func(val failpoint.Value) { + injectTS := val.(int) + minResolveTS = uint64(injectTS) + }) + if minResolveTS < minTS { + i64s[i] = int64(minTS) + } else if min <= minResolveTS && minResolveTS <= maxTS { + i64s[i] = int64(minResolveTS) + } + i64s[i] = int64(maxTS) + } + return nil +} + func (b *builtinFromDaysSig) vectorized() bool { return true } diff --git a/expression/builtin_time_vec_test.go b/expression/builtin_time_vec_test.go index 593cce162d7ff..00a02982e404e 100644 --- a/expression/builtin_time_vec_test.go +++ b/expression/builtin_time_vec_test.go @@ -519,6 +519,12 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{ geners: []dataGenerator{newRangeInt64Gener(0, math.MaxInt64)}, }, }, + ast.ReadTSIn: { + { + retEvalType: types.ETInt, + childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime}, + }, + }, ast.LastDay: { {retEvalType: types.ETDatetime, childrenTypes: []types.EvalType{types.ETDatetime}}, }, diff --git a/expression/integration_test.go b/expression/integration_test.go index c5b3bdd6d25aa..31bfe5e1ba502 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -2262,6 +2263,75 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { result = tk.MustQuery(`select tidb_parse_tso(-1)`) result.Check(testkit.Rows("")) + // for read_ts_in + tk.MustExec("SET time_zone = '+00:00';") + t := time.Now().UTC() + ts := oracle.ComposeTS(t.UnixNano()/int64(time.Millisecond), 0) + readTSInTests := []struct { + sql string + injectResolveTS uint64 + isNull bool + expect int64 + }{ + { + sql: `select read_ts_in(DATE_SUB(NOW(), INTERVAL 600 SECOND), DATE_ADD(NOW(), INTERVAL 600 SECOND))`, + injectResolveTS: ts, + isNull: false, + expect: int64(ts), + }, + { + sql: `select read_ts_in("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, + injectResolveTS: func() uint64 { + phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:30:04.877") + c.Assert(err, IsNil) + return oracle.ComposeTS(phy.UnixNano()/int64(time.Millisecond), 0) + }(), + isNull: false, + expect: func() int64 { + phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:00:00.000") + c.Assert(err, IsNil) + return int64(oracle.ComposeTS(phy.UnixNano()/int64(time.Millisecond), 0)) + }(), + }, + { + sql: `select read_ts_in("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, + injectResolveTS: func() uint64 { + phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 11:30:04.877") + c.Assert(err, IsNil) + return oracle.ComposeTS(phy.UnixNano()/int64(time.Millisecond), 0) + }(), + isNull: false, + expect: func() int64 { + phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 12:00:00.000") + c.Assert(err, IsNil) + return int64(oracle.ComposeTS(phy.UnixNano()/int64(time.Millisecond), 0)) + }(), + }, + { + sql: `select read_ts_in(1, 2)`, + injectResolveTS: 0, + isNull: true, + expect: 0, + }, + { + sql: `select read_ts_in("invalid_time_1", "invalid_time_2")`, + injectResolveTS: 0, + isNull: true, + expect: 0, + }, + } + for _, test := range readTSInTests { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectResolveTS", + fmt.Sprintf("return(%v)", test.injectResolveTS)), IsNil) + result = tk.MustQuery(test.sql) + if test.isNull { + result.Check(testkit.Rows("")) + } else { + result.Check(testkit.Rows(fmt.Sprintf("%d", test.expect))) + } + failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS") + } + // fix issue 10308 result = tk.MustQuery("select time(\"- -\");") result.Check(testkit.Rows("00:00:00")) diff --git a/go.mod b/go.mod index 3fc648111c65d..6c8df0ecf39b7 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/pingcap/tidb +replace github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55 => github.com/JmPotato/parser v0.0.0-20210427035105-f41eb27a4a0c + require ( github.com/BurntSushi/toml v0.3.1 github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect diff --git a/go.sum b/go.sum index a4450303c0621..82f3126ca3e8a 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ github.com/HdrHistogram/hdrhistogram-go v0.9.0 h1:dpujRju0R4M/QZzcnR1LH1qm+TVG3U github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4= github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk= github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI= +github.com/JmPotato/parser v0.0.0-20210427035105-f41eb27a4a0c h1:5+IaTm0Y02tgky2m5MVwzuNW4PKcxT+BeJq6EAHH6tQ= +github.com/JmPotato/parser v0.0.0-20210427035105-f41eb27a4a0c/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -443,8 +445,6 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55 h1:J/NfwCFFPCv7h44ft+2pS3KiMyvOkHprPM5NhDJEoHc= -github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 2388c4f48b9f3..99c0991988e2c 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -217,6 +217,10 @@ func (s *mockStorage) GetMemCache() MemManager { return nil } +func (s *mockStorage) GetMinResolveTS(txnScope string) uint64 { + return 0 +} + // newMockStorage creates a new mockStorage. func newMockStorage() Storage { return &mockStorage{} diff --git a/kv/kv.go b/kv/kv.go index 711dd4e3ee860..0a81c8e4c69ac 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -411,6 +411,8 @@ type Storage interface { ShowStatus(ctx context.Context, key string) (interface{}, error) // GetMemCache return memory manager of the storage. GetMemCache() MemManager + // GetMinResolveTS return the minimal resolved TS of the storage with given txnScope. + GetMinResolveTS(txnScope string) uint64 } // EtcdBackend is used for judging a storage is a real TiKV. diff --git a/store/helper/helper.go b/store/helper/helper.go index e96ad4ae21851..056620bfb94df 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -71,6 +71,7 @@ type Storage interface { SetTiKVClient(client tikv.Client) GetTiKVClient() tikv.Client Closed() <-chan struct{} + GetMinResolveTS(txnScope string) uint64 } // Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table. diff --git a/store/mockstore/mockstorage/storage.go b/store/mockstore/mockstorage/storage.go index 6a47d3b523421..ee2c556b4dfb6 100644 --- a/store/mockstore/mockstorage/storage.go +++ b/store/mockstore/mockstorage/storage.go @@ -106,6 +106,11 @@ func (s *mockStorage) CurrentVersion(txnScope string) (kv.Version, error) { return kv.NewVersion(ver), err } +// GetMinResolveTS return the minimal resolved TS of the storage with given txnScope. +func (s *mockStorage) GetMinResolveTS(txnScope string) uint64 { + return 0 +} + func newTiKVTxn(txn *tikv.KVTxn, err error) (kv.Transaction, error) { if err != nil { return nil, err diff --git a/store/tikv/kv.go b/store/tikv/kv.go index e848c54fb8a16..3e13e14dbb54b 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -214,23 +214,7 @@ func (s *KVStore) BeginWithExactStaleness(txnScope string, prevSec uint64) (*KVT // BeginWithMinStartTS begins transaction with the least startTS func (s *KVStore) BeginWithMinStartTS(txnScope string, minStartTS uint64) (*KVTxn, error) { - stores := make([]*Store, 0) - allStores := s.regionCache.getStoresByType(tikvrpc.TiKV) - if txnScope != oracle.GlobalTxnScope { - for _, store := range allStores { - if store.IsLabelsMatch([]*metapb.StoreLabel{ - { - Key: DCLabelKey, - Value: txnScope, - }, - }) { - stores = append(stores, store) - } - } - } else { - stores = allStores - } - resolveTS := s.getMinResolveTSByStores(stores) + resolveTS := s.GetMinResolveTS(txnScope) startTS := minStartTS // If the resolveTS is larger than the minStartTS, we will use resolveTS as StartTS, otherwise we will use // minStartTS directly. @@ -400,6 +384,27 @@ func (s *KVStore) GetTiKVClient() (client Client) { return s.client } +// GetMinResolveTS return the minimal resolved TS of the storage with given txnScope. +func (s *KVStore) GetMinResolveTS(txnScope string) uint64 { + stores := make([]*Store, 0) + allStores := s.regionCache.getStoresByType(tikvrpc.TiKV) + if txnScope != oracle.GlobalTxnScope { + for _, store := range allStores { + if store.IsLabelsMatch([]*metapb.StoreLabel{ + { + Key: DCLabelKey, + Value: txnScope, + }, + }) { + stores = append(stores, store) + } + } + } else { + stores = allStores + } + return s.getMinResolveTSByStores(stores) +} + func (s *KVStore) getMinResolveTSByStores(stores []*Store) uint64 { failpoint.Inject("injectResolveTS", func(val failpoint.Value) { injectTS := val.(int) diff --git a/util/mock/store.go b/util/mock/store.go index 804f3d6a3f2d3..470200c47d915 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -72,3 +72,8 @@ func (s *Store) GetMemCache() kv.MemManager { // ShowStatus implements kv.Storage interface. func (s *Store) ShowStatus(ctx context.Context, key string) (interface{}, error) { return nil, nil } + +// GetMinResolveTS implements kv.Storage interface. +func (s *Store) GetMinResolveTS(txnScope string) uint64 { + return 0 +} From fee3489555e500a615b88827157ba4cd97c2226e Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 28 Apr 2021 01:11:47 +0800 Subject: [PATCH 02/13] Fix the test Signed-off-by: JmPotato --- expression/builtin_time.go | 7 +++++-- expression/builtin_time_test.go | 23 +++++++++++++++++++++-- expression/builtin_time_vec.go | 12 +++++++++--- expression/builtin_time_vec_test.go | 11 +++++++---- expression/integration_test.go | 6 ++++++ 5 files changed, 48 insertions(+), 11 deletions(-) diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 4907f3d45eccf..63ad7cca3ab77 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -7161,14 +7161,17 @@ func (b *builtinReadTSInSig) evalInt(row chunk.Row) (int64, bool, error) { } return 0, true, err } - minTime, err := leftTime.GoTime(b.ctx.GetSessionVars().TimeZone) + minTime, err := leftTime.GoTime(getTimeZone(b.ctx)) if err != nil { return 0, true, err } - maxTime, err := rightTime.GoTime(b.ctx.GetSessionVars().TimeZone) + maxTime, err := rightTime.GoTime(getTimeZone(b.ctx)) if err != nil { return 0, true, err } + if minTime.After(maxTime) { + return 0, true, handleInvalidTimeError(b.ctx, types.ErrWrongValue.FastGenByArgs("left time must be less then the right time")) + } minTS, maxTS := oracle.ComposeTS(minTime.UnixNano()/int64(time.Millisecond), 0), oracle.ComposeTS(maxTime.UnixNano()/int64(time.Millisecond), 0) var minResolveTS uint64 if store := b.ctx.GetStore(); store != nil { diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index 1775110837542..458555dc723c3 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -2865,7 +2865,7 @@ func (s *testEvaluatorSuite) TestTidbParseTso(c *C) { } } -func (s *testEvaluatorSuite) TestReadInTS(c *C) { +func (s *testEvaluatorSuite) TestReadTSIn(c *C) { const timeParserLayout = "2006-01-02 15:04:05.000" t1, err := time.Parse(timeParserLayout, "2015-09-21 09:53:04.877") c.Assert(err, IsNil) @@ -2879,8 +2879,10 @@ func (s *testEvaluatorSuite) TestReadInTS(c *C) { leftTime interface{} rightTime interface{} injectResolveTS uint64 + isNull bool expect int64 }{ + // ResolveTS is in the range. { leftTime: t1Str, rightTime: t2Str, @@ -2888,11 +2890,13 @@ func (s *testEvaluatorSuite) TestReadInTS(c *C) { phy := t2.Add(-1*time.Second).UnixNano() / int64(time.Millisecond) return oracle.ComposeTS(phy, 0) }(), + isNull: false, expect: func() int64 { phy := t2.Add(-1*time.Second).UnixNano() / int64(time.Millisecond) return int64(oracle.ComposeTS(phy, 0)) }(), }, + // ResolveTS is less than the left time. { leftTime: t1Str, rightTime: t2Str, @@ -2900,8 +2904,10 @@ func (s *testEvaluatorSuite) TestReadInTS(c *C) { phy := t1.Add(-1*time.Second).UnixNano() / int64(time.Millisecond) return oracle.ComposeTS(phy, 0) }(), + isNull: false, expect: ts1, }, + // ResolveTS is bigger than the right time. { leftTime: t1Str, rightTime: t2Str, @@ -2909,8 +2915,17 @@ func (s *testEvaluatorSuite) TestReadInTS(c *C) { phy := t2.Add(time.Second).UnixNano() / int64(time.Millisecond) return oracle.ComposeTS(phy, 0) }(), + isNull: false, expect: ts2, }, + // Wrong time order. + { + leftTime: t2Str, + rightTime: t1Str, + injectResolveTS: 0, + isNull: true, + expect: 0, + }, } fc := funcs[ast.ReadTSIn] @@ -2921,7 +2936,11 @@ func (s *testEvaluatorSuite) TestReadInTS(c *C) { c.Assert(err, IsNil) d, err := evalBuiltinFunc(f, chunk.Row{}) c.Assert(err, IsNil) - c.Assert(d.GetInt64(), Equals, test.expect) + if test.isNull { + c.Assert(d.IsNull(), IsTrue) + } else { + c.Assert(d.GetInt64(), Equals, test.expect) + } failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS") } } diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index aa56d3492251e..08e7e9e211d9a 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -893,20 +893,26 @@ func (b *builtinReadTSInSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column if invalidArg1 { err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, args1[i].String())) } + result.SetNull(i, true) if err != nil { return err } - result.SetNull(i, true) continue } - minTime, err := args0[i].GoTime(b.ctx.GetSessionVars().TimeZone) + minTime, err := args0[i].GoTime(getTimeZone(b.ctx)) if err != nil { + result.SetNull(i, true) return err } - maxTime, err := args1[i].GoTime(b.ctx.GetSessionVars().TimeZone) + maxTime, err := args1[i].GoTime(getTimeZone(b.ctx)) if err != nil { + result.SetNull(i, true) return err } + if minTime.After(maxTime) { + result.SetNull(i, true) + return handleInvalidTimeError(b.ctx, types.ErrWrongValue.FastGenByArgs("left time must be less then the right time")) + } minTS, maxTS := oracle.ComposeTS(minTime.UnixNano()/int64(time.Millisecond), 0), oracle.ComposeTS(maxTime.UnixNano()/int64(time.Millisecond), 0) var minResolveTS uint64 if store := b.ctx.GetStore(); store != nil { diff --git a/expression/builtin_time_vec_test.go b/expression/builtin_time_vec_test.go index 00a02982e404e..b4929df706a8e 100644 --- a/expression/builtin_time_vec_test.go +++ b/expression/builtin_time_vec_test.go @@ -520,10 +520,13 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{ }, }, ast.ReadTSIn: { - { - retEvalType: types.ETInt, - childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime}, - }, + // Because there is a chance that a time error will cause the test to fail, + // we cannot use the vectorized test framework to test builtinReadTSInSig. + // We test the builtinReadTSInSig in TestReadTSIn function. + // { + // retEvalType: types.ETInt, + // childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime}, + // }, }, ast.LastDay: { {retEvalType: types.ETDatetime, childrenTypes: []types.EvalType{types.ETDatetime}}, diff --git a/expression/integration_test.go b/expression/integration_test.go index 31bfe5e1ba502..456b5825bd7ab 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2307,6 +2307,12 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { return int64(oracle.ComposeTS(phy.UnixNano()/int64(time.Millisecond), 0)) }(), }, + { + sql: `select read_ts_in("2021-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, + injectResolveTS: 0, + isNull: true, + expect: 0, + }, { sql: `select read_ts_in(1, 2)`, injectResolveTS: 0, From 1d71ccf3d147fbbe3efa9f339b3fff3e7d0428d5 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 28 Apr 2021 10:55:56 +0800 Subject: [PATCH 03/13] Fix TestShowBuiltin Signed-off-by: JmPotato --- executor/show_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/executor/show_test.go b/executor/show_test.go index 7bebc1843db76..5722430ea661c 100644 --- a/executor/show_test.go +++ b/executor/show_test.go @@ -1102,9 +1102,10 @@ func (s *testSuite5) TestShowBuiltin(c *C) { res := tk.MustQuery("show builtins;") c.Assert(res, NotNil) rows := res.Rows() - c.Assert(268, Equals, len(rows)) + const builtinFuncNum = 269 + c.Assert(builtinFuncNum, Equals, len(rows)) c.Assert("abs", Equals, rows[0][0].(string)) - c.Assert("yearweek", Equals, rows[267][0].(string)) + c.Assert("yearweek", Equals, rows[builtinFuncNum-1][0].(string)) } func (s *testSuite5) TestShowClusterConfig(c *C) { From d4cf88ef237206e6773956f70c20fbf67e307971 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 28 Apr 2021 14:47:14 +0800 Subject: [PATCH 04/13] Fix some bugs and refine the test Signed-off-by: JmPotato --- ddl/column_type_change_test.go | 2 +- expression/builtin_time.go | 31 ++++++++++++++++++++++++++--- expression/builtin_time_test.go | 12 +++++------ expression/builtin_time_vec.go | 17 +++++++++------- expression/builtin_time_vec_test.go | 11 ++++------ expression/integration_test.go | 18 ++++++++++++----- 6 files changed, 62 insertions(+), 29 deletions(-) diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 402704401eff6..68ee059f47305 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -233,7 +233,7 @@ func (s *testColumnTypeChangeSuite) TestRollbackColumnTypeChangeBetweenInteger(c SQL := "alter table t modify column c2 int not null" _, err := tk.Exec(SQL) c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, "[ddl:1]MockRollingBackInCallBack-none") + c.Assert(err.Error(), Equals, "[ddl:1]MockRollingBackInCallBack-queueing") assertRollBackedColUnchanged(c, tk) // Mock roll back at model.StateDeleteOnly. diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 63ad7cca3ab77..5f5991d41d784 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -7169,10 +7169,14 @@ func (b *builtinReadTSInSig) evalInt(row chunk.Row) (int64, bool, error) { if err != nil { return 0, true, err } + // Make sure the time is not too big or small to prevent it from overflow later. + if !(checkTimeRange(minTime) && checkTimeRange(maxTime)) { + return 0, true, nil + } if minTime.After(maxTime) { - return 0, true, handleInvalidTimeError(b.ctx, types.ErrWrongValue.FastGenByArgs("left time must be less then the right time")) + return 0, true, nil } - minTS, maxTS := oracle.ComposeTS(minTime.UnixNano()/int64(time.Millisecond), 0), oracle.ComposeTS(maxTime.UnixNano()/int64(time.Millisecond), 0) + minTS, maxTS := oracle.ComposeTS(minTime.Unix()*1000, 0), oracle.ComposeTS(maxTime.Unix()*1000, 0) var minResolveTS uint64 if store := b.ctx.GetStore(); store != nil { minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope()) @@ -7181,10 +7185,31 @@ func (b *builtinReadTSInSig) evalInt(row chunk.Row) (int64, bool, error) { injectTS := val.(int) minResolveTS = uint64(injectTS) }) + // For a resolved TS t and a time range [t1, t2]: + // 1. If t < t1, we will use t1 as the result, + // and with it, a read request may fail because it's an unreached resolved TS. + // 2. If t1 <= t <= t2, we will use t as the result, and with it, + // a read request won't fail. + // 2. If t2 < t, we will use t2 as the result, + // and with it, a read request won't fail because it's bigger than the latest resolved TS. if minResolveTS < minTS { return int64(minTS), false, nil - } else if min <= minResolveTS && minResolveTS <= maxTS { + } else if minTS <= minResolveTS && minResolveTS <= maxTS { return int64(minResolveTS), false, nil } return int64(maxTS), false, nil } + +func checkTimeRange(t time.Time) bool { + unixT := t.Unix() + unixTMillisecond := unixT * 1000 + // Less than the unix timestamp zero or overflow after * 1000. + if unixT < 0 || unixTMillisecond < 0 { + return false + } + // Overflow after being composed to TS + if oracle.ComposeTS(unixTMillisecond, 0) < uint64(unixTMillisecond) { + return false + } + return true +} diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index 458555dc723c3..4f1b4cdee2367 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -2870,10 +2870,10 @@ func (s *testEvaluatorSuite) TestReadTSIn(c *C) { t1, err := time.Parse(timeParserLayout, "2015-09-21 09:53:04.877") c.Assert(err, IsNil) t1Str := t1.Format(timeParserLayout) - ts1 := int64(oracle.ComposeTS(t1.UnixNano()/int64(time.Millisecond), 0)) + ts1 := int64(oracle.ComposeTS(t1.Unix()*1000, 0)) t2 := time.Now().UTC() t2Str := t2.Format(timeParserLayout) - ts2 := int64(oracle.ComposeTS(t2.UnixNano()/int64(time.Millisecond), 0)) + ts2 := int64(oracle.ComposeTS(t2.Unix()*1000, 0)) s.ctx.GetSessionVars().TimeZone = time.UTC tests := []struct { leftTime interface{} @@ -2887,12 +2887,12 @@ func (s *testEvaluatorSuite) TestReadTSIn(c *C) { leftTime: t1Str, rightTime: t2Str, injectResolveTS: func() uint64 { - phy := t2.Add(-1*time.Second).UnixNano() / int64(time.Millisecond) + phy := t2.Add(-1*time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), isNull: false, expect: func() int64 { - phy := t2.Add(-1*time.Second).UnixNano() / int64(time.Millisecond) + phy := t2.Add(-1*time.Second).Unix() * 1000 return int64(oracle.ComposeTS(phy, 0)) }(), }, @@ -2901,7 +2901,7 @@ func (s *testEvaluatorSuite) TestReadTSIn(c *C) { leftTime: t1Str, rightTime: t2Str, injectResolveTS: func() uint64 { - phy := t1.Add(-1*time.Second).UnixNano() / int64(time.Millisecond) + phy := t1.Add(-1*time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), isNull: false, @@ -2912,7 +2912,7 @@ func (s *testEvaluatorSuite) TestReadTSIn(c *C) { leftTime: t1Str, rightTime: t2Str, injectResolveTS: func() uint64 { - phy := t2.Add(time.Second).UnixNano() / int64(time.Millisecond) + phy := t2.Add(time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), isNull: false, diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index 08e7e9e211d9a..61e23b3501bb8 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -893,27 +893,29 @@ func (b *builtinReadTSInSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column if invalidArg1 { err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, args1[i].String())) } - result.SetNull(i, true) if err != nil { return err } + result.SetNull(i, true) continue } minTime, err := args0[i].GoTime(getTimeZone(b.ctx)) if err != nil { - result.SetNull(i, true) return err } maxTime, err := args1[i].GoTime(getTimeZone(b.ctx)) if err != nil { - result.SetNull(i, true) return err } + if !(checkTimeRange(minTime) && checkTimeRange(maxTime)) { + result.SetNull(i, true) + continue + } if minTime.After(maxTime) { result.SetNull(i, true) - return handleInvalidTimeError(b.ctx, types.ErrWrongValue.FastGenByArgs("left time must be less then the right time")) + continue } - minTS, maxTS := oracle.ComposeTS(minTime.UnixNano()/int64(time.Millisecond), 0), oracle.ComposeTS(maxTime.UnixNano()/int64(time.Millisecond), 0) + minTS, maxTS := oracle.ComposeTS(minTime.Unix()*1000, 0), oracle.ComposeTS(maxTime.Unix()*1000, 0) var minResolveTS uint64 if store := b.ctx.GetStore(); store != nil { minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope()) @@ -924,10 +926,11 @@ func (b *builtinReadTSInSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column }) if minResolveTS < minTS { i64s[i] = int64(minTS) - } else if min <= minResolveTS && minResolveTS <= maxTS { + } else if minTS <= minResolveTS && minResolveTS <= maxTS { i64s[i] = int64(minResolveTS) + } else { + i64s[i] = int64(maxTS) } - i64s[i] = int64(maxTS) } return nil } diff --git a/expression/builtin_time_vec_test.go b/expression/builtin_time_vec_test.go index b4929df706a8e..00a02982e404e 100644 --- a/expression/builtin_time_vec_test.go +++ b/expression/builtin_time_vec_test.go @@ -520,13 +520,10 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{ }, }, ast.ReadTSIn: { - // Because there is a chance that a time error will cause the test to fail, - // we cannot use the vectorized test framework to test builtinReadTSInSig. - // We test the builtinReadTSInSig in TestReadTSIn function. - // { - // retEvalType: types.ETInt, - // childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime}, - // }, + { + retEvalType: types.ETInt, + childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime}, + }, }, ast.LastDay: { {retEvalType: types.ETDatetime, childrenTypes: []types.EvalType{types.ETDatetime}}, diff --git a/expression/integration_test.go b/expression/integration_test.go index 456b5825bd7ab..dc101d52fd2ea 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2266,7 +2266,7 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { // for read_ts_in tk.MustExec("SET time_zone = '+00:00';") t := time.Now().UTC() - ts := oracle.ComposeTS(t.UnixNano()/int64(time.Millisecond), 0) + ts := oracle.ComposeTS(t.Unix()*1000, 0) readTSInTests := []struct { sql string injectResolveTS uint64 @@ -2284,13 +2284,13 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { injectResolveTS: func() uint64 { phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:30:04.877") c.Assert(err, IsNil) - return oracle.ComposeTS(phy.UnixNano()/int64(time.Millisecond), 0) + return oracle.ComposeTS(phy.Unix()*1000, 0) }(), isNull: false, expect: func() int64 { phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:00:00.000") c.Assert(err, IsNil) - return int64(oracle.ComposeTS(phy.UnixNano()/int64(time.Millisecond), 0)) + return int64(oracle.ComposeTS(phy.Unix()*1000, 0)) }(), }, { @@ -2298,13 +2298,13 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { injectResolveTS: func() uint64 { phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 11:30:04.877") c.Assert(err, IsNil) - return oracle.ComposeTS(phy.UnixNano()/int64(time.Millisecond), 0) + return oracle.ComposeTS(phy.Unix()*1000, 0) }(), isNull: false, expect: func() int64 { phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 12:00:00.000") c.Assert(err, IsNil) - return int64(oracle.ComposeTS(phy.UnixNano()/int64(time.Millisecond), 0)) + return int64(oracle.ComposeTS(phy.Unix()*1000, 0)) }(), }, { @@ -2313,6 +2313,14 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { isNull: true, expect: 0, }, + // Time is too small. + { + sql: `select read_ts_in("0020-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, + injectResolveTS: 0, + isNull: true, + expect: 0, + }, + // Wrong value. { sql: `select read_ts_in(1, 2)`, injectResolveTS: 0, From c9309ba90347839d78fb15a4e662f4b1f8a7d905 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 28 Apr 2021 15:52:52 +0800 Subject: [PATCH 05/13] Rename to tidb_bound_staleness Signed-off-by: JmPotato --- expression/builtin.go | 4 ++-- expression/builtin_time.go | 14 +++++++------- expression/builtin_time_test.go | 4 ++-- expression/builtin_time_vec.go | 4 ++-- expression/builtin_time_vec_test.go | 2 +- expression/integration_test.go | 20 ++++++++++---------- go.mod | 2 +- go.sum | 4 ++-- 8 files changed, 27 insertions(+), 27 deletions(-) diff --git a/expression/builtin.go b/expression/builtin.go index f9d636c1fe662..7f7a23c495893 100644 --- a/expression/builtin.go +++ b/expression/builtin.go @@ -688,8 +688,8 @@ var funcs = map[string]functionClass{ ast.YearWeek: &yearWeekFunctionClass{baseFunctionClass{ast.YearWeek, 1, 2}}, ast.LastDay: &lastDayFunctionClass{baseFunctionClass{ast.LastDay, 1, 1}}, // TSO functions - ast.ReadTSIn: &readTSInFunctionClass{baseFunctionClass{ast.ReadTSIn, 2, 2}}, - ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}}, + ast.TiDBBoundStaleness: &tidbBoundStalenessFunctionClass{baseFunctionClass{ast.TiDBBoundStaleness, 2, 2}}, + ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}}, // string functions ast.ASCII: &asciiFunctionClass{baseFunctionClass{ast.ASCII, 1, 1}}, diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 5f5991d41d784..205df92d75827 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -7115,13 +7115,13 @@ func handleInvalidZeroTime(ctx sessionctx.Context, t types.Time) (bool, error) { return true, handleInvalidTimeError(ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, t.String())) } -// readTSInFunctionClass reads a time window [a, b] and compares it with the latest resolvedTS +// tidbBoundStalenessFunctionClass reads a time window [a, b] and compares it with the latest resolvedTS // to determine which TS to use in a read only transaction. -type readTSInFunctionClass struct { +type tidbBoundStalenessFunctionClass struct { baseFunctionClass } -func (c *readTSInFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) { +func (c *tidbBoundStalenessFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) { if err := c.verifyArgs(args); err != nil { return nil, err } @@ -7129,21 +7129,21 @@ func (c *readTSInFunctionClass) getFunction(ctx sessionctx.Context, args []Expre if err != nil { return nil, err } - sig := &builtinReadTSInSig{bf} + sig := &builtinTiDBBoundStalenessSig{bf} return sig, nil } -type builtinReadTSInSig struct { +type builtinTiDBBoundStalenessSig struct { baseBuiltinFunc } -func (b *builtinReadTSInSig) Clone() builtinFunc { +func (b *builtinTiDBBoundStalenessSig) Clone() builtinFunc { newSig := &builtinTidbParseTsoSig{} newSig.cloneFrom(&b.baseBuiltinFunc) return newSig } -func (b *builtinReadTSInSig) evalInt(row chunk.Row) (int64, bool, error) { +func (b *builtinTiDBBoundStalenessSig) evalInt(row chunk.Row) (int64, bool, error) { leftTime, isNull, err := b.args[0].EvalTime(b.ctx, row) if isNull || err != nil { return 0, true, handleInvalidTimeError(b.ctx, err) diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index 4f1b4cdee2367..3bbbc24b980e8 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -2865,7 +2865,7 @@ func (s *testEvaluatorSuite) TestTidbParseTso(c *C) { } } -func (s *testEvaluatorSuite) TestReadTSIn(c *C) { +func (s *testEvaluatorSuite) TestTiDBBoundStaleness(c *C) { const timeParserLayout = "2006-01-02 15:04:05.000" t1, err := time.Parse(timeParserLayout, "2015-09-21 09:53:04.877") c.Assert(err, IsNil) @@ -2928,7 +2928,7 @@ func (s *testEvaluatorSuite) TestReadTSIn(c *C) { }, } - fc := funcs[ast.ReadTSIn] + fc := funcs[ast.TiDBBoundStaleness] for _, test := range tests { c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectResolveTS", fmt.Sprintf("return(%v)", test.injectResolveTS)), IsNil) diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index 61e23b3501bb8..fd42baa1d4edd 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -855,11 +855,11 @@ func (b *builtinTidbParseTsoSig) vecEvalTime(input *chunk.Chunk, result *chunk.C return nil } -func (b *builtinReadTSInSig) vectorized() bool { +func (b *builtinTiDBBoundStalenessSig) vectorized() bool { return true } -func (b *builtinReadTSInSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { +func (b *builtinTiDBBoundStalenessSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() buf0, err := b.bufAllocator.get(types.ETDatetime, n) if err != nil { diff --git a/expression/builtin_time_vec_test.go b/expression/builtin_time_vec_test.go index 00a02982e404e..5b4b74f50b17b 100644 --- a/expression/builtin_time_vec_test.go +++ b/expression/builtin_time_vec_test.go @@ -519,7 +519,7 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{ geners: []dataGenerator{newRangeInt64Gener(0, math.MaxInt64)}, }, }, - ast.ReadTSIn: { + ast.TiDBBoundStaleness: { { retEvalType: types.ETInt, childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime}, diff --git a/expression/integration_test.go b/expression/integration_test.go index dc101d52fd2ea..fea537727c3ff 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2263,24 +2263,24 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { result = tk.MustQuery(`select tidb_parse_tso(-1)`) result.Check(testkit.Rows("")) - // for read_ts_in + // for tidb_bound_staleness tk.MustExec("SET time_zone = '+00:00';") t := time.Now().UTC() ts := oracle.ComposeTS(t.Unix()*1000, 0) - readTSInTests := []struct { + tidbBoundStalenessTests := []struct { sql string injectResolveTS uint64 isNull bool expect int64 }{ { - sql: `select read_ts_in(DATE_SUB(NOW(), INTERVAL 600 SECOND), DATE_ADD(NOW(), INTERVAL 600 SECOND))`, + sql: `select tidb_bound_staleness(DATE_SUB(NOW(), INTERVAL 600 SECOND), DATE_ADD(NOW(), INTERVAL 600 SECOND))`, injectResolveTS: ts, isNull: false, expect: int64(ts), }, { - sql: `select read_ts_in("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, + sql: `select tidb_bound_staleness("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, injectResolveTS: func() uint64 { phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:30:04.877") c.Assert(err, IsNil) @@ -2294,7 +2294,7 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { }(), }, { - sql: `select read_ts_in("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, + sql: `select tidb_bound_staleness("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, injectResolveTS: func() uint64 { phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 11:30:04.877") c.Assert(err, IsNil) @@ -2308,33 +2308,33 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { }(), }, { - sql: `select read_ts_in("2021-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, + sql: `select tidb_bound_staleness("2021-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, injectResolveTS: 0, isNull: true, expect: 0, }, // Time is too small. { - sql: `select read_ts_in("0020-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, + sql: `select tidb_bound_staleness("0020-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, injectResolveTS: 0, isNull: true, expect: 0, }, // Wrong value. { - sql: `select read_ts_in(1, 2)`, + sql: `select tidb_bound_staleness(1, 2)`, injectResolveTS: 0, isNull: true, expect: 0, }, { - sql: `select read_ts_in("invalid_time_1", "invalid_time_2")`, + sql: `select tidb_bound_staleness("invalid_time_1", "invalid_time_2")`, injectResolveTS: 0, isNull: true, expect: 0, }, } - for _, test := range readTSInTests { + for _, test := range tidbBoundStalenessTests { c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectResolveTS", fmt.Sprintf("return(%v)", test.injectResolveTS)), IsNil) result = tk.MustQuery(test.sql) diff --git a/go.mod b/go.mod index 6c8df0ecf39b7..8fa1e66ceb0e4 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/pingcap/tidb -replace github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55 => github.com/JmPotato/parser v0.0.0-20210427035105-f41eb27a4a0c +replace github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55 => github.com/JmPotato/parser v0.0.0-20210428071746-2dcbd7ce4694 require ( github.com/BurntSushi/toml v0.3.1 diff --git a/go.sum b/go.sum index 82f3126ca3e8a..f7d0b653922db 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,8 @@ github.com/HdrHistogram/hdrhistogram-go v0.9.0 h1:dpujRju0R4M/QZzcnR1LH1qm+TVG3U github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4= github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk= github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI= -github.com/JmPotato/parser v0.0.0-20210427035105-f41eb27a4a0c h1:5+IaTm0Y02tgky2m5MVwzuNW4PKcxT+BeJq6EAHH6tQ= -github.com/JmPotato/parser v0.0.0-20210427035105-f41eb27a4a0c/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= +github.com/JmPotato/parser v0.0.0-20210428071746-2dcbd7ce4694 h1:pBJc7xeGu1d7kTKl46Kfu5ZTOTpxsbi8Q7dwx7VmZWM= +github.com/JmPotato/parser v0.0.0-20210428071746-2dcbd7ce4694/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= From 7d8c27f49119ebbb0ae4a22053c6ae0daf91129c Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 28 Apr 2021 17:13:00 +0800 Subject: [PATCH 06/13] Make tidb_bound_staleness deterministic Signed-off-by: JmPotato --- expression/builtin_time.go | 3 +++ expression/builtin_time_test.go | 26 ++++++++++++++++-- expression/builtin_time_vec.go | 20 ++++++++------ expression/builtin_time_vec_test.go | 1 + expression/helper.go | 3 ++- expression/integration_test.go | 2 +- sessionctx/stmtctx/stmtctx.go | 41 ++++++++++++++++++++--------- 7 files changed, 72 insertions(+), 24 deletions(-) diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 205df92d75827..b7264f6cd21aa 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -7185,6 +7185,9 @@ func (b *builtinTiDBBoundStalenessSig) evalInt(row chunk.Row) (int64, bool, erro injectTS := val.(int) minResolveTS = uint64(injectTS) }) + // Try to get from the stmt cache to make sure this function is deterministic. + stmtCtx := b.ctx.GetSessionVars().StmtCtx + minResolveTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtResolveTsCacheKey, minResolveTS).(uint64) // For a resolved TS t and a time range [t1, t2]: // 1. If t < t1, we will use t1 as the result, // and with it, a read request may fail because it's an unreached resolved TS. diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index 3bbbc24b980e8..ea479e0ad8820 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -815,7 +815,7 @@ func (s *testEvaluatorSuite) TestTime(c *C) { } func resetStmtContext(ctx sessionctx.Context) { - ctx.GetSessionVars().StmtCtx.ResetNowTs() + ctx.GetSessionVars().StmtCtx.ResetStmtCache() } func (s *testEvaluatorSuite) TestNowAndUTCTimestamp(c *C) { @@ -2941,8 +2941,30 @@ func (s *testEvaluatorSuite) TestTiDBBoundStaleness(c *C) { } else { c.Assert(d.GetInt64(), Equals, test.expect) } - failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS") + resetStmtContext(s.ctx) } + + // Test whether it's deterministic. + resolveTS1 := oracle.ComposeTS(t2.Add(-1*time.Second).Unix()*1000, 0) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectResolveTS", + fmt.Sprintf("return(%v)", resolveTS1)), IsNil) + f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)})) + c.Assert(err, IsNil) + d, err := evalBuiltinFunc(f, chunk.Row{}) + c.Assert(err, IsNil) + c.Assert(d.GetInt64(), Equals, int64(resolveTS1)) + // ResolveTS updated. + resolveTS2 := oracle.ComposeTS(t2.Add(1*time.Second).Unix()*1000, 0) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectResolveTS", + fmt.Sprintf("return(%v)", resolveTS2)), IsNil) + f, err = fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)})) + c.Assert(err, IsNil) + d, err = evalBuiltinFunc(f, chunk.Row{}) + c.Assert(err, IsNil) + // Still resolveTS1 + c.Assert(d.GetInt64(), Equals, int64(resolveTS1)) + resetStmtContext(s.ctx) + failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS") } func (s *testEvaluatorSuite) TestGetIntervalFromDecimal(c *C) { diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index fd42baa1d4edd..27b5ab114aaab 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" @@ -882,6 +883,17 @@ func (b *builtinTiDBBoundStalenessSig) vecEvalInt(input *chunk.Chunk, result *ch args0 := buf0.Times() args1 := buf1.Times() i64s := result.Int64s() + var minResolveTS uint64 + if store := b.ctx.GetStore(); store != nil { + minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope()) + } + failpoint.Inject("injectResolveTS", func(val failpoint.Value) { + injectTS := val.(int) + minResolveTS = uint64(injectTS) + }) + // Try to get from the stmt cache to make sure this function is deterministic. + stmtCtx := b.ctx.GetSessionVars().StmtCtx + minResolveTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtResolveTsCacheKey, minResolveTS).(uint64) for i := 0; i < n; i++ { if result.IsNull(i) { continue @@ -916,14 +928,6 @@ func (b *builtinTiDBBoundStalenessSig) vecEvalInt(input *chunk.Chunk, result *ch continue } minTS, maxTS := oracle.ComposeTS(minTime.Unix()*1000, 0), oracle.ComposeTS(maxTime.Unix()*1000, 0) - var minResolveTS uint64 - if store := b.ctx.GetStore(); store != nil { - minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope()) - } - failpoint.Inject("injectResolveTS", func(val failpoint.Value) { - injectTS := val.(int) - minResolveTS = uint64(injectTS) - }) if minResolveTS < minTS { i64s[i] = int64(minTS) } else if minTS <= minResolveTS && minResolveTS <= maxTS { diff --git a/expression/builtin_time_vec_test.go b/expression/builtin_time_vec_test.go index 5b4b74f50b17b..190d376cfffe6 100644 --- a/expression/builtin_time_vec_test.go +++ b/expression/builtin_time_vec_test.go @@ -519,6 +519,7 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{ geners: []dataGenerator{newRangeInt64Gener(0, math.MaxInt64)}, }, }, + // Todo: how to inject the resolveTS for better testing. ast.TiDBBoundStaleness: { { retEvalType: types.ETInt, diff --git a/expression/helper.go b/expression/helper.go index 1e89c86e705f2..911edd4ef7758 100644 --- a/expression/helper.go +++ b/expression/helper.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" @@ -155,5 +156,5 @@ func getStmtTimestamp(ctx sessionctx.Context) (time.Time, error) { return time.Unix(timestamp, 0), nil } stmtCtx := ctx.GetSessionVars().StmtCtx - return stmtCtx.GetNowTsCached(), nil + return stmtCtx.GetOrStoreStmtCache(stmtctx.StmtNowTsCacheKey, time.Now()).(time.Time), nil } diff --git a/expression/integration_test.go b/expression/integration_test.go index fea537727c3ff..b2484db6279cf 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2343,8 +2343,8 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { } else { result.Check(testkit.Rows(fmt.Sprintf("%d", test.expect))) } - failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS") } + failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS") // fix issue 10308 result = tk.MustQuery("select time(\"- -\");") diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 8df0001427173..9d57636e7d18f 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -140,8 +140,6 @@ type StatementContext struct { RuntimeStatsColl *execdetails.RuntimeStatsColl TableIDs []int64 IndexNames []string - nowTs time.Time // use this variable for now/current_timestamp calculation/cache for one stmt - stmtTimeCached bool StmtType string OriginalSQL string digestMemo struct { @@ -164,6 +162,9 @@ type StatementContext struct { TblInfo2UnionScan map[*model.TableInfo]bool TaskID uint64 // unique ID for an execution of a statement TaskMapBakTS uint64 // counter for + + // stmtCache is used to store some statement-related values. + stmtCache map[StmtCacheKey]interface{} } // StmtHints are SessionVars related sql hints. @@ -195,19 +196,35 @@ func (sh *StmtHints) TaskMapNeedBackUp() bool { return sh.ForceNthPlan != -1 } -// GetNowTsCached getter for nowTs, if not set get now time and cache it -func (sc *StatementContext) GetNowTsCached() time.Time { - if !sc.stmtTimeCached { - now := time.Now() - sc.nowTs = now - sc.stmtTimeCached = true +// StmtCacheKey represents the key type in the StmtCache. +type StmtCacheKey int + +const ( + // StmtNowTsCacheKey is a variable for now/current_timestamp calculation/cache for one stmt. + StmtNowTsCacheKey StmtCacheKey = iota + // StmtNowTsCacheKey is a variable for resolveTs calculation/cache for one stmt. + StmtResolveTsCacheKey +) + +// GetFromStmtCache gets the cached value of the given key if it exists, otherwise will store the value. +func (sc *StatementContext) GetOrStoreStmtCache(key StmtCacheKey, value interface{}) interface{} { + if sc.stmtCache == nil { + sc.stmtCache = make(map[StmtCacheKey]interface{}) + } + if _, ok := sc.stmtCache[key]; !ok { + sc.stmtCache[key] = value } - return sc.nowTs + return sc.stmtCache[key] +} + +// ResetInStmtCache resets the cache of given key. +func (sc *StatementContext) ResetInStmtCache(key StmtCacheKey) { + delete(sc.stmtCache, key) } -// ResetNowTs resetter for nowTs, clear cached time flag -func (sc *StatementContext) ResetNowTs() { - sc.stmtTimeCached = false +// ResetStmtCache resets all cached values. +func (sc *StatementContext) ResetStmtCache() { + sc.stmtCache = make(map[StmtCacheKey]interface{}) } // SQLDigest gets normalized and digest for provided sql. From 2a7928db5eaf173414f1607db26ed33ed6e73c39 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 10 May 2021 11:41:37 +0800 Subject: [PATCH 07/13] Improve code reusability Signed-off-by: JmPotato --- expression/builtin_time.go | 38 +++++++++++++++++++--------------- expression/builtin_time_vec.go | 13 +----------- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/expression/builtin_time.go b/expression/builtin_time.go index b7264f6cd21aa..cb51263cc8279 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -7181,26 +7181,10 @@ func (b *builtinTiDBBoundStalenessSig) evalInt(row chunk.Row) (int64, bool, erro if store := b.ctx.GetStore(); store != nil { minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope()) } - failpoint.Inject("injectResolveTS", func(val failpoint.Value) { - injectTS := val.(int) - minResolveTS = uint64(injectTS) - }) // Try to get from the stmt cache to make sure this function is deterministic. stmtCtx := b.ctx.GetSessionVars().StmtCtx minResolveTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtResolveTsCacheKey, minResolveTS).(uint64) - // For a resolved TS t and a time range [t1, t2]: - // 1. If t < t1, we will use t1 as the result, - // and with it, a read request may fail because it's an unreached resolved TS. - // 2. If t1 <= t <= t2, we will use t as the result, and with it, - // a read request won't fail. - // 2. If t2 < t, we will use t2 as the result, - // and with it, a read request won't fail because it's bigger than the latest resolved TS. - if minResolveTS < minTS { - return int64(minTS), false, nil - } else if minTS <= minResolveTS && minResolveTS <= maxTS { - return int64(minResolveTS), false, nil - } - return int64(maxTS), false, nil + return calAppropriateTS(minTS, maxTS, minResolveTS), false, nil } func checkTimeRange(t time.Time) bool { @@ -7216,3 +7200,23 @@ func checkTimeRange(t time.Time) bool { } return true } + +// For a resolved TS t and a time range [t1, t2]: +// 1. If t < t1, we will use t1 as the result, +// and with it, a read request may fail because it's an unreached resolved TS. +// 2. If t1 <= t <= t2, we will use t as the result, and with it, +// a read request won't fail. +// 2. If t2 < t, we will use t2 as the result, +// and with it, a read request won't fail because it's bigger than the latest resolved TS. +func calAppropriateTS(minTS, maxTS, minResolveTS uint64) int64 { + failpoint.Inject("injectResolveTS", func(val failpoint.Value) { + injectTS := val.(int) + minResolveTS = uint64(injectTS) + }) + if minResolveTS < minTS { + return int64(minTS) + } else if minTS <= minResolveTS && minResolveTS <= maxTS { + return int64(minResolveTS) + } + return int64(maxTS) +} diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index 27b5ab114aaab..aa6a0fa213717 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -21,7 +21,6 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -887,10 +886,6 @@ func (b *builtinTiDBBoundStalenessSig) vecEvalInt(input *chunk.Chunk, result *ch if store := b.ctx.GetStore(); store != nil { minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope()) } - failpoint.Inject("injectResolveTS", func(val failpoint.Value) { - injectTS := val.(int) - minResolveTS = uint64(injectTS) - }) // Try to get from the stmt cache to make sure this function is deterministic. stmtCtx := b.ctx.GetSessionVars().StmtCtx minResolveTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtResolveTsCacheKey, minResolveTS).(uint64) @@ -928,13 +923,7 @@ func (b *builtinTiDBBoundStalenessSig) vecEvalInt(input *chunk.Chunk, result *ch continue } minTS, maxTS := oracle.ComposeTS(minTime.Unix()*1000, 0), oracle.ComposeTS(maxTime.Unix()*1000, 0) - if minResolveTS < minTS { - i64s[i] = int64(minTS) - } else if minTS <= minResolveTS && minResolveTS <= maxTS { - i64s[i] = int64(minResolveTS) - } else { - i64s[i] = int64(maxTS) - } + i64s[i] = calAppropriateTS(minTS, maxTS, minResolveTS) } return nil } From 78ddfe4efafe6f1cf85705b1bbe3ffbb92d70b8f Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 10 May 2021 12:17:45 +0800 Subject: [PATCH 08/13] Fix the test Signed-off-by: JmPotato --- expression/builtin_time.go | 27 ++++++++++++++++----------- expression/builtin_time_vec.go | 11 ++--------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/expression/builtin_time.go b/expression/builtin_time.go index cb51263cc8279..ba6cdff2de815 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -7177,13 +7177,7 @@ func (b *builtinTiDBBoundStalenessSig) evalInt(row chunk.Row) (int64, bool, erro return 0, true, nil } minTS, maxTS := oracle.ComposeTS(minTime.Unix()*1000, 0), oracle.ComposeTS(maxTime.Unix()*1000, 0) - var minResolveTS uint64 - if store := b.ctx.GetStore(); store != nil { - minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope()) - } - // Try to get from the stmt cache to make sure this function is deterministic. - stmtCtx := b.ctx.GetSessionVars().StmtCtx - minResolveTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtResolveTsCacheKey, minResolveTS).(uint64) + minResolveTS := getMinResolveTS(b.ctx) return calAppropriateTS(minTS, maxTS, minResolveTS), false, nil } @@ -7201,6 +7195,21 @@ func checkTimeRange(t time.Time) bool { return true } +func getMinResolveTS(sessionCtx sessionctx.Context) (minResolveTS uint64) { + if store := sessionCtx.GetStore(); store != nil { + minResolveTS = store.GetMinResolveTS(sessionCtx.GetSessionVars().CheckAndGetTxnScope()) + } + // Inject mocked ResolveTS for test. + failpoint.Inject("injectResolveTS", func(val failpoint.Value) { + injectTS := val.(int) + minResolveTS = uint64(injectTS) + }) + // Try to get from the stmt cache to make sure this function is deterministic. + stmtCtx := sessionCtx.GetSessionVars().StmtCtx + minResolveTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtResolveTsCacheKey, minResolveTS).(uint64) + return +} + // For a resolved TS t and a time range [t1, t2]: // 1. If t < t1, we will use t1 as the result, // and with it, a read request may fail because it's an unreached resolved TS. @@ -7209,10 +7218,6 @@ func checkTimeRange(t time.Time) bool { // 2. If t2 < t, we will use t2 as the result, // and with it, a read request won't fail because it's bigger than the latest resolved TS. func calAppropriateTS(minTS, maxTS, minResolveTS uint64) int64 { - failpoint.Inject("injectResolveTS", func(val failpoint.Value) { - injectTS := val.(int) - minResolveTS = uint64(injectTS) - }) if minResolveTS < minTS { return int64(minTS) } else if minTS <= minResolveTS && minResolveTS <= maxTS { diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index aa6a0fa213717..104c379357a3f 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" @@ -879,16 +878,10 @@ func (b *builtinTiDBBoundStalenessSig) vecEvalInt(input *chunk.Chunk, result *ch } result.ResizeInt64(n, false) result.MergeNulls(buf0, buf1) + i64s := result.Int64s() args0 := buf0.Times() args1 := buf1.Times() - i64s := result.Int64s() - var minResolveTS uint64 - if store := b.ctx.GetStore(); store != nil { - minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope()) - } - // Try to get from the stmt cache to make sure this function is deterministic. - stmtCtx := b.ctx.GetSessionVars().StmtCtx - minResolveTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtResolveTsCacheKey, minResolveTS).(uint64) + minResolveTS := getMinResolveTS(b.ctx) for i := 0; i < n; i++ { if result.IsNull(i) { continue From 5c43d1b32c189d90ae4d04dd29a194b8e523a8d1 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 12 May 2021 11:24:13 +0800 Subject: [PATCH 09/13] Rename to tidb_staleness_bound Signed-off-by: JmPotato --- expression/builtin.go | 2 +- expression/builtin_time.go | 14 +++++++------- expression/builtin_time_test.go | 4 ++-- expression/builtin_time_vec.go | 4 ++-- expression/builtin_time_vec_test.go | 2 +- expression/integration_test.go | 20 ++++++++++---------- go.mod | 4 ++-- go.sum | 8 ++++---- 8 files changed, 29 insertions(+), 29 deletions(-) diff --git a/expression/builtin.go b/expression/builtin.go index 7f7a23c495893..2685445bc8fb6 100644 --- a/expression/builtin.go +++ b/expression/builtin.go @@ -688,7 +688,7 @@ var funcs = map[string]functionClass{ ast.YearWeek: &yearWeekFunctionClass{baseFunctionClass{ast.YearWeek, 1, 2}}, ast.LastDay: &lastDayFunctionClass{baseFunctionClass{ast.LastDay, 1, 1}}, // TSO functions - ast.TiDBBoundStaleness: &tidbBoundStalenessFunctionClass{baseFunctionClass{ast.TiDBBoundStaleness, 2, 2}}, + ast.TiDBStalenessBound: &tidbStalenessBoundFunctionClass{baseFunctionClass{ast.TiDBStalenessBound, 2, 2}}, ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}}, // string functions diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 8fc15f6f9a602..f001b0222245a 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -7115,13 +7115,13 @@ func handleInvalidZeroTime(ctx sessionctx.Context, t types.Time) (bool, error) { return true, handleInvalidTimeError(ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, t.String())) } -// tidbBoundStalenessFunctionClass reads a time window [a, b] and compares it with the latest resolvedTS +// tidbStalenessBoundFunctionClass reads a time window [a, b] and compares it with the latest resolvedTS // to determine which TS to use in a read only transaction. -type tidbBoundStalenessFunctionClass struct { +type tidbStalenessBoundFunctionClass struct { baseFunctionClass } -func (c *tidbBoundStalenessFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) { +func (c *tidbStalenessBoundFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) { if err := c.verifyArgs(args); err != nil { return nil, err } @@ -7129,21 +7129,21 @@ func (c *tidbBoundStalenessFunctionClass) getFunction(ctx sessionctx.Context, ar if err != nil { return nil, err } - sig := &builtinTiDBBoundStalenessSig{bf} + sig := &builtinTiDBStalenessBoundSig{bf} return sig, nil } -type builtinTiDBBoundStalenessSig struct { +type builtinTiDBStalenessBoundSig struct { baseBuiltinFunc } -func (b *builtinTiDBBoundStalenessSig) Clone() builtinFunc { +func (b *builtinTiDBStalenessBoundSig) Clone() builtinFunc { newSig := &builtinTidbParseTsoSig{} newSig.cloneFrom(&b.baseBuiltinFunc) return newSig } -func (b *builtinTiDBBoundStalenessSig) evalInt(row chunk.Row) (int64, bool, error) { +func (b *builtinTiDBStalenessBoundSig) evalInt(row chunk.Row) (int64, bool, error) { leftTime, isNull, err := b.args[0].EvalTime(b.ctx, row) if isNull || err != nil { return 0, true, handleInvalidTimeError(b.ctx, err) diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index c75507419b38a..0e91db3c390ea 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -2857,7 +2857,7 @@ func (s *testEvaluatorSuite) TestTidbParseTso(c *C) { } } -func (s *testEvaluatorSuite) TestTiDBBoundStaleness(c *C) { +func (s *testEvaluatorSuite) TestTiDBStalenessBound(c *C) { const timeParserLayout = "2006-01-02 15:04:05.000" t1, err := time.Parse(timeParserLayout, "2015-09-21 09:53:04.877") c.Assert(err, IsNil) @@ -2920,7 +2920,7 @@ func (s *testEvaluatorSuite) TestTiDBBoundStaleness(c *C) { }, } - fc := funcs[ast.TiDBBoundStaleness] + fc := funcs[ast.TiDBStalenessBound] for _, test := range tests { c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", test.injectSafeTS)), IsNil) diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index 4f8c277ce8bf4..00c27f69885de 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -854,11 +854,11 @@ func (b *builtinTidbParseTsoSig) vecEvalTime(input *chunk.Chunk, result *chunk.C return nil } -func (b *builtinTiDBBoundStalenessSig) vectorized() bool { +func (b *builtinTiDBStalenessBoundSig) vectorized() bool { return true } -func (b *builtinTiDBBoundStalenessSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { +func (b *builtinTiDBStalenessBoundSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() buf0, err := b.bufAllocator.get(types.ETDatetime, n) if err != nil { diff --git a/expression/builtin_time_vec_test.go b/expression/builtin_time_vec_test.go index c8c0a3b4ce69f..22ffbd03c67fd 100644 --- a/expression/builtin_time_vec_test.go +++ b/expression/builtin_time_vec_test.go @@ -520,7 +520,7 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{ }, }, // Todo: how to inject the safeTS for better testing. - ast.TiDBBoundStaleness: { + ast.TiDBStalenessBound: { { retEvalType: types.ETInt, childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime}, diff --git a/expression/integration_test.go b/expression/integration_test.go index 3583b48cf34e7..9b14069f71d43 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2264,24 +2264,24 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { result = tk.MustQuery(`select tidb_parse_tso(-1)`) result.Check(testkit.Rows("")) - // for tidb_bound_staleness + // for tidb_staleness_bound tk.MustExec("SET time_zone = '+00:00';") t := time.Now().UTC() ts := oracle.ComposeTS(t.Unix()*1000, 0) - tidbBoundStalenessTests := []struct { + tidbStalenessBoundTests := []struct { sql string injectSafeTS uint64 isNull bool expect int64 }{ { - sql: `select tidb_bound_staleness(DATE_SUB(NOW(), INTERVAL 600 SECOND), DATE_ADD(NOW(), INTERVAL 600 SECOND))`, + sql: `select tidb_staleness_bound(DATE_SUB(NOW(), INTERVAL 600 SECOND), DATE_ADD(NOW(), INTERVAL 600 SECOND))`, injectSafeTS: ts, isNull: false, expect: int64(ts), }, { - sql: `select tidb_bound_staleness("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, + sql: `select tidb_staleness_bound("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, injectSafeTS: func() uint64 { phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:30:04.877") c.Assert(err, IsNil) @@ -2295,7 +2295,7 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { }(), }, { - sql: `select tidb_bound_staleness("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, + sql: `select tidb_staleness_bound("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, injectSafeTS: func() uint64 { phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 11:30:04.877") c.Assert(err, IsNil) @@ -2309,33 +2309,33 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { }(), }, { - sql: `select tidb_bound_staleness("2021-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, + sql: `select tidb_staleness_bound("2021-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, injectSafeTS: 0, isNull: true, expect: 0, }, // Time is too small. { - sql: `select tidb_bound_staleness("0020-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, + sql: `select tidb_staleness_bound("0020-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, injectSafeTS: 0, isNull: true, expect: 0, }, // Wrong value. { - sql: `select tidb_bound_staleness(1, 2)`, + sql: `select tidb_staleness_bound(1, 2)`, injectSafeTS: 0, isNull: true, expect: 0, }, { - sql: `select tidb_bound_staleness("invalid_time_1", "invalid_time_2")`, + sql: `select tidb_staleness_bound("invalid_time_1", "invalid_time_2")`, injectSafeTS: 0, isNull: true, expect: 0, }, } - for _, test := range tidbBoundStalenessTests { + for _, test := range tidbStalenessBoundTests { c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", test.injectSafeTS)), IsNil) result = tk.MustQuery(test.sql) diff --git a/go.mod b/go.mod index a3f3c4e465636..fac494312dba2 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/pingcap/tidb -replace github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde => github.com/JmPotato/parser v0.0.0-20210510024050-df181d49c58e +replace github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde => github.com/JmPotato/parser v0.0.0-20210512031211-369521b3c879 require ( github.com/BurntSushi/toml v0.3.1 @@ -84,7 +84,7 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect - honnef.co/go/tools v0.1.4 // indirect + honnef.co/go/tools v0.1.3 // indirect modernc.org/mathutil v1.2.2 // indirect sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 diff --git a/go.sum b/go.sum index c8f96e64d3b06..f2e5f1c3f5cd5 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,8 @@ github.com/HdrHistogram/hdrhistogram-go v0.9.0 h1:dpujRju0R4M/QZzcnR1LH1qm+TVG3U github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4= github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk= github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI= -github.com/JmPotato/parser v0.0.0-20210510024050-df181d49c58e h1:hdGbDsK1y2B+EclwSvtDmToBF5NaQbLTIk9bVrV5O3c= -github.com/JmPotato/parser v0.0.0-20210510024050-df181d49c58e/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= +github.com/JmPotato/parser v0.0.0-20210512031211-369521b3c879 h1:KRGXkeXbTzNOyMnntYkoegyiILGJthZnJTvIWJtEem0= +github.com/JmPotato/parser v0.0.0-20210512031211-369521b3c879/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -932,8 +932,8 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -honnef.co/go/tools v0.1.4 h1:SadWOkti5uVN1FAMgxn165+Mw00fuQKyk4Gyn/inxNQ= -honnef.co/go/tools v0.1.4/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= +honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o= +honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= modernc.org/mathutil v1.2.2 h1:+yFk8hBprV+4c0U9GjFtL+dV3N8hOJ8JCituQcMShFY= modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= From 4ed0d684f7287916d96841f7fa16c60c2ade500c Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 13 May 2021 10:54:46 +0800 Subject: [PATCH 10/13] Rename to tidb_bounded_staleness Signed-off-by: JmPotato --- expression/builtin.go | 4 ++-- expression/builtin_time.go | 14 +++++++------- expression/builtin_time_test.go | 4 ++-- expression/builtin_time_vec.go | 4 ++-- expression/builtin_time_vec_test.go | 2 +- expression/integration_test.go | 20 ++++++++++---------- go.mod | 2 +- go.sum | 4 ++-- 8 files changed, 27 insertions(+), 27 deletions(-) diff --git a/expression/builtin.go b/expression/builtin.go index 2685445bc8fb6..a33650eef7b1f 100644 --- a/expression/builtin.go +++ b/expression/builtin.go @@ -688,8 +688,8 @@ var funcs = map[string]functionClass{ ast.YearWeek: &yearWeekFunctionClass{baseFunctionClass{ast.YearWeek, 1, 2}}, ast.LastDay: &lastDayFunctionClass{baseFunctionClass{ast.LastDay, 1, 1}}, // TSO functions - ast.TiDBStalenessBound: &tidbStalenessBoundFunctionClass{baseFunctionClass{ast.TiDBStalenessBound, 2, 2}}, - ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}}, + ast.TiDBBoundedStaleness: &tidbBoundedStalenessFunctionClass{baseFunctionClass{ast.TiDBBoundedStaleness, 2, 2}}, + ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}}, // string functions ast.ASCII: &asciiFunctionClass{baseFunctionClass{ast.ASCII, 1, 1}}, diff --git a/expression/builtin_time.go b/expression/builtin_time.go index f001b0222245a..2c9cbea7b7c12 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -7115,13 +7115,13 @@ func handleInvalidZeroTime(ctx sessionctx.Context, t types.Time) (bool, error) { return true, handleInvalidTimeError(ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, t.String())) } -// tidbStalenessBoundFunctionClass reads a time window [a, b] and compares it with the latest resolvedTS +// tidbBoundedStalenessFunctionClass reads a time window [a, b] and compares it with the latest resolvedTS // to determine which TS to use in a read only transaction. -type tidbStalenessBoundFunctionClass struct { +type tidbBoundedStalenessFunctionClass struct { baseFunctionClass } -func (c *tidbStalenessBoundFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) { +func (c *tidbBoundedStalenessFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) { if err := c.verifyArgs(args); err != nil { return nil, err } @@ -7129,21 +7129,21 @@ func (c *tidbStalenessBoundFunctionClass) getFunction(ctx sessionctx.Context, ar if err != nil { return nil, err } - sig := &builtinTiDBStalenessBoundSig{bf} + sig := &builtinTiDBBoundedStalenessSig{bf} return sig, nil } -type builtinTiDBStalenessBoundSig struct { +type builtinTiDBBoundedStalenessSig struct { baseBuiltinFunc } -func (b *builtinTiDBStalenessBoundSig) Clone() builtinFunc { +func (b *builtinTiDBBoundedStalenessSig) Clone() builtinFunc { newSig := &builtinTidbParseTsoSig{} newSig.cloneFrom(&b.baseBuiltinFunc) return newSig } -func (b *builtinTiDBStalenessBoundSig) evalInt(row chunk.Row) (int64, bool, error) { +func (b *builtinTiDBBoundedStalenessSig) evalInt(row chunk.Row) (int64, bool, error) { leftTime, isNull, err := b.args[0].EvalTime(b.ctx, row) if isNull || err != nil { return 0, true, handleInvalidTimeError(b.ctx, err) diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index 0e91db3c390ea..c3cddf50243fb 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -2857,7 +2857,7 @@ func (s *testEvaluatorSuite) TestTidbParseTso(c *C) { } } -func (s *testEvaluatorSuite) TestTiDBStalenessBound(c *C) { +func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) { const timeParserLayout = "2006-01-02 15:04:05.000" t1, err := time.Parse(timeParserLayout, "2015-09-21 09:53:04.877") c.Assert(err, IsNil) @@ -2920,7 +2920,7 @@ func (s *testEvaluatorSuite) TestTiDBStalenessBound(c *C) { }, } - fc := funcs[ast.TiDBStalenessBound] + fc := funcs[ast.TiDBBoundedStaleness] for _, test := range tests { c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", test.injectSafeTS)), IsNil) diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index 00c27f69885de..ee476e656e9aa 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -854,11 +854,11 @@ func (b *builtinTidbParseTsoSig) vecEvalTime(input *chunk.Chunk, result *chunk.C return nil } -func (b *builtinTiDBStalenessBoundSig) vectorized() bool { +func (b *builtinTiDBBoundedStalenessSig) vectorized() bool { return true } -func (b *builtinTiDBStalenessBoundSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { +func (b *builtinTiDBBoundedStalenessSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() buf0, err := b.bufAllocator.get(types.ETDatetime, n) if err != nil { diff --git a/expression/builtin_time_vec_test.go b/expression/builtin_time_vec_test.go index 22ffbd03c67fd..edf1eeffbb042 100644 --- a/expression/builtin_time_vec_test.go +++ b/expression/builtin_time_vec_test.go @@ -520,7 +520,7 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{ }, }, // Todo: how to inject the safeTS for better testing. - ast.TiDBStalenessBound: { + ast.TiDBBoundedStaleness: { { retEvalType: types.ETInt, childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime}, diff --git a/expression/integration_test.go b/expression/integration_test.go index 2bc5262a45364..5122b52c66d16 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2264,24 +2264,24 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { result = tk.MustQuery(`select tidb_parse_tso(-1)`) result.Check(testkit.Rows("")) - // for tidb_staleness_bound + // for tidb_bounded_staleness tk.MustExec("SET time_zone = '+00:00';") t := time.Now().UTC() ts := oracle.ComposeTS(t.Unix()*1000, 0) - tidbStalenessBoundTests := []struct { + tidbBoundedStalenessTests := []struct { sql string injectSafeTS uint64 isNull bool expect int64 }{ { - sql: `select tidb_staleness_bound(DATE_SUB(NOW(), INTERVAL 600 SECOND), DATE_ADD(NOW(), INTERVAL 600 SECOND))`, + sql: `select tidb_bounded_staleness(DATE_SUB(NOW(), INTERVAL 600 SECOND), DATE_ADD(NOW(), INTERVAL 600 SECOND))`, injectSafeTS: ts, isNull: false, expect: int64(ts), }, { - sql: `select tidb_staleness_bound("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, + sql: `select tidb_bounded_staleness("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, injectSafeTS: func() uint64 { phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:30:04.877") c.Assert(err, IsNil) @@ -2295,7 +2295,7 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { }(), }, { - sql: `select tidb_staleness_bound("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, + sql: `select tidb_bounded_staleness("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, injectSafeTS: func() uint64 { phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 11:30:04.877") c.Assert(err, IsNil) @@ -2309,33 +2309,33 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { }(), }, { - sql: `select tidb_staleness_bound("2021-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, + sql: `select tidb_bounded_staleness("2021-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, injectSafeTS: 0, isNull: true, expect: 0, }, // Time is too small. { - sql: `select tidb_staleness_bound("0020-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, + sql: `select tidb_bounded_staleness("0020-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, injectSafeTS: 0, isNull: true, expect: 0, }, // Wrong value. { - sql: `select tidb_staleness_bound(1, 2)`, + sql: `select tidb_bounded_staleness(1, 2)`, injectSafeTS: 0, isNull: true, expect: 0, }, { - sql: `select tidb_staleness_bound("invalid_time_1", "invalid_time_2")`, + sql: `select tidb_bounded_staleness("invalid_time_1", "invalid_time_2")`, injectSafeTS: 0, isNull: true, expect: 0, }, } - for _, test := range tidbStalenessBoundTests { + for _, test := range tidbBoundedStalenessTests { c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", test.injectSafeTS)), IsNil) result = tk.MustQuery(test.sql) diff --git a/go.mod b/go.mod index 6a8a533cc92f8..5af679911f850 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/pingcap/tidb -replace github.com/pingcap/parser v0.0.0-20210508071014-cd9cd78e230c => github.com/JmPotato/parser v0.0.0-20210512031211-369521b3c879 +replace github.com/pingcap/parser v0.0.0-20210508071014-cd9cd78e230c => github.com/JmPotato/parser v0.0.0-20210513024544-661396307b24 require ( github.com/BurntSushi/toml v0.3.1 diff --git a/go.sum b/go.sum index f2e5f1c3f5cd5..cf0c2853ca5bb 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,8 @@ github.com/HdrHistogram/hdrhistogram-go v0.9.0 h1:dpujRju0R4M/QZzcnR1LH1qm+TVG3U github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4= github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk= github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI= -github.com/JmPotato/parser v0.0.0-20210512031211-369521b3c879 h1:KRGXkeXbTzNOyMnntYkoegyiILGJthZnJTvIWJtEem0= -github.com/JmPotato/parser v0.0.0-20210512031211-369521b3c879/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= +github.com/JmPotato/parser v0.0.0-20210513024544-661396307b24 h1:agYY6SGz0wmQXMl1PibyjjhXIHZasA2aGOW4EZCNnUw= +github.com/JmPotato/parser v0.0.0-20210513024544-661396307b24/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= From 921a4409cf376ebc5d16c1214f83d816f0b58a9f Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 14 May 2021 13:30:45 +0800 Subject: [PATCH 11/13] Address the comment Signed-off-by: JmPotato --- expression/builtin_time.go | 2 +- expression/integration_test.go | 10 ++++++++++ kv/kv.go | 2 +- store/mockstore/mockstorage/storage.go | 2 +- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 2c9cbea7b7c12..9e3de68709f02 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -7115,7 +7115,7 @@ func handleInvalidZeroTime(ctx sessionctx.Context, t types.Time) (bool, error) { return true, handleInvalidTimeError(ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, t.String())) } -// tidbBoundedStalenessFunctionClass reads a time window [a, b] and compares it with the latest resolvedTS +// tidbBoundedStalenessFunctionClass reads a time window [a, b] and compares it with the latest SafeTS // to determine which TS to use in a read only transaction. type tidbBoundedStalenessFunctionClass struct { baseFunctionClass diff --git a/expression/integration_test.go b/expression/integration_test.go index 6ee875bbdf3da..027ce888f3fbf 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2346,6 +2346,16 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { } } failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS") + // test whether tidb_bounded_staleness is deterministic + result = tk.MustQuery(`select tidb_bounded_staleness(NOW(), DATE_ADD(NOW(), INTERVAL 600 SECOND)), tidb_bounded_staleness(NOW(), DATE_ADD(NOW(), INTERVAL 600 SECOND))`) + c.Assert(result.Rows()[0], HasLen, 2) + c.Assert(result.Rows()[0][0], Equals, result.Rows()[0][1]) + preResult := result.Rows()[0][0] + time.Sleep(time.Second) + result = tk.MustQuery(`select tidb_bounded_staleness(NOW(), DATE_ADD(NOW(), INTERVAL 600 SECOND)), tidb_bounded_staleness(NOW(), DATE_ADD(NOW(), INTERVAL 600 SECOND))`) + c.Assert(result.Rows()[0], HasLen, 2) + c.Assert(result.Rows()[0][0], Equals, result.Rows()[0][1]) + c.Assert(result.Rows()[0][0], Not(Equals), preResult) // fix issue 10308 result = tk.MustQuery("select time(\"- -\");") diff --git a/kv/kv.go b/kv/kv.go index 7239bbfd11a46..ddb94bef2d306 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -419,7 +419,7 @@ type Storage interface { ShowStatus(ctx context.Context, key string) (interface{}, error) // GetMemCache return memory manager of the storage. GetMemCache() MemManager - // GetMinSafeTS return the minimal resolved TS of the storage with given txnScope. + // GetMinSafeTS return the minimal SafeTS of the storage with given txnScope. GetMinSafeTS(txnScope string) uint64 } diff --git a/store/mockstore/mockstorage/storage.go b/store/mockstore/mockstorage/storage.go index f8f26b0e3dd95..6221ef855707d 100644 --- a/store/mockstore/mockstorage/storage.go +++ b/store/mockstore/mockstorage/storage.go @@ -99,7 +99,7 @@ func (s *mockStorage) CurrentVersion(txnScope string) (kv.Version, error) { return kv.NewVersion(ver), err } -// GetMinSafeTS return the minimal resolved TS of the storage with given txnScope. +// GetMinSafeTS return the minimal SafeTS of the storage with given txnScope. func (s *mockStorage) GetMinSafeTS(txnScope string) uint64 { return 0 } From d49a650e14e54ff550fcad14184489a0f8eb88ae Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 17 May 2021 18:24:22 +0800 Subject: [PATCH 12/13] Use Datetime as the type of function result Signed-off-by: JmPotato --- expression/builtin_time.go | 60 ++++++++------------- expression/builtin_time_test.go | 83 ++++++++++++++--------------- expression/builtin_time_vec.go | 23 ++++---- expression/builtin_time_vec_test.go | 2 +- expression/integration_test.go | 48 +++++------------ 5 files changed, 86 insertions(+), 130 deletions(-) diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 9e3de68709f02..13b3d1eef3def 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -7125,7 +7125,7 @@ func (c *tidbBoundedStalenessFunctionClass) getFunction(ctx sessionctx.Context, if err := c.verifyArgs(args); err != nil { return nil, err } - bf, err := newBaseBuiltinFuncWithTp(ctx, c.funcName, args, types.ETInt, types.ETDatetime, types.ETDatetime) + bf, err := newBaseBuiltinFuncWithTp(ctx, c.funcName, args, types.ETDatetime, types.ETDatetime, types.ETDatetime) if err != nil { return nil, err } @@ -7143,14 +7143,14 @@ func (b *builtinTiDBBoundedStalenessSig) Clone() builtinFunc { return newSig } -func (b *builtinTiDBBoundedStalenessSig) evalInt(row chunk.Row) (int64, bool, error) { +func (b *builtinTiDBBoundedStalenessSig) evalTime(row chunk.Row) (types.Time, bool, error) { leftTime, isNull, err := b.args[0].EvalTime(b.ctx, row) if isNull || err != nil { - return 0, true, handleInvalidTimeError(b.ctx, err) + return types.ZeroTime, true, handleInvalidTimeError(b.ctx, err) } rightTime, isNull, err := b.args[1].EvalTime(b.ctx, row) if isNull || err != nil { - return 0, true, handleInvalidTimeError(b.ctx, err) + return types.ZeroTime, true, handleInvalidTimeError(b.ctx, err) } if invalidLeftTime, invalidRightTime := leftTime.InvalidZero(), rightTime.InvalidZero(); invalidLeftTime || invalidRightTime { if invalidLeftTime { @@ -7159,42 +7159,26 @@ func (b *builtinTiDBBoundedStalenessSig) evalInt(row chunk.Row) (int64, bool, er if invalidRightTime { err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, rightTime.String())) } - return 0, true, err + return types.ZeroTime, true, err } - minTime, err := leftTime.GoTime(getTimeZone(b.ctx)) + timeZone := getTimeZone(b.ctx) + minTime, err := leftTime.GoTime(timeZone) if err != nil { - return 0, true, err + return types.ZeroTime, true, err } - maxTime, err := rightTime.GoTime(getTimeZone(b.ctx)) + maxTime, err := rightTime.GoTime(timeZone) if err != nil { - return 0, true, err - } - // Make sure the time is not too big or small to prevent it from overflow later. - if !(checkTimeRange(minTime) && checkTimeRange(maxTime)) { - return 0, true, nil + return types.ZeroTime, true, err } if minTime.After(maxTime) { - return 0, true, nil - } - minTS, maxTS := oracle.ComposeTS(minTime.Unix()*1000, 0), oracle.ComposeTS(maxTime.Unix()*1000, 0) - return calAppropriateTS(minTS, maxTS, getMinSafeTS(b.ctx)), false, nil -} - -func checkTimeRange(t time.Time) bool { - unixT := t.Unix() - unixTMillisecond := unixT * 1000 - // Less than the unix timestamp zero or overflow after * 1000. - if unixT < 0 || unixTMillisecond < 0 { - return false - } - // Overflow after being composed to TS - if oracle.ComposeTS(unixTMillisecond, 0) < uint64(unixTMillisecond) { - return false + return types.ZeroTime, true, nil } - return true + // Because the minimum unit of a TSO is millisecond, so we only need fsp to be 3. + return types.NewTime(types.FromGoTime(calAppropriateTime(minTime, maxTime, getMinSafeTime(b.ctx, timeZone))), mysql.TypeDatetime, 3), false, nil } -func getMinSafeTS(sessionCtx sessionctx.Context) (minSafeTS uint64) { +func getMinSafeTime(sessionCtx sessionctx.Context, timeZone *time.Location) time.Time { + var minSafeTS uint64 if store := sessionCtx.GetStore(); store != nil { minSafeTS = store.GetMinSafeTS(sessionCtx.GetSessionVars().CheckAndGetTxnScope()) } @@ -7206,7 +7190,7 @@ func getMinSafeTS(sessionCtx sessionctx.Context) (minSafeTS uint64) { // Try to get from the stmt cache to make sure this function is deterministic. stmtCtx := sessionCtx.GetSessionVars().StmtCtx minSafeTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtSafeTSCacheKey, minSafeTS).(uint64) - return + return oracle.GetTimeFromTS(minSafeTS).In(timeZone) } // For a SafeTS t and a time range [t1, t2]: @@ -7216,11 +7200,11 @@ func getMinSafeTS(sessionCtx sessionctx.Context) (minSafeTS uint64) { // a read request won't fail. // 2. If t2 < t, we will use t2 as the result, // and with it, a read request won't fail because it's bigger than the latest SafeTS. -func calAppropriateTS(minTS, maxTS, minSafeTS uint64) int64 { - if minSafeTS < minTS { - return int64(minTS) - } else if minTS <= minSafeTS && minSafeTS <= maxTS { - return int64(minSafeTS) +func calAppropriateTime(minTime, maxTime, minSafeTime time.Time) time.Time { + if minSafeTime.Before(minTime) { + return minTime + } else if minSafeTime.After(maxTime) { + return maxTime } - return int64(maxTS) + return minSafeTime } diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index c3cddf50243fb..161912b07e973 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -2858,57 +2858,45 @@ func (s *testEvaluatorSuite) TestTidbParseTso(c *C) { } func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) { - const timeParserLayout = "2006-01-02 15:04:05.000" - t1, err := time.Parse(timeParserLayout, "2015-09-21 09:53:04.877") - c.Assert(err, IsNil) - t1Str := t1.Format(timeParserLayout) - ts1 := int64(oracle.ComposeTS(t1.Unix()*1000, 0)) - t2 := time.Now().UTC() - t2Str := t2.Format(timeParserLayout) - ts2 := int64(oracle.ComposeTS(t2.Unix()*1000, 0)) - s.ctx.GetSessionVars().TimeZone = time.UTC + t1, err := time.Parse(types.TimeFormat, "2015-09-21 09:53:04") + c.Assert(err, IsNil) + // time.Parse uses UTC time zone by default, we need to change it to Local manually. + t1 = t1.Local() + t1Str := t1.Format(types.TimeFormat) + t2 := time.Now() + t2Str := t2.Format(types.TimeFormat) + timeZone := time.Local + s.ctx.GetSessionVars().TimeZone = timeZone tests := []struct { leftTime interface{} rightTime interface{} injectSafeTS uint64 isNull bool - expect int64 + expect time.Time }{ // SafeTS is in the range. { - leftTime: t1Str, - rightTime: t2Str, - injectSafeTS: func() uint64 { - phy := t2.Add(-1*time.Second).Unix() * 1000 - return oracle.ComposeTS(phy, 0) - }(), - isNull: false, - expect: func() int64 { - phy := t2.Add(-1*time.Second).Unix() * 1000 - return int64(oracle.ComposeTS(phy, 0)) - }(), + leftTime: t1Str, + rightTime: t2Str, + injectSafeTS: oracle.GoTimeToTS(t2.Add(-1 * time.Second)), + isNull: false, + expect: t2.Add(-1 * time.Second), }, // SafeTS is less than the left time. { - leftTime: t1Str, - rightTime: t2Str, - injectSafeTS: func() uint64 { - phy := t1.Add(-1*time.Second).Unix() * 1000 - return oracle.ComposeTS(phy, 0) - }(), - isNull: false, - expect: ts1, + leftTime: t1Str, + rightTime: t2Str, + injectSafeTS: oracle.GoTimeToTS(t1.Add(-1 * time.Second)), + isNull: false, + expect: t1, }, // SafeTS is bigger than the right time. { - leftTime: t1Str, - rightTime: t2Str, - injectSafeTS: func() uint64 { - phy := t2.Add(time.Second).Unix() * 1000 - return oracle.ComposeTS(phy, 0) - }(), - isNull: false, - expect: ts2, + leftTime: t1Str, + rightTime: t2Str, + injectSafeTS: oracle.GoTimeToTS(t2.Add(time.Second)), + isNull: false, + expect: t2, }, // Wrong time order. { @@ -2916,7 +2904,7 @@ func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) { rightTime: t1Str, injectSafeTS: 0, isNull: true, - expect: 0, + expect: time.Time{}, }, } @@ -2931,30 +2919,37 @@ func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) { if test.isNull { c.Assert(d.IsNull(), IsTrue) } else { - c.Assert(d.GetInt64(), Equals, test.expect) + goTime, err := d.GetMysqlTime().GoTime(timeZone) + c.Assert(err, IsNil) + c.Assert(goTime.Format(types.TimeFormat), Equals, test.expect.Format(types.TimeFormat)) } resetStmtContext(s.ctx) } // Test whether it's deterministic. - safeTS1 := oracle.ComposeTS(t2.Add(-1*time.Second).Unix()*1000, 0) + safeTime1 := t2.Add(-1 * time.Second) + safeTS1 := oracle.ComposeTS(safeTime1.Unix()*1000, 0) c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", safeTS1)), IsNil) f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)})) c.Assert(err, IsNil) d, err := evalBuiltinFunc(f, chunk.Row{}) c.Assert(err, IsNil) - c.Assert(d.GetInt64(), Equals, int64(safeTS1)) + goTime, err := d.GetMysqlTime().GoTime(timeZone) + c.Assert(err, IsNil) + resultTime := goTime.Format(types.TimeFormat) + c.Assert(resultTime, Equals, safeTime1.Format(types.TimeFormat)) // SafeTS updated. - safeTS2 := oracle.ComposeTS(t2.Add(1*time.Second).Unix()*1000, 0) + safeTime2 := t2.Add(1 * time.Second) + safeTS2 := oracle.ComposeTS(safeTime2.Unix()*1000, 0) c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", safeTS2)), IsNil) f, err = fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)})) c.Assert(err, IsNil) d, err = evalBuiltinFunc(f, chunk.Row{}) c.Assert(err, IsNil) - // Still safeTS1 - c.Assert(d.GetInt64(), Equals, int64(safeTS1)) + // Still safeTime1 + c.Assert(resultTime, Equals, safeTime1.Format(types.TimeFormat)) resetStmtContext(s.ctx) failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS") } diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index ee476e656e9aa..6f74a8f587e50 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -858,7 +858,7 @@ func (b *builtinTiDBBoundedStalenessSig) vectorized() bool { return true } -func (b *builtinTiDBBoundedStalenessSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { +func (b *builtinTiDBBoundedStalenessSig) vecEvalTime(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() buf0, err := b.bufAllocator.get(types.ETDatetime, n) if err != nil { @@ -876,12 +876,13 @@ func (b *builtinTiDBBoundedStalenessSig) vecEvalInt(input *chunk.Chunk, result * if err = b.args[1].VecEvalTime(b.ctx, input, buf1); err != nil { return err } - result.ResizeInt64(n, false) - result.MergeNulls(buf0, buf1) - i64s := result.Int64s() args0 := buf0.Times() args1 := buf1.Times() - minSafeTS := getMinSafeTS(b.ctx) + timeZone := getTimeZone(b.ctx) + minSafeTime := getMinSafeTime(b.ctx, timeZone) + result.ResizeTime(n, false) + result.MergeNulls(buf0, buf1) + times := result.Times() for i := 0; i < n; i++ { if result.IsNull(i) { continue @@ -899,24 +900,20 @@ func (b *builtinTiDBBoundedStalenessSig) vecEvalInt(input *chunk.Chunk, result * result.SetNull(i, true) continue } - minTime, err := args0[i].GoTime(getTimeZone(b.ctx)) + minTime, err := args0[i].GoTime(timeZone) if err != nil { return err } - maxTime, err := args1[i].GoTime(getTimeZone(b.ctx)) + maxTime, err := args1[i].GoTime(timeZone) if err != nil { return err } - if !(checkTimeRange(minTime) && checkTimeRange(maxTime)) { - result.SetNull(i, true) - continue - } if minTime.After(maxTime) { result.SetNull(i, true) continue } - minTS, maxTS := oracle.ComposeTS(minTime.Unix()*1000, 0), oracle.ComposeTS(maxTime.Unix()*1000, 0) - i64s[i] = calAppropriateTS(minTS, maxTS, minSafeTS) + // Because the minimum unit of a TSO is millisecond, so we only need fsp to be 3. + times[i] = types.NewTime(types.FromGoTime(calAppropriateTime(minTime, maxTime, minSafeTime)), mysql.TypeDatetime, 3) } return nil } diff --git a/expression/builtin_time_vec_test.go b/expression/builtin_time_vec_test.go index edf1eeffbb042..a757b867b783c 100644 --- a/expression/builtin_time_vec_test.go +++ b/expression/builtin_time_vec_test.go @@ -522,7 +522,7 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{ // Todo: how to inject the safeTS for better testing. ast.TiDBBoundedStaleness: { { - retEvalType: types.ETInt, + retEvalType: types.ETDatetime, childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime}, }, }, diff --git a/expression/integration_test.go b/expression/integration_test.go index 027ce888f3fbf..3e0f7b8ab8916 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2267,83 +2267,63 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { // for tidb_bounded_staleness tk.MustExec("SET time_zone = '+00:00';") t := time.Now().UTC() - ts := oracle.ComposeTS(t.Unix()*1000, 0) + ts := oracle.GoTimeToTS(t) tidbBoundedStalenessTests := []struct { sql string injectSafeTS uint64 - isNull bool - expect int64 + expect string }{ { sql: `select tidb_bounded_staleness(DATE_SUB(NOW(), INTERVAL 600 SECOND), DATE_ADD(NOW(), INTERVAL 600 SECOND))`, injectSafeTS: ts, - isNull: false, - expect: int64(ts), + expect: t.Format(types.TimeFSPFormat[:len(types.TimeFSPFormat)-3]), }, { sql: `select tidb_bounded_staleness("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, injectSafeTS: func() uint64 { - phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:30:04.877") + t, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:30:04.877") c.Assert(err, IsNil) - return oracle.ComposeTS(phy.Unix()*1000, 0) - }(), - isNull: false, - expect: func() int64 { - phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:00:00.000") - c.Assert(err, IsNil) - return int64(oracle.ComposeTS(phy.Unix()*1000, 0)) + return oracle.GoTimeToTS(t) }(), + expect: "2021-04-27 13:00:00.000", }, { sql: `select tidb_bounded_staleness("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, injectSafeTS: func() uint64 { - phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 11:30:04.877") - c.Assert(err, IsNil) - return oracle.ComposeTS(phy.Unix()*1000, 0) - }(), - isNull: false, - expect: func() int64 { - phy, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 12:00:00.000") + t, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 11:30:04.877") c.Assert(err, IsNil) - return int64(oracle.ComposeTS(phy.Unix()*1000, 0)) + return oracle.GoTimeToTS(t) }(), + expect: "2021-04-27 12:00:00.000", }, { sql: `select tidb_bounded_staleness("2021-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, injectSafeTS: 0, - isNull: true, - expect: 0, + expect: "", }, // Time is too small. { sql: `select tidb_bounded_staleness("0020-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, injectSafeTS: 0, - isNull: true, - expect: 0, + expect: "1970-01-01 00:00:00.000", }, // Wrong value. { sql: `select tidb_bounded_staleness(1, 2)`, injectSafeTS: 0, - isNull: true, - expect: 0, + expect: "", }, { sql: `select tidb_bounded_staleness("invalid_time_1", "invalid_time_2")`, injectSafeTS: 0, - isNull: true, - expect: 0, + expect: "", }, } for _, test := range tidbBoundedStalenessTests { c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", test.injectSafeTS)), IsNil) result = tk.MustQuery(test.sql) - if test.isNull { - result.Check(testkit.Rows("")) - } else { - result.Check(testkit.Rows(fmt.Sprintf("%d", test.expect))) - } + result.Check(testkit.Rows(test.expect)) } failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS") // test whether tidb_bounded_staleness is deterministic From 4772c3d45ed3ecec2858cdbd86c89473d3db2223 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 18 May 2021 13:35:40 +0800 Subject: [PATCH 13/13] Update the parser Signed-off-by: JmPotato --- go.mod | 4 +--- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 657a2b84df7c6..d11c2bb18a127 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,5 @@ module github.com/pingcap/tidb -replace github.com/pingcap/parser v0.0.0-20210513020953-ae2c4497c07b => github.com/JmPotato/parser v0.0.0-20210513034240-2f4474019616 - require ( github.com/BurntSushi/toml v0.3.1 github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect @@ -49,7 +47,7 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 - github.com/pingcap/parser v0.0.0-20210513020953-ae2c4497c07b + github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6 github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 diff --git a/go.sum b/go.sum index 96c94bbceed94..7961099368309 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,6 @@ github.com/HdrHistogram/hdrhistogram-go v0.9.0 h1:dpujRju0R4M/QZzcnR1LH1qm+TVG3U github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4= github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk= github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI= -github.com/JmPotato/parser v0.0.0-20210513034240-2f4474019616 h1:An+iixn3iywj+0/Jo41WsCD6FukpAwn+AcslBIYD7kA= -github.com/JmPotato/parser v0.0.0-20210513034240-2f4474019616/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -445,6 +443,8 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= +github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6 h1:wsH3psMH5ksDowsN9VUE9ZqSrX6oF4AYQQfOunkvSfU= +github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8=