From 0814bd669a876b1b23f3269199b22ce3ad5e03da Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 30 May 2023 17:38:41 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #44272 Signed-off-by: ti-chi-bot --- executor/calibrate_resource.go | 407 ++++++++++++++++++++++++++++++ executor/ddl.go | 2 +- executor/stale_txn_test.go | 25 ++ expression/helper.go | 12 + planner/core/planbuilder.go | 2 +- sessionctx/context.go | 5 +- sessionctx/stmtctx/stmtctx.go | 39 +++ sessiontxn/staleread/BUILD.bazel | 1 + sessiontxn/staleread/processor.go | 2 +- sessiontxn/staleread/util.go | 12 +- 10 files changed, 502 insertions(+), 5 deletions(-) create mode 100644 executor/calibrate_resource.go diff --git a/executor/calibrate_resource.go b/executor/calibrate_resource.go new file mode 100644 index 0000000000000..0ca6f5cba29f9 --- /dev/null +++ b/executor/calibrate_resource.go @@ -0,0 +1,407 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "fmt" + "math" + "sort" + "time" + + "github.com/docker/go-units" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/duration" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn/staleread" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/mathutil" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/tikv/client-go/v2/oracle" + rmclient "github.com/tikv/pd/client/resource_group/controller" +) + +var ( + // workloadBaseRUCostMap contains the base resource cost rate per 1 kv cpu within 1 second, + // the data is calculated from benchmark result, these data might not be very accurate, + // but is enough here because the maximum RU capacity is depended on both the cluster and + // the workload. + workloadBaseRUCostMap = map[ast.CalibrateResourceType]*baseResourceCost{ + ast.TPCC: { + tidbToKVCPURatio: 0.6, + kvCPU: 0.15, + readBytes: units.MiB / 2, + writeBytes: units.MiB, + readReqCount: 300, + writeReqCount: 1750, + }, + ast.OLTPREADWRITE: { + tidbToKVCPURatio: 1.25, + kvCPU: 0.35, + readBytes: units.MiB * 4.25, + writeBytes: units.MiB / 3, + readReqCount: 1600, + writeReqCount: 1400, + }, + ast.OLTPREADONLY: { + tidbToKVCPURatio: 2, + kvCPU: 0.52, + readBytes: units.MiB * 28, + writeBytes: 0, + readReqCount: 4500, + writeReqCount: 0, + }, + ast.OLTPWRITEONLY: { + tidbToKVCPURatio: 1, + kvCPU: 0, + readBytes: 0, + writeBytes: units.MiB, + readReqCount: 0, + writeReqCount: 3550, + }, + } + + // resourceGroupCtl is the ResourceGroupController in pd client + resourceGroupCtl *rmclient.ResourceGroupsController +) + +// SetResourceGroupController set a inited ResourceGroupsController for calibrate usage. +func SetResourceGroupController(rc *rmclient.ResourceGroupsController) { + resourceGroupCtl = rc +} + +// GetResourceGroupController returns the ResourceGroupsController. +func GetResourceGroupController() *rmclient.ResourceGroupsController { + return resourceGroupCtl +} + +// the resource cost rate of a specified workload per 1 tikv cpu. +type baseResourceCost struct { + // represents the average ratio of TiDB CPU time to TiKV CPU time, this is used to calculate whether tikv cpu + // or tidb cpu is the performance bottle neck. + tidbToKVCPURatio float64 + // the kv CPU time for calculate RU, it's smaller than the actual cpu usage. The unit is seconds. + kvCPU float64 + // the read bytes rate per 1 tikv cpu. + readBytes uint64 + // the write bytes rate per 1 tikv cpu. + writeBytes uint64 + // the average tikv read request count per 1 tikv cpu. + readReqCount uint64 + // the average tikv write request count per 1 tikv cpu. + writeReqCount uint64 +} + +const ( + // valuableUsageThreshold is the threshold used to determine whether the CPU is high enough. + // The sampling point is available when the CPU utilization of tikv or tidb is higher than the valuableUsageThreshold. + valuableUsageThreshold = 0.2 + // lowUsageThreshold is the threshold used to determine whether the CPU is too low. + // When the CPU utilization of tikv or tidb is lower than lowUsageThreshold, but neither is higher than valuableUsageThreshold, the sampling point is unavailable + lowUsageThreshold = 0.1 + // calibration is performed only when the available time point exceeds the percentOfPass + percentOfPass = 0.9 + // For quotas computed at each point in time, the maximum and minimum portions are discarded, and discardRate is the percentage discarded + discardRate = 0.1 + + // duration Indicates the supported calibration duration + maxDuration = time.Hour * 24 + minDuration = time.Minute * 10 +) + +type calibrateResourceExec struct { + baseExecutor + optionList []*ast.DynamicCalibrateResourceOption + workloadType ast.CalibrateResourceType + done bool +} + +func (e *calibrateResourceExec) parseCalibrateDuration(ctx context.Context) (startTime time.Time, endTime time.Time, err error) { + var dur time.Duration + var ts uint64 + for _, op := range e.optionList { + switch op.Tp { + case ast.CalibrateStartTime: + ts, err = staleread.CalculateAsOfTsExpr(ctx, e.ctx, op.Ts) + if err != nil { + return + } + startTime = oracle.GetTimeFromTS(ts) + case ast.CalibrateEndTime: + ts, err = staleread.CalculateAsOfTsExpr(ctx, e.ctx, op.Ts) + if err != nil { + return + } + endTime = oracle.GetTimeFromTS(ts) + case ast.CalibrateDuration: + dur, err = duration.ParseDuration(op.StrValue) + if err != nil { + return + } + } + } + if startTime.IsZero() { + err = errors.Errorf("start time should not be 0") + return + } + // If endTime is set, duration will be ignored. + if endTime.IsZero() { + if dur != time.Duration(0) { + endTime = startTime.Add(dur) + } else { + endTime = time.Now() + } + } + // check the duration + dur = endTime.Sub(startTime) + if dur > maxDuration { + err = errors.Errorf("the duration of calibration is too long, which could lead to inaccurate output. Please make the duration between %s and %s", minDuration.String(), maxDuration.String()) + return + } + if dur < minDuration { + err = errors.Errorf("the duration of calibration is too short, which could lead to inaccurate output. Please make the duration between %s and %s", minDuration.String(), maxDuration.String()) + } + + return +} + +func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + if e.done { + return nil + } + e.done = true + + exec := e.ctx.(sqlexec.RestrictedSQLExecutor) + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers) + if len(e.optionList) > 0 { + return e.dynamicCalibrate(ctx, req, exec) + } + return e.staticCalibrate(ctx, req, exec) +} + +func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { + startTs, endTs, err := e.parseCalibrateDuration(ctx) + if err != nil { + return err + } + startTime := startTs.In(e.ctx.GetSessionVars().Location()).Format(time.DateTime) + endTime := endTs.In(e.ctx.GetSessionVars().Location()).Format(time.DateTime) + + totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) + if err != nil { + return err + } + totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) + if err != nil { + return err + } + rus, err := getRUPerSec(ctx, e.ctx, exec, startTime, endTime) + if err != nil { + return err + } + tikvCPUs, err := getComponentCPUUsagePerSec(ctx, e.ctx, exec, "tikv", startTime, endTime) + if err != nil { + return err + } + tidbCPUs, err := getComponentCPUUsagePerSec(ctx, e.ctx, exec, "tidb", startTime, endTime) + if err != nil { + return err + } + quotas := make([]float64, 0) + lowCount := 0 + for { + if rus.isEnd() || tikvCPUs.isEnd() || tidbCPUs.isEnd() { + break + } + // make time point match + maxTime := rus.getTime() + if tikvCPUs.getTime().After(maxTime) { + maxTime = tikvCPUs.getTime() + } + if tidbCPUs.getTime().After(maxTime) { + maxTime = tidbCPUs.getTime() + } + if !rus.advance(maxTime) || !tikvCPUs.advance(maxTime) || !tidbCPUs.advance(maxTime) { + continue + } + tikvQuota, tidbQuota := tikvCPUs.getValue()/totalKVCPUQuota, tidbCPUs.getValue()/totalTiDBCPU + // If one of the two cpu usage is greater than the `valuableUsageThreshold`, we can accept it. + // And if both are greater than the `lowUsageThreshold`, we can also accept it. + if tikvQuota > valuableUsageThreshold || tidbQuota > valuableUsageThreshold { + quotas = append(quotas, rus.getValue()/mathutil.Max(tikvQuota, tidbQuota)) + } else if tikvQuota < lowUsageThreshold || tidbQuota < lowUsageThreshold { + lowCount++ + } else { + quotas = append(quotas, rus.getValue()/mathutil.Max(tikvQuota, tidbQuota)) + } + rus.next() + tidbCPUs.next() + tikvCPUs.next() + } + if len(quotas) < 5 { + return errors.Errorf("There are too few metrics points available in selected time window") + } + if float64(len(quotas))/float64(len(quotas)+lowCount) > percentOfPass { + sort.Slice(quotas, func(i, j int) bool { + return quotas[i] > quotas[j] + }) + lowerBound := int(math.Round(float64(len(quotas)) * discardRate)) + upperBound := len(quotas) - lowerBound + sum := 0. + for i := lowerBound; i < upperBound; i++ { + sum += quotas[i] + } + quota := sum / float64(upperBound-lowerBound) + req.AppendUint64(0, uint64(quota)) + } else { + return errors.Errorf("The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation; please select another time window with higher workload, or calibrate resource by hardware instead") + } + return nil +} + +func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { + if !variable.EnableResourceControl.Load() { + return infoschema.ErrResourceGroupSupportDisabled + } + // first fetch the ru settings config. + if resourceGroupCtl == nil { + return errors.New("resource group controller is not initialized") + } + + totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) + if err != nil { + return err + } + totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) + if err != nil { + return err + } + + // The default workload to calculate the RU capacity. + if e.workloadType == ast.WorkloadNone { + e.workloadType = ast.TPCC + } + baseCost, ok := workloadBaseRUCostMap[e.workloadType] + if !ok { + return errors.Errorf("unknown workload '%T'", e.workloadType) + } + + if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota { + totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio + } + ruCfg := resourceGroupCtl.GetConfig() + ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) + + float64(ruCfg.CPUMsCost)*baseCost.kvCPU*1000 + // convert to ms + float64(ruCfg.ReadBytesCost)*float64(baseCost.readBytes) + + float64(ruCfg.WriteBaseCost)*float64(baseCost.writeReqCount) + + float64(ruCfg.WriteBytesCost)*float64(baseCost.writeBytes) + quota := totalKVCPUQuota * ruPerKVCPU + req.AppendUint64(0, uint64(quota)) + return nil +} + +func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { + query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1" + return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota") +} + +func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { + query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1" + return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs") +} + +type timePointValue struct { + tp time.Time + val float64 +} + +type timeSeriesValues struct { + idx int + vals []*timePointValue +} + +func (t *timeSeriesValues) isEnd() bool { + return t.idx >= len(t.vals) +} + +func (t *timeSeriesValues) next() { + t.idx++ +} + +func (t *timeSeriesValues) getTime() time.Time { + return t.vals[t.idx].tp +} + +func (t *timeSeriesValues) getValue() float64 { + return t.vals[t.idx].val +} + +func (t *timeSeriesValues) advance(target time.Time) bool { + for ; t.idx < len(t.vals); t.idx++ { + // `target` is maximal time in other timeSeriesValues, + // so we should find the time which offset is less than 10s. + if t.vals[t.idx].tp.Add(time.Second * 10).After(target) { + return t.vals[t.idx].tp.Add(-time.Second * 10).Before(target) + } + } + return false +} + +func getRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) { + query := fmt.Sprintf("SELECT time, value FROM METRICS_SCHEMA.resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time asc", startTime, endTime) + return getValuesFromMetrics(ctx, sctx, exec, query, "resource_manager_resource_unit") +} + +func getComponentCPUUsagePerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, component, startTime, endTime string) (*timeSeriesValues, error) { + query := fmt.Sprintf("SELECT time, sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%%s' GROUP BY time ORDER BY time asc", startTime, endTime, component) + return getValuesFromMetrics(ctx, sctx, exec, query, "process_cpu_usage") +} + +func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) { + rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query) + if err != nil { + return 0.0, errors.Trace(err) + } + if len(rows) == 0 { + return 0.0, errors.Errorf("metrics '%s' is empty", metrics) + } + + return rows[0].GetFloat64(0), nil +} + +func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (*timeSeriesValues, error) { + rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query) + if err != nil { + return nil, errors.Trace(err) + } + if len(rows) == 0 { + return nil, errors.Errorf("metrics '%s' is empty", metrics) + } + ret := make([]*timePointValue, 0, len(rows)) + for _, row := range rows { + if tp, err := row.GetTime(0).AdjustedGoTime(sctx.GetSessionVars().Location()); err == nil { + ret = append(ret, &timePointValue{ + tp: tp, + val: row.GetFloat64(1), + }) + } + } + return &timeSeriesValues{idx: 0, vals: ret}, nil +} diff --git a/executor/ddl.go b/executor/ddl.go index 85918bb7d4f54..9e5b7f30ae33e 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -531,7 +531,7 @@ func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.J } func (e *DDLExec) executeFlashBackCluster(s *ast.FlashBackToTimestampStmt) error { - flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, s.FlashbackTS) + flashbackTS, err := staleread.CalculateAsOfTsExpr(context.Background(), e.ctx, s.FlashbackTS) if err != nil { return err } diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index dc1ab4ff962a8..e621c33ccc675 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -1396,3 +1396,28 @@ func TestStalePrepare(t *testing.T) { tk.MustQuery("execute stmt").Check(expected) } } + +func TestStaleTSO(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + defer tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int)") + + tk.MustExec("insert into t values(1)") + + asOfExprs := []string{ + "now(3) - interval 1 second", + "current_time() - interval 1 second", + "curtime() - interval 1 second", + } + + nextTSO := oracle.GoTimeToTS(time.Now().Add(2 * time.Second)) + require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO", fmt.Sprintf("return(%d)", nextTSO))) + defer failpoint.Disable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO") + for _, expr := range asOfExprs { + // Make sure the now() expr is evaluated from the stale ts provider. + tk.MustQuery("select * from t as of timestamp " + expr + " order by id asc").Check(testkit.Rows("1")) + } +} diff --git a/expression/helper.go b/expression/helper.go index 2fa47e87c28de..1839416f7c088 100644 --- a/expression/helper.go +++ b/expression/helper.go @@ -27,6 +27,9 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" + "github.com/pingcap/tidb/util/logutil" + "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" ) func boolToInt64(v bool) int64 { @@ -158,6 +161,15 @@ func getStmtTimestamp(ctx sessionctx.Context) (time.Time, error) { failpoint.Return(v, nil) }) + if ctx != nil { + staleTSO, err := ctx.GetSessionVars().StmtCtx.GetStaleTSO() + if staleTSO != 0 && err == nil { + return oracle.GetTimeFromTS(staleTSO), nil + } else if err != nil { + logutil.BgLogger().Error("get stale tso failed", zap.Error(err)) + } + } + now := time.Now() if ctx == nil { diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 8d780ab210066..74f812f0fe520 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3342,7 +3342,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, case *ast.BeginStmt: readTS := b.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS() if raw.AsOf != nil { - startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf.TsExpr) + startTS, err := staleread.CalculateAsOfTsExpr(ctx, b.ctx, raw.AsOf.TsExpr) if err != nil { return nil, err } diff --git a/sessionctx/context.go b/sessionctx/context.go index f39d3a82a8f38..35eb7ba68ca1d 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -244,7 +244,10 @@ const allowedTimeFromNow = 100 * time.Millisecond // ValidateStaleReadTS validates that readTS does not exceed the current time not strictly. func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error { - currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) + currentTS, err := sctx.GetSessionVars().StmtCtx.GetStaleTSO() + if currentTS == 0 || err != nil { + currentTS, err = sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) + } // If we fail to calculate currentTS from local time, fallback to get a timestamp from PD if err != nil { metrics.ValidateReadTSFromPDCount.Inc() diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 1b2fabd4fcee2..df2b375a8c723 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -378,6 +378,19 @@ type StatementContext struct { TableStats map[int64]interface{} // useChunkAlloc indicates whether statement use chunk alloc useChunkAlloc bool +<<<<<<< HEAD +======= + // Check if TiFlash read engine is removed due to strict sql mode. + TiFlashEngineRemovedDueToStrictSQLMode bool + // CanonicalHashCode try to get the canonical hash code from expression. + CanonicalHashCode bool + // StaleTSOProvider is used to provide stale timestamp oracle for read-only transactions. + StaleTSOProvider struct { + sync.Mutex + value *uint64 + eval func() (uint64, error) + } +>>>>>>> e3776f3abb8 (sessiontxn/staleread: more accurate stale ts (#44272)) } // StmtHints are SessionVars related sql hints. @@ -1127,6 +1140,32 @@ func (sc *StatementContext) DetachMemDiskTracker() { } } +// SetStaleTSOProvider sets the stale TSO provider. +func (sc *StatementContext) SetStaleTSOProvider(eval func() (uint64, error)) { + sc.StaleTSOProvider.Lock() + defer sc.StaleTSOProvider.Unlock() + sc.StaleTSOProvider.value = nil + sc.StaleTSOProvider.eval = eval +} + +// GetStaleTSO returns the TSO for stale-read usage which calculate from PD's last response. +func (sc *StatementContext) GetStaleTSO() (uint64, error) { + sc.StaleTSOProvider.Lock() + defer sc.StaleTSOProvider.Unlock() + if sc.StaleTSOProvider.value != nil { + return *sc.StaleTSOProvider.value, nil + } + if sc.StaleTSOProvider.eval == nil { + return 0, nil + } + tso, err := sc.StaleTSOProvider.eval() + if err != nil { + return 0, err + } + sc.StaleTSOProvider.value = &tso + return tso, nil +} + // CopTasksDetails collects some useful information of cop-tasks during execution. type CopTasksDetails struct { NumCopTasks int diff --git a/sessiontxn/staleread/BUILD.bazel b/sessiontxn/staleread/BUILD.bazel index 9c1e11823e32a..69d3a7ef7e3e0 100644 --- a/sessiontxn/staleread/BUILD.bazel +++ b/sessiontxn/staleread/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//types", "//util/dbterror", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_tikv_client_go_v2//oracle", ], ) diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index 17df59c2873e3..af91ffd1b175e 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -280,7 +280,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as return 0, nil } - ts, err := CalculateAsOfTsExpr(sctx, asOf.TsExpr) + ts, err := CalculateAsOfTsExpr(ctx, sctx, asOf.TsExpr) if err != nil { return 0, err } diff --git a/sessiontxn/staleread/util.go b/sessiontxn/staleread/util.go index 3fa84f72cae0b..814861ffddcce 100644 --- a/sessiontxn/staleread/util.go +++ b/sessiontxn/staleread/util.go @@ -18,6 +18,7 @@ import ( "context" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" @@ -29,7 +30,16 @@ import ( ) // CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS. -func CalculateAsOfTsExpr(sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) { +func CalculateAsOfTsExpr(ctx context.Context, sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) { + sctx.GetSessionVars().StmtCtx.SetStaleTSOProvider(func() (uint64, error) { + failpoint.Inject("mockStaleReadTSO", func(val failpoint.Value) (uint64, error) { + return uint64(val.(int)), nil + }) + // this function accepts a context, but we don't need it when there is a valid cached ts. + // in most cases, the stale read ts can be calculated from `cached ts + time since cache - staleness`, + // this can be more accurate than `time.Now() - staleness`, because TiDB's local time can drift. + return sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) + }) tsVal, err := expression.EvalAstExpr(sctx, tsExpr) if err != nil { return 0, err From 8fac5d3d5e70f39dcd974a5f5dfe3c8236a281a0 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 30 May 2023 17:47:31 +0800 Subject: [PATCH 2/2] resolve conflict Signed-off-by: you06 --- executor/calibrate_resource.go | 407 --------------------------------- sessionctx/stmtctx/stmtctx.go | 7 - 2 files changed, 414 deletions(-) delete mode 100644 executor/calibrate_resource.go diff --git a/executor/calibrate_resource.go b/executor/calibrate_resource.go deleted file mode 100644 index 0ca6f5cba29f9..0000000000000 --- a/executor/calibrate_resource.go +++ /dev/null @@ -1,407 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package executor - -import ( - "context" - "fmt" - "math" - "sort" - "time" - - "github.com/docker/go-units" - "github.com/pingcap/errors" - "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/ast" - "github.com/pingcap/tidb/parser/duration" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/sessiontxn/staleread" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/mathutil" - "github.com/pingcap/tidb/util/sqlexec" - "github.com/tikv/client-go/v2/oracle" - rmclient "github.com/tikv/pd/client/resource_group/controller" -) - -var ( - // workloadBaseRUCostMap contains the base resource cost rate per 1 kv cpu within 1 second, - // the data is calculated from benchmark result, these data might not be very accurate, - // but is enough here because the maximum RU capacity is depended on both the cluster and - // the workload. - workloadBaseRUCostMap = map[ast.CalibrateResourceType]*baseResourceCost{ - ast.TPCC: { - tidbToKVCPURatio: 0.6, - kvCPU: 0.15, - readBytes: units.MiB / 2, - writeBytes: units.MiB, - readReqCount: 300, - writeReqCount: 1750, - }, - ast.OLTPREADWRITE: { - tidbToKVCPURatio: 1.25, - kvCPU: 0.35, - readBytes: units.MiB * 4.25, - writeBytes: units.MiB / 3, - readReqCount: 1600, - writeReqCount: 1400, - }, - ast.OLTPREADONLY: { - tidbToKVCPURatio: 2, - kvCPU: 0.52, - readBytes: units.MiB * 28, - writeBytes: 0, - readReqCount: 4500, - writeReqCount: 0, - }, - ast.OLTPWRITEONLY: { - tidbToKVCPURatio: 1, - kvCPU: 0, - readBytes: 0, - writeBytes: units.MiB, - readReqCount: 0, - writeReqCount: 3550, - }, - } - - // resourceGroupCtl is the ResourceGroupController in pd client - resourceGroupCtl *rmclient.ResourceGroupsController -) - -// SetResourceGroupController set a inited ResourceGroupsController for calibrate usage. -func SetResourceGroupController(rc *rmclient.ResourceGroupsController) { - resourceGroupCtl = rc -} - -// GetResourceGroupController returns the ResourceGroupsController. -func GetResourceGroupController() *rmclient.ResourceGroupsController { - return resourceGroupCtl -} - -// the resource cost rate of a specified workload per 1 tikv cpu. -type baseResourceCost struct { - // represents the average ratio of TiDB CPU time to TiKV CPU time, this is used to calculate whether tikv cpu - // or tidb cpu is the performance bottle neck. - tidbToKVCPURatio float64 - // the kv CPU time for calculate RU, it's smaller than the actual cpu usage. The unit is seconds. - kvCPU float64 - // the read bytes rate per 1 tikv cpu. - readBytes uint64 - // the write bytes rate per 1 tikv cpu. - writeBytes uint64 - // the average tikv read request count per 1 tikv cpu. - readReqCount uint64 - // the average tikv write request count per 1 tikv cpu. - writeReqCount uint64 -} - -const ( - // valuableUsageThreshold is the threshold used to determine whether the CPU is high enough. - // The sampling point is available when the CPU utilization of tikv or tidb is higher than the valuableUsageThreshold. - valuableUsageThreshold = 0.2 - // lowUsageThreshold is the threshold used to determine whether the CPU is too low. - // When the CPU utilization of tikv or tidb is lower than lowUsageThreshold, but neither is higher than valuableUsageThreshold, the sampling point is unavailable - lowUsageThreshold = 0.1 - // calibration is performed only when the available time point exceeds the percentOfPass - percentOfPass = 0.9 - // For quotas computed at each point in time, the maximum and minimum portions are discarded, and discardRate is the percentage discarded - discardRate = 0.1 - - // duration Indicates the supported calibration duration - maxDuration = time.Hour * 24 - minDuration = time.Minute * 10 -) - -type calibrateResourceExec struct { - baseExecutor - optionList []*ast.DynamicCalibrateResourceOption - workloadType ast.CalibrateResourceType - done bool -} - -func (e *calibrateResourceExec) parseCalibrateDuration(ctx context.Context) (startTime time.Time, endTime time.Time, err error) { - var dur time.Duration - var ts uint64 - for _, op := range e.optionList { - switch op.Tp { - case ast.CalibrateStartTime: - ts, err = staleread.CalculateAsOfTsExpr(ctx, e.ctx, op.Ts) - if err != nil { - return - } - startTime = oracle.GetTimeFromTS(ts) - case ast.CalibrateEndTime: - ts, err = staleread.CalculateAsOfTsExpr(ctx, e.ctx, op.Ts) - if err != nil { - return - } - endTime = oracle.GetTimeFromTS(ts) - case ast.CalibrateDuration: - dur, err = duration.ParseDuration(op.StrValue) - if err != nil { - return - } - } - } - if startTime.IsZero() { - err = errors.Errorf("start time should not be 0") - return - } - // If endTime is set, duration will be ignored. - if endTime.IsZero() { - if dur != time.Duration(0) { - endTime = startTime.Add(dur) - } else { - endTime = time.Now() - } - } - // check the duration - dur = endTime.Sub(startTime) - if dur > maxDuration { - err = errors.Errorf("the duration of calibration is too long, which could lead to inaccurate output. Please make the duration between %s and %s", minDuration.String(), maxDuration.String()) - return - } - if dur < minDuration { - err = errors.Errorf("the duration of calibration is too short, which could lead to inaccurate output. Please make the duration between %s and %s", minDuration.String(), maxDuration.String()) - } - - return -} - -func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) error { - req.Reset() - if e.done { - return nil - } - e.done = true - - exec := e.ctx.(sqlexec.RestrictedSQLExecutor) - ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers) - if len(e.optionList) > 0 { - return e.dynamicCalibrate(ctx, req, exec) - } - return e.staticCalibrate(ctx, req, exec) -} - -func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { - startTs, endTs, err := e.parseCalibrateDuration(ctx) - if err != nil { - return err - } - startTime := startTs.In(e.ctx.GetSessionVars().Location()).Format(time.DateTime) - endTime := endTs.In(e.ctx.GetSessionVars().Location()).Format(time.DateTime) - - totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) - if err != nil { - return err - } - totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) - if err != nil { - return err - } - rus, err := getRUPerSec(ctx, e.ctx, exec, startTime, endTime) - if err != nil { - return err - } - tikvCPUs, err := getComponentCPUUsagePerSec(ctx, e.ctx, exec, "tikv", startTime, endTime) - if err != nil { - return err - } - tidbCPUs, err := getComponentCPUUsagePerSec(ctx, e.ctx, exec, "tidb", startTime, endTime) - if err != nil { - return err - } - quotas := make([]float64, 0) - lowCount := 0 - for { - if rus.isEnd() || tikvCPUs.isEnd() || tidbCPUs.isEnd() { - break - } - // make time point match - maxTime := rus.getTime() - if tikvCPUs.getTime().After(maxTime) { - maxTime = tikvCPUs.getTime() - } - if tidbCPUs.getTime().After(maxTime) { - maxTime = tidbCPUs.getTime() - } - if !rus.advance(maxTime) || !tikvCPUs.advance(maxTime) || !tidbCPUs.advance(maxTime) { - continue - } - tikvQuota, tidbQuota := tikvCPUs.getValue()/totalKVCPUQuota, tidbCPUs.getValue()/totalTiDBCPU - // If one of the two cpu usage is greater than the `valuableUsageThreshold`, we can accept it. - // And if both are greater than the `lowUsageThreshold`, we can also accept it. - if tikvQuota > valuableUsageThreshold || tidbQuota > valuableUsageThreshold { - quotas = append(quotas, rus.getValue()/mathutil.Max(tikvQuota, tidbQuota)) - } else if tikvQuota < lowUsageThreshold || tidbQuota < lowUsageThreshold { - lowCount++ - } else { - quotas = append(quotas, rus.getValue()/mathutil.Max(tikvQuota, tidbQuota)) - } - rus.next() - tidbCPUs.next() - tikvCPUs.next() - } - if len(quotas) < 5 { - return errors.Errorf("There are too few metrics points available in selected time window") - } - if float64(len(quotas))/float64(len(quotas)+lowCount) > percentOfPass { - sort.Slice(quotas, func(i, j int) bool { - return quotas[i] > quotas[j] - }) - lowerBound := int(math.Round(float64(len(quotas)) * discardRate)) - upperBound := len(quotas) - lowerBound - sum := 0. - for i := lowerBound; i < upperBound; i++ { - sum += quotas[i] - } - quota := sum / float64(upperBound-lowerBound) - req.AppendUint64(0, uint64(quota)) - } else { - return errors.Errorf("The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation; please select another time window with higher workload, or calibrate resource by hardware instead") - } - return nil -} - -func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { - if !variable.EnableResourceControl.Load() { - return infoschema.ErrResourceGroupSupportDisabled - } - // first fetch the ru settings config. - if resourceGroupCtl == nil { - return errors.New("resource group controller is not initialized") - } - - totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) - if err != nil { - return err - } - totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) - if err != nil { - return err - } - - // The default workload to calculate the RU capacity. - if e.workloadType == ast.WorkloadNone { - e.workloadType = ast.TPCC - } - baseCost, ok := workloadBaseRUCostMap[e.workloadType] - if !ok { - return errors.Errorf("unknown workload '%T'", e.workloadType) - } - - if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota { - totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio - } - ruCfg := resourceGroupCtl.GetConfig() - ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) + - float64(ruCfg.CPUMsCost)*baseCost.kvCPU*1000 + // convert to ms - float64(ruCfg.ReadBytesCost)*float64(baseCost.readBytes) + - float64(ruCfg.WriteBaseCost)*float64(baseCost.writeReqCount) + - float64(ruCfg.WriteBytesCost)*float64(baseCost.writeBytes) - quota := totalKVCPUQuota * ruPerKVCPU - req.AppendUint64(0, uint64(quota)) - return nil -} - -func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { - query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1" - return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota") -} - -func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { - query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1" - return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs") -} - -type timePointValue struct { - tp time.Time - val float64 -} - -type timeSeriesValues struct { - idx int - vals []*timePointValue -} - -func (t *timeSeriesValues) isEnd() bool { - return t.idx >= len(t.vals) -} - -func (t *timeSeriesValues) next() { - t.idx++ -} - -func (t *timeSeriesValues) getTime() time.Time { - return t.vals[t.idx].tp -} - -func (t *timeSeriesValues) getValue() float64 { - return t.vals[t.idx].val -} - -func (t *timeSeriesValues) advance(target time.Time) bool { - for ; t.idx < len(t.vals); t.idx++ { - // `target` is maximal time in other timeSeriesValues, - // so we should find the time which offset is less than 10s. - if t.vals[t.idx].tp.Add(time.Second * 10).After(target) { - return t.vals[t.idx].tp.Add(-time.Second * 10).Before(target) - } - } - return false -} - -func getRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) { - query := fmt.Sprintf("SELECT time, value FROM METRICS_SCHEMA.resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time asc", startTime, endTime) - return getValuesFromMetrics(ctx, sctx, exec, query, "resource_manager_resource_unit") -} - -func getComponentCPUUsagePerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, component, startTime, endTime string) (*timeSeriesValues, error) { - query := fmt.Sprintf("SELECT time, sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%%s' GROUP BY time ORDER BY time asc", startTime, endTime, component) - return getValuesFromMetrics(ctx, sctx, exec, query, "process_cpu_usage") -} - -func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) { - rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query) - if err != nil { - return 0.0, errors.Trace(err) - } - if len(rows) == 0 { - return 0.0, errors.Errorf("metrics '%s' is empty", metrics) - } - - return rows[0].GetFloat64(0), nil -} - -func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (*timeSeriesValues, error) { - rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query) - if err != nil { - return nil, errors.Trace(err) - } - if len(rows) == 0 { - return nil, errors.Errorf("metrics '%s' is empty", metrics) - } - ret := make([]*timePointValue, 0, len(rows)) - for _, row := range rows { - if tp, err := row.GetTime(0).AdjustedGoTime(sctx.GetSessionVars().Location()); err == nil { - ret = append(ret, &timePointValue{ - tp: tp, - val: row.GetFloat64(1), - }) - } - } - return &timeSeriesValues{idx: 0, vals: ret}, nil -} diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index df2b375a8c723..1df1c35354580 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -378,19 +378,12 @@ type StatementContext struct { TableStats map[int64]interface{} // useChunkAlloc indicates whether statement use chunk alloc useChunkAlloc bool -<<<<<<< HEAD -======= - // Check if TiFlash read engine is removed due to strict sql mode. - TiFlashEngineRemovedDueToStrictSQLMode bool - // CanonicalHashCode try to get the canonical hash code from expression. - CanonicalHashCode bool // StaleTSOProvider is used to provide stale timestamp oracle for read-only transactions. StaleTSOProvider struct { sync.Mutex value *uint64 eval func() (uint64, error) } ->>>>>>> e3776f3abb8 (sessiontxn/staleread: more accurate stale ts (#44272)) } // StmtHints are SessionVars related sql hints.