Skip to content

Commit

Permalink
ddl: improve FLASHBACK DATABASE for many table case (#54439) (#54602)
Browse files Browse the repository at this point in the history
close #54415
  • Loading branch information
ti-chi-bot authored Jul 15, 2024
1 parent d6d27e7 commit 37bf4e9
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 67 deletions.
6 changes: 2 additions & 4 deletions pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,15 +704,13 @@ func TestSnapshotVersion(t *testing.T) {
tbl, err := currSnapIs.TableByName(model.NewCIStr("test2"), model.NewCIStr("t"))
require.NoError(t, err)

m, err := dom.GetSnapshotMeta(snapTS)
require.NoError(t, err)
m := dom.GetSnapshotMeta(snapTS)

tblInfo1, err := m.GetTable(dbInfo.ID, tbl.Meta().ID)
require.True(t, meta.ErrDBNotExists.Equal(err))
require.Nil(t, tblInfo1)

m, err = dom.GetSnapshotMeta(currSnapTS)
require.NoError(t, err)
m = dom.GetSnapshotMeta(currSnapTS)

tblInfo2, err := m.GetTable(dbInfo.ID, tbl.Meta().ID)
require.NoError(t, err)
Expand Down
10 changes: 7 additions & 3 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,9 +1299,13 @@ type RecoverInfo struct {
type RecoverSchemaInfo struct {
*model.DBInfo
RecoverTabsInfo []*RecoverInfo
DropJobID int64
SnapshotTS uint64
OldSchemaName model.CIStr
// LoadTablesOnExecute is the new logic to avoid a large RecoverTabsInfo can't be
// persisted. If it's true, DDL owner will recover RecoverTabsInfo instead of the
// job submit node.
LoadTablesOnExecute bool
DropJobID int64
SnapshotTS uint64
OldSchemaName model.CIStr
}

// delayForAsyncCommit sleeps `SafeWindow + AllowedClockDrift` before a DDL job finishes.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
err = finishRecoverSchema(w, job)
case model.ActionCreateTables:
if job.IsCancelled() {
// it may be too large that it can not be added to the history queue, too
// it may be too large that it can not be added to the history queue, so
// delete its arguments
job.Args = nil
}
Expand Down
51 changes: 40 additions & 11 deletions pkg/ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/label"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
)
Expand Down Expand Up @@ -279,14 +280,6 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i
} else {
job.Args[checkFlagIndexInJobArgs] = recoverCheckFlagDisableGC
}
// Clear all placement when recover
for _, recoverTabInfo := range recoverSchemaInfo.RecoverTabsInfo {
err = clearTablePlacementAndBundles(recoverTabInfo.TableInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
}
schemaInfo.State = model.StateWriteOnly
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
Expand All @@ -299,6 +292,36 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i
return ver, errors.Errorf("disable gc failed, try again later. err: %v", err)
}
}

recoverTbls := recoverSchemaInfo.RecoverTabsInfo
if recoverSchemaInfo.LoadTablesOnExecute {
sid := recoverSchemaInfo.DBInfo.ID
snap := w.store.GetSnapshot(kv.NewVersion(recoverSchemaInfo.SnapshotTS))
snapMeta := meta.NewSnapshotMeta(snap)
tables, err2 := snapMeta.ListTables(sid)
if err2 != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err2)
}
recoverTbls = make([]*RecoverInfo, 0, len(tables))
for _, tblInfo := range tables {
autoIDs, err3 := snapMeta.GetAutoIDAccessors(sid, tblInfo.ID).Get()
if err3 != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err3)
}
recoverTbls = append(recoverTbls, &RecoverInfo{
SchemaID: sid,
TableInfo: tblInfo,
DropJobID: recoverSchemaInfo.DropJobID,
SnapshotTS: recoverSchemaInfo.SnapshotTS,
AutoIDs: autoIDs,
OldSchemaName: recoverSchemaInfo.OldSchemaName.L,
OldTableName: tblInfo.Name.L,
})
}
}

dbInfo := schemaInfo.Clone()
dbInfo.State = model.StatePublic
err = t.CreateDatabase(dbInfo)
Expand All @@ -311,7 +334,8 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
for _, recoverInfo := range recoverSchemaInfo.RecoverTabsInfo {

for _, recoverInfo := range recoverTbls {
if recoverInfo.TableInfo.TTLInfo != nil {
// force disable TTL job schedule for recovered table
recoverInfo.TableInfo.TTLInfo.Enable = false
Expand All @@ -322,13 +346,18 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i
}
}
schemaInfo.State = model.StatePublic
for _, recoverInfo := range recoverSchemaInfo.RecoverTabsInfo {
diffInfos := make([]schemaIDAndTableInfo, 0, len(recoverTbls))
for _, recoverInfo := range recoverTbls {
recoverInfo.TableInfo.State = model.StatePublic
recoverInfo.TableInfo.UpdateTS = t.StartTS
diffInfos = append(diffInfos, schemaIDAndTableInfo{
schemaID: schemaInfo.ID,
tblInfo: recoverInfo.TableInfo,
})
}
// use to update InfoSchema
job.SchemaID = schemaInfo.ID
ver, err = updateSchemaVersion(d, t, job)
ver, err = updateSchemaVersion(d, t, job, diffInfos...)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,6 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
job.Args[checkFlagIndexInJobArgs] = recoverCheckFlagDisableGC
}

// Clear all placement when recover
err = clearTablePlacementAndBundles(tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

job.SchemaState = model.StateWriteOnly
tblInfo.State = model.StateWriteOnly
case model.StateWriteOnly:
Expand Down Expand Up @@ -535,6 +528,11 @@ func (w *worker) recoverTable(t *meta.Meta, job *model.Job, recoverInfo *Recover
if err != nil {
return ver, errors.Trace(err)
}
err = clearTablePlacementAndBundles(w.ctx, recoverInfo.TableInfo)
if err != nil {
return ver, errors.Trace(err)
}

tableInfo := recoverInfo.TableInfo.Clone()
tableInfo.State = model.StatePublic
tableInfo.UpdateTS = t.StartTS
Expand All @@ -561,7 +559,10 @@ func (w *worker) recoverTable(t *meta.Meta, job *model.Job, recoverInfo *Recover
return ver, nil
}

func clearTablePlacementAndBundles(tblInfo *model.TableInfo) error {
func clearTablePlacementAndBundles(ctx context.Context, tblInfo *model.TableInfo) error {
failpoint.Inject("mockClearTablePlacementAndBundlesErr", func() {
failpoint.Return(errors.New("mock error for clearTablePlacementAndBundles"))
})
var bundles []*placement.Bundle
if tblInfo.PlacementPolicyRef != nil {
tblInfo.PlacementPolicyRef = nil
Expand All @@ -582,7 +583,7 @@ func clearTablePlacementAndBundles(tblInfo *model.TableInfo) error {
return nil
}

return infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
return infosync.PutRuleBundlesWithDefaultRetry(ctx, bundles)
}

// mockRecoverTableCommitErrOnce uses to make sure
Expand Down
4 changes: 2 additions & 2 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,9 @@ func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchem
}

// GetSnapshotMeta gets a new snapshot meta at startTS.
func (do *Domain) GetSnapshotMeta(startTS uint64) (*meta.Meta, error) {
func (do *Domain) GetSnapshotMeta(startTS uint64) *meta.Meta {
snapshot := do.store.GetSnapshot(kv.NewVersion(startTS))
return meta.NewSnapshotMeta(snapshot), nil
return meta.NewSnapshotMeta(snapshot)
}

// ExpiredTimeStamp4PC gets expiredTimeStamp4PC from domain.
Expand Down
46 changes: 10 additions & 36 deletions pkg/executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,7 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
return infoschema.ErrTableExists.GenWithStack("Table '%-.192s' already been recover to '%-.192s', can't be recover repeatedly", s.Table.Name.O, tbl.Meta().Name.O)
}

m, err := domain.GetDomain(e.Ctx()).GetSnapshotMeta(job.StartTS)
if err != nil {
return err
}
m := domain.GetDomain(e.Ctx()).GetSnapshotMeta(job.StartTS)
autoIDs, err := m.GetAutoIDAccessors(job.SchemaID, job.TableID).Get()
if err != nil {
return err
Expand Down Expand Up @@ -475,10 +472,7 @@ func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, dom *domain.Do
// it will use the `start_ts` of DDL job as snapshot to get the dropped/truncated table information.
func GetDropOrTruncateTableInfoFromJobs(jobs []*model.Job, gcSafePoint uint64, dom *domain.Domain, fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) {
getTable := func(startTS uint64, schemaID int64, tableID int64) (*model.TableInfo, error) {
snapMeta, err := dom.GetSnapshotMeta(startTS)
if err != nil {
return nil, err
}
snapMeta := dom.GetSnapshotMeta(startTS)
tbl, err := snapMeta.GetTable(schemaID, tableID)
return tbl, err
}
Expand Down Expand Up @@ -569,10 +563,7 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
return infoschema.ErrTableExists.GenWithStack("Table '%-.192s' already been flashback to '%-.192s', can't be flashback repeatedly", s.Table.Name.O, tbl.Meta().Name.O)
}

m, err := domain.GetDomain(e.Ctx()).GetSnapshotMeta(job.StartTS)
if err != nil {
return err
}
m := domain.GetDomain(e.Ctx()).GetSnapshotMeta(job.StartTS)
autoIDs, err := m.GetAutoIDAccessors(job.SchemaID, job.TableID).Get()
if err != nil {
return err
Expand Down Expand Up @@ -639,10 +630,7 @@ func (e *DDLExec) getRecoverDBByName(schemaName model.CIStr) (recoverSchemaInfo
if job.Type != model.ActionDropSchema {
continue
}
snapMeta, err := dom.GetSnapshotMeta(job.StartTS)
if err != nil {
return false, err
}
snapMeta := dom.GetSnapshotMeta(job.StartTS)
schemaInfo, err := snapMeta.GetDatabase(job.SchemaID)
if err != nil {
return false, err
Expand All @@ -656,27 +644,13 @@ func (e *DDLExec) getRecoverDBByName(schemaName model.CIStr) (recoverSchemaInfo
if schemaInfo.Name.L != schemaName.L {
continue
}
tables, err := snapMeta.ListTables(job.SchemaID)
if err != nil {
return false, err
}
recoverTabsInfo := make([]*ddl.RecoverInfo, 0)
for _, tblInfo := range tables {
autoIDs, err := snapMeta.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Get()
if err != nil {
return false, err
}
recoverTabsInfo = append(recoverTabsInfo, &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
DropJobID: job.ID,
SnapshotTS: job.StartTS,
AutoIDs: autoIDs,
OldSchemaName: schemaName.L,
OldTableName: tblInfo.Name.L,
})
recoverSchemaInfo = &ddl.RecoverSchemaInfo{
DBInfo: schemaInfo,
LoadTablesOnExecute: true,
DropJobID: job.ID,
SnapshotTS: job.StartTS,
OldSchemaName: schemaName,
}
recoverSchemaInfo = &ddl.RecoverSchemaInfo{DBInfo: schemaInfo, RecoverTabsInfo: recoverTabsInfo, DropJobID: job.ID, SnapshotTS: job.StartTS, OldSchemaName: schemaName}
return true, nil
}
return false, nil
Expand Down
11 changes: 10 additions & 1 deletion pkg/executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ func TestFlashbackSchema(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_ddl_error_count_limit = 2")
tk.MustExec("create database if not exists test_flashback")
tk.MustExec("use test_flashback")
tk.MustExec("drop table if exists t_flashback")
Expand All @@ -574,9 +575,17 @@ func TestFlashbackSchema(t *testing.T) {
tk.MustExec("insert into t_flashback values (1),(2),(3)")
tk.MustExec("drop database test_flashback")

// test PD connection issue causes failure after tidb_ddl_error_count_limit
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockClearTablePlacementAndBundlesErr", `return()`))
// TODO(lance6716): fix it later
//tk.MustGetErrMsg("flashback database test_flashback", "[ddl:-1]DDL job rollback, error msg: mock error for clearTablePlacementAndBundles")
tk.MustExecToErr("flashback database test_flashback")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockClearTablePlacementAndBundlesErr", `1*return()`))
tk.MustExec("flashback database test_flashback")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockClearTablePlacementAndBundlesErr"))

// Test flashback database with db_not_exists name.
tk.MustGetErrMsg("flashback database db_not_exists", "Can't find dropped database: db_not_exists in DDL history jobs")
tk.MustExec("flashback database test_flashback")
tk.MustGetErrMsg("flashback database test_flashback to test_flashback2", infoschema.ErrDatabaseExists.GenWithStack("Schema 'test_flashback' already been recover to 'test_flashback', can't be recover repeatedly").Error())

// Test flashback database failed by there is already a new database with the same name.
Expand Down

0 comments on commit 37bf4e9

Please sign in to comment.