From 7248e0544799d96551d63a581f781179a5a854b9 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Mon, 23 Oct 2023 10:44:00 +0800 Subject: [PATCH 1/6] This is an automated cherry-pick of #47740 Signed-off-by: ti-chi-bot --- br/pkg/stream/rewrite_meta_rawkv.go | 70 +++++----- br/pkg/stream/util_test.go | 4 + .../br_pitr/incremental_data/delete_range.sql | 25 ++++ .../br_pitr/prepare_data/delete_range.sql | 124 ++++++++++++++++++ br/tests/br_pitr/run.sh | 100 ++++++++++++++ br/tests/run_group.sh | 4 + 6 files changed, 294 insertions(+), 33 deletions(-) create mode 100644 br/tests/br_pitr/incremental_data/delete_range.sql create mode 100644 br/tests/br_pitr/prepare_data/delete_range.sql create mode 100644 br/tests/br_pitr/run.sh diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 759549ef07155..a4f41215e6d49 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -23,6 +23,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/ingestrec" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" "github.com/pingcap/tidb/kv" @@ -708,10 +709,11 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) erro } func (sr *SchemasReplace) deleteRange(job *model.Job) error { + lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range")) dbReplace, exist := sr.DbMap[job.SchemaID] if !exist { // skip this mddljob, the same below - log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) + logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) return nil } @@ -747,14 +749,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { newTableIDs := make([]int64, 0, len(tableIDs)) for tableID, tableReplace := range dbReplace.TableMap { if _, exist := argsSet[tableID]; !exist { - log.Debug("DropSchema: record a table, but it doesn't exist in job args", + logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args", zap.Int64("oldTableID", tableID)) continue } newTableIDs = append(newTableIDs, tableReplace.TableID) for partitionID, newPartitionID := range tableReplace.PartitionMap { if _, exist := argsSet[partitionID]; !exist { - log.Debug("DropSchema: record a partition, but it doesn't exist in job args", + logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args", zap.Int64("oldPartitionID", partitionID)) continue } @@ -763,7 +765,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } if len(newTableIDs) != len(tableIDs) { - log.Debug( + logutil.CL(lctx).Warn( "DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace") // only drop newTableIDs' ranges } @@ -777,7 +779,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropTable, model.ActionTruncateTable: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", + logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -790,18 +792,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return errors.Trace(err) } if len(physicalTableIDs) > 0 { - // delete partition id instead of table id - for i := 0; i < len(physicalTableIDs); i++ { - newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]] + newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs)) + // delete partition id + for _, oldPid := range physicalTableIDs { + newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", physicalTableIDs[i])) + logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", + zap.Int64("oldPartitionID", oldPid)) continue } - physicalTableIDs[i] = newPid + newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) } - if len(physicalTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, physicalTableIDs) + if len(newPhysicalTableIDs) > 0 { + sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) } return nil } @@ -811,7 +814,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropTablePartition, model.ActionTruncateTablePartition: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug( + logutil.CL(lctx).Warn( "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil @@ -821,18 +824,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return errors.Trace(err) } - for i := 0; i < len(physicalTableIDs); i++ { - newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]] + newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs)) + for _, oldPid := range physicalTableIDs { + newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug( + logutil.CL(lctx).Warn( "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", physicalTableIDs[i])) + zap.Int64("oldPartitionID", oldPid)) continue } - physicalTableIDs[i] = newPid + newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) } - if len(physicalTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, physicalTableIDs) + if len(newPhysicalTableIDs) > 0 { + sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) } return nil // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. @@ -840,7 +844,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { // iff job.State = model.JobStateRollbackDone tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", + logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -858,7 +862,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug( + logutil.CL(lctx).Warn( "AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue @@ -873,7 +877,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropIndex, model.ActionDropPrimaryKey: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -892,7 +896,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } // len(indexIDs) = 1 @@ -915,7 +919,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -924,7 +928,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -944,7 +948,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { if len(indexIDs) > 0 { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -953,7 +957,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -974,7 +978,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { if len(indexIDs) > 0 { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -983,7 +987,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -1003,7 +1007,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -1012,7 +1016,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) diff --git a/br/pkg/stream/util_test.go b/br/pkg/stream/util_test.go index 2562c9ce15840..6dda62a04ad60 100644 --- a/br/pkg/stream/util_test.go +++ b/br/pkg/stream/util_test.go @@ -23,6 +23,10 @@ func TestDateFormat(t *testing.T) { 434605479096221697, "2022-07-15 20:32:12.734 +0800", }, + { + 434605478903808000, + "2022-07-15 20:32:12 +0800", + }, } timeZone, _ := time.LoadLocation("Asia/Shanghai") diff --git a/br/tests/br_pitr/incremental_data/delete_range.sql b/br/tests/br_pitr/incremental_data/delete_range.sql new file mode 100644 index 0000000000000..f5afde943649e --- /dev/null +++ b/br/tests/br_pitr/incremental_data/delete_range.sql @@ -0,0 +1,25 @@ +-- 1. Drop Schema +drop database db_to_be_dropped; +-- 2. Drop/Truncate Table +drop table table_to_be_dropped_or_truncated.t0_dropped; +drop table table_to_be_dropped_or_truncated.t1_dropped; +truncate table table_to_be_dropped_or_truncated.t0_truncated; +truncate table table_to_be_dropped_or_truncated.t1_truncated; +-- 3. Drop/Truncate Table Partition +alter table partition_to_be_dropped_or_truncated.t1_dropped drop partition p0; +alter table partition_to_be_dropped_or_truncated.t1_truncated truncate partition p0; +-- 4. Drop Table Index/PrimaryKey +alter table index_or_primarykey_to_be_dropped.t0 drop index k1; +alter table index_or_primarykey_to_be_dropped.t1 drop index k1; +alter table index_or_primarykey_to_be_dropped.t0 drop primary key; +alter table index_or_primarykey_to_be_dropped.t1 drop primary key; +-- 5. Drop Table Indexes +alter table indexes_to_be_dropped.t0 drop index k1, drop index k2; +alter table indexes_to_be_dropped.t1 drop index k1, drop index k2; +-- 6. Drop Table Column/Columns +alter table column_s_to_be_dropped.t0_column drop column name; +alter table column_s_to_be_dropped.t1_column drop column name; +alter table column_s_to_be_dropped.t0_columns drop column name, drop column c; +alter table column_s_to_be_dropped.t1_columns drop column name, drop column c; +-- 7. Modify Table Column +alter table column_to_be_modified.t0 modify column name varchar(25); diff --git a/br/tests/br_pitr/prepare_data/delete_range.sql b/br/tests/br_pitr/prepare_data/delete_range.sql new file mode 100644 index 0000000000000..e2a20be9e45fa --- /dev/null +++ b/br/tests/br_pitr/prepare_data/delete_range.sql @@ -0,0 +1,124 @@ +-- 1. Drop Schema +create database db_to_be_dropped; +create table db_to_be_dropped.t0(id int primary key, c int, name char(20)); +create table db_to_be_dropped.t1(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on db_to_be_dropped.t0 (name); +create index k2 on db_to_be_dropped.t0(c); +create index k1 on db_to_be_dropped.t1(name); +create index k2 on db_to_be_dropped.t1(c); +create index k3 on db_to_be_dropped.t1 (id, c); + +insert into db_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into db_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 2. Drop/Truncate Table +create database table_to_be_dropped_or_truncated; +create table table_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table table_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on table_to_be_dropped_or_truncated.t0_dropped (name); +create index k2 on table_to_be_dropped_or_truncated.t0_dropped (c); +create index k1 on table_to_be_dropped_or_truncated.t1_dropped (name); +create index k2 on table_to_be_dropped_or_truncated.t1_dropped (c); +create index k3 on table_to_be_dropped_or_truncated.t1_dropped (id, c); + +create index k1 on table_to_be_dropped_or_truncated.t0_truncated (name); +create index k2 on table_to_be_dropped_or_truncated.t0_truncated (c); +create index k1 on table_to_be_dropped_or_truncated.t1_truncated (name); +create index k2 on table_to_be_dropped_or_truncated.t1_truncated (c); +create index k3 on table_to_be_dropped_or_truncated.t1_truncated (id, c); + +insert into table_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into table_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); + +-- 3. Drop/Truncate Table Partition +create database partition_to_be_dropped_or_truncated; +create table partition_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table partition_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on partition_to_be_dropped_or_truncated.t0_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated.t0_dropped (c); +create index k1 on partition_to_be_dropped_or_truncated.t1_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated.t1_dropped (c); +create index k3 on partition_to_be_dropped_or_truncated.t1_dropped (id, c); + +create index k1 on partition_to_be_dropped_or_truncated.t0_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated.t0_truncated (c); +create index k1 on partition_to_be_dropped_or_truncated.t1_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated.t1_truncated (c); +create index k3 on partition_to_be_dropped_or_truncated.t1_truncated (id, c); + +insert into partition_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into partition_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); +-- 4. Drop Table Index/PrimaryKey +create database index_or_primarykey_to_be_dropped; +create table index_or_primarykey_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); +create table index_or_primarykey_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on index_or_primarykey_to_be_dropped.t0 (name); +create index k2 on index_or_primarykey_to_be_dropped.t0 (c); +create index k1 on index_or_primarykey_to_be_dropped.t1 (name); +create index k2 on index_or_primarykey_to_be_dropped.t1 (c); +create index k3 on index_or_primarykey_to_be_dropped.t1 (id, c); + +insert into index_or_primarykey_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into index_or_primarykey_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 5. Drop Table INDEXES +create database indexes_to_be_dropped; +create table indexes_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); +create table indexes_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on indexes_to_be_dropped.t0 (name); +create index k2 on indexes_to_be_dropped.t0 (c); +create index k1 on indexes_to_be_dropped.t1 (name); +create index k2 on indexes_to_be_dropped.t1 (c); +create index k3 on indexes_to_be_dropped.t1 (id, c); + +insert into indexes_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into indexes_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 6. Drop Table Column/Columns +create database column_s_to_be_dropped; +create table column_s_to_be_dropped.t0_column(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped.t1_column(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table column_s_to_be_dropped.t0_columns(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped.t1_columns(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_s_to_be_dropped.t0_column (name); +create index k2 on column_s_to_be_dropped.t0_column (c); +create index k1 on column_s_to_be_dropped.t1_column (name); +create index k2 on column_s_to_be_dropped.t1_column (c); +create index k3 on column_s_to_be_dropped.t1_column (id, c); + +create index k1 on column_s_to_be_dropped.t0_columns (name); +create index k2 on column_s_to_be_dropped.t0_columns (c); +create index k1 on column_s_to_be_dropped.t1_columns (name); +create index k2 on column_s_to_be_dropped.t1_columns (c); +-- create index k3 on column_s_to_be_dropped.t1_columns (id, c); + +insert into column_s_to_be_dropped.t0_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t1_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t0_columns values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t1_columns values (1, 2, "123"), (2, 3, "123"); +-- 7. Modify Table Column +create database column_to_be_modified; +create table column_to_be_modified.t0(id int primary key nonclustered, c int, name char(20)); +create table column_to_be_modified.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_to_be_modified.t0 (name); +create index k2 on column_to_be_modified.t0 (c); +create index k1 on column_to_be_modified.t1 (name); +create index k2 on column_to_be_modified.t1 (c); +create index k3 on column_to_be_modified.t1 (id, c); + +insert into column_to_be_modified.t0 values (1, 2, "123"), (2, 3, "123"); +insert into column_to_be_modified.t1 values (1, 2, "123"), (2, 3, "123"); diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh new file mode 100644 index 0000000000000..25a7fda5588f2 --- /dev/null +++ b/br/tests/br_pitr/run.sh @@ -0,0 +1,100 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +. run_services +CUR=$(cd `dirname $0`; pwd) + +# const value +PREFIX="pitr_backup" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*. +res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" + +# start a new cluster +echo "restart a services" +restart_services + +# prepare the data +echo "prepare the data" +run_sql_file $CUR/prepare_data/delete_range.sql +# ... + +# start the log backup task +echo "start log task" +run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log" + +# run snapshot backup +echo "run snapshot backup" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full" + +# load the incremental data +echo "load the incremental data" +run_sql_file $CUR/incremental_data/delete_range.sql +# ... + +# wait checkpoint advance +echo "wait checkpoint advance" +sleep 10 +current_ts=$(echo $(($(date +%s%3N) << 18))) +echo "current ts: $current_ts" +i=0 +while true; do + # extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty + log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name integration_test --json 2>/dev/null) + echo "log backup status: $log_backup_status" + checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end') + echo "checkpoint ts: $checkpoint_ts" + + # check whether the checkpoint ts is a number + if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then + # check whether the checkpoint has advanced + if [ $checkpoint_ts -gt $current_ts ]; then + echo "the checkpoint has advanced" + break + fi + # the checkpoint hasn't advanced + echo "the checkpoint hasn't advanced" + i=$((i+1)) + if [ "$i" -gt 50 ]; then + echo 'the checkpoint lag is too large' + exit 1 + fi + sleep 10 + else + # unknown status, maybe somewhere is wrong + echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!" + exit 1 + fi +done + +# dump some info from upstream cluster +# ... + +# start a new cluster +echo "restart a services" +restart_services + +# PITR restore +echo "run pitr" +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1 + +# check something in downstream cluster +echo "check br log" +check_contains "restore log success summary" +# check_not_contains "rewrite delete range" +echo "" > $res_file +echo "check sql result" +run_sql "select count(*) DELETE_RANGE_CNT from mysql.gc_delete_range group by ts order by DELETE_RANGE_CNT desc limit 1;" +check_contains "DELETE_RANGE_CNT: 46" diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh index ca66c8d5013ce..717125b6d3552 100755 --- a/br/tests/run_group.sh +++ b/br/tests/run_group.sh @@ -24,7 +24,11 @@ groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history" +<<<<<<< HEAD ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' +======= + ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr' +>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740)) ["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index' ["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn' From 062984c81896e35e550d715633d85bbd29bfb87c Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 23 Oct 2023 21:29:14 +0800 Subject: [PATCH 2/6] fix integration test br_pitr Signed-off-by: Leavrth --- br/pkg/restore/client.go | 2 +- br/pkg/restore/util.go | 10 ++- br/pkg/stream/rewrite_meta_rawkv.go | 90 +++++-------------- .../br_pitr/incremental_data/delete_range.sql | 9 +- .../br_pitr/prepare_data/delete_range.sql | 2 +- br/tests/br_pitr/run.sh | 15 +++- 6 files changed, 49 insertions(+), 79 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 56b8a4e96c8f3..e6d97e2a19a17 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2789,7 +2789,7 @@ func (rc *Client) InitSchemasReplaceForDDL( dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{ Name: newTableInfo.Name.O, TableID: newTableInfo.ID, - PartitionMap: getTableIDMap(newTableInfo, t.Info), + PartitionMap: getPartitionIDMap(newTableInfo, t.Info), IndexMap: getIndexIDMap(newTableInfo, t.Info), } } diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index c1366aa67c472..e00d0d1298fe0 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -43,8 +43,8 @@ type AppliedFile interface { GetEndKey() []byte } -// getTableIDMap creates a map maping old tableID to new tableID. -func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { +// getPartitionIDMap creates a map maping old physical ID to new physical ID. +func getPartitionIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { tableIDMap := make(map[int64]int64) if oldTable.Partition != nil && newTable.Partition != nil { @@ -60,6 +60,12 @@ func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { } } + return tableIDMap +} + +// getTableIDMap creates a map maping old tableID to new tableID. +func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { + tableIDMap := getPartitionIDMap(newTable, oldTable) tableIDMap[oldTable.ID] = newTable.ID return tableIDMap } diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index a4f41215e6d49..213ddf3c452cb 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/tablecodec" filter "github.com/pingcap/tidb/util/table-filter" "go.uber.org/zap" ) @@ -686,14 +687,17 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) erro if !job.IsCancelled() { switch job.Type { case model.ActionAddIndex, model.ActionAddPrimaryKey: - if job.State == model.JobStateRollbackDone { - return sr.deleteRange(job) + // AddJob would filter out the job state + if err := sr.ingestRecorder.AddJob(job, isSubJob); err != nil { + return err } - err := sr.ingestRecorder.AddJob(job, isSubJob) - return errors.Trace(err) - case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey, - model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, - model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes: + return sr.deleteRange(job) + case model.ActionDropSchema, model.ActionDropTable, + model.ActionTruncateTable, model.ActionDropIndex, + model.ActionDropPrimaryKey, + model.ActionDropTablePartition, model.ActionTruncateTablePartition, + model.ActionDropColumn, model.ActionModifyColumn, + model.ActionReorganizePartition: return sr.deleteRange(job) case model.ActionMultiSchemaChange: for _, sub := range job.MultiSchemaInfo.SubJobs { @@ -806,12 +810,12 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { if len(newPhysicalTableIDs) > 0 { sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) } - return nil + // logical table may contain global index regions, so delete the logical table range. } sr.insertDeleteRangeForTable(newJobID, []int64{tableReplace.TableID}) return nil - case model.ActionDropTablePartition, model.ActionTruncateTablePartition: + case model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionReorganizePartition: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { logutil.CL(lctx).Warn( @@ -855,8 +859,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return errors.Trace(err) } + tempIdxID := tablecodec.TempIndexPrefix | indexID var elementID int64 = 1 - indexIDs := []int64{indexID} + var indexIDs []int64 + if job.State == model.JobStateRollbackDone { + indexIDs = []int64{indexID, tempIdxID} + } else { + indexIDs = []int64{tempIdxID} + } if len(partitionIDs) > 0 { for _, oldPid := range partitionIDs { @@ -906,37 +916,6 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) } return nil - case model.ActionDropIndexes: // // Deprecated, we use ActionMultiSchemaChange instead. - var indexIDs []int64 - var partitionIDs []int64 - if err := job.DecodeArgs(&[]model.CIStr{}, &[]bool{}, &indexIDs, &partitionIDs); err != nil { - return errors.Trace(err) - } - // Remove data in TiKV. - if len(indexIDs) == 0 { - return nil - } - - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) - return nil - } - - var elementID int64 = 1 - if len(partitionIDs) > 0 { - for _, oldPid := range partitionIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) - continue - } - sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) - } - } else { - sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) - } - return nil case model.ActionDropColumn: var colName model.CIStr var ifExists bool @@ -967,35 +946,6 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } } return nil - case model.ActionDropColumns: // Deprecated, we use ActionMultiSchemaChange instead. - var colNames []model.CIStr - var ifExists []bool - var indexIDs []int64 - var partitionIDs []int64 - if err := job.DecodeArgs(&colNames, &ifExists, &indexIDs, &partitionIDs); err != nil { - return errors.Trace(err) - } - if len(indexIDs) > 0 { - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) - return nil - } - - var elementID int64 = 1 - if len(partitionIDs) > 0 { - for _, oldPid := range partitionIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) - continue - } - sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) - } - } else { - sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) - } - } case model.ActionModifyColumn: var indexIDs []int64 var partitionIDs []int64 diff --git a/br/tests/br_pitr/incremental_data/delete_range.sql b/br/tests/br_pitr/incremental_data/delete_range.sql index f5afde943649e..c8ee308dfd0c9 100644 --- a/br/tests/br_pitr/incremental_data/delete_range.sql +++ b/br/tests/br_pitr/incremental_data/delete_range.sql @@ -5,14 +5,19 @@ drop table table_to_be_dropped_or_truncated.t0_dropped; drop table table_to_be_dropped_or_truncated.t1_dropped; truncate table table_to_be_dropped_or_truncated.t0_truncated; truncate table table_to_be_dropped_or_truncated.t1_truncated; --- 3. Drop/Truncate Table Partition +-- 3. Drop/Truncate/Reorganize Table Partition alter table partition_to_be_dropped_or_truncated.t1_dropped drop partition p0; alter table partition_to_be_dropped_or_truncated.t1_truncated truncate partition p0; --- 4. Drop Table Index/PrimaryKey +alter table partition_to_be_dropped_or_truncated.t1_truncated reorganize partition p2 INTO (PARTITION p2 VALUES LESS THAN (20), PARTITION p3 VALUES LESS THAN MAXVALUE); +-- 4. Drop Table Index/PrimaryKey or Add Table Index/Primary alter table index_or_primarykey_to_be_dropped.t0 drop index k1; alter table index_or_primarykey_to_be_dropped.t1 drop index k1; alter table index_or_primarykey_to_be_dropped.t0 drop primary key; alter table index_or_primarykey_to_be_dropped.t1 drop primary key; +create index k1 on index_or_primarykey_to_be_dropped.t0 (name); +create index k1 on index_or_primarykey_to_be_dropped.t1 (name); +alter table index_or_primarykey_to_be_dropped.t0 add primary key; +alter table index_or_primarykey_to_be_dropped.t1 add primary key; -- 5. Drop Table Indexes alter table indexes_to_be_dropped.t0 drop index k1, drop index k2; alter table indexes_to_be_dropped.t1 drop index k1, drop index k2; diff --git a/br/tests/br_pitr/prepare_data/delete_range.sql b/br/tests/br_pitr/prepare_data/delete_range.sql index e2a20be9e45fa..9d571ddadbf0a 100644 --- a/br/tests/br_pitr/prepare_data/delete_range.sql +++ b/br/tests/br_pitr/prepare_data/delete_range.sql @@ -36,7 +36,7 @@ insert into table_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2 insert into table_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); insert into table_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); --- 3. Drop/Truncate Table Partition +-- 3. Drop/Truncate/Reorganize Table Partition create database partition_to_be_dropped_or_truncated; create table partition_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); create table partition_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index 25a7fda5588f2..afe400820eb7e 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -31,6 +31,10 @@ echo "prepare the data" run_sql_file $CUR/prepare_data/delete_range.sql # ... +# check something after prepare the data +prepare_delete_range_count=$(run_sql "select count(*) DELETE_RANGE_CNT from (select * from mysql.gc_delete_range union all select * from mysql.gc_delete_range_done) del_range;" | tail -n 1 | awk '{print $2}') +echo "prepare_delete_range_count: $prepare_delete_range_count" + # start the log backup task echo "start log task" run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log" @@ -44,6 +48,10 @@ echo "load the incremental data" run_sql_file $CUR/incremental_data/delete_range.sql # ... +# check something after load the incremental data +incremental_delete_range_count=$(run_sql "select count(*) DELETE_RANGE_CNT from (select * from mysql.gc_delete_range union all select * from mysql.gc_delete_range_done) del_range;" | tail -n 1 | awk '{print $2}') +echo "incremental_delete_range_count: $incremental_delete_range_count" + # wait checkpoint advance echo "wait checkpoint advance" sleep 10 @@ -93,8 +101,9 @@ run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-bac # check something in downstream cluster echo "check br log" check_contains "restore log success summary" -# check_not_contains "rewrite delete range" +check_not_contains "rewrite delete range" echo "" > $res_file echo "check sql result" -run_sql "select count(*) DELETE_RANGE_CNT from mysql.gc_delete_range group by ts order by DELETE_RANGE_CNT desc limit 1;" -check_contains "DELETE_RANGE_CNT: 46" +run_sql "select count(*) DELETE_RANGE_CNT from (select * from mysql.gc_delete_range union all select * from mysql.gc_delete_range_done) del_range group by ts order by DELETE_RANGE_CNT desc limit 1;" +expect_delete_range=$(($incremental_delete_range_count-$prepare_delete_range_count)) +check_contains "DELETE_RANGE_CNT: $expect_delete_range" From 87c934cd69ca52a2ae1d78e98cbc807a3ce43b2d Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 23 Oct 2023 22:42:47 +0800 Subject: [PATCH 3/6] fix unit test Signed-off-by: Leavrth --- br/pkg/stream/rewrite_meta_rawkv.go | 1 - br/pkg/stream/rewrite_meta_rawkv_test.go | 142 +++++++----------- .../br_pitr/incremental_data/delete_range.sql | 4 +- 3 files changed, 53 insertions(+), 94 deletions(-) diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 213ddf3c452cb..2e822488e810e 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -845,7 +845,6 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return nil // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. case model.ActionAddIndex, model.ActionAddPrimaryKey: - // iff job.State = model.JobStateRollbackDone tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index 752796c2ac574..47f28c045560b 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -11,6 +11,7 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" filter "github.com/pingcap/tidb/util/table-filter" "github.com/stretchr/testify/require" @@ -638,23 +639,26 @@ var ( ) var ( - dropSchemaJob = &model.Job{Type: model.ActionDropSchema, SchemaID: mDDLJobDBOldID, RawArgs: json.RawMessage(`[[71,72,73,74,75]]`)} - dropTable0Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)} - dropTable1Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)} - dropTable0Partition1Job = &model.Job{Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} - rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} - rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} - dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)} - dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)} - dropTable0IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)} - dropTable1IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[]]`)} - dropTable0ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)} - dropTable1ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)} - dropTable0ColumnsJob = &model.Job{Type: model.ActionDropColumns, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)} - dropTable1ColumnsJob = &model.Job{Type: model.ActionDropColumns, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[]]`)} - modifyTable0ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)} - modifyTable1ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)} - multiSchemaChangeJob0 = &model.Job{ + dropSchemaJob = &model.Job{Type: model.ActionDropSchema, SchemaID: mDDLJobDBOldID, RawArgs: json.RawMessage(`[[71,72,73,74,75]]`)} + dropTable0Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)} + dropTable1Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)} + dropTable0Partition1Job = &model.Job{Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + reorganizeTable0Partition1Job = &model.Job{Type: model.ActionReorganizePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} + rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} + addTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} + addTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} + dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)} + dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)} + dropTable0IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)} + dropTable1IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[]]`)} + dropTable0ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)} + dropTable1ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)} + dropTable0ColumnsJob = &model.Job{Type: model.ActionDropColumns, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)} + dropTable1ColumnsJob = &model.Job{Type: model.ActionDropColumns, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[]]`)} + modifyTable0ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)} + modifyTable1ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)} + multiSchemaChangeJob0 = &model.Job{ Type: model.ActionMultiSchemaChange, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, @@ -772,6 +776,9 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { _, exist := mDDLJobALLNewPartitionIDSet[tableID] require.True(t, exist) } + targs = <-midr.tableCh + require.Equal(t, len(targs.tableIDs), 1) + require.Equal(t, targs.tableIDs[0], mDDLJobTable0NewID) // drop table1 err = schemaReplace.deleteRange(dropTable1Job) @@ -787,68 +794,71 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { require.Equal(t, len(targs.tableIDs), 1) require.Equal(t, targs.tableIDs[0], mDDLJobPartition1NewID) + // reorganize table partition1 + err = schemaReplace.deleteRange(reorganizeTable0Partition1Job) + require.NoError(t, err) + targs = <-midr.tableCh + require.Equal(t, len(targs.tableIDs), 1) + require.Equal(t, targs.tableIDs[0], mDDLJobPartition1NewID) + // roll back add index for table0 - err = schemaReplace.deleteRange(rollBackTable0IndexJob) + err = schemaReplace.restoreFromHistory(rollBackTable0IndexJob, false) require.NoError(t, err) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { iargs = <-midr.indexCh _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), 1) + require.Equal(t, len(iargs.indexIDs), 2) require.Equal(t, iargs.indexIDs[0], int64(2)) + require.Equal(t, iargs.indexIDs[1], int64(tablecodec.TempIndexPrefix|2)) } // roll back add index for table1 - err = schemaReplace.deleteRange(rollBackTable1IndexJob) + err = schemaReplace.restoreFromHistory(rollBackTable1IndexJob, false) require.NoError(t, err) iargs = <-midr.indexCh require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), 1) + require.Equal(t, len(iargs.indexIDs), 2) require.Equal(t, iargs.indexIDs[0], int64(2)) + require.Equal(t, iargs.indexIDs[1], int64(tablecodec.TempIndexPrefix|2)) - // drop index for table0 - err = schemaReplace.deleteRange(dropTable0IndexJob) + // add index for table 0 + err = schemaReplace.restoreFromHistory(addTable0IndexJob, false) require.NoError(t, err) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { iargs = <-midr.indexCh _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] require.True(t, exist) require.Equal(t, len(iargs.indexIDs), 1) - require.Equal(t, iargs.indexIDs[0], int64(2)) + require.Equal(t, iargs.indexIDs[0], int64(tablecodec.TempIndexPrefix|2)) } - // drop index for table1 - err = schemaReplace.deleteRange(dropTable1IndexJob) + // add index for table 1 + err = schemaReplace.restoreFromHistory(addTable1IndexJob, false) require.NoError(t, err) iargs = <-midr.indexCh require.Equal(t, iargs.tableID, mDDLJobTable1NewID) require.Equal(t, len(iargs.indexIDs), 1) - require.Equal(t, iargs.indexIDs[0], int64(2)) + require.Equal(t, iargs.indexIDs[0], int64(tablecodec.TempIndexPrefix|2)) - // drop indexes for table0 - err = schemaReplace.deleteRange(dropTable0IndexesJob) + // drop index for table0 + err = schemaReplace.deleteRange(dropTable0IndexJob) require.NoError(t, err) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { iargs = <-midr.indexCh _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } + require.Equal(t, len(iargs.indexIDs), 1) + require.Equal(t, iargs.indexIDs[0], int64(2)) } - // drop indexes for table1 - err = schemaReplace.deleteRange(dropTable1IndexesJob) + // drop index for table1 + err = schemaReplace.deleteRange(dropTable1IndexJob) require.NoError(t, err) iargs = <-midr.indexCh require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } + require.Equal(t, len(iargs.indexIDs), 1) + require.Equal(t, iargs.indexIDs[0], int64(2)) // drop column for table0 err = schemaReplace.deleteRange(dropTable0ColumnJob) @@ -875,56 +885,6 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { require.True(t, exist) } - // drop columns for table0 - err = schemaReplace.deleteRange(dropTable0ColumnsJob) - require.NoError(t, err) - for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { - iargs = <-midr.indexCh - _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] - require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } - } - - // drop columns for table1 - err = schemaReplace.deleteRange(dropTable1ColumnsJob) - require.NoError(t, err) - iargs = <-midr.indexCh - require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } - - // drop columns for table0 - err = schemaReplace.deleteRange(modifyTable0ColumnJob) - require.NoError(t, err) - for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { - iargs = <-midr.indexCh - _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] - require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } - } - - // drop columns for table1 - err = schemaReplace.deleteRange(modifyTable1ColumnJob) - require.NoError(t, err) - iargs = <-midr.indexCh - require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } - // drop indexes(multi-schema-change) for table0 err = schemaReplace.restoreFromHistory(multiSchemaChangeJob0, false) require.NoError(t, err) diff --git a/br/tests/br_pitr/incremental_data/delete_range.sql b/br/tests/br_pitr/incremental_data/delete_range.sql index c8ee308dfd0c9..2b68e11f17054 100644 --- a/br/tests/br_pitr/incremental_data/delete_range.sql +++ b/br/tests/br_pitr/incremental_data/delete_range.sql @@ -16,8 +16,8 @@ alter table index_or_primarykey_to_be_dropped.t0 drop primary key; alter table index_or_primarykey_to_be_dropped.t1 drop primary key; create index k1 on index_or_primarykey_to_be_dropped.t0 (name); create index k1 on index_or_primarykey_to_be_dropped.t1 (name); -alter table index_or_primarykey_to_be_dropped.t0 add primary key; -alter table index_or_primarykey_to_be_dropped.t1 add primary key; +alter table index_or_primarykey_to_be_dropped.t0 add primary key (id); +alter table index_or_primarykey_to_be_dropped.t1 add primary key (id); -- 5. Drop Table Indexes alter table indexes_to_be_dropped.t0 drop index k1, drop index k2; alter table indexes_to_be_dropped.t1 drop index k1, drop index k2; From b29c63e748e8f1bea10ca3119fb9bea8f62cae4e Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 26 Oct 2023 10:56:59 +0800 Subject: [PATCH 4/6] resolve conflicts Signed-off-by: Leavrth --- br/tests/run_group.sh | 4 ---- 1 file changed, 4 deletions(-) diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh index 717125b6d3552..e454fec47cfad 100755 --- a/br/tests/run_group.sh +++ b/br/tests/run_group.sh @@ -24,11 +24,7 @@ groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history" -<<<<<<< HEAD - ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' -======= ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr' ->>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740)) ["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index' ["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn' From 3284ef42fa013ab5e6b5874ded84f0570cbbcbe8 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 26 Oct 2023 16:27:15 +0800 Subject: [PATCH 5/6] fix integration test br_pitr Signed-off-by: Leavrth --- br/pkg/stream/rewrite_meta_rawkv.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 2e822488e810e..79761c3d8ddfa 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -807,10 +807,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) } + + // logical table may contain global index regions, so delete the logical table range. + newPhysicalTableIDs = append(newPhysicalTableIDs, tableReplace.TableID) if len(newPhysicalTableIDs) > 0 { sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) } - // logical table may contain global index regions, so delete the logical table range. + + return nil } sr.insertDeleteRangeForTable(newJobID, []int64{tableReplace.TableID}) From 53602dd6ccf8ca6b54870874fac1322b1b820db7 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 26 Oct 2023 16:43:33 +0800 Subject: [PATCH 6/6] fix unit test Signed-off-by: Leavrth --- br/pkg/stream/rewrite_meta_rawkv_test.go | 37 +++++++++++++++++++----- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index 47f28c045560b..0e48f6711cff3 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -650,12 +650,8 @@ var ( addTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)} dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)} - dropTable0IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)} - dropTable1IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[]]`)} dropTable0ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)} dropTable1ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)} - dropTable0ColumnsJob = &model.Job{Type: model.ActionDropColumns, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)} - dropTable1ColumnsJob = &model.Job{Type: model.ActionDropColumns, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[]]`)} modifyTable0ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)} modifyTable1ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)} multiSchemaChangeJob0 = &model.Job{ @@ -771,14 +767,14 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { err = schemaReplace.deleteRange(dropTable0Job) require.NoError(t, err) targs = <-midr.tableCh - require.Equal(t, len(targs.tableIDs), len(mDDLJobALLNewPartitionIDSet)) + require.Equal(t, len(targs.tableIDs), len(mDDLJobALLNewPartitionIDSet)+1) for _, tableID := range targs.tableIDs { _, exist := mDDLJobALLNewPartitionIDSet[tableID] + if !exist { + exist = tableID == mDDLJobTable0NewID + } require.True(t, exist) } - targs = <-midr.tableCh - require.Equal(t, len(targs.tableIDs), 1) - require.Equal(t, targs.tableIDs[0], mDDLJobTable0NewID) // drop table1 err = schemaReplace.deleteRange(dropTable1Job) @@ -885,6 +881,31 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { require.True(t, exist) } + // modify column for table0 + err = schemaReplace.deleteRange(modifyTable0ColumnJob) + require.NoError(t, err) + for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { + iargs = <-midr.indexCh + _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] + require.True(t, exist) + require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) + for _, indexID := range iargs.indexIDs { + _, exist := mDDLJobALLIndexesIDSet[indexID] + require.True(t, exist) + } + } + + // modify column for table1 + err = schemaReplace.deleteRange(modifyTable1ColumnJob) + require.NoError(t, err) + iargs = <-midr.indexCh + require.Equal(t, iargs.tableID, mDDLJobTable1NewID) + require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) + for _, indexID := range iargs.indexIDs { + _, exist := mDDLJobALLIndexesIDSet[indexID] + require.True(t, exist) + } + // drop indexes(multi-schema-change) for table0 err = schemaReplace.restoreFromHistory(multiSchemaChangeJob0, false) require.NoError(t, err)