diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 6c1036f03f7..44bc2f8f81d 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -510,7 +510,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { }) g, ctx := errgroup.WithContext(ctx) - ctx, cancelOwner := context.WithCancel(ctx) ownerCtx := cdcContext.NewContext(ctx, newGlobalVars) g.Go(func() error { return c.runEtcdWorker(ownerCtx, owner, @@ -522,8 +521,8 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { globalState, // todo: do not use owner flush interval ownerFlushInterval, util.RoleController.String()) - // controller is exited, cancel owner to exit the loop. - cancelOwner() + // controller has exited, stop owner. + c.owner.AsyncStop() return er }) err = g.Wait()