From e7d886f0a2cddba30983c5de21fbe9e04be5b11c Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 12 Nov 2020 15:57:27 +0800 Subject: [PATCH 1/6] owner: fix multiple owners exist when owner campaign key is deleted --- cdc/owner.go | 34 +++++++++++++++++++++++++++++++--- cdc/owner_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ errors.toml | 5 +++++ pkg/errors/errors.go | 1 + 4 files changed, 81 insertions(+), 3 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index ad7cd91d92d..a269cfb05ab 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -975,12 +975,18 @@ func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error { ctx, cancel := context.WithCancel(ctx) defer cancel() + go func() { + if err := o.watchCampaignKey(ctx); err != nil { + cancel() + } + }() + if err := o.throne(ctx); err != nil { return err } - ctx1, cancel := context.WithCancel(ctx) - defer cancel() + ctx1, cancel1 := context.WithCancel(ctx) + defer cancel1() changedFeeds := o.watchFeedChange(ctx1) ticker := time.NewTicker(tickTime) @@ -994,7 +1000,8 @@ loop: close(o.done) break loop case <-ctx.Done(): - return ctx.Err() + err = ctx.Err() + break loop case <-changedFeeds: case <-ticker.C: } @@ -1019,6 +1026,27 @@ loop: return err } +// watchCampaignKey watches the aliveness of campaign owner key in etcd +func (o *Owner) watchCampaignKey(ctx context.Context) error { + key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, o.session.Lease()) +restart: + wch := o.etcdClient.Client.Watch(ctx, key) + for resp := range wch { + err := resp.Err() + if err != nil { + log.Error("watch owner campaign key failed, restart the watcher", zap.Error(err)) + goto restart + } + for _, ev := range resp.Events { + if ev.Type == clientv3.EventTypeDelete { + log.Warn("owner campaign key deleted", zap.String("key", key)) + return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs() + } + } + } + return nil +} + func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} { output := make(chan struct{}, 1) go func() { diff --git a/cdc/owner_test.go b/cdc/owner_test.go index bb3baa5b1bd..374395555e7 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -16,7 +16,9 @@ package cdc import ( "context" "errors" + "fmt" "net/url" + "sync" "time" "github.com/pingcap/check" @@ -768,3 +770,45 @@ func (s *ownerSuite) TestChangefeedApplyDDLJob(c *check.C) { c.Assert(cf.tables, check.DeepEquals, expectTables[i]) } } + +func (s *ownerSuite) TestWatchCampaignKey(c *check.C) { + ctx := context.Background() + capture, err := NewCapture(ctx, []string{s.clientURL.String()}, + &security.Credential{}, "127.0.0.1:12034", &processorOpts{}) + c.Assert(err, check.IsNil) + err = capture.Campaign(ctx) + c.Assert(err, check.IsNil) + + cctx, cancel := context.WithCancel(ctx) + owner, err := NewOwner(cctx, nil, &security.Credential{}, capture.session, + DefaultCDCGCSafePointTTL, time.Millisecond*200) + c.Assert(err, check.IsNil) + + // check campaign key deleted can be detected + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := owner.watchCampaignKey(cctx) + c.Assert(cerror.ErrOwnerCampaignKeyDeleted.Equal(err), check.IsTrue) + cancel() + }() + etcdCli := owner.etcdClient.Client.Unwrap() + key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, owner.session.Lease()) + _, err = etcdCli.Delete(ctx, key) + c.Assert(err, check.IsNil) + wg.Wait() + + // check the watch routine can be canceled + err = capture.Campaign(ctx) + c.Assert(err, check.IsNil) + cctx, cancel = context.WithCancel(ctx) + wg.Add(1) + go func() { + defer wg.Done() + err := owner.watchCampaignKey(cctx) + c.Assert(err, check.IsNil) + }() + cancel() + wg.Wait() +} diff --git a/errors.toml b/errors.toml index 93a95563f1d..7e4c9b56ab4 100755 --- a/errors.toml +++ b/errors.toml @@ -451,6 +451,11 @@ error = ''' old value is not enabled ''' +["CDC:ErrOwnerCampaignKeyDeleted"] +error = ''' +owner campaign key deleted +''' + ["CDC:ErrOwnerChangefeedNotFound"] error = ''' changefeed %s not found in owner cache diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5e193c413f2..ab5874fd753 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -173,4 +173,5 @@ var ( ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState")) ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType")) ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch")) + ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted")) ) From ecbcd7ac2e2d3841fbd589e8bb81fe398284fe87 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Fri, 13 Nov 2020 10:17:00 +0800 Subject: [PATCH 2/6] add comment --- cdc/owner.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cdc/owner.go b/cdc/owner.go index a269cfb05ab..2c58450e311 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -1000,6 +1000,8 @@ loop: close(o.done) break loop case <-ctx.Done(): + // FIXME: cancel the context doesn't ensure all resources are destructed, is it reasonable? + // Anyway we just break loop here to ensure the following destruction. err = ctx.Err() break loop case <-changedFeeds: From c6d1497559a34468eae8b5ca2df3ef4998e95483 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Fri, 13 Nov 2020 14:16:04 +0800 Subject: [PATCH 3/6] fix unit test --- cdc/owner.go | 13 +++++++++++-- cdc/owner_test.go | 10 +++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index 2c58450e311..7e23694ff65 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -1031,12 +1031,21 @@ loop: // watchCampaignKey watches the aliveness of campaign owner key in etcd func (o *Owner) watchCampaignKey(ctx context.Context) error { key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, o.session.Lease()) + resp, err := o.etcdClient.Client.Get(ctx, key) + if err != nil { + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if resp.Count == 0 { + return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs() + } restart: - wch := o.etcdClient.Client.Watch(ctx, key) + wch := o.etcdClient.Client.Watch(ctx, key, clientv3.WithRev(resp.Header.Revision+1)) for resp := range wch { err := resp.Err() if err != nil { - log.Error("watch owner campaign key failed, restart the watcher", zap.Error(err)) + if err != mvcc.ErrCompacted { + log.Error("watch owner campaign key failed, restart the watcher", zap.Error(err)) + } goto restart } for _, ev := range resp.Events { diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 374395555e7..4b47d5f9de4 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -793,22 +793,30 @@ func (s *ownerSuite) TestWatchCampaignKey(c *check.C) { c.Assert(cerror.ErrOwnerCampaignKeyDeleted.Equal(err), check.IsTrue) cancel() }() + // ensure the watch loop has started + time.Sleep(time.Millisecond * 100) etcdCli := owner.etcdClient.Client.Unwrap() key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, owner.session.Lease()) _, err = etcdCli.Delete(ctx, key) c.Assert(err, check.IsNil) wg.Wait() + // check key is deleted before watch loop starts + cctx, cancel = context.WithCancel(ctx) + err = owner.watchCampaignKey(cctx) + c.Assert(cerror.ErrOwnerCampaignKeyDeleted.Equal(err), check.IsTrue) + // check the watch routine can be canceled err = capture.Campaign(ctx) c.Assert(err, check.IsNil) - cctx, cancel = context.WithCancel(ctx) wg.Add(1) go func() { defer wg.Done() err := owner.watchCampaignKey(cctx) c.Assert(err, check.IsNil) }() + // ensure the watch loop has started + time.Sleep(time.Millisecond * 100) cancel() wg.Wait() } From 273e050cdeeb69cff9bdd11140fbb323af41e651 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Fri, 13 Nov 2020 16:37:28 +0800 Subject: [PATCH 4/6] fix compaction restart revision --- cdc/owner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/owner.go b/cdc/owner.go index 7e23694ff65..97c5fb88d09 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -1031,6 +1031,7 @@ loop: // watchCampaignKey watches the aliveness of campaign owner key in etcd func (o *Owner) watchCampaignKey(ctx context.Context) error { key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, o.session.Lease()) +restart: resp, err := o.etcdClient.Client.Get(ctx, key) if err != nil { return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) @@ -1038,7 +1039,6 @@ func (o *Owner) watchCampaignKey(ctx context.Context) error { if resp.Count == 0 { return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs() } -restart: wch := o.etcdClient.Client.Watch(ctx, key, clientv3.WithRev(resp.Header.Revision+1)) for resp := range wch { err := resp.Err() From 9045f492270cfdda6e7959e2d739dd6fb8fec554 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 19 Nov 2020 14:13:19 +0800 Subject: [PATCH 5/6] Update cdc/owner.go --- cdc/owner.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/owner.go b/cdc/owner.go index 5233ea67986..cb903043b17 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -1040,6 +1040,7 @@ restart: if resp.Count == 0 { return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs() } + // watch the key change from the next revision relatived to the current wch := o.etcdClient.Client.Watch(ctx, key, clientv3.WithRev(resp.Header.Revision+1)) for resp := range wch { err := resp.Err() From 115671aee35a563f03be97b598a5cec4e1a1c082 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 19 Nov 2020 14:17:59 +0800 Subject: [PATCH 6/6] fix make fmt --- cdc/owner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/owner.go b/cdc/owner.go index cb903043b17..d0b5a84cf47 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -1040,7 +1040,7 @@ restart: if resp.Count == 0 { return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs() } - // watch the key change from the next revision relatived to the current + // watch the key change from the next revision relatived to the current wch := o.etcdClient.Client.Watch(ctx, key, clientv3.WithRev(resp.Header.Revision+1)) for resp := range wch { err := resp.Err()