From 64e2a23b74c605bad8aa7bab14121b72db18137e Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Mon, 3 Aug 2020 11:49:34 +0800 Subject: [PATCH] ddl: change drop partition and truncate partition's job args to support multi partition id array (#18419) (#18930) Signed-off-by: ti-srebot --- ddl/ddl_api.go | 8 ++- ddl/ddl_worker_test.go | 2 +- ddl/delete_range.go | 14 ++-- ddl/partition.go | 99 +++++++++++++++----------- ddl/partition_test.go | 154 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 226 insertions(+), 51 deletions(-) create mode 100644 ddl/partition_test.go diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 290fc460901a5..a84825ae976a9 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2564,6 +2564,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp if err != nil { return errors.Trace(err) } + pids := []int64{pid} job := &model.Job{ SchemaID: schema.ID, @@ -2571,7 +2572,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp SchemaName: schema.Name.L, Type: model.ActionTruncateTablePartition, BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{pid}, + Args: []interface{}{pids}, } err = d.doDDLJob(ctx, job) @@ -2603,7 +2604,8 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec * } partName := spec.PartitionNames[0].L - err = checkDropTablePartition(meta, partName) + partNames := []string{partName} + err = checkDropTablePartition(meta, partNames) if err != nil { if ErrDropPartitionNonExistent.Equal(err) && spec.IfExists { ctx.GetSessionVars().StmtCtx.AppendNote(err) @@ -2618,7 +2620,7 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec * SchemaName: schema.Name.L, Type: model.ActionDropTablePartition, BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{partName}, + Args: []interface{}{partNames}, } err = d.doDDLJob(ctx, job) diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 6faf4080b2b29..3ff5c01d25624 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -792,7 +792,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) { // test truncate table partition failed caused by canceled. test = &tests[24] - truncateTblPartitionArgs := []interface{}{partitionTblInfo.Partition.Definitions[0].ID} + truncateTblPartitionArgs := []interface{}{[]int64{partitionTblInfo.Partition.Definitions[0].ID}} doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, partitionTblInfo.ID, test.act, truncateTblPartitionArgs, &test.cancelState) c.Check(checkErr, IsNil) changedTable = testGetTable(c, d, dbInfo.ID, partitionTblInfo.ID) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index ad1ae3fce3af2..45059716a824d 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -289,13 +289,17 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error endKey := tablecodec.EncodeTablePrefix(tableID + 1) return doInsert(s, job.ID, tableID, startKey, endKey, now) case model.ActionDropTablePartition, model.ActionTruncateTablePartition: - var physicalTableID int64 - if err := job.DecodeArgs(&physicalTableID); err != nil { + var physicalTableIDs []int64 + if err := job.DecodeArgs(&physicalTableIDs); err != nil { return errors.Trace(err) } - startKey := tablecodec.EncodeTablePrefix(physicalTableID) - endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1) - return doInsert(s, job.ID, physicalTableID, startKey, endKey, now) + for _, physicalTableID := range physicalTableIDs { + startKey := tablecodec.EncodeTablePrefix(physicalTableID) + endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1) + if err := doInsert(s, job.ID, physicalTableID, startKey, endKey, now); err != nil { + return errors.Trace(err) + } + } // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. case model.ActionAddIndex, model.ActionAddPrimaryKey: tableID := job.TableID diff --git a/ddl/partition.go b/ddl/partition.go index 481f0da5d2be2..0dd2f97c2fb02 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -562,44 +562,54 @@ func validRangePartitionType(col *model.ColumnInfo) bool { } // checkDropTablePartition checks if the partition exists and does not allow deleting the last existing partition in the table. -func checkDropTablePartition(meta *model.TableInfo, partName string) error { +func checkDropTablePartition(meta *model.TableInfo, partLowerNames []string) error { pi := meta.Partition if pi.Type != model.PartitionTypeRange && pi.Type != model.PartitionTypeList { return errOnlyOnRangeListPartition.GenWithStackByArgs("DROP") } oldDefs := pi.Definitions - for _, def := range oldDefs { - if strings.EqualFold(def.Name.L, strings.ToLower(partName)) { - if len(oldDefs) == 1 { - return errors.Trace(ErrDropLastPartition) + for _, pn := range partLowerNames { + found := false + for _, def := range oldDefs { + if def.Name.L == pn { + found = true + break } - return nil } + if !found { + return errors.Trace(ErrDropPartitionNonExistent.GenWithStackByArgs(pn)) + } + } + if len(oldDefs) == len(partLowerNames) { + return errors.Trace(ErrDropLastPartition) } - return errors.Trace(ErrDropPartitionNonExistent.GenWithStackByArgs(partName)) + return nil } // removePartitionInfo each ddl job deletes a partition. -func removePartitionInfo(tblInfo *model.TableInfo, partName string) int64 { +func removePartitionInfo(tblInfo *model.TableInfo, partLowerNames []string) []int64 { oldDefs := tblInfo.Partition.Definitions newDefs := make([]model.PartitionDefinition, 0, len(oldDefs)-1) - var pid int64 - for i := 0; i < len(oldDefs); i++ { - if !strings.EqualFold(oldDefs[i].Name.L, strings.ToLower(partName)) { - continue + var pids []int64 + for _, partName := range partLowerNames { + for i := 0; i < len(oldDefs); i++ { + if oldDefs[i].Name.L != partName { + continue + } + pids = append(pids, oldDefs[i].ID) + newDefs = append(oldDefs[:i], oldDefs[i+1:]...) + break } - pid = oldDefs[i].ID - newDefs = append(oldDefs[:i], oldDefs[i+1:]...) - break } + tblInfo.Partition.Definitions = newDefs - return pid + return pids } // onDropTablePartition deletes old partition meta. func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) { - var partName string - if err := job.DecodeArgs(&partName); err != nil { + var partNames []string + if err := job.DecodeArgs(&partNames); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -608,12 +618,12 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) { return ver, errors.Trace(err) } // If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist. - err = checkDropTablePartition(tblInfo, partName) + err = checkDropTablePartition(tblInfo, partNames) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - physicalTableID := removePartitionInfo(tblInfo, partName) + physicalTableIDs := removePartitionInfo(tblInfo, partNames) ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) @@ -622,15 +632,15 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) { // Finish this job. job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) // A background job will be created to delete old partition data. - job.Args = []interface{}{physicalTableID} + job.Args = []interface{}{physicalTableIDs} return ver, nil } // onDropTablePartition truncates old partition meta. func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) { var ver int64 - var oldID int64 - if err := job.DecodeArgs(&oldID); err != nil { + var oldIDs []int64 + if err := job.DecodeArgs(&oldIDs); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -643,20 +653,23 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e return ver, errors.Trace(ErrPartitionMgmtOnNonpartitioned) } - var newPartition *model.PartitionDefinition - for i := 0; i < len(pi.Definitions); i++ { - def := &pi.Definitions[i] - if def.ID == oldID { - pid, err1 := t.GenGlobalID() - if err != nil { - return ver, errors.Trace(err1) + newPartitions := make([]model.PartitionDefinition, 0, len(oldIDs)) + for _, oldID := range oldIDs { + for i := 0; i < len(pi.Definitions); i++ { + def := &pi.Definitions[i] + if def.ID == oldID { + pid, err1 := t.GenGlobalID() + if err != nil { + return ver, errors.Trace(err1) + } + def.ID = pid + // Shallow copy only use the def.ID in event handle. + newPartitions = append(newPartitions, *def) + break } - def.ID = pid - newPartition = def - break } } - if newPartition == nil { + if len(newPartitions) == 0 { return ver, table.ErrUnknownPartition.GenWithStackByArgs("drop?", tblInfo.Name.O) } @@ -664,12 +677,14 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e if tblInfo.TiFlashReplica != nil { tblInfo.TiFlashReplica.Available = false // Set partition replica become unavailable. - for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs { - if id == oldID { - newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i] - newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...) - tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs - break + for _, oldID := range oldIDs { + for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs { + if id == oldID { + newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i] + newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...) + tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs + break + } } } } @@ -681,9 +696,9 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e // Finish this job. job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) - asyncNotifyEvent(d, &util.Event{Tp: model.ActionTruncateTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: []model.PartitionDefinition{*newPartition}}}) + asyncNotifyEvent(d, &util.Event{Tp: model.ActionTruncateTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: newPartitions}}) // A background job will be created to delete old partition data. - job.Args = []interface{}{oldID} + job.Args = []interface{}{oldIDs} return ver, nil } diff --git a/ddl/partition_test.go b/ddl/partition_test.go new file mode 100644 index 0000000000000..1cebff9200301 --- /dev/null +++ b/ddl/partition_test.go @@ -0,0 +1,154 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + + . "github.com/pingcap/check" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" +) + +var _ = Suite(&testPartitionSuite{}) + +type testPartitionSuite struct { + store kv.Storage +} + +func (s *testPartitionSuite) SetUpSuite(c *C) { + s.store = testCreateStore(c, "test_store") +} + +func (s *testPartitionSuite) TearDownSuite(c *C) { + err := s.store.Close() + c.Assert(err, IsNil) +} + +func (s *testPartitionSuite) TestDropAndTruncatePartition(c *C) { + d := testNewDDLAndStart( + context.Background(), + c, + WithStore(s.store), + WithLease(testLease), + ) + defer d.Stop() + dbInfo := testSchemaInfo(c, d, "test_partition") + testCreateSchema(c, testNewContext(d), d, dbInfo) + // generate 5 partition in tableInfo. + tblInfo, partIDs := buildTableInfoWithPartition(c, d) + ctx := testNewContext(d) + testCreateTable(c, ctx, d, dbInfo, tblInfo) + + testDropPartition(c, ctx, d, dbInfo, tblInfo, []string{"p0", "p1"}) + + testTruncatePartition(c, ctx, d, dbInfo, tblInfo, []int64{partIDs[3], partIDs[4]}) +} + +func buildTableInfoWithPartition(c *C, d *ddl) (*model.TableInfo, []int64) { + tbl := &model.TableInfo{ + Name: model.NewCIStr("t"), + } + col := &model.ColumnInfo{ + Name: model.NewCIStr("c"), + Offset: 1, + State: model.StatePublic, + FieldType: *types.NewFieldType(mysql.TypeLong), + ID: allocateColumnID(tbl), + } + genIDs, err := d.genGlobalIDs(1) + c.Assert(err, IsNil) + tbl.ID = genIDs[0] + tbl.Columns = []*model.ColumnInfo{col} + tbl.Charset = "utf8" + tbl.Collate = "utf8_bin" + + partIDs, err := d.genGlobalIDs(5) + c.Assert(err, IsNil) + partInfo := &model.PartitionInfo{ + Type: model.PartitionTypeRange, + Expr: tbl.Columns[0].Name.L, + Enable: true, + Definitions: []model.PartitionDefinition{ + { + ID: partIDs[0], + Name: model.NewCIStr("p0"), + LessThan: []string{"100"}, + }, + { + ID: partIDs[1], + Name: model.NewCIStr("p1"), + LessThan: []string{"200"}, + }, + { + ID: partIDs[2], + Name: model.NewCIStr("p2"), + LessThan: []string{"300"}, + }, + { + ID: partIDs[3], + Name: model.NewCIStr("p3"), + LessThan: []string{"400"}, + }, + { + ID: partIDs[4], + Name: model.NewCIStr("p4"), + LessThan: []string{"500"}, + }, + }, + } + tbl.Partition = partInfo + return tbl, partIDs +} + +func buildDropPartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, partNames []string) *model.Job { + return &model.Job{ + SchemaID: dbInfo.ID, + TableID: tblInfo.ID, + Type: model.ActionDropTablePartition, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{partNames}, + } +} + +func testDropPartition(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, partNames []string) *model.Job { + job := buildDropPartitionJob(dbInfo, tblInfo, partNames) + err := d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + v := getSchemaVer(c, ctx) + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + return job +} + +func buildTruncatePartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, pids []int64) *model.Job { + return &model.Job{ + SchemaID: dbInfo.ID, + TableID: tblInfo.ID, + Type: model.ActionTruncateTablePartition, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{pids}, + } +} + +func testTruncatePartition(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, pids []int64) *model.Job { + job := buildTruncatePartitionJob(dbInfo, tblInfo, pids) + err := d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + v := getSchemaVer(c, ctx) + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + return job +}