Skip to content

Commit

Permalink
cherry pick pingcap#1072 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
amyangfei authored and ti-srebot committed Nov 19, 2020
1 parent a5a6dec commit 0e61f8c
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 3 deletions.
46 changes: 43 additions & 3 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,12 +976,18 @@ func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

go func() {
if err := o.watchCampaignKey(ctx); err != nil {
cancel()
}
}()

if err := o.throne(ctx); err != nil {
return err
}

ctx1, cancel := context.WithCancel(ctx)
defer cancel()
ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
changedFeeds := o.watchFeedChange(ctx1)

ticker := time.NewTicker(tickTime)
Expand All @@ -995,7 +1001,10 @@ loop:
close(o.done)
break loop
case <-ctx.Done():
return ctx.Err()
// FIXME: cancel the context doesn't ensure all resources are destructed, is it reasonable?
// Anyway we just break loop here to ensure the following destruction.
err = ctx.Err()
break loop
case <-changedFeeds:
case <-ticker.C:
}
Expand All @@ -1020,6 +1029,37 @@ loop:
return err
}

// watchCampaignKey watches the aliveness of campaign owner key in etcd
func (o *Owner) watchCampaignKey(ctx context.Context) error {
key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, o.session.Lease())
restart:
resp, err := o.etcdClient.Client.Get(ctx, key)
if err != nil {
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
if resp.Count == 0 {
return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs()
}
// watch the key change from the next revision relatived to the current
wch := o.etcdClient.Client.Watch(ctx, key, clientv3.WithRev(resp.Header.Revision+1))
for resp := range wch {
err := resp.Err()
if err != nil {
if err != mvcc.ErrCompacted {
log.Error("watch owner campaign key failed, restart the watcher", zap.Error(err))
}
goto restart
}
for _, ev := range resp.Events {
if ev.Type == clientv3.EventTypeDelete {
log.Warn("owner campaign key deleted", zap.String("key", key))
return cerror.ErrOwnerCampaignKeyDeleted.GenWithStackByArgs()
}
}
}
return nil
}

func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} {
output := make(chan struct{}, 1)
go func() {
Expand Down
52 changes: 52 additions & 0 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package cdc
import (
"context"
"errors"
"fmt"
"net/url"
"sync"
"time"

"github.com/pingcap/check"
Expand Down Expand Up @@ -784,3 +786,53 @@ func (s *ownerSuite) TestChangefeedApplyDDLJob(c *check.C) {
c.Assert(cf.tables, check.DeepEquals, expectTables[i])
}
}

func (s *ownerSuite) TestWatchCampaignKey(c *check.C) {
ctx := context.Background()
capture, err := NewCapture(ctx, []string{s.clientURL.String()},
&security.Credential{}, "127.0.0.1:12034", &processorOpts{})
c.Assert(err, check.IsNil)
err = capture.Campaign(ctx)
c.Assert(err, check.IsNil)

cctx, cancel := context.WithCancel(ctx)
owner, err := NewOwner(cctx, nil, &security.Credential{}, capture.session,
DefaultCDCGCSafePointTTL, time.Millisecond*200)
c.Assert(err, check.IsNil)

// check campaign key deleted can be detected
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := owner.watchCampaignKey(cctx)
c.Assert(cerror.ErrOwnerCampaignKeyDeleted.Equal(err), check.IsTrue)
cancel()
}()
// ensure the watch loop has started
time.Sleep(time.Millisecond * 100)
etcdCli := owner.etcdClient.Client.Unwrap()
key := fmt.Sprintf("%s/%x", kv.CaptureOwnerKey, owner.session.Lease())
_, err = etcdCli.Delete(ctx, key)
c.Assert(err, check.IsNil)
wg.Wait()

// check key is deleted before watch loop starts
cctx, cancel = context.WithCancel(ctx)
err = owner.watchCampaignKey(cctx)
c.Assert(cerror.ErrOwnerCampaignKeyDeleted.Equal(err), check.IsTrue)

// check the watch routine can be canceled
err = capture.Campaign(ctx)
c.Assert(err, check.IsNil)
wg.Add(1)
go func() {
defer wg.Done()
err := owner.watchCampaignKey(cctx)
c.Assert(err, check.IsNil)
}()
// ensure the watch loop has started
time.Sleep(time.Millisecond * 100)
cancel()
wg.Wait()
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ error = '''
old value is not enabled
'''

["CDC:ErrOwnerCampaignKeyDeleted"]
error = '''
owner campaign key deleted
'''

["CDC:ErrOwnerChangefeedNotFound"]
error = '''
changefeed %s not found in owner cache
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,5 @@ var (
ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState"))
ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType"))
ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch"))
ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted"))
)

0 comments on commit 0e61f8c

Please sign in to comment.