Skip to content

Commit

Permalink
mounter(ticdc): fix truncate table partition cause mounter failed iss…
Browse files Browse the repository at this point in the history
…ue (#10528)

close #10522
  • Loading branch information
sdojjy authored Jan 23, 2024
1 parent 7a9af98 commit ea976f4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
9 changes: 9 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package entry
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math"
Expand Down Expand Up @@ -170,10 +171,18 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
}
tableInfo, exist := snap.PhysicalTableByID(physicalTableID)
if !exist {
// for truncate table and truncate table partition DDL, the table ID is changed, but DML can be inserted to TiKV with old table ID.
// normally, cdc will close the old table pipeline and create a new one, and these invalid DMLs keys will not be pulled by CDC,
// but if redo is enabled or push based table pipeline is enabled, puller and mounter are not blocked by barrier ts.
// So some invalid DML keys will be decoded before processor removing the table pipeline
if snap.IsTruncateTableID(physicalTableID) {
log.Debug("skip the DML of truncated table", zap.Uint64("ts", raw.CRTs), zap.Int64("tableID", physicalTableID))
return nil, nil
}
log.Error("can not found table schema",
zap.Uint64("ts", raw.CRTs),
zap.String("key", hex.EncodeToString(raw.Key)),
zap.Int64("tableID", physicalTableID))
return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID)
}
if bytes.HasPrefix(key, recordPrefix) {
Expand Down
22 changes: 18 additions & 4 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,11 +468,16 @@ func (s *Snapshot) DoHandleDDL(job *timodel.Job) error {
if err != nil {
return errors.Trace(err)
}
case timodel.ActionTruncateTablePartition,
case timodel.ActionTruncateTablePartition:
err := s.inner.updatePartition(getWrapTableInfo(job), true, job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}
case
timodel.ActionAddTablePartition,
timodel.ActionDropTablePartition,
timodel.ActionReorganizePartition:
err := s.inner.updatePartition(getWrapTableInfo(job), job.BinlogInfo.FinishedTS)
err := s.inner.updatePartition(getWrapTableInfo(job), false, job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -863,7 +868,7 @@ func (s *snapshot) doCreateTable(tbInfo *model.TableInfo, currentTs uint64) {
}

// updatePartition updates partition info for `tbInfo`.
func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) error {
func (s *snapshot) updatePartition(tbInfo *model.TableInfo, isTruncate bool, currentTs uint64) error {
oldTbInfo, ok := s.physicalTableByID(tbInfo.ID)
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(tbInfo.ID)
Expand All @@ -888,13 +893,22 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) er
for _, partition := range oldPi.Definitions {
s.partitions.ReplaceOrInsert(newVersionedID(partition.ID, tag))
}
newPartitionIDMap := make(map[int64]struct{}, len(newPi.NewPartitionIDs))
for _, partition := range newPi.Definitions {
vid := newVersionedID(partition.ID, tag)
vid.target = tbInfo
s.partitions.ReplaceOrInsert(vid)
if ineligible {
s.ineligibleTables.ReplaceOrInsert(newVersionedID(partition.ID, tag))
}
newPartitionIDMap[partition.ID] = struct{}{}
}
if isTruncate {
for _, partition := range oldPi.Definitions {
if _, ok := newPartitionIDMap[partition.ID]; !ok {
s.truncatedTables.ReplaceOrInsert(newVersionedID(partition.ID, tag))
}
}
}
s.currentTs = currentTs

Expand Down Expand Up @@ -984,7 +998,7 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin
// ref: https://github.com/pingcap/tidb/issues/43819
targetTable.SchemaID = oldTable.SchemaID
targetTable.TableName = oldTable.TableName
err = s.updatePartition(targetTable, currentTS)
err = s.updatePartition(targetTable, false, currentTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
31 changes: 28 additions & 3 deletions cdc/entry/schema/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,19 @@ func TestUpdatePartition(t *testing.T) {
oldTb = newTbInfo(1, "DB_1", 11)
oldTb.Partition = nil
require.Nil(t, snap.inner.createTable(oldTb, 110))
require.Error(t, snap.inner.updatePartition(newTbInfo(1, "DB_1", 11), 120))
require.Error(t, snap.inner.updatePartition(newTbInfo(1, "DB_1", 11), false, 120))

// updatePartition fails if the new table is not partitioned.
require.Nil(t, snap.inner.dropTable(11, 130))
require.Nil(t, snap.inner.createTable(newTbInfo(1, "DB_1", 11), 140))
newTb = newTbInfo(1, "DB_1", 11)
newTb.Partition = nil
require.Error(t, snap.inner.updatePartition(newTb, 150))
require.Error(t, snap.inner.updatePartition(newTb, false, 150))
snap1 = snap.Copy()

newTb = newTbInfo(1, "DB_1", 11)
newTb.Partition.Definitions[0] = timodel.PartitionDefinition{ID: 11 + 65536*2}
require.Nil(t, snap.inner.updatePartition(newTb, 160))
require.Nil(t, snap.inner.updatePartition(newTb, false, 160))
snap2 = snap.Copy()

info, _ = snap1.PhysicalTableByID(11)
Expand All @@ -253,6 +253,31 @@ func TestUpdatePartition(t *testing.T) {
require.True(t, snap2.IsIneligibleTableID(11+65536*2))
}

func TestTruncateTablePartition(t *testing.T) {
var oldTb, newTb *model.TableInfo

snap := NewEmptySnapshot(false)
require.Nil(t, snap.inner.createSchema(newDBInfo(1), 100))

// updatePartition fails if the old table is not partitioned.
oldTb = newTbInfo(1, "DB_1", 11)
oldTb.Partition = nil
require.Nil(t, snap.inner.createTable(oldTb, 110))
require.Error(t, snap.inner.updatePartition(newTbInfo(1, "DB_1", 11), false, 120))

// updatePartition fails if the new table is not partitioned.
require.Nil(t, snap.inner.dropTable(11, 130))
require.Nil(t, snap.inner.createTable(newTbInfo(1, "DB_1", 11), 140))
newTb = newTbInfo(1, "DB_1", 11)
newTb.Partition = nil
require.Error(t, snap.inner.updatePartition(newTb, false, 150))

newTb = newTbInfo(1, "DB_1", 11)
newTb.Partition.Definitions[0] = timodel.PartitionDefinition{ID: 11 + 65536*2}
require.Nil(t, snap.inner.updatePartition(newTb, true, 160))
require.True(t, snap.IsTruncateTableID(11+65536))
}

func TestExchangePartition(t *testing.T) {
var targetTb, sourceTb *model.TableInfo

Expand Down

0 comments on commit ea976f4

Please sign in to comment.