Skip to content

Commit

Permalink
ddl: Exchange part schema load fix (pingcap#46126)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss committed Aug 17, 2023
1 parent 070394e commit 8ad6dc7
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 22 deletions.
45 changes: 29 additions & 16 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,27 +1556,40 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
diff.OldSchemaID = oldSchemaIDs[0]
diff.AffectedOpts = affects
case model.ActionExchangeTablePartition:
// From start of function: diff.SchemaID = job.SchemaID
// Old is original non partitioned table
diff.OldTableID = job.TableID
diff.OldSchemaID = job.SchemaID
// Update the partitioned table (it is only done in the last state)
var (
ptSchemaID int64
ptTableID int64
ptDefID int64
partName string // Not used
withValidation bool // Not used
)
// See ddl.ExchangeTablePartition
err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
// This is needed for not crashing TiFlash!
// TODO: Update TiFlash, to handle StateWriteOnly
diff.AffectedOpts = []*model.AffectedOption{{
TableID: ptTableID,
}}
if job.SchemaState != model.StatePublic {
// No change, just to refresh the non-partitioned table
// with its new ExchangePartitionInfo.
diff.TableID = job.TableID
diff.SchemaID = job.SchemaID
// Keep this as Schema ID of non-partitioned table
// to avoid trigger early rename in TiFlash
diff.AffectedOpts[0].SchemaID = job.SchemaID
} else {
// Update the partitioned table (it is only done in the last state)
var (
ptSchemaID int64
ptTableID int64
ptDefID int64 // Not needed, will reload the whole table
partName string // Not used
withValidation bool // Not used
)
// See ddl.ExchangeTablePartition
err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
diff.SchemaID = ptSchemaID
diff.TableID = ptTableID
// Swap
diff.TableID = ptDefID
// Also add correct SchemaID in case different schemas
diff.AffectedOpts[0].SchemaID = ptSchemaID
}
case model.ActionTruncateTablePartition:
diff.TableID = job.TableID
Expand Down
23 changes: 23 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2073,6 +2073,16 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}

if defID != partDef.ID {
logutil.BgLogger().Info("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"),
zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID))
job.Args[0] = partDef.ID
defID = partDef.ID
err = updateDDLJob2Table(w.sess, job, true)
if err != nil {
return ver, errors.Trace(err)
}
}
nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
ExchangePartitionID: ptID,
ExchangePartitionDefID: defID,
Expand All @@ -2094,6 +2104,18 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
delayForAsyncCommit()
}

if defID != partDef.ID {
// Should never happen, should have been updated above, in previous state!
logutil.BgLogger().Error("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"),
zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID))
job.Args[0] = partDef.ID
defID = partDef.ID
err = updateDDLJob2Table(w.sess, job, true)
if err != nil {
return ver, errors.Trace(err)
}
}

if withValidation {
err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name)
if err != nil {
Expand Down Expand Up @@ -2201,6 +2223,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
ntr := rules[ntrID]
ptr := rules[ptrID]

// This must be a bug, nt cannot be partitioned!
partIDs := getPartitionIDs(nt)

var setRules []*label.Rule
Expand Down
28 changes: 22 additions & 6 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,31 +296,46 @@ func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDi
ntID := diff.OldTableID
ptSchemaID := diff.SchemaID
ptID := diff.TableID
partID := diff.TableID
if len(diff.AffectedOpts) > 0 {
// From old version
ptID = diff.AffectedOpts[0].TableID
ptSchemaID = diff.AffectedOpts[0].SchemaID
if diff.AffectedOpts[0].SchemaID != 0 {
ptSchemaID = diff.AffectedOpts[0].SchemaID
}
}
// The normal table needs to be updated first:
// Just update the tables separately
currDiff := &model.SchemaDiff{
// This is only for the case since https://github.com/pingcap/tidb/pull/45877
// Fixed now, by adding back the AffectedOpts
// to carry the partitioned Table ID.
Type: diff.Type,
Version: diff.Version,
TableID: ntID,
SchemaID: ntSchemaID,
}
if ptID != partID {
currDiff.TableID = partID
currDiff.OldTableID = ntID
currDiff.OldSchemaID = ntSchemaID
}
ntIDs, err := b.applyTableUpdate(m, currDiff)
if err != nil {
return nil, errors.Trace(err)
}
b.markPartitionBundleShouldUpdate(ntID)
// Then the partitioned table
// partID is the new id for the non-partitioned table!
b.markTableBundleShouldUpdate(partID)
// Then the partitioned table, will re-read the whole table, including all partitions!
currDiff.TableID = ptID
currDiff.SchemaID = ptSchemaID
currDiff.OldTableID = ptID
currDiff.OldSchemaID = ptSchemaID
ptIDs, err := b.applyTableUpdate(m, currDiff)
if err != nil {
return nil, errors.Trace(err)
}
b.markTableBundleShouldUpdate(ptID)
// ntID is the new id for the partition!
b.markPartitionBundleShouldUpdate(ntID)
err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -426,7 +441,8 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
newTableID = diff.TableID
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
oldTableID = diff.TableID
case model.ActionTruncateTable, model.ActionCreateView:
case model.ActionTruncateTable, model.ActionCreateView,
model.ActionExchangeTablePartition:
oldTableID = diff.OldTableID
newTableID = diff.TableID
default:
Expand Down

0 comments on commit 8ad6dc7

Please sign in to comment.