diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index 2859eec26d28d..660b24b330836 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -704,15 +704,13 @@ func TestSnapshotVersion(t *testing.T) { tbl, err := currSnapIs.TableByName(model.NewCIStr("test2"), model.NewCIStr("t")) require.NoError(t, err) - m, err := dom.GetSnapshotMeta(snapTS) - require.NoError(t, err) + m := dom.GetSnapshotMeta(snapTS) tblInfo1, err := m.GetTable(dbInfo.ID, tbl.Meta().ID) require.True(t, meta.ErrDBNotExists.Equal(err)) require.Nil(t, tblInfo1) - m, err = dom.GetSnapshotMeta(currSnapTS) - require.NoError(t, err) + m = dom.GetSnapshotMeta(currSnapTS) tblInfo2, err := m.GetTable(dbInfo.ID, tbl.Meta().ID) require.NoError(t, err) diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index e9e146c466dce..23c0c94b2b441 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -1299,9 +1299,13 @@ type RecoverInfo struct { type RecoverSchemaInfo struct { *model.DBInfo RecoverTabsInfo []*RecoverInfo - DropJobID int64 - SnapshotTS uint64 - OldSchemaName model.CIStr + // LoadTablesOnExecute is the new logic to avoid a large RecoverTabsInfo can't be + // persisted. If it's true, DDL owner will recover RecoverTabsInfo instead of the + // job submit node. + LoadTablesOnExecute bool + DropJobID int64 + SnapshotTS uint64 + OldSchemaName model.CIStr } // delayForAsyncCommit sleeps `SafeWindow + AllowedClockDrift` before a DDL job finishes. diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 3935c2e1b649c..8969d9c3ccb5d 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -593,7 +593,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { err = finishRecoverSchema(w, job) case model.ActionCreateTables: if job.IsCancelled() { - // it may be too large that it can not be added to the history queue, too + // it may be too large that it can not be added to the history queue, so // delete its arguments job.Args = nil } diff --git a/pkg/ddl/schema.go b/pkg/ddl/schema.go index e0425a5a9c618..0b0c51f915955 100644 --- a/pkg/ddl/schema.go +++ b/pkg/ddl/schema.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/pkg/ddl/label" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/model" ) @@ -279,14 +280,6 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i } else { job.Args[checkFlagIndexInJobArgs] = recoverCheckFlagDisableGC } - // Clear all placement when recover - for _, recoverTabInfo := range recoverSchemaInfo.RecoverTabsInfo { - err = clearTablePlacementAndBundles(recoverTabInfo.TableInfo) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") - } - } schemaInfo.State = model.StateWriteOnly job.SchemaState = model.StateWriteOnly case model.StateWriteOnly: @@ -299,6 +292,36 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i return ver, errors.Errorf("disable gc failed, try again later. err: %v", err) } } + + recoverTbls := recoverSchemaInfo.RecoverTabsInfo + if recoverSchemaInfo.LoadTablesOnExecute { + sid := recoverSchemaInfo.DBInfo.ID + snap := w.store.GetSnapshot(kv.NewVersion(recoverSchemaInfo.SnapshotTS)) + snapMeta := meta.NewSnapshotMeta(snap) + tables, err2 := snapMeta.ListTables(sid) + if err2 != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err2) + } + recoverTbls = make([]*RecoverInfo, 0, len(tables)) + for _, tblInfo := range tables { + autoIDs, err3 := snapMeta.GetAutoIDAccessors(sid, tblInfo.ID).Get() + if err3 != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err3) + } + recoverTbls = append(recoverTbls, &RecoverInfo{ + SchemaID: sid, + TableInfo: tblInfo, + DropJobID: recoverSchemaInfo.DropJobID, + SnapshotTS: recoverSchemaInfo.SnapshotTS, + AutoIDs: autoIDs, + OldSchemaName: recoverSchemaInfo.OldSchemaName.L, + OldTableName: tblInfo.Name.L, + }) + } + } + dbInfo := schemaInfo.Clone() dbInfo.State = model.StatePublic err = t.CreateDatabase(dbInfo) @@ -311,7 +334,8 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i job.State = model.JobStateCancelled return ver, errors.Trace(err) } - for _, recoverInfo := range recoverSchemaInfo.RecoverTabsInfo { + + for _, recoverInfo := range recoverTbls { if recoverInfo.TableInfo.TTLInfo != nil { // force disable TTL job schedule for recovered table recoverInfo.TableInfo.TTLInfo.Enable = false @@ -322,13 +346,18 @@ func (w *worker) onRecoverSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver i } } schemaInfo.State = model.StatePublic - for _, recoverInfo := range recoverSchemaInfo.RecoverTabsInfo { + diffInfos := make([]schemaIDAndTableInfo, 0, len(recoverTbls)) + for _, recoverInfo := range recoverTbls { recoverInfo.TableInfo.State = model.StatePublic recoverInfo.TableInfo.UpdateTS = t.StartTS + diffInfos = append(diffInfos, schemaIDAndTableInfo{ + schemaID: schemaInfo.ID, + tblInfo: recoverInfo.TableInfo, + }) } // use to update InfoSchema job.SchemaID = schemaInfo.ID - ver, err = updateSchemaVersion(d, t, job) + ver, err = updateSchemaVersion(d, t, job, diffInfos...) if err != nil { return ver, errors.Trace(err) } diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index df4d630b4e9f8..17f37e7b7b522 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -471,13 +471,6 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in job.Args[checkFlagIndexInJobArgs] = recoverCheckFlagDisableGC } - // Clear all placement when recover - err = clearTablePlacementAndBundles(tblInfo) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") - } - job.SchemaState = model.StateWriteOnly tblInfo.State = model.StateWriteOnly case model.StateWriteOnly: @@ -535,6 +528,11 @@ func (w *worker) recoverTable(t *meta.Meta, job *model.Job, recoverInfo *Recover if err != nil { return ver, errors.Trace(err) } + err = clearTablePlacementAndBundles(w.ctx, recoverInfo.TableInfo) + if err != nil { + return ver, errors.Trace(err) + } + tableInfo := recoverInfo.TableInfo.Clone() tableInfo.State = model.StatePublic tableInfo.UpdateTS = t.StartTS @@ -561,7 +559,10 @@ func (w *worker) recoverTable(t *meta.Meta, job *model.Job, recoverInfo *Recover return ver, nil } -func clearTablePlacementAndBundles(tblInfo *model.TableInfo) error { +func clearTablePlacementAndBundles(ctx context.Context, tblInfo *model.TableInfo) error { + failpoint.Inject("mockClearTablePlacementAndBundlesErr", func() { + failpoint.Return(errors.New("mock error for clearTablePlacementAndBundles")) + }) var bundles []*placement.Bundle if tblInfo.PlacementPolicyRef != nil { tblInfo.PlacementPolicyRef = nil @@ -582,7 +583,7 @@ func clearTablePlacementAndBundles(tblInfo *model.TableInfo) error { return nil } - return infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) + return infosync.PutRuleBundlesWithDefaultRetry(ctx, bundles) } // mockRecoverTableCommitErrOnce uses to make sure diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 11cb020eb340b..6a415a904d03d 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -498,9 +498,9 @@ func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchem } // GetSnapshotMeta gets a new snapshot meta at startTS. -func (do *Domain) GetSnapshotMeta(startTS uint64) (*meta.Meta, error) { +func (do *Domain) GetSnapshotMeta(startTS uint64) *meta.Meta { snapshot := do.store.GetSnapshot(kv.NewVersion(startTS)) - return meta.NewSnapshotMeta(snapshot), nil + return meta.NewSnapshotMeta(snapshot) } // ExpiredTimeStamp4PC gets expiredTimeStamp4PC from domain. diff --git a/pkg/executor/ddl.go b/pkg/executor/ddl.go index 6766742fbdab5..31372f69ba101 100644 --- a/pkg/executor/ddl.go +++ b/pkg/executor/ddl.go @@ -408,10 +408,7 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error { return infoschema.ErrTableExists.GenWithStack("Table '%-.192s' already been recover to '%-.192s', can't be recover repeatedly", s.Table.Name.O, tbl.Meta().Name.O) } - m, err := domain.GetDomain(e.Ctx()).GetSnapshotMeta(job.StartTS) - if err != nil { - return err - } + m := domain.GetDomain(e.Ctx()).GetSnapshotMeta(job.StartTS) autoIDs, err := m.GetAutoIDAccessors(job.SchemaID, job.TableID).Get() if err != nil { return err @@ -475,10 +472,7 @@ func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, dom *domain.Do // it will use the `start_ts` of DDL job as snapshot to get the dropped/truncated table information. func GetDropOrTruncateTableInfoFromJobs(jobs []*model.Job, gcSafePoint uint64, dom *domain.Domain, fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) { getTable := func(startTS uint64, schemaID int64, tableID int64) (*model.TableInfo, error) { - snapMeta, err := dom.GetSnapshotMeta(startTS) - if err != nil { - return nil, err - } + snapMeta := dom.GetSnapshotMeta(startTS) tbl, err := snapMeta.GetTable(schemaID, tableID) return tbl, err } @@ -569,10 +563,7 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error { return infoschema.ErrTableExists.GenWithStack("Table '%-.192s' already been flashback to '%-.192s', can't be flashback repeatedly", s.Table.Name.O, tbl.Meta().Name.O) } - m, err := domain.GetDomain(e.Ctx()).GetSnapshotMeta(job.StartTS) - if err != nil { - return err - } + m := domain.GetDomain(e.Ctx()).GetSnapshotMeta(job.StartTS) autoIDs, err := m.GetAutoIDAccessors(job.SchemaID, job.TableID).Get() if err != nil { return err @@ -639,10 +630,7 @@ func (e *DDLExec) getRecoverDBByName(schemaName model.CIStr) (recoverSchemaInfo if job.Type != model.ActionDropSchema { continue } - snapMeta, err := dom.GetSnapshotMeta(job.StartTS) - if err != nil { - return false, err - } + snapMeta := dom.GetSnapshotMeta(job.StartTS) schemaInfo, err := snapMeta.GetDatabase(job.SchemaID) if err != nil { return false, err @@ -656,27 +644,13 @@ func (e *DDLExec) getRecoverDBByName(schemaName model.CIStr) (recoverSchemaInfo if schemaInfo.Name.L != schemaName.L { continue } - tables, err := snapMeta.ListTables(job.SchemaID) - if err != nil { - return false, err - } - recoverTabsInfo := make([]*ddl.RecoverInfo, 0) - for _, tblInfo := range tables { - autoIDs, err := snapMeta.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Get() - if err != nil { - return false, err - } - recoverTabsInfo = append(recoverTabsInfo, &ddl.RecoverInfo{ - SchemaID: job.SchemaID, - TableInfo: tblInfo, - DropJobID: job.ID, - SnapshotTS: job.StartTS, - AutoIDs: autoIDs, - OldSchemaName: schemaName.L, - OldTableName: tblInfo.Name.L, - }) + recoverSchemaInfo = &ddl.RecoverSchemaInfo{ + DBInfo: schemaInfo, + LoadTablesOnExecute: true, + DropJobID: job.ID, + SnapshotTS: job.StartTS, + OldSchemaName: schemaName, } - recoverSchemaInfo = &ddl.RecoverSchemaInfo{DBInfo: schemaInfo, RecoverTabsInfo: recoverTabsInfo, DropJobID: job.ID, SnapshotTS: job.StartTS, OldSchemaName: schemaName} return true, nil } return false, nil diff --git a/pkg/executor/recover_test.go b/pkg/executor/recover_test.go index c57c71dc61af9..4d715bd9a82a1 100644 --- a/pkg/executor/recover_test.go +++ b/pkg/executor/recover_test.go @@ -556,6 +556,7 @@ func TestFlashbackSchema(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_ddl_error_count_limit = 2") tk.MustExec("create database if not exists test_flashback") tk.MustExec("use test_flashback") tk.MustExec("drop table if exists t_flashback") @@ -574,9 +575,17 @@ func TestFlashbackSchema(t *testing.T) { tk.MustExec("insert into t_flashback values (1),(2),(3)") tk.MustExec("drop database test_flashback") + // test PD connection issue causes failure after tidb_ddl_error_count_limit + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockClearTablePlacementAndBundlesErr", `return()`)) + // TODO(lance6716): fix it later + //tk.MustGetErrMsg("flashback database test_flashback", "[ddl:-1]DDL job rollback, error msg: mock error for clearTablePlacementAndBundles") + tk.MustExecToErr("flashback database test_flashback") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockClearTablePlacementAndBundlesErr", `1*return()`)) + tk.MustExec("flashback database test_flashback") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockClearTablePlacementAndBundlesErr")) + // Test flashback database with db_not_exists name. tk.MustGetErrMsg("flashback database db_not_exists", "Can't find dropped database: db_not_exists in DDL history jobs") - tk.MustExec("flashback database test_flashback") tk.MustGetErrMsg("flashback database test_flashback to test_flashback2", infoschema.ErrDatabaseExists.GenWithStack("Schema 'test_flashback' already been recover to 'test_flashback', can't be recover repeatedly").Error()) // Test flashback database failed by there is already a new database with the same name.