Skip to content

Commit

Permalink
ddl: Corrected index management during REORGANIZE PARTITION (#56786) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 16, 2024
1 parent df4b5d5 commit 6bcd156
Show file tree
Hide file tree
Showing 9 changed files with 503 additions and 157 deletions.
103 changes: 47 additions & 56 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2185,9 +2185,7 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique &&
indexInfo.State == model.StateDeleteReorganization &&
tblInfo.Partition.DDLState == model.StateDeleteReorganization {
if indexInfo.State == model.StateWriteOnly {
dropIndices = append(dropIndices, indexInfo)
}
}
Expand Down Expand Up @@ -3043,9 +3041,6 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
}

func getNewGlobal(partInfo *model.PartitionInfo, idx *model.IndexInfo) bool {
if len(partInfo.DDLUpdateIndexes) == 0 {
return idx.Global
}
for _, newIdx := range partInfo.DDLUpdateIndexes {
if strings.EqualFold(idx.Name.L, newIdx.IndexName) {
return newIdx.Global
Expand Down Expand Up @@ -3151,6 +3146,9 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
//
// Everything now looks as it should, no memory of old partitions/indexes,
// and no more double writing, since the previous state is only reading the new partitions/indexes.
//
// Note: Special handling is also required in tables.newPartitionedTable(),
// to get per partition indexes in the right state.
func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args, err := model.GetTablePartitionArgs(job)
if err != nil {
Expand Down Expand Up @@ -3262,39 +3260,33 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
if err != nil {
return ver, errors.Trace(err)
}
if !inAllPartitionColumns {
// Currently only support Explicit Global indexes.
if !newGlobal {
job.State = model.JobStateCancelled
return ver, dbterror.ErrGlobalIndexNotExplicitlySet.GenWithStackByArgs(index.Name.O)
}
// Duplicate the unique indexes with new index ids.
// If previously was Global or will be Global:
// it must be recreated with new index ID
// TODO: Could we allow that session in StateWriteReorganization, when StateDeleteReorganization
// has started, may not find changes through the global index that sessions in StateDeleteReorganization made?
// If so, then we could avoid copying the full Global Index if it has not changed from LOCAL!
// It might be possible to use the new, not yet public partitions to access those rows?!
// Just that it would not work with explicit partition select SELECT FROM t PARTITION (p,...)
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
newIndex.Global = true
tblInfo.Indices = append(tblInfo.Indices, newIndex)
} else {
if newGlobal {
// TODO: For the future loosen this restriction and allow global indexes for unique keys also including all partitioning columns
return ver, dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs(fmt.Sprintf("PARTITION BY, index '%v' is unique and contains all partitioning columns, but has Global Index set", index.Name.O))
}
if index.Global {
// Index was previously Global, now it needs to be duplicated and become a local index.
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
newIndex.Global = false
tblInfo.Indices = append(tblInfo.Indices, newIndex)
}
// Currently only support Explicit Global indexes.
if !inAllPartitionColumns && !newGlobal {
job.State = model.JobStateCancelled
return ver, dbterror.ErrGlobalIndexNotExplicitlySet.GenWithStackByArgs(index.Name.O)
}
if !index.Global && !newGlobal {
// still local index, no need to duplicate index.
continue
}
if tblInfo.Partition.DDLChangedIndex == nil {
tblInfo.Partition.DDLChangedIndex = make(map[int64]bool)
}
// Duplicate the unique indexes with new index ids.
// If previously was Global or will be Global:
// it must be recreated with new index ID
// TODO: Could we allow that session in StateWriteReorganization, when StateDeleteReorganization
// has started, may not find changes through the global index that sessions in StateDeleteReorganization made?
// If so, then we could avoid copying the full Global Index if it has not changed from LOCAL!
// It might be possible to use the new, not yet public partitions to access those rows?!
// Just that it would not work with explicit partition select SELECT FROM t PARTITION (p,...)
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
tblInfo.Partition.DDLChangedIndex[index.ID] = false
tblInfo.Partition.DDLChangedIndex[newIndex.ID] = true
newIndex.Global = newGlobal
tblInfo.Indices = append(tblInfo.Indices, newIndex)
}
failpoint.Inject("reorgPartCancel1", func(val failpoint.Value) {
if val.(bool) {
Expand Down Expand Up @@ -3487,26 +3479,18 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
if !index.Unique {
continue
}
switch index.State {
case model.StateWriteReorganization:
isNew, ok := tblInfo.Partition.DDLChangedIndex[index.ID]
if !ok {
continue
}
if isNew {
// Newly created index, replacing old unique/global index
index.State = model.StatePublic
case model.StatePublic:
if index.Global {
// Mark the old global index as non-readable, and to be dropped
index.State = model.StateDeleteReorganization
} else {
inAllPartitionColumns, err := checkPartitionKeysConstraint(partInfo, index.Columns, tblInfo)
if err != nil {
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}
if !inAllPartitionColumns {
// Mark the old unique index as non-readable, and to be dropped,
// since it is replaced by a global index
index.State = model.StateDeleteReorganization
}
}
continue
}
// Old index, should not be visible any longer,
// but needs to be kept up-to-date in case rollback happens.
index.State = model.StateWriteOnly
}
firstPartIdx, lastPartIdx, idMap, err2 := getReplacedPartitionIDs(partNames, tblInfo.Partition)
if err2 != nil {
Expand Down Expand Up @@ -3563,14 +3547,18 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique && indexInfo.State == model.StateDeleteReorganization {
if indexInfo.Unique && indexInfo.State == model.StateWriteOnly {
// Drop the old unique (possible global) index, see onDropIndex
indexInfo.State = model.StateNone
DropIndexColumnFlag(tblInfo, indexInfo)
RemoveDependentHiddenColumns(tblInfo, indexInfo)
dropIndices = append(dropIndices, indexInfo)
}
}
// TODO: verify that the indexes are dropped,
// and that StateDeleteOnly+StateDeleteReorganization is not needed.
// local indexes is not an issue, since they will be gone with the dropped
// partitions, but replaced global indexes should be checked!
for _, indexInfo := range dropIndices {
removeIndexInfo(tblInfo, indexInfo)
}
Expand Down Expand Up @@ -3632,6 +3620,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
failpoint.Return(ver, errors.New("Injected error by reorgPartFail5"))
}
})
failpoint.Inject("updateVersionAndTableInfoErrInStateDeleteReorganization", func() {
failpoint.Return(ver, errors.New("Injected error in StateDeleteReorganization"))
})
args.OldPhysicalTblIDs = physicalTableIDs
args.NewPartitionIDs = newIDs
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
Expand Down
62 changes: 20 additions & 42 deletions pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,6 @@ func convertAddTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job,
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
job.State = model.JobStateRollingback
return ver, errors.Trace(otherwiseErr)
}
Expand Down Expand Up @@ -371,51 +369,25 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
partNames = append(partNames, pd.Name.L)
}
var dropIndices []*model.IndexInfo
// When Global Index is duplicated to a non Global, we later need
// to know if if it was Global before (marked to be dropped) or not.
globalToUniqueDupMap := make(map[string]int64)
for _, indexInfo := range tblInfo.Indices {
if !indexInfo.Unique {
continue
}
switch indexInfo.State {
case model.StateWriteReorganization, model.StateDeleteOnly,
model.StateWriteOnly:
dropIndices = append(dropIndices, indexInfo)
case model.StateDeleteReorganization:
if pi.DDLState != model.StateDeleteReorganization {
continue
}
// Old index marked to be dropped, rollback by making it public again
indexInfo.State = model.StatePublic
if indexInfo.Global {
if id, ok := globalToUniqueDupMap[indexInfo.Name.L]; ok {
return ver, errors.NewNoStackErrorf("Duplicate global index names '%s', %d != %d", indexInfo.Name.O, indexInfo.ID, id)
}
globalToUniqueDupMap[indexInfo.Name.L] = indexInfo.ID
}
case model.StatePublic:
if pi.DDLState != model.StateDeleteReorganization {
continue
isNew, ok := pi.DDLChangedIndex[indexInfo.ID]
if !ok {
// non-changed index
continue
}
if !isNew {
if pi.DDLState == model.StateDeleteReorganization {
// Revert the non-public state
indexInfo.State = model.StatePublic
}
// We cannot drop the index here, we need to wait until
// the next schema version
// i.e. rollback in rollbackLikeDropPartition
// New index that became public in this state,
// mark it to be dropped in next schema version
if indexInfo.Global {
indexInfo.State = model.StateDeleteReorganization
} else {
if pi.DDLState == model.StateDeleteReorganization {
indexInfo.State = model.StateWriteOnly
} else {
// How to know if this index was created as a duplicate or not?
if id, ok := globalToUniqueDupMap[indexInfo.Name.L]; ok {
// The original index
if id >= indexInfo.ID {
return ver, errors.NewNoStackErrorf("Indexes in wrong order during rollback, '%s', %d >= %d", indexInfo.Name.O, id, indexInfo.ID)
}
indexInfo.State = model.StateDeleteReorganization
} else {
globalToUniqueDupMap[indexInfo.Name.L] = indexInfo.ID
}
dropIndices = append(dropIndices, indexInfo)
}
}
}
Expand Down Expand Up @@ -466,13 +438,19 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
return ver, errors.Trace(errors.New("Internal error, failed to find original partition definitions"))
}
pi.Definitions = newDefs
pi.Num = uint64(len(pi.Definitions))
} else {
// Move back to StateWriteReorganization, i.e. use the original table
// (non-partitioned or differently partitioned) as the main table to use.
// Otherwise, the Type does not match the expression.
pi.Type, pi.DDLType = pi.DDLType, pi.Type
pi.Expr, pi.DDLExpr = pi.DDLExpr, pi.Expr
pi.Columns, pi.DDLColumns = pi.DDLColumns, pi.Columns
pi.Definitions = pi.DroppingDefinitions
}
pi.Num = uint64(len(pi.Definitions))
// We should move back one state, since there might be other sessions seeing the new partitions.
job.SchemaState = model.StateWriteReorganization
pi.DDLState = job.SchemaState
}

args := jobCtx.jobArgs.(*model.TablePartitionArgs)
Expand Down
Loading

0 comments on commit 6bcd156

Please sign in to comment.