Skip to content

Commit

Permalink
Merge branch 'master' into typectx_clip_zero
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Oct 19, 2023
2 parents b84d8f0 + 90bd2dd commit 5590475
Show file tree
Hide file tree
Showing 93 changed files with 4,149 additions and 2,083 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ func getDefaultValue(ctx sessionctx.Context, col *table.Column, option *ast.Colu
return str, false, err
}
// For other kind of fields (e.g. INT), we supply its integer as string value.
value, err := v.GetBinaryLiteral().ToInt(ctx.GetSessionVars().StmtCtx.TypeCtx)
value, err := v.GetBinaryLiteral().ToInt(ctx.GetSessionVars().StmtCtx.TypeCtx())
if err != nil {
return nil, false, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type JobContext struct {
tp string

resourceGroupName string
cloudStorageURI string
}

// NewJobContext returns a new ddl job context.
Expand Down
10 changes: 9 additions & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ SwitchIndexState:
}
return ver, err
}
loadCloudStorageURI(w, job)
if reorgTp.NeedMergeProcess() {
// Increase telemetryAddIndexIngestUsage
telemetryAddIndexIngestUsage.Inc()
Expand Down Expand Up @@ -791,6 +792,12 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt
return model.ReorgTypeTxnMerge, nil
}

func loadCloudStorageURI(w *worker, job *model.Job) {
jc := w.jobContext(job.ID, job.ReorgMeta)
jc.cloudStorageURI = variable.CloudStorageURI.Load()
job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0
}

// cleanupSortPath is used to clean up the temp data of the previous jobs.
// Because we don't remove all the files after the support of checkpoint,
// there maybe some stale files in the sort path if TiDB is killed during the backfill process.
Expand Down Expand Up @@ -2098,11 +2105,12 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
elemIDs = append(elemIDs, elem.ID)
}

job := reorgInfo.Job
taskMeta := &BackfillGlobalMeta{
Job: *reorgInfo.Job.Clone(),
EleIDs: elemIDs,
EleTypeKey: reorgInfo.currElement.TypeKey,
CloudStorageURI: variable.CloudStorageURI.Load(),
CloudStorageURI: w.jobContext(job.ID, job.ReorgMeta).cloudStorageURI,
}

metaData, err := json.Marshal(taskMeta)
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error {
Revertible: true,
CtxVars: job.CtxVars,
ReorgTp: reorgTp,
UseCloud: false,
})
return nil
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ const (
TopologyTimeToRefresh = 30 * time.Second
// TopologyPrometheus means address of prometheus.
TopologyPrometheus = "/topology/prometheus"
// TopologyTiProxy means address of TiProxy.
TopologyTiProxy = "/topology/tiproxy"
// infoSuffix is the suffix of TiDB/TiProxy topology info.
infoSuffix = "/info"
// TablePrometheusCacheExpiry is the expiry time for prometheus address cache.
TablePrometheusCacheExpiry = 10 * time.Second
// RequestRetryInterval is the sleep time before next retry for http request
Expand Down Expand Up @@ -1312,3 +1316,68 @@ func SetPDScheduleConfig(ctx context.Context, config map[string]interface{}) err
}
return is.scheduleManager.SetPDScheduleConfig(ctx, config)
}

// TiProxyServerInfo is the server info for TiProxy.
type TiProxyServerInfo struct {
Version string `json:"version"`
GitHash string `json:"git_hash"`
IP string `json:"ip"`
Port string `json:"port"`
StatusPort string `json:"status_port"`
StartTimestamp int64 `json:"start_timestamp"`
}

// GetTiProxyServerInfo gets all TiProxy servers information from etcd.
func GetTiProxyServerInfo(ctx context.Context) (map[string]*TiProxyServerInfo, error) {
failpoint.Inject("mockGetTiProxyServerInfo", func(val failpoint.Value) {
res := make(map[string]*TiProxyServerInfo)
err := json.Unmarshal([]byte(val.(string)), &res)
failpoint.Return(res, err)
})
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
}
return is.getTiProxyServerInfo(ctx)
}

func (is *InfoSyncer) getTiProxyServerInfo(ctx context.Context) (map[string]*TiProxyServerInfo, error) {
// In test.
if is.etcdCli == nil {
return nil, nil
}

var err error
var resp *clientv3.GetResponse
allInfo := make(map[string]*TiProxyServerInfo)
for i := 0; i < keyOpDefaultRetryCnt; i++ {
if ctx.Err() != nil {
return nil, errors.Trace(ctx.Err())
}
childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
resp, err = is.etcdCli.Get(childCtx, TopologyTiProxy, clientv3.WithPrefix())
cancel()
if err != nil {
logutil.BgLogger().Info("get key failed", zap.String("key", TopologyTiProxy), zap.Error(err))
time.Sleep(200 * time.Millisecond)
continue
}
for _, kv := range resp.Kvs {
key := string(kv.Key)
if !strings.HasSuffix(key, infoSuffix) {
continue
}
addr := key[len(TopologyTiProxy)+1 : len(key)-len(infoSuffix)]
var info TiProxyServerInfo
err = json.Unmarshal(kv.Value, &info)
if err != nil {
logutil.BgLogger().Info("unmarshal key failed", zap.String("key", key), zap.ByteString("value", kv.Value),
zap.Error(err))
return nil, errors.Trace(err)
}
allInfo[addr] = &info
}
return allInfo, nil
}
return nil, errors.Trace(err)
}
2 changes: 1 addition & 1 deletion pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ go_library(
"//pkg/statistics",
"//pkg/statistics/handle",
"//pkg/statistics/handle/cache",
"//pkg/statistics/handle/globalstats",
"//pkg/statistics/handle/util",
"//pkg/store/driver/backoff",
"//pkg/store/driver/txn",
Expand Down Expand Up @@ -323,7 +324,6 @@ go_test(
"inspection_result_test.go",
"inspection_summary_test.go",
"join_pkg_test.go",
"join_test.go",
"joiner_test.go",
"main_test.go",
"memtable_reader_test.go",
Expand Down
3 changes: 1 addition & 2 deletions pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
fms = append(fms, collectors[i].FMSketch)
}
if needExtStats {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
extStats, err = statsHandle.BuildExtendedStats(e.TableID.GetStatisticsID(), e.colsInfo, collectors)
extStats, err = statistics.BuildExtendedStats(e.ctx, e.TableID.GetStatisticsID(), e.colsInfo, collectors)
if err != nil {
return nil, nil, nil, nil, nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(

count = rootRowCollector.Base().Count
if needExtStats {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
extStats, err = statsHandle.BuildExtendedStats(e.TableID.GetStatisticsID(), e.colsInfo, sampleCollectors)
extStats, err = statistics.BuildExtendedStats(e.ctx, e.TableID.GetStatisticsID(), e.colsInfo, sampleCollectors)
if err != nil {
return 0, nil, nil, nil, nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/globalstats"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -84,7 +85,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob
cache = nil
}

globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(
globalStatsI, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(
e.Ctx(),
globalOpts, e.Ctx().GetInfoSchema().(infoschema.InfoSchema),
globalStatsID.tableID,
Expand All @@ -101,6 +102,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob
}
return err
}
globalStats := globalStatsI.(*globalstats.GlobalStats)
// Dump global-level stats to kv.
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
Expand Down
64 changes: 0 additions & 64 deletions pkg/executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package executor_test

import (
"fmt"
"os"
"strconv"
"strings"
"testing"

"github.com/pingcap/tidb/pkg/domain"
Expand Down Expand Up @@ -102,25 +100,6 @@ func TestAnalyzeIndexExtractTopN(t *testing.T) {
}
}

func TestAnalyzePartitionTableForFloat(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustExec("use test")
tk.MustExec("CREATE TABLE t1 ( id bigint(20) unsigned NOT NULL AUTO_INCREMENT, num float(9,8) DEFAULT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin PARTITION BY HASH (id) PARTITIONS 128;")
// To reproduce the error we meet in https://github.com/pingcap/tidb/issues/35910, we should use the data provided in this issue
b, err := os.ReadFile("testdata/analyze_test_data.sql")
require.NoError(t, err)
sqls := strings.Split(string(b), ";")
for _, sql := range sqls {
if len(sql) < 1 {
continue
}
tk.MustExec(sql)
}
tk.MustExec("analyze table t1")
}

func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -198,46 +177,3 @@ func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) {
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'").CheckAt([]int{5, 6}, expected)
}
}

func TestMergeGlobalStatsWithUnAnalyzedPartition(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set tidb_partition_prune_mode=dynamic;")
tk.MustExec("CREATE TABLE `t` ( `id` int(11) DEFAULT NULL, `a` int(11) DEFAULT NULL, `b` int(11) DEFAULT NULL, `c` int(11) DEFAULT NULL ) PARTITION BY RANGE (`id`) (PARTITION `p0` VALUES LESS THAN (3), PARTITION `p1` VALUES LESS THAN (7), PARTITION `p2` VALUES LESS THAN (11));")
tk.MustExec("insert into t values (1,1,1,1),(2,2,2,2),(4,4,4,4),(5,5,5,5),(6,6,6,6),(8,8,8,8),(9,9,9,9);")
tk.MustExec("create index idxa on t (a);")
tk.MustExec("create index idxb on t (b);")
tk.MustExec("create index idxc on t (c);")
tk.MustExec("analyze table t partition p0 index idxa;")
tk.MustExec("analyze table t partition p1 index idxb;")
tk.MustExec("analyze table t partition p2 index idxc;")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1105 The version 2 would collect all statistics not only the selected indexes",
"Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t's partition p2, reason to use this rate is \"use min(1, 110000/10000) as the sample-rate=1\""))
tk.MustExec("analyze table t partition p0;")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t's partition p0, reason to use this rate is \"use min(1, 110000/2) as the sample-rate=1\""))
}

func TestSetFastAnalyzeSystemVariable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@session.tidb_enable_fast_analyze=1")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1105 the fast analyze feature has already been removed in TiDB v7.5.0, so this will have no effect"))
}

func TestIncrementalAnalyze(t *testing.T) {
msg := "the incremental analyze feature has already been removed in TiDB v7.5.0, so this will have no effect"
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, primary key(a), index idx(b))")
tk.MustMatchErrMsg("analyze incremental table t index", msg)
// Create a partition table.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, primary key(a), index idx(b)) partition by range(a) (partition p0 values less than (10), partition p1 values less than (20))")
tk.MustMatchErrMsg("analyze incremental table t partition p0 index idx", msg)
}
5 changes: 3 additions & 2 deletions pkg/executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -61,7 +61,8 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
worker.errCh <- errors.Trace(exeerrors.ErrQueryInterrupted)
return
}
err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot, util.StatsMetaHistorySourceAnalyze)
statsHandle := domain.GetDomain(worker.sctx).StatsHandle()
err := statsHandle.SaveTableStatsToStorage(results, analyzeSnapshot, util.StatsMetaHistorySourceAnalyze)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(worker.sctx, results.Job, err)
Expand Down
Loading

0 comments on commit 5590475

Please sign in to comment.