Skip to content

Commit

Permalink
Merge branch 'master' into test-cop-priority
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Aug 25, 2022
2 parents 59f4211 + 87a6106 commit f25818c
Show file tree
Hide file tree
Showing 17 changed files with 356 additions and 74 deletions.
68 changes: 50 additions & 18 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type DBReplace struct {

type SchemasReplace struct {
DbMap map[OldID]*DBReplace
RewriteTS uint64
TableFilter filter.Filter
globalTableIdMap map[OldID]NewID
RewriteTS uint64 // used to rewrite commit ts in meta kv.
TableFilter filter.Filter // used to filter schema/table
genGenGlobalID func(ctx context.Context) (int64, error)
genGenGlobalIDs func(ctx context.Context, n int) ([]int64, error)
insertDeleteRangeForTable func(jobID int64, tableIDs []int64)
Expand Down Expand Up @@ -92,8 +93,19 @@ func NewSchemasReplace(
insertDeleteRangeForTable func(jobID int64, tableIDs []int64),
insertDeleteRangeForIndex func(jobID int64, elementID *int64, tableID int64, indexIDs []int64),
) *SchemasReplace {
globalTableIdMap := make(map[OldID]NewID)
for _, dr := range dbMap {
for tblID, tr := range dr.TableMap {
globalTableIdMap[tblID] = tr.NewTableID
for oldpID, newpID := range tr.PartitionMap {
globalTableIdMap[oldpID] = newpID
}
}
}

return &SchemasReplace{
DbMap: dbMap,
globalTableIdMap: globalTableIdMap,
RewriteTS: restoreTS,
TableFilter: tableFilter,
genGenGlobalID: genID,
Expand Down Expand Up @@ -199,6 +211,11 @@ func (sr *SchemasReplace) rewriteKeyForTable(
parseField func([]byte) (tableID int64, err error),
encodeField func(tableID int64) []byte,
) ([]byte, bool, error) {
var (
err error
newID int64
exist bool
)
rawMetaKey, err := ParseTxnMetaKeyFrom(key)
if err != nil {
return nil, false, errors.Trace(err)
Expand All @@ -216,7 +233,7 @@ func (sr *SchemasReplace) rewriteKeyForTable(

dbReplace, exist := sr.DbMap[dbID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -226,9 +243,13 @@ func (sr *SchemasReplace) rewriteKeyForTable(

tableReplace, exist := dbReplace.TableMap[tableID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tableID]
if !exist {
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tableID] = newID
}
tableReplace = NewTableReplace(nil, newID)
dbReplace.TableMap[tableID] = tableReplace
Expand All @@ -243,15 +264,20 @@ func (sr *SchemasReplace) rewriteKeyForTable(
}

func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bool, error) {
var tableInfo model.TableInfo
var (
tableInfo model.TableInfo
err error
newID int64
exist bool
)
if err := json.Unmarshal(value, &tableInfo); err != nil {
return nil, false, errors.Trace(err)
}

// update table ID
dbReplace, exist := sr.DbMap[dbID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -261,10 +287,15 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bo

tableReplace, exist := dbReplace.TableMap[tableInfo.ID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tableInfo.ID]
if !exist {
newID, err = sr.genGenGlobalID(context.TODO())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tableInfo.ID] = newID
}

tableReplace = NewTableReplace(&tableInfo, newID)
dbReplace.TableMap[tableInfo.ID] = tableReplace
} else {
Expand All @@ -287,12 +318,15 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bo
partitions := newTableInfo.GetPartitionInfo()
if partitions != nil {
for i, tbl := range partitions.Definitions {
newID, exist := tableReplace.PartitionMap[tbl.ID]
newID, exist = tableReplace.PartitionMap[tbl.ID]
if !exist {
var err error
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tbl.ID]
if !exist {
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tbl.ID] = newID
}
tableReplace.PartitionMap[tbl.ID] = newID
}
Expand Down Expand Up @@ -470,8 +504,6 @@ func (sr *SchemasReplace) tryToGCJob(job *model.Job) error {
return err
}
}
case model.ActionExchangeTablePartition:
return errors.Errorf("restore of ddl `exchange-table-partition` is not supported")
}
}
return nil
Expand Down
110 changes: 101 additions & 9 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@ func mockGenGenGlobalID(ctx context.Context) (int64, error) {
return increaseID, nil
}

func ProduceValue(tableName string, dbID int64) ([]byte, error) {
tableInfo := model.TableInfo{
ID: dbID,
Name: model.NewCIStr(tableName),
}

return json.Marshal(tableInfo)
}

func MockEmptySchemasReplace(midr *mockInsertDeleteRange) *SchemasReplace {
dbMap := make(map[OldID]*DBReplace)
if midr == nil {
Expand Down Expand Up @@ -211,6 +202,107 @@ func TestRewriteValueForPartitionTable(t *testing.T) {
require.Equal(t, tableInfo.Partition.Definitions[1].ID, newID2)
}

func TestRewriteValueForExchangePartition(t *testing.T) {
var (
dbID1 int64 = 100
tableID1 int64 = 101
pt1ID int64 = 102
pt2ID int64 = 103
tableName1 = "t1"
pt1Name = "pt1"
pt2Name = "pt2"

dbID2 int64 = 105
tableID2 int64 = 106
tableName2 = "t2"
tableInfo model.TableInfo
)

// construct the partition table t1
pt1 := model.PartitionDefinition{
ID: pt1ID,
Name: model.NewCIStr(pt1Name),
}
pt2 := model.PartitionDefinition{
ID: pt2ID,
Name: model.NewCIStr(pt2Name),
}

pi := model.PartitionInfo{
Enable: true,
Definitions: make([]model.PartitionDefinition, 0),
}
pi.Definitions = append(pi.Definitions, pt1, pt2)
t1 := model.TableInfo{
ID: tableID1,
Name: model.NewCIStr(tableName1),
Partition: &pi,
}
db1 := model.DBInfo{
ID: dbID1,
}

// construct the no partition table t2
t2 := model.TableInfo{
ID: tableID2,
Name: model.NewCIStr(tableName2),
}
db2 := model.DBInfo{
ID: dbID2,
}

// construct the SchemaReplace
dbMap := make(map[OldID]*DBReplace)
dbMap[dbID1] = NewDBReplace(&db1, dbID1+100)
dbMap[dbID1].TableMap[tableID1] = NewTableReplace(&t1, tableID1+100)
dbMap[dbID1].TableMap[tableID1].PartitionMap[pt1ID] = pt1ID + 100
dbMap[dbID1].TableMap[tableID1].PartitionMap[pt2ID] = pt2ID + 100

dbMap[dbID2] = NewDBReplace(&db2, dbID2+100)
dbMap[dbID2].TableMap[tableID2] = NewTableReplace(&t2, tableID2+100)

sr := NewSchemasReplace(
dbMap,
0,
filter.All(),
mockGenGenGlobalID,
nil,
nil,
nil,
)
require.Equal(t, len(sr.globalTableIdMap), 4)

//exchange parition, t1 parition0 with the t2
t1Copy := t1.Clone()
t1Copy.Partition = t1.Partition.Clone()
t2Copy := t2.Clone()

t1Copy.Partition.Definitions[0].ID = tableID2
t2Copy.ID = pt1ID

// rewrite partition table
value, err := json.Marshal(&t1Copy)
require.Nil(t, err)
value, needRewrite, err := sr.rewriteTableInfo(value, dbID1)
require.Nil(t, err)
require.True(t, needRewrite)
err = json.Unmarshal(value, &tableInfo)
require.Nil(t, err)
require.Equal(t, tableInfo.ID, tableID1+100)
require.Equal(t, tableInfo.Partition.Definitions[0].ID, tableID2+100)
require.Equal(t, tableInfo.Partition.Definitions[1].ID, pt2ID+100)

// rewrite no partition table
value, err = json.Marshal(&t2Copy)
require.Nil(t, err)
value, needRewrite, err = sr.rewriteTableInfo(value, dbID2)
require.Nil(t, err)
require.True(t, needRewrite)
err = json.Unmarshal(value, &tableInfo)
require.Nil(t, err)
require.Equal(t, tableInfo.ID, pt1ID+100)
}

// db:70->80 -
// | - t0:71->81 -
// | | - p0:72->82
Expand Down
8 changes: 5 additions & 3 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ import (
)

func TestInfo(t *testing.T) {
t.Skip("TestInfo will hang currently, it should be fixed later")

if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}

integration.BeforeTest(t)
integration.BeforeTestExternal(t)

if !unixSocketAvailable() {
t.Skip("ETCD use ip:port as unix socket address, skip when it is unavailable.")
Expand Down Expand Up @@ -110,9 +112,9 @@ func TestInfo(t *testing.T) {
require.Equalf(t, info.ID, infos[ddlID].ID, "server one info %v, info %v", infos[ddlID], info)

// Test the scene where syncer.Done() gets the information.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/util/ErrorMockSessionDone", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/syncer/ErrorMockSessionDone", `return(true)`))
<-dom.ddl.SchemaSyncer().Done()
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/util/ErrorMockSessionDone"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/syncer/ErrorMockSessionDone"))
time.Sleep(15 * time.Millisecond)
syncerStarted := false
for i := 0; i < 1000; i++ {
Expand Down
2 changes: 1 addition & 1 deletion domain/globalconfigsync/globalconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestStoreGlobalConfig(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTest(t)
integration.BeforeTestExternal(t)

store, err := mockstore.NewMockStore()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestTopology(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTest(t)
integration.BeforeTestExternal(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {

// Close implements the Executor Close interface.
func (e *InsertExec) Close() error {
defer e.memTracker.ReplaceBytesUsed(0)
e.ctx.GetSessionVars().CurrInsertValues = chunk.Row{}
e.ctx.GetSessionVars().CurrInsertBatchExtraCols = e.ctx.GetSessionVars().CurrInsertBatchExtraCols[0:0:0]
e.setMessage()
Expand Down
10 changes: 5 additions & 5 deletions executor/memtest/mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@ package memtest
import (
"testing"

"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestGlobalMemoryTrackerOnCleanUp(t *testing.T) {
originConsume := executor.GlobalMemoryUsageTracker.BytesConsumed()
func TestInsertUpdateTrackerOnCleanUp(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")

originConsume := tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
// assert insert
tk.MustExec("insert t (id) values (1)")
tk.MustExec("insert t (id) values (2)")
tk.MustExec("insert t (id) values (3)")
afterConsume := executor.GlobalMemoryUsageTracker.BytesConsumed()
afterConsume := tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
require.Equal(t, afterConsume, originConsume)

originConsume = tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
// assert update
tk.MustExec("update t set id = 4 where id = 1")
tk.MustExec("update t set id = 5 where id = 2")
tk.MustExec("update t set id = 6 where id = 3")
afterConsume = executor.GlobalMemoryUsageTracker.BytesConsumed()
afterConsume = tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
require.Equal(t, afterConsume, originConsume)
}
2 changes: 1 addition & 1 deletion executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,14 +427,14 @@ func (e *UpdateExec) composeGeneratedColumns(rowIdx int, newRowData []types.Datu

// Close implements the Executor Close interface.
func (e *UpdateExec) Close() error {
defer e.memTracker.ReplaceBytesUsed(0)
e.setMessage()
if e.runtimeStats != nil && e.stats != nil {
txn, err := e.ctx.Txn(false)
if err == nil && txn.Valid() && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, nil)
}
}
defer e.memTracker.ReplaceBytesUsed(0)
return e.children[0].Close()
}

Expand Down
4 changes: 2 additions & 2 deletions owner/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSingle(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTest(t)
integration.BeforeTestExternal(t)

store, err := mockstore.NewMockStore()
require.NoError(t, err)
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestCluster(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTest(t)
integration.BeforeTestExternal(t)

originalTTL := owner.ManagerSessionTTL
owner.ManagerSessionTTL = 3
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ var (
sessionExecuteParseDurationInternal = metrics.SessionExecuteParseDuration.WithLabelValues(metrics.LblInternal)
sessionExecuteParseDurationGeneral = metrics.SessionExecuteParseDuration.WithLabelValues(metrics.LblGeneral)

telemetryCTEUsageRecurCTE = metrics.TelemetrySQLCTECnt.WithLabelValues("recursive_cte")
telemetryCTEUsageRecurCTE = metrics.TelemetrySQLCTECnt.WithLabelValues("recurCTE")
telemetryCTEUsageNonRecurCTE = metrics.TelemetrySQLCTECnt.WithLabelValues("nonRecurCTE")
telemetryCTEUsageNotCTE = metrics.TelemetrySQLCTECnt.WithLabelValues("notCTE")
telemetryMultiSchemaChangeUsage = metrics.TelemetryMultiSchemaChangeCnt
Expand Down
Loading

0 comments on commit f25818c

Please sign in to comment.