From 48e22971729d39242ccf519ccdfb2efc09a9a205 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 17 Aug 2023 13:23:09 +0200 Subject: [PATCH] ddl: Exchange part schema load fix (#46126) close pingcap/tidb#45791, ref pingcap/tidb#46125 --- ddl/ddl_worker.go | 45 ++++++++++++++++++++++++++++--------------- ddl/partition.go | 23 ++++++++++++++++++++++ infoschema/builder.go | 28 +++++++++++++++++++++------ 3 files changed, 74 insertions(+), 22 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 2f8cd248bc48a..f055aaaeff34a 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1373,27 +1373,40 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... diff.OldSchemaID = oldSchemaIDs[0] diff.AffectedOpts = affects case model.ActionExchangeTablePartition: + // From start of function: diff.SchemaID = job.SchemaID + // Old is original non partitioned table diff.OldTableID = job.TableID diff.OldSchemaID = job.SchemaID + // Update the partitioned table (it is only done in the last state) + var ( + ptSchemaID int64 + ptTableID int64 + ptDefID int64 + partName string // Not used + withValidation bool // Not used + ) + // See ddl.ExchangeTablePartition + err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) + if err != nil { + return 0, errors.Trace(err) + } + // This is needed for not crashing TiFlash! + // TODO: Update TiFlash, to handle StateWriteOnly + diff.AffectedOpts = []*model.AffectedOption{{ + TableID: ptTableID, + }} if job.SchemaState != model.StatePublic { + // No change, just to refresh the non-partitioned table + // with its new ExchangePartitionInfo. diff.TableID = job.TableID - diff.SchemaID = job.SchemaID + // Keep this as Schema ID of non-partitioned table + // to avoid trigger early rename in TiFlash + diff.AffectedOpts[0].SchemaID = job.SchemaID } else { - // Update the partitioned table (it is only done in the last state) - var ( - ptSchemaID int64 - ptTableID int64 - ptDefID int64 // Not needed, will reload the whole table - partName string // Not used - withValidation bool // Not used - ) - // See ddl.ExchangeTablePartition - err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) - if err != nil { - return 0, errors.Trace(err) - } - diff.SchemaID = ptSchemaID - diff.TableID = ptTableID + // Swap + diff.TableID = ptDefID + // Also add correct SchemaID in case different schemas + diff.AffectedOpts[0].SchemaID = ptSchemaID } case model.ActionTruncateTablePartition: diff.TableID = job.TableID diff --git a/ddl/partition.go b/ddl/partition.go index 43210a2e7a50f..96f01e02828e7 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2419,6 +2419,16 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } + if defID != partDef.ID { + logutil.BgLogger().Info("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"), + zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID)) + job.Args[0] = partDef.ID + defID = partDef.ID + err = updateDDLJob2Table(w.sess, job, true) + if err != nil { + return ver, errors.Trace(err) + } + } nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ ExchangePartitionID: ptID, ExchangePartitionDefID: defID, @@ -2440,6 +2450,18 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo delayForAsyncCommit() } + if defID != partDef.ID { + // Should never happen, should have been updated above, in previous state! + logutil.BgLogger().Error("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"), + zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID)) + job.Args[0] = partDef.ID + defID = partDef.ID + err = updateDDLJob2Table(w.sess, job, true) + if err != nil { + return ver, errors.Trace(err) + } + } + if withValidation { err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) if err != nil { @@ -2547,6 +2569,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo ntr := rules[ntrID] ptr := rules[ptrID] + // This must be a bug, nt cannot be partitioned! partIDs := getPartitionIDs(nt) var setRules []*label.Rule diff --git a/infoschema/builder.go b/infoschema/builder.go index 90e91587d0de8..4b6c6627d100b 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -321,31 +321,46 @@ func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDi ntID := diff.OldTableID ptSchemaID := diff.SchemaID ptID := diff.TableID + partID := diff.TableID if len(diff.AffectedOpts) > 0 { - // From old version ptID = diff.AffectedOpts[0].TableID - ptSchemaID = diff.AffectedOpts[0].SchemaID + if diff.AffectedOpts[0].SchemaID != 0 { + ptSchemaID = diff.AffectedOpts[0].SchemaID + } } // The normal table needs to be updated first: // Just update the tables separately currDiff := &model.SchemaDiff{ + // This is only for the case since https://github.com/pingcap/tidb/pull/45877 + // Fixed now, by adding back the AffectedOpts + // to carry the partitioned Table ID. + Type: diff.Type, Version: diff.Version, TableID: ntID, SchemaID: ntSchemaID, } + if ptID != partID { + currDiff.TableID = partID + currDiff.OldTableID = ntID + currDiff.OldSchemaID = ntSchemaID + } ntIDs, err := b.applyTableUpdate(m, currDiff) if err != nil { return nil, errors.Trace(err) } - b.markPartitionBundleShouldUpdate(ntID) - // Then the partitioned table + // partID is the new id for the non-partitioned table! + b.markTableBundleShouldUpdate(partID) + // Then the partitioned table, will re-read the whole table, including all partitions! currDiff.TableID = ptID currDiff.SchemaID = ptSchemaID + currDiff.OldTableID = ptID + currDiff.OldSchemaID = ptSchemaID ptIDs, err := b.applyTableUpdate(m, currDiff) if err != nil { return nil, errors.Trace(err) } - b.markTableBundleShouldUpdate(ptID) + // ntID is the new id for the partition! + b.markPartitionBundleShouldUpdate(ntID) err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID) if err != nil { return nil, errors.Trace(err) @@ -451,7 +466,8 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 newTableID = diff.TableID case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: oldTableID = diff.TableID - case model.ActionTruncateTable, model.ActionCreateView: + case model.ActionTruncateTable, model.ActionCreateView, + model.ActionExchangeTablePartition: oldTableID = diff.OldTableID newTableID = diff.TableID default: