Skip to content

Commit

Permalink
ddl: change drop partition and truncate partition's job args to suppo…
Browse files Browse the repository at this point in the history
…rt multi partition id array (#18419) (#18930)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Aug 3, 2020
1 parent 05a4cf6 commit 64e2a23
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 51 deletions.
8 changes: 5 additions & 3 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2564,14 +2564,15 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
if err != nil {
return errors.Trace(err)
}
pids := []int64{pid}

job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
Type: model.ActionTruncateTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{pid},
Args: []interface{}{pids},
}

err = d.doDDLJob(ctx, job)
Expand Down Expand Up @@ -2603,7 +2604,8 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *
}

partName := spec.PartitionNames[0].L
err = checkDropTablePartition(meta, partName)
partNames := []string{partName}
err = checkDropTablePartition(meta, partNames)
if err != nil {
if ErrDropPartitionNonExistent.Equal(err) && spec.IfExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
Expand All @@ -2618,7 +2620,7 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *
SchemaName: schema.Name.L,
Type: model.ActionDropTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partName},
Args: []interface{}{partNames},
}

err = d.doDDLJob(ctx, job)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) {

// test truncate table partition failed caused by canceled.
test = &tests[24]
truncateTblPartitionArgs := []interface{}{partitionTblInfo.Partition.Definitions[0].ID}
truncateTblPartitionArgs := []interface{}{[]int64{partitionTblInfo.Partition.Definitions[0].ID}}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, partitionTblInfo.ID, test.act, truncateTblPartitionArgs, &test.cancelState)
c.Check(checkErr, IsNil)
changedTable = testGetTable(c, d, dbInfo.ID, partitionTblInfo.ID)
Expand Down
14 changes: 9 additions & 5 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,17 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
return doInsert(s, job.ID, tableID, startKey, endKey, now)
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
var physicalTableID int64
if err := job.DecodeArgs(&physicalTableID); err != nil {
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
return errors.Trace(err)
}
startKey := tablecodec.EncodeTablePrefix(physicalTableID)
endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1)
return doInsert(s, job.ID, physicalTableID, startKey, endKey, now)
for _, physicalTableID := range physicalTableIDs {
startKey := tablecodec.EncodeTablePrefix(physicalTableID)
endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1)
if err := doInsert(s, job.ID, physicalTableID, startKey, endKey, now); err != nil {
return errors.Trace(err)
}
}
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
tableID := job.TableID
Expand Down
99 changes: 57 additions & 42 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,44 +562,54 @@ func validRangePartitionType(col *model.ColumnInfo) bool {
}

// checkDropTablePartition checks if the partition exists and does not allow deleting the last existing partition in the table.
func checkDropTablePartition(meta *model.TableInfo, partName string) error {
func checkDropTablePartition(meta *model.TableInfo, partLowerNames []string) error {
pi := meta.Partition
if pi.Type != model.PartitionTypeRange && pi.Type != model.PartitionTypeList {
return errOnlyOnRangeListPartition.GenWithStackByArgs("DROP")
}
oldDefs := pi.Definitions
for _, def := range oldDefs {
if strings.EqualFold(def.Name.L, strings.ToLower(partName)) {
if len(oldDefs) == 1 {
return errors.Trace(ErrDropLastPartition)
for _, pn := range partLowerNames {
found := false
for _, def := range oldDefs {
if def.Name.L == pn {
found = true
break
}
return nil
}
if !found {
return errors.Trace(ErrDropPartitionNonExistent.GenWithStackByArgs(pn))
}
}
if len(oldDefs) == len(partLowerNames) {
return errors.Trace(ErrDropLastPartition)
}
return errors.Trace(ErrDropPartitionNonExistent.GenWithStackByArgs(partName))
return nil
}

// removePartitionInfo each ddl job deletes a partition.
func removePartitionInfo(tblInfo *model.TableInfo, partName string) int64 {
func removePartitionInfo(tblInfo *model.TableInfo, partLowerNames []string) []int64 {
oldDefs := tblInfo.Partition.Definitions
newDefs := make([]model.PartitionDefinition, 0, len(oldDefs)-1)
var pid int64
for i := 0; i < len(oldDefs); i++ {
if !strings.EqualFold(oldDefs[i].Name.L, strings.ToLower(partName)) {
continue
var pids []int64
for _, partName := range partLowerNames {
for i := 0; i < len(oldDefs); i++ {
if oldDefs[i].Name.L != partName {
continue
}
pids = append(pids, oldDefs[i].ID)
newDefs = append(oldDefs[:i], oldDefs[i+1:]...)
break
}
pid = oldDefs[i].ID
newDefs = append(oldDefs[:i], oldDefs[i+1:]...)
break
}

tblInfo.Partition.Definitions = newDefs
return pid
return pids
}

// onDropTablePartition deletes old partition meta.
func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var partName string
if err := job.DecodeArgs(&partName); err != nil {
var partNames []string
if err := job.DecodeArgs(&partNames); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand All @@ -608,12 +618,12 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}
// If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist.
err = checkDropTablePartition(tblInfo, partName)
err = checkDropTablePartition(tblInfo, partNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
physicalTableID := removePartitionInfo(tblInfo, partName)
physicalTableIDs := removePartitionInfo(tblInfo, partNames)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -622,15 +632,15 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
// A background job will be created to delete old partition data.
job.Args = []interface{}{physicalTableID}
job.Args = []interface{}{physicalTableIDs}
return ver, nil
}

// onDropTablePartition truncates old partition meta.
func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) {
var ver int64
var oldID int64
if err := job.DecodeArgs(&oldID); err != nil {
var oldIDs []int64
if err := job.DecodeArgs(&oldIDs); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand All @@ -643,33 +653,38 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
return ver, errors.Trace(ErrPartitionMgmtOnNonpartitioned)
}

var newPartition *model.PartitionDefinition
for i := 0; i < len(pi.Definitions); i++ {
def := &pi.Definitions[i]
if def.ID == oldID {
pid, err1 := t.GenGlobalID()
if err != nil {
return ver, errors.Trace(err1)
newPartitions := make([]model.PartitionDefinition, 0, len(oldIDs))
for _, oldID := range oldIDs {
for i := 0; i < len(pi.Definitions); i++ {
def := &pi.Definitions[i]
if def.ID == oldID {
pid, err1 := t.GenGlobalID()
if err != nil {
return ver, errors.Trace(err1)
}
def.ID = pid
// Shallow copy only use the def.ID in event handle.
newPartitions = append(newPartitions, *def)
break
}
def.ID = pid
newPartition = def
break
}
}
if newPartition == nil {
if len(newPartitions) == 0 {
return ver, table.ErrUnknownPartition.GenWithStackByArgs("drop?", tblInfo.Name.O)
}

// Clear the tiflash replica available status.
if tblInfo.TiFlashReplica != nil {
tblInfo.TiFlashReplica.Available = false
// Set partition replica become unavailable.
for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs {
if id == oldID {
newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i]
newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...)
tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs
break
for _, oldID := range oldIDs {
for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs {
if id == oldID {
newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i]
newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...)
tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs
break
}
}
}
}
Expand All @@ -681,9 +696,9 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
asyncNotifyEvent(d, &util.Event{Tp: model.ActionTruncateTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: []model.PartitionDefinition{*newPartition}}})
asyncNotifyEvent(d, &util.Event{Tp: model.ActionTruncateTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: newPartitions}})
// A background job will be created to delete old partition data.
job.Args = []interface{}{oldID}
job.Args = []interface{}{oldIDs}
return ver, nil
}

Expand Down
154 changes: 154 additions & 0 deletions ddl/partition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
)

var _ = Suite(&testPartitionSuite{})

type testPartitionSuite struct {
store kv.Storage
}

func (s *testPartitionSuite) SetUpSuite(c *C) {
s.store = testCreateStore(c, "test_store")
}

func (s *testPartitionSuite) TearDownSuite(c *C) {
err := s.store.Close()
c.Assert(err, IsNil)
}

func (s *testPartitionSuite) TestDropAndTruncatePartition(c *C) {
d := testNewDDLAndStart(
context.Background(),
c,
WithStore(s.store),
WithLease(testLease),
)
defer d.Stop()
dbInfo := testSchemaInfo(c, d, "test_partition")
testCreateSchema(c, testNewContext(d), d, dbInfo)
// generate 5 partition in tableInfo.
tblInfo, partIDs := buildTableInfoWithPartition(c, d)
ctx := testNewContext(d)
testCreateTable(c, ctx, d, dbInfo, tblInfo)

testDropPartition(c, ctx, d, dbInfo, tblInfo, []string{"p0", "p1"})

testTruncatePartition(c, ctx, d, dbInfo, tblInfo, []int64{partIDs[3], partIDs[4]})
}

func buildTableInfoWithPartition(c *C, d *ddl) (*model.TableInfo, []int64) {
tbl := &model.TableInfo{
Name: model.NewCIStr("t"),
}
col := &model.ColumnInfo{
Name: model.NewCIStr("c"),
Offset: 1,
State: model.StatePublic,
FieldType: *types.NewFieldType(mysql.TypeLong),
ID: allocateColumnID(tbl),
}
genIDs, err := d.genGlobalIDs(1)
c.Assert(err, IsNil)
tbl.ID = genIDs[0]
tbl.Columns = []*model.ColumnInfo{col}
tbl.Charset = "utf8"
tbl.Collate = "utf8_bin"

partIDs, err := d.genGlobalIDs(5)
c.Assert(err, IsNil)
partInfo := &model.PartitionInfo{
Type: model.PartitionTypeRange,
Expr: tbl.Columns[0].Name.L,
Enable: true,
Definitions: []model.PartitionDefinition{
{
ID: partIDs[0],
Name: model.NewCIStr("p0"),
LessThan: []string{"100"},
},
{
ID: partIDs[1],
Name: model.NewCIStr("p1"),
LessThan: []string{"200"},
},
{
ID: partIDs[2],
Name: model.NewCIStr("p2"),
LessThan: []string{"300"},
},
{
ID: partIDs[3],
Name: model.NewCIStr("p3"),
LessThan: []string{"400"},
},
{
ID: partIDs[4],
Name: model.NewCIStr("p4"),
LessThan: []string{"500"},
},
},
}
tbl.Partition = partInfo
return tbl, partIDs
}

func buildDropPartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, partNames []string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionDropTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partNames},
}
}

func testDropPartition(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, partNames []string) *model.Job {
job := buildDropPartitionJob(dbInfo, tblInfo, partNames)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func buildTruncatePartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, pids []int64) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionTruncateTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{pids},
}
}

func testTruncatePartition(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, pids []int64) *model.Job {
job := buildTruncatePartitionJob(dbInfo, tblInfo, pids)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

0 comments on commit 64e2a23

Please sign in to comment.