diff --git a/ddl/placement_policy_test.go b/ddl/placement_policy_test.go index c8626121515d9..22c0a23c09d9f 100644 --- a/ddl/placement_policy_test.go +++ b/ddl/placement_policy_test.go @@ -2055,3 +2055,93 @@ func (s *testDBSuite6) TestPDFail(c *C) { " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p1` */)")) checkAllBundlesNotChange(c, existBundles) } + +func (s *testDBSuite6) TestRecoverTableWithPlacementPolicy(c *C) { + clearAllBundles(c) + failpoint.Enable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed", `return`) + defer func(originGC bool) { + failpoint.Disable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed") + if originGC { + ddl.EmulatorGCEnable() + } else { + ddl.EmulatorGCDisable() + } + }(ddl.IsEmulatorGCEnable()) + ddl.EmulatorGCDisable() + + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop placement policy if exists p1") + tk.MustExec("drop placement policy if exists p2") + tk.MustExec("drop placement policy if exists p3") + tk.MustExec("drop table if exists tp1, tp2") + + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + tk.MustExec(fmt.Sprintf(safePointSQL, "20060102-15:04:05 -0700 MST")) + + tk.MustExec("create placement policy p1 primary_region='r1' regions='r1,r2'") + defer tk.MustExec("drop placement policy if exists p1") + + tk.MustExec("create placement policy p2 primary_region='r2' regions='r2,r3'") + defer tk.MustExec("drop placement policy if exists p2") + + tk.MustExec("create placement policy p3 primary_region='r3' regions='r3,r4'") + defer tk.MustExec("drop placement policy if exists p3") + + // test recover + tk.MustExec(`CREATE TABLE tp1 (id INT) placement policy p1 PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (100) placement policy p2, + PARTITION p1 VALUES LESS THAN (1000), + PARTITION p2 VALUES LESS THAN (10000) placement policy p3 + );`) + defer tk.MustExec("drop table if exists tp1") + + tk.MustExec("drop table tp1") + tk.MustExec("recover table tp1") + tk.MustQuery("show create table tp1").Check(testkit.Rows("tp1 CREATE TABLE `tp1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" + + " PARTITION `p1` VALUES LESS THAN (1000),\n" + + " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp1") + + // test flashback + tk.MustExec(`CREATE TABLE tp2 (id INT) placement policy p1 PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (100) placement policy p2, + PARTITION p1 VALUES LESS THAN (1000), + PARTITION p2 VALUES LESS THAN (10000) placement policy p3 + );`) + defer tk.MustExec("drop table if exists tp2") + + tk.MustExec("drop table tp1") + tk.MustExec("drop table tp2") + tk.MustExec("flashback table tp2") + tk.MustQuery("show create table tp2").Check(testkit.Rows("tp2 CREATE TABLE `tp2` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" + + " PARTITION `p1` VALUES LESS THAN (1000),\n" + + " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp2") + + // test recover after police drop + tk.MustExec("drop table tp2") + tk.MustExec("drop placement policy p1") + tk.MustExec("drop placement policy p2") + tk.MustExec("drop placement policy p3") + + tk.MustExec("flashback table tp2 to tp3") + tk.MustQuery("show create table tp3").Check(testkit.Rows("tp3 CREATE TABLE `tp3` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" + + " PARTITION `p1` VALUES LESS THAN (1000),\n" + + " PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)")) + checkExistTableBundlesInPD(c, s.dom, "test", "tp3") +} diff --git a/ddl/table.go b/ddl/table.go index 504f85f83faed..cd2b818450c57 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -319,6 +319,19 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagDisableGC } + bundles, err := placement.NewFullTableBundles(t, tblInfo) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + // Send the placement bundle to PD. + err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Wrapf(err, "failed to notify PD the placement rules") + } + job.SchemaState = model.StateWriteOnly tblInfo.State = model.StateWriteOnly ver, err = updateVersionAndTableInfo(t, job, tblInfo, false) diff --git a/executor/ddl.go b/executor/ddl.go index 4c2be5828b8a8..255580c04d93b 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -621,6 +621,10 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error { return err } + if tblInfo, err = recoverTablePlacement(m, tblInfo); err != nil { + return err + } + recoverInfo := &ddl.RecoverInfo{ SchemaID: job.SchemaID, TableInfo: tblInfo, @@ -635,6 +639,40 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error { return err } +// recoverTablePlacement is used when recover/flashback table. +// It will replace the placement policy of table with the direct options because the original policy may be deleted +func recoverTablePlacement(snapshotMeta *meta.Meta, tblInfo *model.TableInfo) (*model.TableInfo, error) { + if ref := tblInfo.PlacementPolicyRef; ref != nil { + policy, err := snapshotMeta.GetPolicy(ref.ID) + if err != nil { + return nil, errors.Trace(err) + } + + tblInfo.PlacementPolicyRef = nil + tblInfo.DirectPlacementOpts = policy.PlacementSettings + } + + if tblInfo.Partition != nil { + for idx := range tblInfo.Partition.Definitions { + def := &tblInfo.Partition.Definitions[idx] + ref := def.PlacementPolicyRef + if ref == nil { + continue + } + + policy, err := snapshotMeta.GetPolicy(ref.ID) + if err != nil { + return nil, errors.Trace(err) + } + + def.PlacementPolicyRef = nil + def.DirectPlacementOpts = policy.PlacementSettings + } + } + + return tblInfo, nil +} + func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) { job, err := t.GetHistoryDDLJob(s.JobID) if err != nil { @@ -787,6 +825,11 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error { if err != nil { return err } + + if tblInfo, err = recoverTablePlacement(m, tblInfo); err != nil { + return err + } + recoverInfo := &ddl.RecoverInfo{ SchemaID: job.SchemaID, TableInfo: tblInfo,