Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8884
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
asddongmen authored and ti-chi-bot committed May 6, 2023
1 parent 95b5db3 commit de50756
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 4 deletions.
35 changes: 31 additions & 4 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -355,11 +356,13 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
}
// Campaign to be an owner, it blocks until it becomes the owner
if err := c.campaign(ctx); err != nil {
switch errors.Cause(err) {
case context.Canceled:

rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
case mvcc.ErrCompacted:
// the revision we requested is compacted, just retry
} else if rootErr == mvcc.ErrCompacted || isErrCompacted(rootErr) {
log.Warn("campaign owner failed due to etcd revision "+
"has been compacted, retry later", zap.Error(err))
continue
}
log.Warn("campaign owner failed", zap.Error(err))
Expand Down Expand Up @@ -467,11 +470,24 @@ func (c *Capture) GetOwner() (owner.Owner, error) {
}

// campaign to be an owner.
<<<<<<< HEAD
func (c *Capture) campaign(ctx cdcContext.Context) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return cerror.WrapError(cerror.ErrCaptureCampaignOwner, c.election.Campaign(ctx, c.info.ID))
=======
func (c *captureImpl) campaign(ctx context.Context) error {
// TODO: `Campaign` will get stuck when send SIGSTOP to pd leader.
// For `Campaign`, when send SIGSTOP to pd leader, cdc maybe call `cancel`
// (cause by `processor routine` exit). And inside `Campaign`, the routine
// return from `waitDeletes`(https://github.com/etcd-io/etcd/blob/main/client/v3/concurrency/election.go#L93),
// then call `Resign`(note: use `client.Ctx`) to etcd server. But the etcd server
// (the client connects to) has entered the STOP state, which means that
// the server cannot process the request, but will still maintain the GRPC
// connection. So `routine` will block 'Resign'.
return cerror.WrapError(cerror.ErrCaptureCampaignOwner, c.election.campaign(ctx, c.info.ID))
>>>>>>> 648fe8a9c6 (capture (ticdc): fix processor exit unexpectedly when some pd node fail (#8884))
}

// resign lets an owner start a new election.
Expand Down Expand Up @@ -608,3 +624,14 @@ func (c *Capture) StatusProvider() owner.StatusProvider {
}
return owner.NewStatusProvider(c.owner)
}
<<<<<<< HEAD
=======

func (c *captureImpl) IsReady() bool {
return c.migrator.IsMigrateDone()
}

func isErrCompacted(err error) bool {
return strings.Contains(err.Error(), "required revision has been compacted")
}
>>>>>>> 648fe8a9c6 (capture (ticdc): fix processor exit unexpectedly when some pd node fail (#8884))
51 changes: 51 additions & 0 deletions cdc/capture/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package capture

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
)

// election wraps the owner election methods.
// It is useful in testing.
type election interface {
campaign(ctx context.Context, key string) error
resign(ctx context.Context) error
}

type electionImpl struct {
election *concurrency.Election
}

func newElection(sess *concurrency.Session, key string) election {
return &electionImpl{
election: concurrency.NewElection(sess, key),
}
}

func (e *electionImpl) campaign(ctx context.Context, key string) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return e.election.Campaign(ctx, key)
}

func (e *electionImpl) resign(ctx context.Context) error {
return e.election.Resign(ctx)
}

0 comments on commit de50756

Please sign in to comment.