Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: fix multiple owners exist when owner campaign key is deleted #1072

Merged
merged 8 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,12 +975,18 @@ func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

go func() {
if err := o.watchCampaignKey(ctx); err != nil {
cancel()
}
}()

if err := o.throne(ctx); err != nil {
return err
}

ctx1, cancel := context.WithCancel(ctx)
defer cancel()
ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
changedFeeds := o.watchFeedChange(ctx1)

ticker := time.NewTicker(tickTime)
Expand All @@ -994,7 +1000,10 @@ loop:
close(o.done)
break loop
case <-ctx.Done():
return ctx.Err()
// FIXME: cancel the context doesn't ensure all resources are destructed, is it reasonable?
// Anyway we just break loop here to ensure the following destruction.
err = ctx.Err()
break loop
case <-changedFeeds:
case <-ticker.C:
}
Expand All @@ -1019,6 +1028,36 @@ loop:
return err
}

// watchCampaignKey watches the aliveness of campaign owner key in etcd
func (o *Owner) watchCampaignKey(ctx context.Context) error {
key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, o.session.Lease())
restart:
resp, err := o.etcdClient.Client.Get(ctx, key)
if err != nil {
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
if resp.Count == 0 {
return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs()
}
wch := o.etcdClient.Client.Watch(ctx, key, clientv3.WithRev(resp.Header.Revision+1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why +1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change with revision resp.Header.Revision is not needed, so +1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments?

amyangfei marked this conversation as resolved.
Show resolved Hide resolved
for resp := range wch {
err := resp.Err()
if err != nil {
if err != mvcc.ErrCompacted {
log.Error("watch owner campaign key failed, restart the watcher", zap.Error(err))
}
goto restart
zier-one marked this conversation as resolved.
Show resolved Hide resolved
}
for _, ev := range resp.Events {
if ev.Type == clientv3.EventTypeDelete {
log.Warn("owner campaign key deleted", zap.String("key", key))
return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs()
}
}
}
return nil
}

func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} {
output := make(chan struct{}, 1)
go func() {
Expand Down
52 changes: 52 additions & 0 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package cdc
import (
"context"
"errors"
"fmt"
"net/url"
"sync"
"time"

"github.com/pingcap/check"
Expand Down Expand Up @@ -768,3 +770,53 @@ func (s *ownerSuite) TestChangefeedApplyDDLJob(c *check.C) {
c.Assert(cf.tables, check.DeepEquals, expectTables[i])
}
}

func (s *ownerSuite) TestWatchCampaignKey(c *check.C) {
ctx := context.Background()
capture, err := NewCapture(ctx, []string{s.clientURL.String()},
&security.Credential{}, "127.0.0.1:12034", &processorOpts{})
c.Assert(err, check.IsNil)
err = capture.Campaign(ctx)
c.Assert(err, check.IsNil)

cctx, cancel := context.WithCancel(ctx)
owner, err := NewOwner(cctx, nil, &security.Credential{}, capture.session,
DefaultCDCGCSafePointTTL, time.Millisecond*200)
c.Assert(err, check.IsNil)

// check campaign key deleted can be detected
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := owner.watchCampaignKey(cctx)
c.Assert(cerror.ErrOwnerCampaignKeyDeleted.Equal(err), check.IsTrue)
cancel()
}()
// ensure the watch loop has started
time.Sleep(time.Millisecond * 100)
etcdCli := owner.etcdClient.Client.Unwrap()
key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, owner.session.Lease())
_, err = etcdCli.Delete(ctx, key)
c.Assert(err, check.IsNil)
wg.Wait()

// check key is deleted before watch loop starts
cctx, cancel = context.WithCancel(ctx)
err = owner.watchCampaignKey(cctx)
c.Assert(cerror.ErrOwnerCampaignKeyDeleted.Equal(err), check.IsTrue)

// check the watch routine can be canceled
err = capture.Campaign(ctx)
c.Assert(err, check.IsNil)
wg.Add(1)
go func() {
defer wg.Done()
err := owner.watchCampaignKey(cctx)
c.Assert(err, check.IsNil)
}()
// ensure the watch loop has started
time.Sleep(time.Millisecond * 100)
cancel()
wg.Wait()
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ error = '''
old value is not enabled
'''

["CDC:ErrOwnerCampaignKeyDeleted"]
error = '''
owner campaign key deleted
'''

["CDC:ErrOwnerChangefeedNotFound"]
error = '''
changefeed %s not found in owner cache
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,5 @@ var (
ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState"))
ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType"))
ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch"))
ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted"))
)