diff --git a/cdc/owner.go b/cdc/owner.go index 23d5fae669d..d0b5a84cf47 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -976,12 +976,18 @@ func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error { ctx, cancel := context.WithCancel(ctx) defer cancel() + go func() { + if err := o.watchCampaignKey(ctx); err != nil { + cancel() + } + }() + if err := o.throne(ctx); err != nil { return err } - ctx1, cancel := context.WithCancel(ctx) - defer cancel() + ctx1, cancel1 := context.WithCancel(ctx) + defer cancel1() changedFeeds := o.watchFeedChange(ctx1) ticker := time.NewTicker(tickTime) @@ -995,7 +1001,10 @@ loop: close(o.done) break loop case <-ctx.Done(): - return ctx.Err() + // FIXME: cancel the context doesn't ensure all resources are destructed, is it reasonable? + // Anyway we just break loop here to ensure the following destruction. + err = ctx.Err() + break loop case <-changedFeeds: case <-ticker.C: } @@ -1020,6 +1029,37 @@ loop: return err } +// watchCampaignKey watches the aliveness of campaign owner key in etcd +func (o *Owner) watchCampaignKey(ctx context.Context) error { + key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, o.session.Lease()) +restart: + resp, err := o.etcdClient.Client.Get(ctx, key) + if err != nil { + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if resp.Count == 0 { + return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs() + } + // watch the key change from the next revision relatived to the current + wch := o.etcdClient.Client.Watch(ctx, key, clientv3.WithRev(resp.Header.Revision+1)) + for resp := range wch { + err := resp.Err() + if err != nil { + if err != mvcc.ErrCompacted { + log.Error("watch owner campaign key failed, restart the watcher", zap.Error(err)) + } + goto restart + } + for _, ev := range resp.Events { + if ev.Type == clientv3.EventTypeDelete { + log.Warn("owner campaign key deleted", zap.String("key", key)) + return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs() + } + } + } + return nil +} + func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} { output := make(chan struct{}, 1) go func() { diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 27a4720b002..4b59fd60ebe 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -16,7 +16,9 @@ package cdc import ( "context" "errors" + "fmt" "net/url" + "sync" "time" "github.com/pingcap/check" @@ -784,3 +786,53 @@ func (s *ownerSuite) TestChangefeedApplyDDLJob(c *check.C) { c.Assert(cf.tables, check.DeepEquals, expectTables[i]) } } + +func (s *ownerSuite) TestWatchCampaignKey(c *check.C) { + ctx := context.Background() + capture, err := NewCapture(ctx, []string{s.clientURL.String()}, + &security.Credential{}, "127.0.0.1:12034", &processorOpts{}) + c.Assert(err, check.IsNil) + err = capture.Campaign(ctx) + c.Assert(err, check.IsNil) + + cctx, cancel := context.WithCancel(ctx) + owner, err := NewOwner(cctx, nil, &security.Credential{}, capture.session, + DefaultCDCGCSafePointTTL, time.Millisecond*200) + c.Assert(err, check.IsNil) + + // check campaign key deleted can be detected + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := owner.watchCampaignKey(cctx) + c.Assert(cerror.ErrOwnerCampaignKeyDeleted.Equal(err), check.IsTrue) + cancel() + }() + // ensure the watch loop has started + time.Sleep(time.Millisecond * 100) + etcdCli := owner.etcdClient.Client.Unwrap() + key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, owner.session.Lease()) + _, err = etcdCli.Delete(ctx, key) + c.Assert(err, check.IsNil) + wg.Wait() + + // check key is deleted before watch loop starts + cctx, cancel = context.WithCancel(ctx) + err = owner.watchCampaignKey(cctx) + c.Assert(cerror.ErrOwnerCampaignKeyDeleted.Equal(err), check.IsTrue) + + // check the watch routine can be canceled + err = capture.Campaign(ctx) + c.Assert(err, check.IsNil) + wg.Add(1) + go func() { + defer wg.Done() + err := owner.watchCampaignKey(cctx) + c.Assert(err, check.IsNil) + }() + // ensure the watch loop has started + time.Sleep(time.Millisecond * 100) + cancel() + wg.Wait() +} diff --git a/errors.toml b/errors.toml index 93a95563f1d..7e4c9b56ab4 100755 --- a/errors.toml +++ b/errors.toml @@ -451,6 +451,11 @@ error = ''' old value is not enabled ''' +["CDC:ErrOwnerCampaignKeyDeleted"] +error = ''' +owner campaign key deleted +''' + ["CDC:ErrOwnerChangefeedNotFound"] error = ''' changefeed %s not found in owner cache diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5e193c413f2..ab5874fd753 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -173,4 +173,5 @@ var ( ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState")) ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType")) ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch")) + ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted")) )