Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix rollback reorganize partition left intermediate state (#51631) #53469

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,18 +1961,20 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
job.State = model.JobStateCancelled
return ver, err
}
if partInfo.DDLType != model.PartitionTypeNone {
// ALTER TABLE ... PARTITION BY
if partInfo.Type != model.PartitionTypeNone {
// Also remove anything with the new table id
physicalTableIDs = append(physicalTableIDs, tblInfo.Partition.NewTableID)
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
// Reset if it was normal table before
if tblInfo.Partition.Type == model.PartitionTypeNone {
if tblInfo.Partition.Type == model.PartitionTypeNone ||
tblInfo.Partition.DDLType == model.PartitionTypeNone {
tblInfo.Partition = nil
} else {
tblInfo.Partition.NewTableID = 0
tblInfo.Partition.DDLExpr = ""
tblInfo.Partition.DDLColumns = nil
tblInfo.Partition.DDLType = model.PartitionTypeNone
tblInfo.Partition.ClearReorgIntermediateInfo()
}
} else {
// REMOVE PARTITIONING
tblInfo.Partition.ClearReorgIntermediateInfo()
}

ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
Expand Down Expand Up @@ -3017,10 +3019,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
tblInfo.Partition = nil
} else {
// ALTER TABLE ... PARTITION BY
tblInfo.Partition.DDLType = model.PartitionTypeNone
tblInfo.Partition.DDLExpr = ""
tblInfo.Partition.DDLColumns = nil
tblInfo.Partition.NewTableID = 0
tblInfo.Partition.ClearReorgIntermediateInfo()
}
err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs)
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,19 @@ func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model
for _, pd := range addingDefinitions {
partNames = append(partNames, pd.Name.L)
}
job.Args = []interface{}{partNames}
if job.Type == model.ActionReorganizePartition ||
job.Type == model.ActionAlterTablePartitioning ||
job.Type == model.ActionRemovePartitioning {
partInfo := &model.PartitionInfo{}
var pNames []string
err = job.DecodeArgs(&pNames, &partInfo)
if err != nil {
return ver, err
}
job.Args = []any{partNames, partInfo}
} else {
job.Args = []any{partNames}
}
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/parser/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//pkg/parser/charset",
"//pkg/parser/mysql",
Expand Down
8 changes: 8 additions & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,14 @@ func (pi *PartitionInfo) HasTruncatingPartitionID(pid int64) bool {
return false
}

// ClearReorgIntermediateInfo remove intermediate information used during reorganize partition.
func (pi *PartitionInfo) ClearReorgIntermediateInfo() {
pi.DDLType = PartitionTypeNone
pi.DDLExpr = ""
pi.DDLColumns = nil
pi.NewTableID = 0
}

// PartitionState is the state of the partition.
type PartitionState struct {
ID int64 `json:"id"`
Expand Down
13 changes: 13 additions & 0 deletions pkg/parser/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,3 +818,16 @@ func TestTTLJobInterval(t *testing.T) {
require.NoError(t, err)
require.Equal(t, time.Hour*200, interval)
}

func TestClearReorgIntermediateInfo(t *testing.T) {
ptInfo := &PartitionInfo{}
ptInfo.DDLType = PartitionTypeHash
ptInfo.DDLExpr = "Test DDL Expr"
ptInfo.NewTableID = 1111

ptInfo.ClearReorgIntermediateInfo()
require.Equal(t, PartitionTypeNone, ptInfo.DDLType)
require.Equal(t, "", ptInfo.DDLExpr)
require.Equal(t, true, ptInfo.DDLColumns == nil)
require.Equal(t, int64(0), ptInfo.NewTableID)
}
2 changes: 1 addition & 1 deletion pkg/table/tables/test/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"partition_test.go",
],
flaky = True,
shard_count = 18,
shard_count = 19,
deps = [
"//pkg/ddl",
"//pkg/domain",
Expand Down
81 changes: 81 additions & 0 deletions pkg/table/tables/test/partition/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3057,3 +3057,84 @@ func TestPointGetKeyPartitioning(t *testing.T) {
tk.MustExec(`INSERT INTO t VALUES ('Aa', 'Ab', 'Ac'), ('Ba', 'Bb', 'Bc')`)
tk.MustQuery(`SELECT * FROM t WHERE b = 'Ab'`).Check(testkit.Rows("Aa Ab Ac"))
}

// Issue TiDB #51090.
func TestAlterTablePartitionRollback(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk3 := testkit.NewTestKit(t, store)
tk4 := testkit.NewTestKit(t, store)
tk5 := testkit.NewTestKit(t, store)
tk.MustExec(`use test;`)
tk2.MustExec(`use test;`)
tk3.MustExec(`use test;`)
tk4.MustExec(`use test;`)
tk5.MustExec(`use test;`)
tk.MustExec(`create table t(a int);`)
tk.MustExec(`insert into t values(1), (2), (3);`)

alterChan := make(chan error)
alterPartition := func() {
err := tk4.ExecToErr(`alter table t partition by hash(a) partitions 3;`)
alterChan <- err
}
waitFor := func(s string) {
for {
select {
case alterErr := <-alterChan:
require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr)
default:
// Alter still running
}
res := tk5.MustQuery(`admin show ddl jobs where db_name = 'test' and table_name = 't' and job_type = 'alter table partition by'`).Rows()
if len(res) > 0 && res[0][4] == s {
logutil.BgLogger().Info("Got state", zap.String("State", s))
break
}
gotime.Sleep(10 * gotime.Millisecond)
}
dom := domain.GetDomain(tk5.Session())
// Make sure the table schema is the new schema.
require.NoError(t, dom.Reload())
}

testFunc := func(states []string) {
for i, s := range states {
if i%2 == 0 {
tk2.MustExec(`begin;`)
tk2.MustExec(`select 1 from t;`)
if i > 0 {
tk3.MustExec(`commit;`)
}
} else {
tk3.MustExec(`begin;`)
tk3.MustExec(`select 1 from t;`)
tk2.MustExec(`commit;`)
}
if i == 0 {
go alterPartition()
}
waitFor(s)
if i == len(states)-1 {
break
}
}
res := tk.MustQuery(`admin show ddl jobs where table_name = 't' and job_type = 'alter table partition by'`).Rows()
tk.MustExec(fmt.Sprintf("admin cancel ddl jobs %v", res[0][0]))
tk2.MustExec(`commit;`)
tk3.MustExec(`commit;`)
require.ErrorContains(t, <-alterChan, "[ddl:8214]Cancelled DDL job")
tk.MustQuery(`show create table t;`).Check(testkit.Rows(
"t CREATE TABLE `t` (\n" +
" `a` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustQuery(`select a from t order by a;`).Check(testkit.Rows("1", "2", "3"))
}

states := []string{"delete only", "write only", "write reorganization", "delete reorganization"}
for i := range states {
testFunc(states[:i+1])
}
}