Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: fetch cpu quota metrics from store instead of prometheus (#49176) #49255

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 148 additions & 21 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,30 @@
package executor

import (
"bufio"
"context"
"encoding/base64"
"fmt"
"io"
"math"
"net/http"
"runtime"
"sort"
"strconv"
"strings"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/duration"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -121,6 +130,13 @@ const (
// duration Indicates the supported calibration duration
maxDuration = time.Hour * 24
minDuration = time.Minute

// serverTypeTiDB is tidb's instance type name
serverTypeTiDB = "tidb"
// serverTypeTiKV is tikv's instance type name
serverTypeTiKV = "tikv"
// serverTypeTiFlash is tiflash's instance type name
serverTypeTiFlash = "tiflash"
)

type calibrateResourceExec struct {
Expand Down Expand Up @@ -187,33 +203,39 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro
return nil
}
e.done = true

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
if len(e.optionList) > 0 {
return e.dynamicCalibrate(ctx, req, exec)
return e.dynamicCalibrate(ctx, req)
}
return e.staticCalibrate(ctx, req, exec)
return e.staticCalibrate(req)
}

var (
errLowUsage = errors.Errorf("The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation; please select another time window with higher workload, or calibrate resource by hardware instead")
errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v")
)

func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk) error {
exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
startTs, endTs, err := e.parseCalibrateDuration(ctx)
if err != nil {
return err
}
serverInfos, err := infoschema.GetClusterServerInfo(e.ctx)
if err != nil {
return err
}
startTime := startTs.In(e.ctx.GetSessionVars().Location()).Format("2006-01-02 15:04:05")
endTime := endTs.In(e.ctx.GetSessionVars().Location()).Format("2006-01-02 15:04:05")

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
totalKVCPUQuota, err := getTiKVTotalCPUQuota(serverInfos)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
totalTiDBCPU, err := getTiDBTotalCPUQuota(serverInfos)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
Expand Down Expand Up @@ -277,20 +299,21 @@ func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk
return nil
}

func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
func (e *calibrateResourceExec) staticCalibrate(req *chunk.Chunk) error {
// first fetch the ru settings config.
if resourceGroupCtl == nil {
return errors.New("resource group controller is not initialized")
}
clusterInfo, err := infoschema.GetClusterServerInfo(e.ctx)
if err != nil {
return err
}

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
totalKVCPUQuota, err := getTiKVTotalCPUQuota(clusterInfo)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
totalTiDBCPUQuota, err := getTiDBTotalCPUQuota(clusterInfo)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
Expand All @@ -304,8 +327,8 @@ func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.
return errors.Errorf("unknown workload '%T'", e.workloadType)
}

if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio
if totalTiDBCPUQuota/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPUQuota / baseCost.tidbToKVCPURatio
}
ruCfg := resourceGroupCtl.GetConfig()
ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) +
Expand All @@ -318,14 +341,27 @@ func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.
return nil
}

func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota")
func getTiDBTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) {
cpuQuota := float64(runtime.GOMAXPROCS(0))
failpoint.Inject("mockGOMAXPROCS", func(val failpoint.Value) {
if val != nil {
cpuQuota = float64(val.(int))
}
})
instanceNum := count(clusterInfo, serverTypeTiDB)
return cpuQuota * float64(instanceNum), nil
}

func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
func getTiKVTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) {
instanceNum := count(clusterInfo, serverTypeTiKV)
if instanceNum == 0 {
return 0.0, errors.New("no server with type 'tikv' is found")
}
cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiKV, "tikv_server_cpu_cores_quota")
if err != nil {
return 0.0, err
}
return cpuQuota * float64(instanceNum), nil
}

type timePointValue struct {
Expand Down Expand Up @@ -403,3 +439,94 @@ func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sql
}
return &timeSeriesValues{idx: 0, vals: ret}, nil
}

func count(clusterInfo []infoschema.ServerInfo, ty string) int {
num := 0
for _, e := range clusterInfo {
if e.ServerType == ty {
num++
}
}
return num
}

func fetchServerCPUQuota(serverInfos []infoschema.ServerInfo, serverType string, metricName string) (float64, error) {
var cpuQuota float64
err := fetchStoreMetrics(serverInfos, serverType, func(addr string, resp *http.Response) error {
if resp.StatusCode != http.StatusOK {
return errors.Errorf("request %s failed: %s", addr, resp.Status)
}
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, metricName) {
continue
}
// the metrics format is like following:
// tikv_server_cpu_cores_quota 8
quota, err := strconv.ParseFloat(line[len(metricName)+1:], 64)
if err == nil {
cpuQuota = quota
}
return errors.Trace(err)
}
return errors.Errorf("metrics '%s' not found from server '%s'", metricName, addr)
})
return cpuQuota, err
}

func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, onResp func(string, *http.Response) error) error {
var firstErr error
for _, srv := range serversInfo {
if srv.ServerType != serverType {
continue
}
if len(srv.StatusAddr) == 0 {
continue
}
url := fmt.Sprintf("%s://%s/metrics", util.InternalHTTPSchema(), srv.StatusAddr)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
var resp *http.Response
failpoint.Inject("mockMetricsResponse", func(val failpoint.Value) {
if val != nil {
data, _ := base64.StdEncoding.DecodeString(val.(string))
resp = &http.Response{
StatusCode: http.StatusOK,
Body: noopCloserWrapper{
Reader: strings.NewReader(string(data)),
},
}
}
})
if resp == nil {
var err1 error
// ignore false positive go line, can't use defer here because it's in a loop.
//nolint:bodyclose
resp, err1 = util.InternalHTTPClient().Do(req)
if err1 != nil {
if firstErr == nil {
firstErr = err1
}
continue
}
}
err = onResp(srv.Address, resp)
resp.Body.Close()
return err
}
if firstErr == nil {
firstErr = errors.Errorf("no server with type '%s' is found", serverType)
}
return firstErr
}

type noopCloserWrapper struct {
io.Reader
}

func (noopCloserWrapper) Close() error {
return nil
}
59 changes: 36 additions & 23 deletions executor/calibrate_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package executor_test

import (
"context"
"encoding/base64"
"encoding/json"
"strings"
"testing"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -76,12 +78,27 @@ func TestCalibrateResource(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "query metric error: pd unavailable")
require.ErrorContains(t, err, "no server with type 'tikv' is found")

// error sql
_, err = tk.Exec("CALIBRATE RESOURCE WORKLOAD tpcc START_TIME '2020-02-12 10:35:00'")
require.Error(t, err)

// Mock for cluster info
// information_schema.cluster_config
instances := []string{
"pd,127.0.0.1:32379,127.0.0.1:32380,mock-version,mock-githash,0",
"tidb,127.0.0.1:34000,30080,mock-version,mock-githash,1001",
"tikv,127.0.0.1:30160,30180,mock-version,mock-githash,0",
"tikv,127.0.0.1:30161,30181,mock-version,mock-githash,0",
"tikv,127.0.0.1:30162,30182,mock-version,mock-githash,0",
}
fpExpr := `return("` + strings.Join(instances, ";") + `")`
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/infoschema/mockClusterInfo", fpExpr))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/infoschema/mockClusterInfo"))
}()

// Mock for metric table data.
fpName := "github.com/pingcap/tidb/executor/mockMetricsTableData"
require.NoError(t, failpoint.Enable(fpName, "return"))
Expand All @@ -95,40 +112,36 @@ func TestCalibrateResource(t *testing.T) {
return time
}

metricsData := `# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 49943
# HELP tikv_server_cpu_cores_quota Total CPU cores quota for TiKV server
# TYPE tikv_server_cpu_cores_quota gauge
tikv_server_cpu_cores_quota 8
`
// failpoint doesn't support string contains whitespaces and newline
encodedData := base64.StdEncoding.EncodeToString([]byte(metricsData))
fpExpr = `return("` + encodedData + `")`
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockMetricsResponse", fpExpr))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockGOMAXPROCS", "return(40)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockGOMAXPROCS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockMetricsResponse"))
}()
mockData := make(map[string][][]types.Datum)
ctx := context.WithValue(context.Background(), "__mockMetricsTableData", mockData)
ctx = failpoint.WithHook(ctx, func(_ context.Context, fpname string) bool {
return fpName == fpname
})
rs, err = tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(ctx, rs.NewChunk(nil))
// because when mock metrics is empty, error is always `pd unavailable`, don't check detail.
require.ErrorContains(t, err, "There is no CPU quota metrics, query metric error: pd unavailable")

mockData["tikv_cpu_quota"] = [][]types.Datum{
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-0", 8.0),
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-1", 8.0),
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-2", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-0", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-1", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-2", 8.0),
}
mockData["tidb_server_maxprocs"] = [][]types.Datum{
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 40.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tidb-0", 40.0),
}

tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("69768"))
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD TPCC").Check(testkit.Rows("69768"))
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_READ_WRITE").Check(testkit.Rows("55823"))
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_READ_ONLY").Check(testkit.Rows("34926"))
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_WRITE_ONLY").Check(testkit.Rows("109776"))

// change total tidb cpu to less than tikv_cpu_quota
mockData["tidb_server_maxprocs"] = [][]types.Datum{
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 8.0),
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockGOMAXPROCS", "return(8)"))
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("38760"))

// construct data for dynamic calibrate
Expand Down