From a534d31faafbe5da6281bdb058dcd34773be8383 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sun, 18 Jul 2021 20:11:34 +0800 Subject: [PATCH] owner: fix new owner updating checkpoint too early with pending DDL (#2252) (#2291) --- cdc/owner/changefeed.go | 8 +++++++- cdc/owner/schema.go | 5 ++++- tests/kill_owner_with_ddl/run.sh | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index ca983210aa4..537db51db03 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -212,7 +212,10 @@ LOOP: if err != nil { return errors.Trace(err) } - c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs) + // Since we wait for checkpoint == ddlJob.FinishTs before executing the DDL, + // when there is a recovery, there is no guarantee that the DDL at the checkpoint + // has been executed. So we need to start the DDL puller from (checkpoint-1). + c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1) if err != nil { return errors.Trace(err) } @@ -317,6 +320,9 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) { case ddlJobBarrier: ddlResolvedTs, ddlJob := c.ddlPuller.FrontDDL() if ddlJob == nil || ddlResolvedTs != barrierTs { + if ddlResolvedTs < barrierTs { + return barrierTs, nil + } c.barriers.Update(ddlJobBarrier, ddlResolvedTs) return barrierTs, nil } diff --git a/cdc/owner/schema.go b/cdc/owner/schema.go index feb70898920..0cb2327edab 100644 --- a/cdc/owner/schema.go +++ b/cdc/owner/schema.go @@ -46,6 +46,9 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con return nil, errors.Trace(err) } } + // We do a snapshot read of the metadata from TiKV at (startTs-1) instead of startTs, + // because the DDL puller might send a DDL at startTs, which would cause schema conflicts if + // the DDL's result is already contained in the snapshot. schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs-1, config.ForceReplicate) if err != nil { return nil, errors.Trace(err) @@ -58,7 +61,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con schemaSnapshot: schemaSnap, filter: f, config: config, - ddlHandledTs: startTs - 1, + ddlHandledTs: startTs, }, nil } diff --git a/tests/kill_owner_with_ddl/run.sh b/tests/kill_owner_with_ddl/run.sh index 0913f5e555b..d157e0a171f 100755 --- a/tests/kill_owner_with_ddl/run.sh +++ b/tests/kill_owner_with_ddl/run.sh @@ -69,7 +69,7 @@ function run() { for i in $(seq 1 3); do kill_cdc_and_restart $pd_addr $WORK_DIR $CDC_BINARY - sleep 2 + sleep 8 done export GO_FAILPOINTS=''