Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#47740
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Leavrth authored and ti-chi-bot committed Oct 23, 2023
1 parent 0540e51 commit c81b679
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 22 deletions.
106 changes: 84 additions & 22 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,21 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
<<<<<<< HEAD
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
filter "github.com/pingcap/tidb/util/table-filter"
=======
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
"go.uber.org/zap"
)

Expand Down Expand Up @@ -587,10 +598,11 @@ func (sr *SchemasReplace) tryToGCJob(job *model.Job) error {
}

func (sr *SchemasReplace) deleteRange(job *model.Job) error {
lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range"))
dbReplace, exist := sr.DbMap[job.SchemaID]
if !exist {
// skip this mddljob, the same below
log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
return nil
}

Expand Down Expand Up @@ -626,21 +638,36 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
newTableIDs := make([]int64, 0, len(tableIDs))
for tableID, tableReplace := range dbReplace.TableMap {
if _, exist := argsSet[tableID]; !exist {
<<<<<<< HEAD
log.Debug("DropSchema: record a table, but it doesn't exist in job args", zap.Int64("oldTableID", tableID))
=======
logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args",
zap.Int64("oldTableID", tableID))
>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
continue
}
newTableIDs = append(newTableIDs, tableReplace.NewTableID)
for partitionID, newPartitionID := range tableReplace.PartitionMap {
if _, exist := argsSet[partitionID]; !exist {
<<<<<<< HEAD
log.Debug("DropSchema: record a partition, but it doesn't exist in job args", zap.Int64("oldPartitionID", partitionID))
=======
logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args",
zap.Int64("oldPartitionID", partitionID))
>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
continue
}
newTableIDs = append(newTableIDs, newPartitionID)
}
}

if len(newTableIDs) != len(tableIDs) {
<<<<<<< HEAD
log.Debug("DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
=======
logutil.CL(lctx).Warn(
"DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
// only drop newTableIDs' ranges
}

Expand All @@ -653,7 +680,12 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropTable, model.ActionTruncateTable:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
<<<<<<< HEAD
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
=======
logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
return nil
}

Expand All @@ -665,17 +697,23 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
return errors.Trace(err)
}
if len(physicalTableIDs) > 0 {
// delete partition id instead of table id
for i := 0; i < len(physicalTableIDs); i++ {
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
// delete partition id
for _, oldPid := range physicalTableIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
<<<<<<< HEAD
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", physicalTableIDs[i]))
=======
logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
continue
}
physicalTableIDs[i] = newPid
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
}
if len(physicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
if len(newPhysicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
}
return nil
}
Expand All @@ -685,32 +723,50 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
<<<<<<< HEAD
log.Debug("DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
=======
logutil.CL(lctx).Warn(
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
return nil
}
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
return errors.Trace(err)
}

for i := 0; i < len(physicalTableIDs); i++ {
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
for _, oldPid := range physicalTableIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
<<<<<<< HEAD
log.Debug("DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", physicalTableIDs[i]))
=======
logutil.CL(lctx).Warn(
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
continue
}
physicalTableIDs[i] = newPid
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
}
if len(physicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
if len(newPhysicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
}
return nil
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
// iff job.State = model.JobStateRollbackDone
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
<<<<<<< HEAD
log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
=======
logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
return nil
}
var indexID int64
Expand All @@ -727,7 +783,13 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
<<<<<<< HEAD
log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
=======
logutil.CL(lctx).Warn(
"AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
continue
}

Expand All @@ -740,7 +802,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -759,7 +821,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
// len(indexIDs) = 1
Expand All @@ -782,7 +844,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {

tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -791,7 +853,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -811,7 +873,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
if len(indexIDs) > 0 {
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -820,7 +882,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -841,7 +903,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
if len(indexIDs) > 0 {
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -850,7 +912,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -870,7 +932,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
}
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -879,7 +941,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/stream/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func TestDateFormat(t *testing.T) {
434605479096221697,
"2022-07-15 20:32:12.734 +0800",
},
{
434605478903808000,
"2022-07-15 20:32:12 +0800",
},
}

timeZone, _ := time.LoadLocation("Asia/Shanghai")
Expand Down
25 changes: 25 additions & 0 deletions br/tests/br_pitr/incremental_data/delete_range.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- 1. Drop Schema
drop database db_to_be_dropped;
-- 2. Drop/Truncate Table
drop table table_to_be_dropped_or_truncated.t0_dropped;
drop table table_to_be_dropped_or_truncated.t1_dropped;
truncate table table_to_be_dropped_or_truncated.t0_truncated;
truncate table table_to_be_dropped_or_truncated.t1_truncated;
-- 3. Drop/Truncate Table Partition
alter table partition_to_be_dropped_or_truncated.t1_dropped drop partition p0;
alter table partition_to_be_dropped_or_truncated.t1_truncated truncate partition p0;
-- 4. Drop Table Index/PrimaryKey
alter table index_or_primarykey_to_be_dropped.t0 drop index k1;
alter table index_or_primarykey_to_be_dropped.t1 drop index k1;
alter table index_or_primarykey_to_be_dropped.t0 drop primary key;
alter table index_or_primarykey_to_be_dropped.t1 drop primary key;
-- 5. Drop Table Indexes
alter table indexes_to_be_dropped.t0 drop index k1, drop index k2;
alter table indexes_to_be_dropped.t1 drop index k1, drop index k2;
-- 6. Drop Table Column/Columns
alter table column_s_to_be_dropped.t0_column drop column name;
alter table column_s_to_be_dropped.t1_column drop column name;
alter table column_s_to_be_dropped.t0_columns drop column name, drop column c;
alter table column_s_to_be_dropped.t1_columns drop column name, drop column c;
-- 7. Modify Table Column
alter table column_to_be_modified.t0 modify column name varchar(25);
Loading

0 comments on commit c81b679

Please sign in to comment.