Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: Update global count and modify_count only if partition is not locked #47319

Merged
merged 8 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/lockstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//infoschema",
"//parser/ast",
"//parser/model",
"//statistics/handle/lockstats",
"//table/tables",
"//util/chunk",
"@com_github_pingcap_errors//:errors",
Expand Down
28 changes: 16 additions & 12 deletions executor/lockstats/lock_stats_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle/lockstats"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/chunk"
)
Expand Down Expand Up @@ -67,12 +68,12 @@ func (e *LockExec) Next(_ context.Context, _ *chunk.Chunk) error {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}
} else {
tidAndNames, pidAndNames, err := populateTableAndPartitionIDs(e.Tables, is)
tableWithPartitions, err := populateTableAndPartitionIDs(e.Tables, is)
if err != nil {
return err
}

msg, err := h.LockTables(tidAndNames, pidAndNames)
msg, err := h.LockTables(tableWithPartitions)
if err != nil {
return err
}
Expand Down Expand Up @@ -130,35 +131,38 @@ func populatePartitionIDAndNames(
return tbl.Meta().ID, pidNames, nil
}

// populateTableAndPartitionIDs returns table IDs and partition IDs for the given table names.
// populateTableAndPartitionIDs returns the lockstats.TableInfo for the given table names.
func populateTableAndPartitionIDs(
tables []*ast.TableName,
is infoschema.InfoSchema,
) (map[int64]string, map[int64]string, error) {
) (map[int64]*lockstats.TableInfo, error) {
if len(tables) == 0 {
return nil, nil, errors.New("table list should not be empty")
return nil, errors.New("table list should not be empty")
}

tidAndNames := make(map[int64]string, len(tables))
pidAndNames := make(map[int64]string, len(tables))
tableWithPartitions := make(map[int64]*lockstats.TableInfo, len(tables))

for _, table := range tables {
tbl, err := is.TableByName(table.Schema, table.Name)
if err != nil {
return nil, nil, err
return nil, err
}
tid := tbl.Meta().ID
tableWithPartitions[tid] = &lockstats.TableInfo{
FullName: fmt.Sprintf("%s.%s", table.Schema.L, table.Name.L),
}
tidAndNames[tbl.Meta().ID] = fmt.Sprintf("%s.%s", table.Schema.L, table.Name.L)

pi := tbl.Meta().GetPartitionInfo()
if pi == nil {
continue
}
tableWithPartitions[tid].PartitionInfo = make(map[int64]string, len(pi.Definitions))

for _, p := range pi.Definitions {
pidAndNames[p.ID] = genFullPartitionName(table, p.Name.L)
tableWithPartitions[tid].PartitionInfo[p.ID] = genFullPartitionName(table, p.Name.L)
}
}

return tidAndNames, pidAndNames, nil
return tableWithPartitions, nil
}

func genFullPartitionName(table *ast.TableName, partitionName string) string {
Expand Down
17 changes: 6 additions & 11 deletions executor/lockstats/lock_stats_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,14 @@ func TestPopulateTableAndPartitionIDs(t *testing.T) {
},
}

gotTIDAndNames, gotPIDAndNames, err := populateTableAndPartitionIDs(tables, fakeInfo)
tableWithPartitions, err := populateTableAndPartitionIDs(tables, fakeInfo)
require.NoError(t, err)
require.Equal(t, map[int64]string{
1: "test.t1",
4: "test.t2",
}, gotTIDAndNames)
require.Equal(t, map[int64]string{
2: "test.t1 partition (p1)",
3: "test.t1 partition (p2)",
}, gotPIDAndNames)

require.Equal(t, 2, len(tableWithPartitions))
require.Equal(t, "test.t1", tableWithPartitions[1].FullName)
require.Equal(t, "test.t1 partition (p1)", tableWithPartitions[1].PartitionInfo[2])
require.Equal(t, "test.t2", tableWithPartitions[4].FullName)
// Empty table list.
_, _, err = populateTableAndPartitionIDs(nil, fakeInfo)
_, err = populateTableAndPartitionIDs(nil, fakeInfo)
require.Error(t, err)
}

Expand Down
4 changes: 2 additions & 2 deletions executor/lockstats/unlock_stats_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func (e *UnlockExec) Next(context.Context, *chunk.Chunk) error {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}
} else {
tidAndNames, pidAndNames, err := populateTableAndPartitionIDs(e.Tables, is)
tableWithPartitions, err := populateTableAndPartitionIDs(e.Tables, is)
if err != nil {
return err
}
msg, err := h.RemoveLockedTables(tidAndNames, pidAndNames)
msg, err := h.RemoveLockedTables(tableWithPartitions)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion statistics/handle/handletest/lockstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 18,
shard_count = 21,
deps = [
"//config",
"//domain",
"//kv",
"//parser/model",
"//statistics/handle",
"//testkit",
"//testkit/testsetup",
"@com_github_stretchr_testify//require",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package lockstats

import (
"fmt"
"strconv"
"testing"
"time"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -228,7 +230,7 @@ func TestUnlockOnePartitionOfLockedTableWouldFail(t *testing.T) {
require.Equal(t, 3, num)
}

func TestUnlockTheWholeTableWouldUnlockLockedPartitionsAndGenerateWarning(t *testing.T) {
func TestUnlockTheUnlockedTableWouldGenerateWarning(t *testing.T) {
_, dom, tk, tbl := setupTestEnvironmentWithPartitionedTableT(t)

handle := dom.StatsHandle()
Expand All @@ -255,10 +257,10 @@ func TestUnlockTheWholeTableWouldUnlockLockedPartitionsAndGenerateWarning(t *tes
"Warning 1105 skip unlocking unlocked table: test.t",
))

// Should unlock the locked partition.
// No partition is unlocked.
rows = tk.MustQuery(selectTableLockSQL).Rows()
num, _ = strconv.Atoi(rows[0][0].(string))
require.Equal(t, 0, num)
require.Equal(t, 1, num)
}

func TestSkipLockALotOfPartitions(t *testing.T) {
Expand Down Expand Up @@ -423,15 +425,15 @@ func TestExchangePartitionShouldChangeNothing(t *testing.T) {
func TestNewPartitionShouldBeLockedIfWholeTableLocked(t *testing.T) {
_, dom, tk, tbl := setupTestEnvironmentWithPartitionedTableT(t)

handle := dom.StatsHandle()
h := dom.StatsHandle()
// Get partition stats.
p0Id := tbl.GetPartitionInfo().Definitions[0].ID
partition0Stats := handle.GetPartitionStats(tbl, p0Id)
partition0Stats := h.GetPartitionStats(tbl, p0Id)
for _, col := range partition0Stats.Columns {
require.True(t, col.IsStatsInitialized())
}
p1Id := tbl.GetPartitionInfo().Definitions[1].ID
partition1Stats := handle.GetPartitionStats(tbl, p1Id)
partition1Stats := h.GetPartitionStats(tbl, p1Id)
for _, col := range partition1Stats.Columns {
require.True(t, col.IsStatsInitialized())
}
Expand All @@ -443,14 +445,66 @@ func TestNewPartitionShouldBeLockedIfWholeTableLocked(t *testing.T) {

// Add a new partition.
tk.MustExec("alter table t add partition (partition p2 values less than (30))")
tk.MustExec("insert into t(a, b) values(21,'a')")
tk.MustExec("insert into t(a, b) values(22,'b')")
// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(handle.DumpAll))
// Check the mysql.stats_table_locked is updated correctly.
// And the new partition is locked.
rows = tk.MustQuery("select count, modify_count, table_id from mysql.stats_table_locked order by table_id").Rows()
require.Len(t, rows, 4)
require.Equal(t, "0", rows[0][0])
require.Equal(t, "0", rows[0][1])
require.Equal(t, "0", rows[1][0])
require.Equal(t, "0", rows[1][1])
require.Equal(t, "0", rows[2][0])
require.Equal(t, "0", rows[2][1])
require.Equal(t, "2", rows[3][0])
require.Equal(t, "2", rows[3][1])

// Check the new partition is locked.
tk.MustExec("analyze table t partition p2")

// Check the new partition is locked.
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1105 skip analyze locked table: test.t partition (p2)",
))

// Unlock the whole table.
tk.MustExec("unlock stats t")
// Check the meta is updated correctly.
rows = tk.MustQuery(fmt.Sprint("select count, modify_count from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])
}

func TestUnlockSomePartitionsWouldUpdateGlobalCountCorrectly(t *testing.T) {
_, dom, tk, tbl := setupTestEnvironmentWithPartitionedTableT(t)

h := dom.StatsHandle()
tk.MustExec("lock stats t partition p0, p1")
tk.MustExec("insert into t(a, b) values(1,'a')")
tk.MustExec("insert into t(a, b) values(2,'b')")
tk.MustExec("analyze table test.t partition p0, p1")
tblStats := h.GetTableStats(tbl)
require.Equal(t, int64(0), tblStats.RealtimeCount)

// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(handle.DumpAll))
// Check the mysql.stats_table_locked is updated correctly.
rows := tk.MustQuery("select count, modify_count, table_id from mysql.stats_table_locked order by table_id").Rows()
require.Len(t, rows, 2)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])
require.Equal(t, "0", rows[1][0])
require.Equal(t, "0", rows[1][1])

// Unlock partition p0 and p1.
tk.MustExec("unlock stats t partition p0, p1")
// Check the global count is updated correctly.
rows = tk.MustQuery(fmt.Sprint("select count, modify_count, table_id from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])
}

func setupTestEnvironmentWithPartitionedTableT(t *testing.T) (kv.Storage, *domain.Domain, *testkit.TestKit, *model.TableInfo) {
Expand Down
78 changes: 78 additions & 0 deletions statistics/handle/handletest/lockstats/lock_table_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package lockstats

import (
"fmt"
"strconv"
"testing"
"time"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -266,6 +268,82 @@ func TestTruncateTableShouldCleanUpLockInfo(t *testing.T) {
require.Equal(t, 0, num)
}

func TestUnlockPartitionedTableWouldUpdateGlobalCountCorrectly(t *testing.T) {
_, dom, tk, tbl := setupTestEnvironmentWithPartitionedTableT(t)

h := dom.StatsHandle()
tk.MustExec("lock stats t")
tk.MustExec("insert into t(a, b) values(1,'a')")
tk.MustExec("insert into t(a, b) values(2,'b')")
tk.MustExec("analyze table test.t")
tblStats := h.GetTableStats(tbl)
require.Equal(t, int64(0), tblStats.RealtimeCount)

// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(handle.DumpAll))
// Check the mysql.stats_table_locked is updated correctly.
rows := tk.MustQuery("select count, modify_count, table_id from mysql.stats_table_locked order by table_id").Rows()
require.Len(t, rows, 3)
require.Equal(t, "0", rows[0][0])
require.Equal(t, "0", rows[0][1])
require.Equal(t, "2", rows[1][0])
require.Equal(t, "2", rows[1][1])
require.Equal(t, "0", rows[2][0])
require.Equal(t, "0", rows[2][1])
// Unlock partition p0 and p1 failed.
tk.MustExec("unlock stats t partition p0, p1")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1105 skip unlocking partitions of locked table: test.t",
))

// Unlock the table.
tk.MustExec("unlock stats t")
// Check the global count is updated correctly.
rows = tk.MustQuery(fmt.Sprint("select count, modify_count from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])
}

func TestDeltaInLockInfoCanBeNegative(t *testing.T) {
_, dom, tk, tbl := setupTestEnvironmentWithPartitionedTableT(t)

h := dom.StatsHandle()
tk.MustExec("insert into t(a, b) values(1,'a')")
tk.MustExec("insert into t(a, b) values(2,'b')")
// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(handle.DumpAll))
rows := tk.MustQuery(fmt.Sprint("select count, modify_count from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])

tk.MustExec("lock stats t")
// Delete some rows.
tk.MustExec("delete from t where a = 1")
tk.MustExec("delete from t where a = 2")

// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(handle.DumpAll))
// Check the mysql.stats_table_locked is updated correctly.
rows = tk.MustQuery("select count, modify_count, table_id from mysql.stats_table_locked order by table_id").Rows()
require.Len(t, rows, 3)
require.Equal(t, "0", rows[0][0])
require.Equal(t, "0", rows[0][1])
require.Equal(t, "-2", rows[1][0])
require.Equal(t, "2", rows[1][1])
require.Equal(t, "0", rows[2][0])
require.Equal(t, "0", rows[2][1])

// Unlock the table.
tk.MustExec("unlock stats t")
// Check the global count is updated correctly.
rows = tk.MustQuery(fmt.Sprint("select count, modify_count from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "0", rows[0][0])
require.Equal(t, "4", rows[0][1])
}

func setupTestEnvironmentWithTableT(t *testing.T) (kv.Storage, *domain.Domain, *testkit.TestKit, *model.TableInfo) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
14 changes: 6 additions & 8 deletions statistics/handle/lock_stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
)

// LockTables add locked tables id to store.
// - tidAndNames: table ids and names of which will be locked.
// - pidAndNames: partition ids and names of which will be locked.
// - tables: tables that will be locked.
// Return the message of skipped tables and error.
func (h *Handle) LockTables(tidAndNames map[int64]string, pidAndNames map[int64]string) (skipped string, err error) {
func (h *Handle) LockTables(tables map[int64]*lockstats.TableInfo) (skipped string, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
skipped, err = lockstats.AddLockedTables(sctx.(sqlexec.RestrictedSQLExecutor), tidAndNames, pidAndNames)
skipped, err = lockstats.AddLockedTables(sctx.(sqlexec.RestrictedSQLExecutor), tables)
return err
})
return
Expand All @@ -52,12 +51,11 @@ func (h *Handle) LockPartitions(
}

// RemoveLockedTables remove tables from table locked records.
// - tidAndNames: table ids and names of which will be unlocked.
// - pidAndNames: partition ids and names of which will be unlocked.
// - tables: tables of which will be unlocked.
// Return the message of skipped tables and error.
func (h *Handle) RemoveLockedTables(tidAndNames map[int64]string, pidAndNames map[int64]string) (skipped string, err error) {
func (h *Handle) RemoveLockedTables(tables map[int64]*lockstats.TableInfo) (skipped string, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
skipped, err = lockstats.RemoveLockedTables(sctx.(sqlexec.RestrictedSQLExecutor), tidAndNames, pidAndNames)
skipped, err = lockstats.RemoveLockedTables(sctx.(sqlexec.RestrictedSQLExecutor), tables)
return err
})
return
Expand Down
Loading