Skip to content

Commit

Permalink
fix syncpoint and finish
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed May 18, 2023
1 parent 4667624 commit 8e11bbd
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,25 +900,19 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
// 1. All data before the barrierTs was sent to downstream.
// 2. No more data after barrierTs was sent to downstream.
checkpointReachBarrier := barrierTs == c.state.Status.CheckpointTs

// TODO: To check if we can remove the `barrierTs == c.state.Status.ResolvedTs` condition.
fullyBlocked := checkpointReachBarrier && barrierTs == c.state.Status.ResolvedTs
if !checkpointReachBarrier {
return barrierTs, nil
}

switch barrierTp {
case syncPointBarrier:
if !fullyBlocked {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.Config.SyncPointInterval))
if err := c.ddlSink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)
case finishBarrier:
if fullyBlocked {
c.feedStateManager.MarkFinished()
}
return barrierTs, nil
c.feedStateManager.MarkFinished()
default:
log.Panic("Unknown barrier type", zap.Int("barrierType", int(barrierTp)))
}
Expand Down

0 comments on commit 8e11bbd

Please sign in to comment.