diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 56b8a4e96c8f3..e6d97e2a19a17 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2789,7 +2789,7 @@ func (rc *Client) InitSchemasReplaceForDDL( dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{ Name: newTableInfo.Name.O, TableID: newTableInfo.ID, - PartitionMap: getTableIDMap(newTableInfo, t.Info), + PartitionMap: getPartitionIDMap(newTableInfo, t.Info), IndexMap: getIndexIDMap(newTableInfo, t.Info), } } diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index c1366aa67c472..e00d0d1298fe0 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -43,8 +43,8 @@ type AppliedFile interface { GetEndKey() []byte } -// getTableIDMap creates a map maping old tableID to new tableID. -func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { +// getPartitionIDMap creates a map maping old physical ID to new physical ID. +func getPartitionIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { tableIDMap := make(map[int64]int64) if oldTable.Partition != nil && newTable.Partition != nil { @@ -60,6 +60,12 @@ func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { } } + return tableIDMap +} + +// getTableIDMap creates a map maping old tableID to new tableID. +func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { + tableIDMap := getPartitionIDMap(newTable, oldTable) tableIDMap[oldTable.ID] = newTable.ID return tableIDMap } diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 759549ef07155..79761c3d8ddfa 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -23,11 +23,13 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/ingestrec" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/tablecodec" filter "github.com/pingcap/tidb/util/table-filter" "go.uber.org/zap" ) @@ -685,14 +687,17 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) erro if !job.IsCancelled() { switch job.Type { case model.ActionAddIndex, model.ActionAddPrimaryKey: - if job.State == model.JobStateRollbackDone { - return sr.deleteRange(job) + // AddJob would filter out the job state + if err := sr.ingestRecorder.AddJob(job, isSubJob); err != nil { + return err } - err := sr.ingestRecorder.AddJob(job, isSubJob) - return errors.Trace(err) - case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey, - model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, - model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes: + return sr.deleteRange(job) + case model.ActionDropSchema, model.ActionDropTable, + model.ActionTruncateTable, model.ActionDropIndex, + model.ActionDropPrimaryKey, + model.ActionDropTablePartition, model.ActionTruncateTablePartition, + model.ActionDropColumn, model.ActionModifyColumn, + model.ActionReorganizePartition: return sr.deleteRange(job) case model.ActionMultiSchemaChange: for _, sub := range job.MultiSchemaInfo.SubJobs { @@ -708,10 +713,11 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) erro } func (sr *SchemasReplace) deleteRange(job *model.Job) error { + lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range")) dbReplace, exist := sr.DbMap[job.SchemaID] if !exist { // skip this mddljob, the same below - log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) + logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) return nil } @@ -747,14 +753,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { newTableIDs := make([]int64, 0, len(tableIDs)) for tableID, tableReplace := range dbReplace.TableMap { if _, exist := argsSet[tableID]; !exist { - log.Debug("DropSchema: record a table, but it doesn't exist in job args", + logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args", zap.Int64("oldTableID", tableID)) continue } newTableIDs = append(newTableIDs, tableReplace.TableID) for partitionID, newPartitionID := range tableReplace.PartitionMap { if _, exist := argsSet[partitionID]; !exist { - log.Debug("DropSchema: record a partition, but it doesn't exist in job args", + logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args", zap.Int64("oldPartitionID", partitionID)) continue } @@ -763,7 +769,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } if len(newTableIDs) != len(tableIDs) { - log.Debug( + logutil.CL(lctx).Warn( "DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace") // only drop newTableIDs' ranges } @@ -777,7 +783,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropTable, model.ActionTruncateTable: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", + logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -790,28 +796,33 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return errors.Trace(err) } if len(physicalTableIDs) > 0 { - // delete partition id instead of table id - for i := 0; i < len(physicalTableIDs); i++ { - newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]] + newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs)) + // delete partition id + for _, oldPid := range physicalTableIDs { + newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", physicalTableIDs[i])) + logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", + zap.Int64("oldPartitionID", oldPid)) continue } - physicalTableIDs[i] = newPid + newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) } - if len(physicalTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, physicalTableIDs) + + // logical table may contain global index regions, so delete the logical table range. + newPhysicalTableIDs = append(newPhysicalTableIDs, tableReplace.TableID) + if len(newPhysicalTableIDs) > 0 { + sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) } + return nil } sr.insertDeleteRangeForTable(newJobID, []int64{tableReplace.TableID}) return nil - case model.ActionDropTablePartition, model.ActionTruncateTablePartition: + case model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionReorganizePartition: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug( + logutil.CL(lctx).Warn( "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil @@ -821,26 +832,26 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return errors.Trace(err) } - for i := 0; i < len(physicalTableIDs); i++ { - newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]] + newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs)) + for _, oldPid := range physicalTableIDs { + newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug( + logutil.CL(lctx).Warn( "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", physicalTableIDs[i])) + zap.Int64("oldPartitionID", oldPid)) continue } - physicalTableIDs[i] = newPid + newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) } - if len(physicalTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, physicalTableIDs) + if len(newPhysicalTableIDs) > 0 { + sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) } return nil // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. case model.ActionAddIndex, model.ActionAddPrimaryKey: - // iff job.State = model.JobStateRollbackDone tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", + logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -851,14 +862,20 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return errors.Trace(err) } + tempIdxID := tablecodec.TempIndexPrefix | indexID var elementID int64 = 1 - indexIDs := []int64{indexID} + var indexIDs []int64 + if job.State == model.JobStateRollbackDone { + indexIDs = []int64{indexID, tempIdxID} + } else { + indexIDs = []int64{tempIdxID} + } if len(partitionIDs) > 0 { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug( + logutil.CL(lctx).Warn( "AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue @@ -873,7 +890,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropIndex, model.ActionDropPrimaryKey: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -892,7 +909,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } // len(indexIDs) = 1 @@ -902,37 +919,6 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) } return nil - case model.ActionDropIndexes: // // Deprecated, we use ActionMultiSchemaChange instead. - var indexIDs []int64 - var partitionIDs []int64 - if err := job.DecodeArgs(&[]model.CIStr{}, &[]bool{}, &indexIDs, &partitionIDs); err != nil { - return errors.Trace(err) - } - // Remove data in TiKV. - if len(indexIDs) == 0 { - return nil - } - - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) - return nil - } - - var elementID int64 = 1 - if len(partitionIDs) > 0 { - for _, oldPid := range partitionIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) - continue - } - sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) - } - } else { - sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) - } - return nil case model.ActionDropColumn: var colName model.CIStr var ifExists bool @@ -944,7 +930,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { if len(indexIDs) > 0 { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -953,7 +939,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -963,35 +949,6 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } } return nil - case model.ActionDropColumns: // Deprecated, we use ActionMultiSchemaChange instead. - var colNames []model.CIStr - var ifExists []bool - var indexIDs []int64 - var partitionIDs []int64 - if err := job.DecodeArgs(&colNames, &ifExists, &indexIDs, &partitionIDs); err != nil { - return errors.Trace(err) - } - if len(indexIDs) > 0 { - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) - return nil - } - - var elementID int64 = 1 - if len(partitionIDs) > 0 { - for _, oldPid := range partitionIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) - continue - } - sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) - } - } else { - sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) - } - } case model.ActionModifyColumn: var indexIDs []int64 var partitionIDs []int64 @@ -1003,7 +960,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -1012,7 +969,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index 752796c2ac574..0e48f6711cff3 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -11,6 +11,7 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" filter "github.com/pingcap/tidb/util/table-filter" "github.com/stretchr/testify/require" @@ -638,23 +639,22 @@ var ( ) var ( - dropSchemaJob = &model.Job{Type: model.ActionDropSchema, SchemaID: mDDLJobDBOldID, RawArgs: json.RawMessage(`[[71,72,73,74,75]]`)} - dropTable0Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)} - dropTable1Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)} - dropTable0Partition1Job = &model.Job{Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} - rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} - rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} - dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)} - dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)} - dropTable0IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)} - dropTable1IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[]]`)} - dropTable0ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)} - dropTable1ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)} - dropTable0ColumnsJob = &model.Job{Type: model.ActionDropColumns, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)} - dropTable1ColumnsJob = &model.Job{Type: model.ActionDropColumns, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[]]`)} - modifyTable0ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)} - modifyTable1ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)} - multiSchemaChangeJob0 = &model.Job{ + dropSchemaJob = &model.Job{Type: model.ActionDropSchema, SchemaID: mDDLJobDBOldID, RawArgs: json.RawMessage(`[[71,72,73,74,75]]`)} + dropTable0Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)} + dropTable1Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)} + dropTable0Partition1Job = &model.Job{Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + reorganizeTable0Partition1Job = &model.Job{Type: model.ActionReorganizePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} + rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} + addTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} + addTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} + dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)} + dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)} + dropTable0ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)} + dropTable1ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)} + modifyTable0ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)} + modifyTable1ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)} + multiSchemaChangeJob0 = &model.Job{ Type: model.ActionMultiSchemaChange, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, @@ -767,9 +767,12 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { err = schemaReplace.deleteRange(dropTable0Job) require.NoError(t, err) targs = <-midr.tableCh - require.Equal(t, len(targs.tableIDs), len(mDDLJobALLNewPartitionIDSet)) + require.Equal(t, len(targs.tableIDs), len(mDDLJobALLNewPartitionIDSet)+1) for _, tableID := range targs.tableIDs { _, exist := mDDLJobALLNewPartitionIDSet[tableID] + if !exist { + exist = tableID == mDDLJobTable0NewID + } require.True(t, exist) } @@ -787,68 +790,71 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { require.Equal(t, len(targs.tableIDs), 1) require.Equal(t, targs.tableIDs[0], mDDLJobPartition1NewID) + // reorganize table partition1 + err = schemaReplace.deleteRange(reorganizeTable0Partition1Job) + require.NoError(t, err) + targs = <-midr.tableCh + require.Equal(t, len(targs.tableIDs), 1) + require.Equal(t, targs.tableIDs[0], mDDLJobPartition1NewID) + // roll back add index for table0 - err = schemaReplace.deleteRange(rollBackTable0IndexJob) + err = schemaReplace.restoreFromHistory(rollBackTable0IndexJob, false) require.NoError(t, err) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { iargs = <-midr.indexCh _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), 1) + require.Equal(t, len(iargs.indexIDs), 2) require.Equal(t, iargs.indexIDs[0], int64(2)) + require.Equal(t, iargs.indexIDs[1], int64(tablecodec.TempIndexPrefix|2)) } // roll back add index for table1 - err = schemaReplace.deleteRange(rollBackTable1IndexJob) + err = schemaReplace.restoreFromHistory(rollBackTable1IndexJob, false) require.NoError(t, err) iargs = <-midr.indexCh require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), 1) + require.Equal(t, len(iargs.indexIDs), 2) require.Equal(t, iargs.indexIDs[0], int64(2)) + require.Equal(t, iargs.indexIDs[1], int64(tablecodec.TempIndexPrefix|2)) - // drop index for table0 - err = schemaReplace.deleteRange(dropTable0IndexJob) + // add index for table 0 + err = schemaReplace.restoreFromHistory(addTable0IndexJob, false) require.NoError(t, err) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { iargs = <-midr.indexCh _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] require.True(t, exist) require.Equal(t, len(iargs.indexIDs), 1) - require.Equal(t, iargs.indexIDs[0], int64(2)) + require.Equal(t, iargs.indexIDs[0], int64(tablecodec.TempIndexPrefix|2)) } - // drop index for table1 - err = schemaReplace.deleteRange(dropTable1IndexJob) + // add index for table 1 + err = schemaReplace.restoreFromHistory(addTable1IndexJob, false) require.NoError(t, err) iargs = <-midr.indexCh require.Equal(t, iargs.tableID, mDDLJobTable1NewID) require.Equal(t, len(iargs.indexIDs), 1) - require.Equal(t, iargs.indexIDs[0], int64(2)) + require.Equal(t, iargs.indexIDs[0], int64(tablecodec.TempIndexPrefix|2)) - // drop indexes for table0 - err = schemaReplace.deleteRange(dropTable0IndexesJob) + // drop index for table0 + err = schemaReplace.deleteRange(dropTable0IndexJob) require.NoError(t, err) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { iargs = <-midr.indexCh _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } + require.Equal(t, len(iargs.indexIDs), 1) + require.Equal(t, iargs.indexIDs[0], int64(2)) } - // drop indexes for table1 - err = schemaReplace.deleteRange(dropTable1IndexesJob) + // drop index for table1 + err = schemaReplace.deleteRange(dropTable1IndexJob) require.NoError(t, err) iargs = <-midr.indexCh require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } + require.Equal(t, len(iargs.indexIDs), 1) + require.Equal(t, iargs.indexIDs[0], int64(2)) // drop column for table0 err = schemaReplace.deleteRange(dropTable0ColumnJob) @@ -875,32 +881,7 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { require.True(t, exist) } - // drop columns for table0 - err = schemaReplace.deleteRange(dropTable0ColumnsJob) - require.NoError(t, err) - for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { - iargs = <-midr.indexCh - _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] - require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } - } - - // drop columns for table1 - err = schemaReplace.deleteRange(dropTable1ColumnsJob) - require.NoError(t, err) - iargs = <-midr.indexCh - require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } - - // drop columns for table0 + // modify column for table0 err = schemaReplace.deleteRange(modifyTable0ColumnJob) require.NoError(t, err) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { @@ -914,7 +895,7 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { } } - // drop columns for table1 + // modify column for table1 err = schemaReplace.deleteRange(modifyTable1ColumnJob) require.NoError(t, err) iargs = <-midr.indexCh diff --git a/br/pkg/stream/util_test.go b/br/pkg/stream/util_test.go index 2562c9ce15840..6dda62a04ad60 100644 --- a/br/pkg/stream/util_test.go +++ b/br/pkg/stream/util_test.go @@ -23,6 +23,10 @@ func TestDateFormat(t *testing.T) { 434605479096221697, "2022-07-15 20:32:12.734 +0800", }, + { + 434605478903808000, + "2022-07-15 20:32:12 +0800", + }, } timeZone, _ := time.LoadLocation("Asia/Shanghai") diff --git a/br/tests/br_pitr/incremental_data/delete_range.sql b/br/tests/br_pitr/incremental_data/delete_range.sql new file mode 100644 index 0000000000000..2b68e11f17054 --- /dev/null +++ b/br/tests/br_pitr/incremental_data/delete_range.sql @@ -0,0 +1,30 @@ +-- 1. Drop Schema +drop database db_to_be_dropped; +-- 2. Drop/Truncate Table +drop table table_to_be_dropped_or_truncated.t0_dropped; +drop table table_to_be_dropped_or_truncated.t1_dropped; +truncate table table_to_be_dropped_or_truncated.t0_truncated; +truncate table table_to_be_dropped_or_truncated.t1_truncated; +-- 3. Drop/Truncate/Reorganize Table Partition +alter table partition_to_be_dropped_or_truncated.t1_dropped drop partition p0; +alter table partition_to_be_dropped_or_truncated.t1_truncated truncate partition p0; +alter table partition_to_be_dropped_or_truncated.t1_truncated reorganize partition p2 INTO (PARTITION p2 VALUES LESS THAN (20), PARTITION p3 VALUES LESS THAN MAXVALUE); +-- 4. Drop Table Index/PrimaryKey or Add Table Index/Primary +alter table index_or_primarykey_to_be_dropped.t0 drop index k1; +alter table index_or_primarykey_to_be_dropped.t1 drop index k1; +alter table index_or_primarykey_to_be_dropped.t0 drop primary key; +alter table index_or_primarykey_to_be_dropped.t1 drop primary key; +create index k1 on index_or_primarykey_to_be_dropped.t0 (name); +create index k1 on index_or_primarykey_to_be_dropped.t1 (name); +alter table index_or_primarykey_to_be_dropped.t0 add primary key (id); +alter table index_or_primarykey_to_be_dropped.t1 add primary key (id); +-- 5. Drop Table Indexes +alter table indexes_to_be_dropped.t0 drop index k1, drop index k2; +alter table indexes_to_be_dropped.t1 drop index k1, drop index k2; +-- 6. Drop Table Column/Columns +alter table column_s_to_be_dropped.t0_column drop column name; +alter table column_s_to_be_dropped.t1_column drop column name; +alter table column_s_to_be_dropped.t0_columns drop column name, drop column c; +alter table column_s_to_be_dropped.t1_columns drop column name, drop column c; +-- 7. Modify Table Column +alter table column_to_be_modified.t0 modify column name varchar(25); diff --git a/br/tests/br_pitr/prepare_data/delete_range.sql b/br/tests/br_pitr/prepare_data/delete_range.sql new file mode 100644 index 0000000000000..9d571ddadbf0a --- /dev/null +++ b/br/tests/br_pitr/prepare_data/delete_range.sql @@ -0,0 +1,124 @@ +-- 1. Drop Schema +create database db_to_be_dropped; +create table db_to_be_dropped.t0(id int primary key, c int, name char(20)); +create table db_to_be_dropped.t1(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on db_to_be_dropped.t0 (name); +create index k2 on db_to_be_dropped.t0(c); +create index k1 on db_to_be_dropped.t1(name); +create index k2 on db_to_be_dropped.t1(c); +create index k3 on db_to_be_dropped.t1 (id, c); + +insert into db_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into db_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 2. Drop/Truncate Table +create database table_to_be_dropped_or_truncated; +create table table_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table table_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on table_to_be_dropped_or_truncated.t0_dropped (name); +create index k2 on table_to_be_dropped_or_truncated.t0_dropped (c); +create index k1 on table_to_be_dropped_or_truncated.t1_dropped (name); +create index k2 on table_to_be_dropped_or_truncated.t1_dropped (c); +create index k3 on table_to_be_dropped_or_truncated.t1_dropped (id, c); + +create index k1 on table_to_be_dropped_or_truncated.t0_truncated (name); +create index k2 on table_to_be_dropped_or_truncated.t0_truncated (c); +create index k1 on table_to_be_dropped_or_truncated.t1_truncated (name); +create index k2 on table_to_be_dropped_or_truncated.t1_truncated (c); +create index k3 on table_to_be_dropped_or_truncated.t1_truncated (id, c); + +insert into table_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into table_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); + +-- 3. Drop/Truncate/Reorganize Table Partition +create database partition_to_be_dropped_or_truncated; +create table partition_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table partition_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on partition_to_be_dropped_or_truncated.t0_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated.t0_dropped (c); +create index k1 on partition_to_be_dropped_or_truncated.t1_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated.t1_dropped (c); +create index k3 on partition_to_be_dropped_or_truncated.t1_dropped (id, c); + +create index k1 on partition_to_be_dropped_or_truncated.t0_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated.t0_truncated (c); +create index k1 on partition_to_be_dropped_or_truncated.t1_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated.t1_truncated (c); +create index k3 on partition_to_be_dropped_or_truncated.t1_truncated (id, c); + +insert into partition_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into partition_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); +-- 4. Drop Table Index/PrimaryKey +create database index_or_primarykey_to_be_dropped; +create table index_or_primarykey_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); +create table index_or_primarykey_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on index_or_primarykey_to_be_dropped.t0 (name); +create index k2 on index_or_primarykey_to_be_dropped.t0 (c); +create index k1 on index_or_primarykey_to_be_dropped.t1 (name); +create index k2 on index_or_primarykey_to_be_dropped.t1 (c); +create index k3 on index_or_primarykey_to_be_dropped.t1 (id, c); + +insert into index_or_primarykey_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into index_or_primarykey_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 5. Drop Table INDEXES +create database indexes_to_be_dropped; +create table indexes_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); +create table indexes_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on indexes_to_be_dropped.t0 (name); +create index k2 on indexes_to_be_dropped.t0 (c); +create index k1 on indexes_to_be_dropped.t1 (name); +create index k2 on indexes_to_be_dropped.t1 (c); +create index k3 on indexes_to_be_dropped.t1 (id, c); + +insert into indexes_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into indexes_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 6. Drop Table Column/Columns +create database column_s_to_be_dropped; +create table column_s_to_be_dropped.t0_column(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped.t1_column(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table column_s_to_be_dropped.t0_columns(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped.t1_columns(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_s_to_be_dropped.t0_column (name); +create index k2 on column_s_to_be_dropped.t0_column (c); +create index k1 on column_s_to_be_dropped.t1_column (name); +create index k2 on column_s_to_be_dropped.t1_column (c); +create index k3 on column_s_to_be_dropped.t1_column (id, c); + +create index k1 on column_s_to_be_dropped.t0_columns (name); +create index k2 on column_s_to_be_dropped.t0_columns (c); +create index k1 on column_s_to_be_dropped.t1_columns (name); +create index k2 on column_s_to_be_dropped.t1_columns (c); +-- create index k3 on column_s_to_be_dropped.t1_columns (id, c); + +insert into column_s_to_be_dropped.t0_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t1_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t0_columns values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t1_columns values (1, 2, "123"), (2, 3, "123"); +-- 7. Modify Table Column +create database column_to_be_modified; +create table column_to_be_modified.t0(id int primary key nonclustered, c int, name char(20)); +create table column_to_be_modified.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_to_be_modified.t0 (name); +create index k2 on column_to_be_modified.t0 (c); +create index k1 on column_to_be_modified.t1 (name); +create index k2 on column_to_be_modified.t1 (c); +create index k3 on column_to_be_modified.t1 (id, c); + +insert into column_to_be_modified.t0 values (1, 2, "123"), (2, 3, "123"); +insert into column_to_be_modified.t1 values (1, 2, "123"), (2, 3, "123"); diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh new file mode 100644 index 0000000000000..afe400820eb7e --- /dev/null +++ b/br/tests/br_pitr/run.sh @@ -0,0 +1,109 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +. run_services +CUR=$(cd `dirname $0`; pwd) + +# const value +PREFIX="pitr_backup" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*. +res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" + +# start a new cluster +echo "restart a services" +restart_services + +# prepare the data +echo "prepare the data" +run_sql_file $CUR/prepare_data/delete_range.sql +# ... + +# check something after prepare the data +prepare_delete_range_count=$(run_sql "select count(*) DELETE_RANGE_CNT from (select * from mysql.gc_delete_range union all select * from mysql.gc_delete_range_done) del_range;" | tail -n 1 | awk '{print $2}') +echo "prepare_delete_range_count: $prepare_delete_range_count" + +# start the log backup task +echo "start log task" +run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log" + +# run snapshot backup +echo "run snapshot backup" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full" + +# load the incremental data +echo "load the incremental data" +run_sql_file $CUR/incremental_data/delete_range.sql +# ... + +# check something after load the incremental data +incremental_delete_range_count=$(run_sql "select count(*) DELETE_RANGE_CNT from (select * from mysql.gc_delete_range union all select * from mysql.gc_delete_range_done) del_range;" | tail -n 1 | awk '{print $2}') +echo "incremental_delete_range_count: $incremental_delete_range_count" + +# wait checkpoint advance +echo "wait checkpoint advance" +sleep 10 +current_ts=$(echo $(($(date +%s%3N) << 18))) +echo "current ts: $current_ts" +i=0 +while true; do + # extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty + log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name integration_test --json 2>/dev/null) + echo "log backup status: $log_backup_status" + checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end') + echo "checkpoint ts: $checkpoint_ts" + + # check whether the checkpoint ts is a number + if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then + # check whether the checkpoint has advanced + if [ $checkpoint_ts -gt $current_ts ]; then + echo "the checkpoint has advanced" + break + fi + # the checkpoint hasn't advanced + echo "the checkpoint hasn't advanced" + i=$((i+1)) + if [ "$i" -gt 50 ]; then + echo 'the checkpoint lag is too large' + exit 1 + fi + sleep 10 + else + # unknown status, maybe somewhere is wrong + echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!" + exit 1 + fi +done + +# dump some info from upstream cluster +# ... + +# start a new cluster +echo "restart a services" +restart_services + +# PITR restore +echo "run pitr" +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1 + +# check something in downstream cluster +echo "check br log" +check_contains "restore log success summary" +check_not_contains "rewrite delete range" +echo "" > $res_file +echo "check sql result" +run_sql "select count(*) DELETE_RANGE_CNT from (select * from mysql.gc_delete_range union all select * from mysql.gc_delete_range_done) del_range group by ts order by DELETE_RANGE_CNT desc limit 1;" +expect_delete_range=$(($incremental_delete_range_count-$prepare_delete_range_count)) +check_contains "DELETE_RANGE_CNT: $expect_delete_range" diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh index ca66c8d5013ce..e454fec47cfad 100755 --- a/br/tests/run_group.sh +++ b/br/tests/run_group.sh @@ -24,7 +24,7 @@ groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history" - ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' + ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr' ["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index' ["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn'