Skip to content

Commit

Permalink
statistics: Update global count and modify_count only if partition is…
Browse files Browse the repository at this point in the history
… not locked (#47319)

ref #46351
  • Loading branch information
Rustin170506 authored Sep 28, 2023
1 parent 2992c1c commit 05b9786
Show file tree
Hide file tree
Showing 14 changed files with 428 additions and 171 deletions.
1 change: 1 addition & 0 deletions executor/lockstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//infoschema",
"//parser/ast",
"//parser/model",
"//statistics/handle/lockstats",
"//table/tables",
"//util/chunk",
"@com_github_pingcap_errors//:errors",
Expand Down
28 changes: 16 additions & 12 deletions executor/lockstats/lock_stats_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle/lockstats"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/chunk"
)
Expand Down Expand Up @@ -67,12 +68,12 @@ func (e *LockExec) Next(_ context.Context, _ *chunk.Chunk) error {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}
} else {
tidAndNames, pidAndNames, err := populateTableAndPartitionIDs(e.Tables, is)
tableWithPartitions, err := populateTableAndPartitionIDs(e.Tables, is)
if err != nil {
return err
}

msg, err := h.LockTables(tidAndNames, pidAndNames)
msg, err := h.LockTables(tableWithPartitions)
if err != nil {
return err
}
Expand Down Expand Up @@ -130,35 +131,38 @@ func populatePartitionIDAndNames(
return tbl.Meta().ID, pidNames, nil
}

// populateTableAndPartitionIDs returns table IDs and partition IDs for the given table names.
// populateTableAndPartitionIDs returns the lockstats.TableInfo for the given table names.
func populateTableAndPartitionIDs(
tables []*ast.TableName,
is infoschema.InfoSchema,
) (map[int64]string, map[int64]string, error) {
) (map[int64]*lockstats.TableInfo, error) {
if len(tables) == 0 {
return nil, nil, errors.New("table list should not be empty")
return nil, errors.New("table list should not be empty")
}

tidAndNames := make(map[int64]string, len(tables))
pidAndNames := make(map[int64]string, len(tables))
tableWithPartitions := make(map[int64]*lockstats.TableInfo, len(tables))

for _, table := range tables {
tbl, err := is.TableByName(table.Schema, table.Name)
if err != nil {
return nil, nil, err
return nil, err
}
tid := tbl.Meta().ID
tableWithPartitions[tid] = &lockstats.TableInfo{
FullName: fmt.Sprintf("%s.%s", table.Schema.L, table.Name.L),
}
tidAndNames[tbl.Meta().ID] = fmt.Sprintf("%s.%s", table.Schema.L, table.Name.L)

pi := tbl.Meta().GetPartitionInfo()
if pi == nil {
continue
}
tableWithPartitions[tid].PartitionInfo = make(map[int64]string, len(pi.Definitions))

for _, p := range pi.Definitions {
pidAndNames[p.ID] = genFullPartitionName(table, p.Name.L)
tableWithPartitions[tid].PartitionInfo[p.ID] = genFullPartitionName(table, p.Name.L)
}
}

return tidAndNames, pidAndNames, nil
return tableWithPartitions, nil
}

func genFullPartitionName(table *ast.TableName, partitionName string) string {
Expand Down
17 changes: 6 additions & 11 deletions executor/lockstats/lock_stats_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,14 @@ func TestPopulateTableAndPartitionIDs(t *testing.T) {
},
}

gotTIDAndNames, gotPIDAndNames, err := populateTableAndPartitionIDs(tables, fakeInfo)
tableWithPartitions, err := populateTableAndPartitionIDs(tables, fakeInfo)
require.NoError(t, err)
require.Equal(t, map[int64]string{
1: "test.t1",
4: "test.t2",
}, gotTIDAndNames)
require.Equal(t, map[int64]string{
2: "test.t1 partition (p1)",
3: "test.t1 partition (p2)",
}, gotPIDAndNames)

require.Equal(t, 2, len(tableWithPartitions))
require.Equal(t, "test.t1", tableWithPartitions[1].FullName)
require.Equal(t, "test.t1 partition (p1)", tableWithPartitions[1].PartitionInfo[2])
require.Equal(t, "test.t2", tableWithPartitions[4].FullName)
// Empty table list.
_, _, err = populateTableAndPartitionIDs(nil, fakeInfo)
_, err = populateTableAndPartitionIDs(nil, fakeInfo)
require.Error(t, err)
}

Expand Down
4 changes: 2 additions & 2 deletions executor/lockstats/unlock_stats_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func (e *UnlockExec) Next(context.Context, *chunk.Chunk) error {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}
} else {
tidAndNames, pidAndNames, err := populateTableAndPartitionIDs(e.Tables, is)
tableWithPartitions, err := populateTableAndPartitionIDs(e.Tables, is)
if err != nil {
return err
}
msg, err := h.RemoveLockedTables(tidAndNames, pidAndNames)
msg, err := h.RemoveLockedTables(tableWithPartitions)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion statistics/handle/handletest/lockstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 18,
shard_count = 21,
deps = [
"//config",
"//domain",
"//kv",
"//parser/model",
"//statistics/handle",
"//testkit",
"//testkit/testsetup",
"@com_github_stretchr_testify//require",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package lockstats

import (
"fmt"
"strconv"
"testing"
"time"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -228,7 +230,7 @@ func TestUnlockOnePartitionOfLockedTableWouldFail(t *testing.T) {
require.Equal(t, 3, num)
}

func TestUnlockTheWholeTableWouldUnlockLockedPartitionsAndGenerateWarning(t *testing.T) {
func TestUnlockTheUnlockedTableWouldGenerateWarning(t *testing.T) {
_, dom, tk, tbl := setupTestEnvironmentWithPartitionedTableT(t)

handle := dom.StatsHandle()
Expand All @@ -255,10 +257,10 @@ func TestUnlockTheWholeTableWouldUnlockLockedPartitionsAndGenerateWarning(t *tes
"Warning 1105 skip unlocking unlocked table: test.t",
))

// Should unlock the locked partition.
// No partition is unlocked.
rows = tk.MustQuery(selectTableLockSQL).Rows()
num, _ = strconv.Atoi(rows[0][0].(string))
require.Equal(t, 0, num)
require.Equal(t, 1, num)
}

func TestSkipLockALotOfPartitions(t *testing.T) {
Expand Down Expand Up @@ -423,15 +425,15 @@ func TestExchangePartitionShouldChangeNothing(t *testing.T) {
func TestNewPartitionShouldBeLockedIfWholeTableLocked(t *testing.T) {
_, dom, tk, tbl := setupTestEnvironmentWithPartitionedTableT(t)

handle := dom.StatsHandle()
h := dom.StatsHandle()
// Get partition stats.
p0Id := tbl.GetPartitionInfo().Definitions[0].ID
partition0Stats := handle.GetPartitionStats(tbl, p0Id)
partition0Stats := h.GetPartitionStats(tbl, p0Id)
for _, col := range partition0Stats.Columns {
require.True(t, col.IsStatsInitialized())
}
p1Id := tbl.GetPartitionInfo().Definitions[1].ID
partition1Stats := handle.GetPartitionStats(tbl, p1Id)
partition1Stats := h.GetPartitionStats(tbl, p1Id)
for _, col := range partition1Stats.Columns {
require.True(t, col.IsStatsInitialized())
}
Expand All @@ -443,14 +445,66 @@ func TestNewPartitionShouldBeLockedIfWholeTableLocked(t *testing.T) {

// Add a new partition.
tk.MustExec("alter table t add partition (partition p2 values less than (30))")
tk.MustExec("insert into t(a, b) values(21,'a')")
tk.MustExec("insert into t(a, b) values(22,'b')")
// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(handle.DumpAll))
// Check the mysql.stats_table_locked is updated correctly.
// And the new partition is locked.
rows = tk.MustQuery("select count, modify_count, table_id from mysql.stats_table_locked order by table_id").Rows()
require.Len(t, rows, 4)
require.Equal(t, "0", rows[0][0])
require.Equal(t, "0", rows[0][1])
require.Equal(t, "0", rows[1][0])
require.Equal(t, "0", rows[1][1])
require.Equal(t, "0", rows[2][0])
require.Equal(t, "0", rows[2][1])
require.Equal(t, "2", rows[3][0])
require.Equal(t, "2", rows[3][1])

// Check the new partition is locked.
tk.MustExec("analyze table t partition p2")

// Check the new partition is locked.
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1105 skip analyze locked table: test.t partition (p2)",
))

// Unlock the whole table.
tk.MustExec("unlock stats t")
// Check the meta is updated correctly.
rows = tk.MustQuery(fmt.Sprint("select count, modify_count from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])
}

func TestUnlockSomePartitionsWouldUpdateGlobalCountCorrectly(t *testing.T) {
_, dom, tk, tbl := setupTestEnvironmentWithPartitionedTableT(t)

h := dom.StatsHandle()
tk.MustExec("lock stats t partition p0, p1")
tk.MustExec("insert into t(a, b) values(1,'a')")
tk.MustExec("insert into t(a, b) values(2,'b')")
tk.MustExec("analyze table test.t partition p0, p1")
tblStats := h.GetTableStats(tbl)
require.Equal(t, int64(0), tblStats.RealtimeCount)

// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(handle.DumpAll))
// Check the mysql.stats_table_locked is updated correctly.
rows := tk.MustQuery("select count, modify_count, table_id from mysql.stats_table_locked order by table_id").Rows()
require.Len(t, rows, 2)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])
require.Equal(t, "0", rows[1][0])
require.Equal(t, "0", rows[1][1])

// Unlock partition p0 and p1.
tk.MustExec("unlock stats t partition p0, p1")
// Check the global count is updated correctly.
rows = tk.MustQuery(fmt.Sprint("select count, modify_count, table_id from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])
}

func setupTestEnvironmentWithPartitionedTableT(t *testing.T) (kv.Storage, *domain.Domain, *testkit.TestKit, *model.TableInfo) {
Expand Down
78 changes: 78 additions & 0 deletions statistics/handle/handletest/lockstats/lock_table_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package lockstats

import (
"fmt"
"strconv"
"testing"
"time"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -266,6 +268,82 @@ func TestTruncateTableShouldCleanUpLockInfo(t *testing.T) {
require.Equal(t, 0, num)
}

func TestUnlockPartitionedTableWouldUpdateGlobalCountCorrectly(t *testing.T) {
_, dom, tk, tbl := setupTestEnvironmentWithPartitionedTableT(t)

h := dom.StatsHandle()
tk.MustExec("lock stats t")
tk.MustExec("insert into t(a, b) values(1,'a')")
tk.MustExec("insert into t(a, b) values(2,'b')")
tk.MustExec("analyze table test.t")
tblStats := h.GetTableStats(tbl)
require.Equal(t, int64(0), tblStats.RealtimeCount)

// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(handle.DumpAll))
// Check the mysql.stats_table_locked is updated correctly.
rows := tk.MustQuery("select count, modify_count, table_id from mysql.stats_table_locked order by table_id").Rows()
require.Len(t, rows, 3)
require.Equal(t, "0", rows[0][0])
require.Equal(t, "0", rows[0][1])
require.Equal(t, "2", rows[1][0])
require.Equal(t, "2", rows[1][1])
require.Equal(t, "0", rows[2][0])
require.Equal(t, "0", rows[2][1])
// Unlock partition p0 and p1 failed.
tk.MustExec("unlock stats t partition p0, p1")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1105 skip unlocking partitions of locked table: test.t",
))

// Unlock the table.
tk.MustExec("unlock stats t")
// Check the global count is updated correctly.
rows = tk.MustQuery(fmt.Sprint("select count, modify_count from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])
}

func TestDeltaInLockInfoCanBeNegative(t *testing.T) {
_, dom, tk, tbl := setupTestEnvironmentWithPartitionedTableT(t)

h := dom.StatsHandle()
tk.MustExec("insert into t(a, b) values(1,'a')")
tk.MustExec("insert into t(a, b) values(2,'b')")
// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(handle.DumpAll))
rows := tk.MustQuery(fmt.Sprint("select count, modify_count from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])

tk.MustExec("lock stats t")
// Delete some rows.
tk.MustExec("delete from t where a = 1")
tk.MustExec("delete from t where a = 2")

// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(handle.DumpAll))
// Check the mysql.stats_table_locked is updated correctly.
rows = tk.MustQuery("select count, modify_count, table_id from mysql.stats_table_locked order by table_id").Rows()
require.Len(t, rows, 3)
require.Equal(t, "0", rows[0][0])
require.Equal(t, "0", rows[0][1])
require.Equal(t, "-2", rows[1][0])
require.Equal(t, "2", rows[1][1])
require.Equal(t, "0", rows[2][0])
require.Equal(t, "0", rows[2][1])

// Unlock the table.
tk.MustExec("unlock stats t")
// Check the global count is updated correctly.
rows = tk.MustQuery(fmt.Sprint("select count, modify_count from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "0", rows[0][0])
require.Equal(t, "4", rows[0][1])
}

func setupTestEnvironmentWithTableT(t *testing.T) (kv.Storage, *domain.Domain, *testkit.TestKit, *model.TableInfo) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
14 changes: 6 additions & 8 deletions statistics/handle/lock_stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
)

// LockTables add locked tables id to store.
// - tidAndNames: table ids and names of which will be locked.
// - pidAndNames: partition ids and names of which will be locked.
// - tables: tables that will be locked.
// Return the message of skipped tables and error.
func (h *Handle) LockTables(tidAndNames map[int64]string, pidAndNames map[int64]string) (skipped string, err error) {
func (h *Handle) LockTables(tables map[int64]*lockstats.TableInfo) (skipped string, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
skipped, err = lockstats.AddLockedTables(sctx.(sqlexec.RestrictedSQLExecutor), tidAndNames, pidAndNames)
skipped, err = lockstats.AddLockedTables(sctx.(sqlexec.RestrictedSQLExecutor), tables)
return err
})
return
Expand All @@ -52,12 +51,11 @@ func (h *Handle) LockPartitions(
}

// RemoveLockedTables remove tables from table locked records.
// - tidAndNames: table ids and names of which will be unlocked.
// - pidAndNames: partition ids and names of which will be unlocked.
// - tables: tables of which will be unlocked.
// Return the message of skipped tables and error.
func (h *Handle) RemoveLockedTables(tidAndNames map[int64]string, pidAndNames map[int64]string) (skipped string, err error) {
func (h *Handle) RemoveLockedTables(tables map[int64]*lockstats.TableInfo) (skipped string, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
skipped, err = lockstats.RemoveLockedTables(sctx.(sqlexec.RestrictedSQLExecutor), tidAndNames, pidAndNames)
skipped, err = lockstats.RemoveLockedTables(sctx.(sqlexec.RestrictedSQLExecutor), tables)
return err
})
return
Expand Down
Loading

0 comments on commit 05b9786

Please sign in to comment.