Skip to content

Commit

Permalink
statistics: record last gc ts to avoid huge read on stats table (#46138)
Browse files Browse the repository at this point in the history
close #31778, close #45966
  • Loading branch information
winoros authored Sep 11, 2023
1 parent 789d6d0 commit 51f1a82
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 9 deletions.
1 change: 0 additions & 1 deletion build/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ nogo(
}) +
select({
"//build:without_rbe": [
"//build/linter/filepermission",
],
"//conditions:default": [],
}),
Expand Down
3 changes: 2 additions & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2062,6 +2062,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
if tblInfo.TiFlashReplica != nil {
removeTiFlashAvailablePartitionIDs(tblInfo, physicalTableIDs)
}
droppedDefs := tblInfo.Partition.DroppingDefinitions
tblInfo.Partition.DroppingDefinitions = nil
// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []interface{}{physicalTableIDs}
Expand All @@ -2071,7 +2072,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
}
job.SchemaState = model.StateNone
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
asyncNotifyEvent(d, &util.Event{Tp: model.ActionDropTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: tblInfo.Partition.Definitions}})
asyncNotifyEvent(d, &util.Event{Tp: model.ActionDropTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: droppedDefs}})
// A background job will be created to delete old partition data.
job.Args = []interface{}{physicalTableIDs}
default:
Expand Down
5 changes: 5 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,11 @@ func onDropTableOrView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ er
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
startKey := tablecodec.EncodeTablePrefix(job.TableID)
job.Args = append(job.Args, startKey, oldIDs, ruleIDs)
if tblInfo.IsSequence() {
asyncNotifyEvent(d, &util.Event{Tp: model.ActionDropSequence, TableInfo: tblInfo})
} else if !tblInfo.IsView() {
asyncNotifyEvent(d, &util.Event{Tp: model.ActionDropTable, TableInfo: tblInfo})
}
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State))
}
Expand Down
42 changes: 42 additions & 0 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
}
case model.ActionDropTable:
ids := h.getInitStateTableIDs(t.TableInfo)
for _, id := range ids {
if err := h.resetTableStats2KVForDrop(id); err != nil {
return err
}
}
case model.ActionAddColumn, model.ActionModifyColumn:
ids := h.getInitStateTableIDs(t.TableInfo)
for _, id := range ids {
Expand All @@ -63,6 +70,11 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
}
for _, def := range t.PartInfo.Definitions {
if err := h.resetTableStats2KVForDrop(def.ID); err != nil {
return err
}
}
case model.ActionReorganizePartition:
for _, def := range t.PartInfo.Definitions {
// TODO: Should we trigger analyze instead of adding 0s?
Expand Down Expand Up @@ -320,6 +332,36 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
return nil
}

// resetTableStats2KV resets the count to 0.
func (h *Handle) resetTableStats2KVForDrop(physicalID int64) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.recordHistoricalStatsMeta(physicalID, statsVer, StatsMetaHistorySourceSchemaChange)
}
}()
h.mu.Lock()
defer h.mu.Unlock()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
startTS := txn.StartTS()
if _, err := exec.ExecuteInternal(ctx, "update mysql.stats_meta set version=%? where table_id =%?", startTS, physicalID); err != nil {
return err
}
return nil
}

// insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value.
// This operation also updates version.
func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) {
Expand Down
10 changes: 7 additions & 3 deletions statistics/handle/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,16 @@ func TestDDLHistogram(t *testing.T) {
func TestDDLPartition(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
for _, pruneMode := range []string{"static", "dynamic"} {
for i, pruneMode := range []string{"static", "dynamic"} {
testKit.MustExec("set @@tidb_partition_prune_mode=`" + pruneMode + "`")
testKit.MustExec("set global tidb_partition_prune_mode=`" + pruneMode + "`")
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
h := do.StatsHandle()
if i == 1 {
err := h.HandleDDLEvent(<-h.DDLEventCh())
require.NoError(t, err)
}
createTable := `CREATE TABLE t (a int, b int, primary key(a), index idx(b))
PARTITION BY RANGE ( a ) (
PARTITION p0 VALUES LESS THAN (6),
Expand All @@ -207,14 +212,13 @@ PARTITION BY RANGE ( a ) (
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()
h := do.StatsHandle()
err = h.HandleDDLEvent(<-h.DDLEventCh())
require.NoError(t, err)
require.Nil(t, h.Update(is))
pi := tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
require.False(t, statsTbl.Pseudo)
require.False(t, statsTbl.Pseudo, "for %v", pruneMode)
}

testKit.MustExec("insert into t values (1,2),(6,2),(11,2),(16,2)")
Expand Down
44 changes: 42 additions & 2 deletions statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package handle
import (
"context"
"encoding/json"
"strconv"
"time"

"github.com/pingcap/errors"
Expand All @@ -34,9 +35,11 @@ import (
"go.uber.org/zap"
)

const gcLastTSVarName = "tidb_stats_gc_last_ts"

// GCStats will garbage collect the useless stats info. For dropped tables, we will first update their version so that
// other tidb could know that table is deleted.
func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error {
func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) {
ctx := context.Background()
// To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb,
// we only garbage collect version before 10 lease.
Expand All @@ -47,7 +50,17 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error
return nil
}
gcVer := now - offset
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_meta where version < %?", gcVer)
lastGC, err := h.GetLastGCTimestamp(ctx)
if err != nil {
return err
}
defer func() {
if err != nil {
return
}
err = h.writeGCTimestampToKV(ctx, gcVer)
}()
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_meta where version >= %? and version < %?", lastGC, gcVer)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -70,6 +83,33 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error
return h.removeDeletedExtendedStats(gcVer)
}

// GetLastGCTimestamp loads the last gc time from mysql.tidb.
func (h *Handle) GetLastGCTimestamp(ctx context.Context) (uint64, error) {
rows, _, err := h.execRestrictedSQL(ctx, "SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name=%?", gcLastTSVarName)
if err != nil {
return 0, errors.Trace(err)
}
if len(rows) == 0 {
return 0, nil
}
lastGcTSString := rows[0].GetString(0)
lastGcTS, err := strconv.ParseUint(lastGcTSString, 10, 64)
if err != nil {
return 0, errors.Trace(err)
}
return lastGcTS, nil
}

func (h *Handle) writeGCTimestampToKV(ctx context.Context, newTS uint64) error {
_, _, err := h.execRestrictedSQL(ctx,
"insert into mysql.tidb (variable_name, variable_value) values (%?, %?) on duplicate key update variable_value = %?",
gcLastTSVarName,
newTS,
newTS,
)
return err
}

func (h *Handle) gcTableStats(is infoschema.InfoSchema, physicalID int64) error {
ctx := context.Background()
rows, _, err := h.execRestrictedSQL(ctx, "select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID)
Expand Down
4 changes: 3 additions & 1 deletion statistics/handle/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func TestGCExtendedStats(t *testing.T) {
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")
testKit.MustExec("alter table t add stats_extended s1 correlation(a,b)")
testKit.MustExec("alter table t add stats_extended s2 correlation(b,c)")
h := dom.StatsHandle()
require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh()))
testKit.MustExec("analyze table t")

testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows(
Expand All @@ -114,7 +116,6 @@ func TestGCExtendedStats(t *testing.T) {
"s1 2 [1,2] 1.000000 1",
"s2 2 [2,3] 1.000000 1",
))
h := dom.StatsHandle()
ddlLease := time.Duration(0)
require.Nil(t, h.GCStats(dom.InfoSchema(), ddlLease))
testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows(
Expand All @@ -130,6 +131,7 @@ func TestGCExtendedStats(t *testing.T) {
testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows(
"s2 2 [2,3] 1.000000 1",
))
require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh()))
require.Nil(t, h.GCStats(dom.InfoSchema(), ddlLease))
testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows(
"s2 2 [2,3] 1.000000 2",
Expand Down
7 changes: 6 additions & 1 deletion statistics/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,11 +754,14 @@ func TestGlobalIndexStatistics(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

for _, version := range []string{"1", "2"} {
for i, version := range []string{"1", "2"} {
tk.MustExec("set @@session.tidb_analyze_version = " + version)

// analyze table t
tk.MustExec("drop table if exists t")
if i != 0 {
require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh()))
}
tk.MustExec("CREATE TABLE t ( a int, b int, c int default 0, key(a) )" +
"PARTITION BY RANGE (a) (" +
"PARTITION p0 VALUES LESS THAN (10)," +
Expand All @@ -779,6 +782,7 @@ func TestGlobalIndexStatistics(t *testing.T) {

// analyze table t index idx
tk.MustExec("drop table if exists t")
require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh()))
tk.MustExec("CREATE TABLE t ( a int, b int, c int default 0, primary key(b, a) clustered )" +
"PARTITION BY RANGE (a) (" +
"PARTITION p0 VALUES LESS THAN (10)," +
Expand All @@ -796,6 +800,7 @@ func TestGlobalIndexStatistics(t *testing.T) {

// analyze table t index
tk.MustExec("drop table if exists t")
require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh()))
tk.MustExec("CREATE TABLE t ( a int, b int, c int default 0, primary key(b, a) clustered )" +
"PARTITION BY RANGE (a) (" +
"PARTITION p0 VALUES LESS THAN (10)," +
Expand Down

0 comments on commit 51f1a82

Please sign in to comment.