diff --git a/dm/pkg/etcdutil/etcdutil.go b/dm/pkg/etcdutil/etcdutil.go new file mode 100644 index 00000000000..f90b329e391 --- /dev/null +++ b/dm/pkg/etcdutil/etcdutil.go @@ -0,0 +1,150 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// learn from https://github.com/pingcap/pd/blob/v3.0.5/pkg/etcdutil/etcdutil.go. + +package etcdutil + +import ( + "context" + "crypto/tls" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "go.etcd.io/etcd/clientv3" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.uber.org/zap" + + tcontext "github.com/pingcap/tiflow/dm/pkg/context" + "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/retry" + "github.com/pingcap/tiflow/pkg/errorutil" +) + +const ( + // DefaultDialTimeout is the maximum amount of time a dial will wait for a + // connection to setup. 30s is long enough for most of the network conditions. + DefaultDialTimeout = 30 * time.Second + + // DefaultRequestTimeout 10s is long enough for most of etcd clusters. + DefaultRequestTimeout = 10 * time.Second + + // DefaultRevokeLeaseTimeout is the maximum amount of time waiting for revoke etcd lease. + DefaultRevokeLeaseTimeout = 3 * time.Second +) + +var etcdDefaultTxnRetryParam = retry.Params{ + RetryCount: 5, + FirstRetryDuration: time.Second, + BackoffStrategy: retry.Stable, + IsRetryableFn: func(retryTime int, err error) bool { + return errorutil.IsRetryableEtcdError(err) + }, +} + +var etcdDefaultTxnStrategy = retry.FiniteRetryStrategy{} + +// CreateClient creates an etcd client with some default config items. +func CreateClient(endpoints []string, tlsCfg *tls.Config) (*clientv3.Client, error) { + return clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: DefaultDialTimeout, + TLS: tlsCfg, + }) +} + +// ListMembers returns a list of internal etcd members. +func ListMembers(client *clientv3.Client) (*clientv3.MemberListResponse, error) { + ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout) + defer cancel() + return client.MemberList(ctx) +} + +// AddMember adds an etcd member. +func AddMember(client *clientv3.Client, peerAddrs []string) (*clientv3.MemberAddResponse, error) { + ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout) + defer cancel() + return client.MemberAdd(ctx, peerAddrs) +} + +// RemoveMember removes an etcd member by the given id. +func RemoveMember(client *clientv3.Client, id uint64) (*clientv3.MemberRemoveResponse, error) { + ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout) + defer cancel() + return client.MemberRemove(ctx, id) +} + +// DoOpsInOneTxnWithRetry do multiple etcd operations in one txn. +// TODO: add unit test to test encountered an retryable error first but then recovered. +func DoOpsInOneTxnWithRetry(cli *clientv3.Client, ops ...clientv3.Op) (*clientv3.TxnResponse, int64, error) { + ctx, cancel := context.WithTimeout(cli.Ctx(), DefaultRequestTimeout) + defer cancel() + tctx := tcontext.NewContext(ctx, log.L()) + ret, _, err := etcdDefaultTxnStrategy.Apply(tctx, etcdDefaultTxnRetryParam, func(t *tcontext.Context) (ret interface{}, err error) { + resp, err := cli.Txn(ctx).Then(ops...).Commit() + if err != nil { + return nil, errors.Trace(err) + } + return resp, nil + }) + if err != nil { + return nil, 0, err + } + resp := ret.(*clientv3.TxnResponse) + return resp, resp.Header.Revision, nil +} + +// DoOpsInOneCmpsTxnWithRetry do multiple etcd operations in one txn and with comparisons. +func DoOpsInOneCmpsTxnWithRetry(cli *clientv3.Client, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (*clientv3.TxnResponse, int64, error) { + ctx, cancel := context.WithTimeout(cli.Ctx(), DefaultRequestTimeout) + defer cancel() + tctx := tcontext.NewContext(ctx, log.L()) + + ret, _, err := etcdDefaultTxnStrategy.Apply(tctx, etcdDefaultTxnRetryParam, func(t *tcontext.Context) (ret interface{}, err error) { + failpoint.Inject("ErrNoSpace", func() { + tctx.L().Info("fail to do ops in etcd", zap.String("failpoint", "ErrNoSpace")) + failpoint.Return(nil, v3rpc.ErrNoSpace) + }) + resp, err := cli.Txn(ctx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit() + if err != nil { + return nil, err + } + return resp, nil + }) + if err != nil { + return nil, 0, err + } + resp := ret.(*clientv3.TxnResponse) + return resp, resp.Header.Revision, nil +} + +// IsRetryableError check whether error is retryable error for etcd to build again. +func IsRetryableError(err error) bool { + switch errors.Cause(err) { + case v3rpc.ErrCompacted, v3rpc.ErrNoLeader, v3rpc.ErrNoSpace, context.DeadlineExceeded: + return true + default: + return false + } +} + +// IsLimitedRetryableError check whether error is retryable error for etcd to build again in a limited number of times. +func IsLimitedRetryableError(err error) bool { + switch errors.Cause(err) { + case v3rpc.ErrNoSpace, context.DeadlineExceeded: + return true + default: + return false + } +} diff --git a/pkg/errorutil/ignore.go b/pkg/errorutil/ignore.go new file mode 100644 index 00000000000..05ecf2ad36d --- /dev/null +++ b/pkg/errorutil/ignore.go @@ -0,0 +1,70 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package errorutil + +import ( + dmysql "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + tddl "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/mysql" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" +) + +// IsIgnorableMySQLDDLError is used to check what error can be ignored +// we can get error code from: +// infoschema's error definition: https://github.com/pingcap/tidb/blob/master/infoschema/infoschema.go +// DDL's error definition: https://github.com/pingcap/tidb/blob/master/ddl/ddl.go +// tidb/mysql error code definition: https://github.com/pingcap/tidb/blob/master/mysql/errcode.go +func IsIgnorableMySQLDDLError(err error) bool { + err = errors.Cause(err) + mysqlErr, ok := err.(*dmysql.MySQLError) + if !ok { + return false + } + + errCode := errors.ErrCode(mysqlErr.Number) + switch errCode { + case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseDropExists.Code(), + infoschema.ErrTableExists.Code(), infoschema.ErrTableDropExists.Code(), + infoschema.ErrColumnExists.Code(), infoschema.ErrIndexExists.Code(), + infoschema.ErrKeyNotExists.Code(), tddl.ErrCantDropFieldOrKey.Code(), + mysql.ErrDupKeyName, mysql.ErrSameNamePartition, + mysql.ErrDropPartitionNonExistent, mysql.ErrMultiplePriKey: + return true + default: + return false + } +} + +func IsRetryableEtcdError(err error) bool { + etcdErr := errors.Cause(err) + + switch etcdErr { + // Etcd ResourceExhausted errors, may recover after some time + case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests: + return true + // Etcd Unavailable errors, may be available after some time + // https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167 + // ErrStopped: + // one of the etcd nodes stopped from failure injection + // ErrNotCapable: + // capability check has not been done (in the beginning) + case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout, + v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy: + return true + default: + return false + } +} diff --git a/pkg/errorutil/ignore_test.go b/pkg/errorutil/ignore_test.go new file mode 100644 index 00000000000..825bf7d91b6 --- /dev/null +++ b/pkg/errorutil/ignore_test.go @@ -0,0 +1,66 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package errorutil + +import ( + "errors" + "testing" + + "github.com/go-sql-driver/mysql" + "github.com/pingcap/tidb/infoschema" + tmysql "github.com/pingcap/tidb/parser/mysql" + "github.com/stretchr/testify/require" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" +) + +func newMysqlErr(number uint16, message string) *mysql.MySQLError { + return &mysql.MySQLError{ + Number: number, + Message: message, + } +} + +func TestIgnoreMysqlDDLError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {errors.New("raw error"), false}, + {newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), true}, + {newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database"), true}, + {newMysqlErr(uint16(infoschema.ErrAccessDenied.Code()), "Access denied for user"), false}, + } + + for _, item := range cases { + require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err)) + } +} + +func TestIsRetryableEtcdError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {nil, false}, + {v3rpc.ErrCorrupt, false}, + + {v3rpc.ErrGRPCTimeoutDueToConnectionLost, true}, + {v3rpc.ErrTimeoutDueToLeaderFail, true}, + {v3rpc.ErrNoSpace, true}, + } + + for _, item := range cases { + require.Equal(t, item.ret, IsRetryableEtcdError(item.err)) + } +} diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index e1e286decb5..382ad8241c7 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -21,10 +21,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" "google.golang.org/grpc/codes" ) @@ -51,6 +52,15 @@ const ( etcdRequestProgressDuration = 1 * time.Second // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future etcdWatchChBufferSize = 16 + // etcdTxnTimeoutDuration represents the timeout duration for committing a + // transaction to Etcd + etcdTxnTimeoutDuration = 30 * time.Second +) + +var ( + TxnEmptyCmps = []clientv3.Cmp{} + TxnEmptyOpsThen = []clientv3.Op{} + TxnEmptyOpsElse = []clientv3.Op{} ) // set to var instead of const for mocking the value to speedup test @@ -121,12 +131,17 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti return c.cli.Delete(ctx, key, opts...) } -// Txn delegates request to clientv3.KV.Txn -func (c *Client) Txn(ctx context.Context) clientv3.Txn { - if metric, ok := c.metrics[EtcdTxn]; ok { - metric.Inc() - } - return c.cli.Txn(ctx) +// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error, +// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry. +func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) { + txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) + defer cancel() + err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error { + var inErr error + resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit() + return inErr + }) + return } // Grant delegates request to clientv3.Lease.Grant @@ -144,11 +159,17 @@ func isRetryableError(rpcName string) retry.IsRetryable { if !cerrors.IsRetryableError(err) { return false } - if rpcName == EtcdRevoke { - if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound { - // it means the etcd lease is already expired or revoked + + switch rpcName { + case EtcdRevoke: + if etcdErr, ok := err.(v3rpc.EtcdError); ok && etcdErr.Code() == codes.NotFound { + // It means the etcd lease is already expired or revoked return false } + case EtcdTxn: + return errorutil.IsRetryableEtcdError(err) + default: + // For other types of operation, we retry directly without handling errors } return true @@ -190,7 +211,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR }() var lastRevision int64 watchCtx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + // Using closures to handle changes to the cancel function + cancel() + }() watchCh := c.cli.Watch(watchCtx, key, opts...) ticker := c.clock.Ticker(etcdRequestProgressDuration) @@ -200,7 +224,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR for { select { case <-ctx.Done(): - cancel() return case response := <-watchCh: lastReceivedResponseTime = c.clock.Now() @@ -214,7 +237,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR for { select { case <-ctx.Done(): - cancel() return case outCh <- response: // it may block here break Loop @@ -233,7 +255,13 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack")) cancel() watchCtx, cancel = context.WithCancel(ctx) +<<<<<<< HEAD watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) +======= + // to avoid possible context leak warning from govet + _ = cancel + watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) +>>>>>>> 8dce39fdf (etcd/client(ticdc): add retry operation for etcd transaction api (#4248) (#4474)) // we need to reset lastReceivedResponseTime after reset Watch lastReceivedResponseTime = c.clock.Now() } diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index ba14f9e5ebe..822fb2f5c54 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/pkg/util/testleak" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) type clientSuite struct{} @@ -45,6 +46,10 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3. return nil, errors.New("mock error") } +func (m *mockClient) Txn(ctx context.Context) clientv3.Txn { + return &mockTxn{ctx: ctx} +} + type mockWatcher struct { clientv3.Watcher watchCh chan clientv3.WatchResponse @@ -76,8 +81,39 @@ func (s *clientSuite) TestRetry(c *check.C) { c.Assert(get, check.NotNil) _, err = retrycli.Put(context.TODO(), "", "") +<<<<<<< HEAD c.Assert(err, check.NotNil) c.Assert(errors.Cause(err), check.ErrorMatches, "mock error", check.Commentf("err:%v", err.Error())) +======= + require.NotNil(t, err) + require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Test Txn case + // case 0: normal + rsp, err := retrycli.Txn(ctx, nil, nil, nil) + require.Nil(t, err) + require.False(t, rsp.Succeeded) + + // case 1: errors.ErrReachMaxTry + _, err = retrycli.Txn(ctx, TxnEmptyCmps, nil, nil) + require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) + + // case 2: errors.ErrReachMaxTry + _, err = retrycli.Txn(ctx, nil, TxnEmptyOpsThen, nil) + require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) + + // case 3: context.DeadlineExceeded + _, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, nil) + require.Equal(t, context.DeadlineExceeded, err) + + // other case: mock error + _, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, TxnEmptyOpsElse) + require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) + +>>>>>>> 8dce39fdf (etcd/client(ticdc): add retry operation for etcd transaction api (#4248) (#4474)) maxTries = originValue } @@ -218,3 +254,44 @@ func (s *etcdSuite) TestOutChBlocked(c *check.C) { c.Check(sentRes, check.DeepEquals, receivedRes) } + +type mockTxn struct { + ctx context.Context + mode int +} + +func (txn *mockTxn) If(cs ...clientv3.Cmp) clientv3.Txn { + if cs != nil { + txn.mode += 1 + } + return txn +} + +func (txn *mockTxn) Then(ops ...clientv3.Op) clientv3.Txn { + if ops != nil { + txn.mode += 1 << 1 + } + return txn +} + +func (txn *mockTxn) Else(ops ...clientv3.Op) clientv3.Txn { + if ops != nil { + txn.mode += 1 << 2 + } + return txn +} + +func (txn *mockTxn) Commit() (*clientv3.TxnResponse, error) { + switch txn.mode { + case 0: + return &clientv3.TxnResponse{}, nil + case 1: + return nil, rpctypes.ErrNoSpace + case 2: + return nil, rpctypes.ErrTimeoutDueToLeaderFail + case 3: + return nil, context.DeadlineExceeded + default: + return nil, errors.New("mock error") + } +} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index de783833df4..1076e69ba8e 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -22,6 +22,556 @@ import ( "go.etcd.io/etcd/embed" ) +<<<<<<< HEAD +======= +const ( + // CaptureOwnerKey is the capture owner path that is saved to etcd + CaptureOwnerKey = EtcdKeyBase + "/owner" + // CaptureInfoKeyPrefix is the capture info path that is saved to etcd + CaptureInfoKeyPrefix = EtcdKeyBase + "/capture" + // TaskKeyPrefix is the prefix of task keys + TaskKeyPrefix = EtcdKeyBase + "/task" + // TaskStatusKeyPrefix is the prefix of task status keys + TaskStatusKeyPrefix = TaskKeyPrefix + "/status" + // TaskPositionKeyPrefix is the prefix of task position keys + TaskPositionKeyPrefix = TaskKeyPrefix + "/position" + // JobKeyPrefix is the prefix of job keys + JobKeyPrefix = EtcdKeyBase + "/job" +) + +// GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config +func GetEtcdKeyChangeFeedList() string { + return fmt.Sprintf("%s/changefeed/info", EtcdKeyBase) +} + +// GetEtcdKeyChangeFeedInfo returns the key of a changefeed config +func GetEtcdKeyChangeFeedInfo(changefeedID string) string { + return fmt.Sprintf("%s/%s", GetEtcdKeyChangeFeedList(), changefeedID) +} + +// GetEtcdKeyTaskPosition returns the key of a task position +func GetEtcdKeyTaskPosition(changefeedID, captureID string) string { + return TaskPositionKeyPrefix + "/" + captureID + "/" + changefeedID +} + +// GetEtcdKeyCaptureInfo returns the key of a capture info +func GetEtcdKeyCaptureInfo(id string) string { + return CaptureInfoKeyPrefix + "/" + id +} + +// GetEtcdKeyTaskStatus returns the key for the task status +func GetEtcdKeyTaskStatus(changeFeedID, captureID string) string { + return TaskStatusKeyPrefix + "/" + captureID + "/" + changeFeedID +} + +// GetEtcdKeyJob returns the key for a job status +func GetEtcdKeyJob(changeFeedID string) string { + return JobKeyPrefix + "/" + changeFeedID +} + +// CDCEtcdClient is a wrap of etcd client +type CDCEtcdClient struct { + Client *Client +} + +// NewCDCEtcdClient returns a new CDCEtcdClient +func NewCDCEtcdClient(ctx context.Context, cli *clientv3.Client) CDCEtcdClient { + captureAddr := util.CaptureAddrFromCtx(ctx) + metrics := map[string]prometheus.Counter{ + EtcdPut: etcdRequestCounter.WithLabelValues(EtcdPut, captureAddr), + EtcdGet: etcdRequestCounter.WithLabelValues(EtcdGet, captureAddr), + EtcdDel: etcdRequestCounter.WithLabelValues(EtcdDel, captureAddr), + EtcdTxn: etcdRequestCounter.WithLabelValues(EtcdTxn, captureAddr), + EtcdGrant: etcdRequestCounter.WithLabelValues(EtcdGrant, captureAddr), + EtcdRevoke: etcdRequestCounter.WithLabelValues(EtcdRevoke, captureAddr), + } + return CDCEtcdClient{Client: Wrap(cli, metrics)} +} + +// Close releases resources in CDCEtcdClient +func (c CDCEtcdClient) Close() error { + return c.Client.Unwrap().Close() +} + +// ClearAllCDCInfo delete all keys created by CDC +func (c CDCEtcdClient) ClearAllCDCInfo(ctx context.Context) error { + _, err := c.Client.Delete(ctx, EtcdKeyBase, clientv3.WithPrefix()) + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) +} + +// GetAllCDCInfo get all keys created by CDC +func (c CDCEtcdClient) GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error) { + resp, err := c.Client.Get(ctx, EtcdKeyBase, clientv3.WithPrefix()) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + return resp.Kvs, nil +} + +// GetChangeFeeds returns kv revision and a map mapping from changefeedID to changefeed detail mvccpb.KeyValue +func (c CDCEtcdClient) GetChangeFeeds(ctx context.Context) (int64, map[string]*mvccpb.KeyValue, error) { + key := GetEtcdKeyChangeFeedList() + + resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) + if err != nil { + return 0, nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + revision := resp.Header.Revision + details := make(map[string]*mvccpb.KeyValue, resp.Count) + for _, kv := range resp.Kvs { + id, err := model.ExtractKeySuffix(string(kv.Key)) + if err != nil { + return 0, nil, err + } + details[id] = kv + } + return revision, details, nil +} + +// GetAllChangeFeedInfo queries all changefeed information +func (c CDCEtcdClient) GetAllChangeFeedInfo(ctx context.Context) (map[string]*model.ChangeFeedInfo, error) { + _, details, err := c.GetChangeFeeds(ctx) + if err != nil { + return nil, errors.Trace(err) + } + allFeedInfo := make(map[string]*model.ChangeFeedInfo, len(details)) + for id, rawDetail := range details { + info := &model.ChangeFeedInfo{} + if err := info.Unmarshal(rawDetail.Value); err != nil { + return nil, errors.Trace(err) + } + allFeedInfo[id] = info + } + + return allFeedInfo, nil +} + +// GetChangeFeedInfo queries the config of a given changefeed +func (c CDCEtcdClient) GetChangeFeedInfo(ctx context.Context, id string) (*model.ChangeFeedInfo, error) { + key := GetEtcdKeyChangeFeedInfo(id) + resp, err := c.Client.Get(ctx, key) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if resp.Count == 0 { + return nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(key) + } + detail := &model.ChangeFeedInfo{} + err = detail.Unmarshal(resp.Kvs[0].Value) + return detail, errors.Trace(err) +} + +// DeleteChangeFeedInfo deletes a changefeed config from etcd +func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, id string) error { + key := GetEtcdKeyChangeFeedInfo(id) + _, err := c.Client.Delete(ctx, key) + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) +} + +// GetAllChangeFeedStatus queries all changefeed job status +func (c CDCEtcdClient) GetAllChangeFeedStatus(ctx context.Context) (map[string]*model.ChangeFeedStatus, error) { + key := JobKeyPrefix + resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + statuses := make(map[string]*model.ChangeFeedStatus, resp.Count) + for _, rawKv := range resp.Kvs { + changefeedID, err := model.ExtractKeySuffix(string(rawKv.Key)) + if err != nil { + return nil, err + } + status := &model.ChangeFeedStatus{} + err = status.Unmarshal(rawKv.Value) + if err != nil { + return nil, errors.Trace(err) + } + statuses[changefeedID] = status + } + return statuses, nil +} + +// GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed +func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error) { + key := GetEtcdKeyJob(id) + resp, err := c.Client.Get(ctx, key) + if err != nil { + return nil, 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if resp.Count == 0 { + return nil, 0, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(key) + } + info := &model.ChangeFeedStatus{} + err = info.Unmarshal(resp.Kvs[0].Value) + return info, resp.Kvs[0].ModRevision, errors.Trace(err) +} + +// GetCaptures returns kv revision and CaptureInfo list +func (c CDCEtcdClient) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error) { + key := CaptureInfoKeyPrefix + + resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) + if err != nil { + return 0, nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + revision := resp.Header.Revision + infos := make([]*model.CaptureInfo, 0, resp.Count) + for _, kv := range resp.Kvs { + info := &model.CaptureInfo{} + err := info.Unmarshal(kv.Value) + if err != nil { + return 0, nil, errors.Trace(err) + } + infos = append(infos, info) + } + return revision, infos, nil +} + +// GetCaptureInfo get capture info from etcd. +// return errCaptureNotExist if the capture not exists. +func (c CDCEtcdClient) GetCaptureInfo(ctx context.Context, id string) (info *model.CaptureInfo, err error) { + key := GetEtcdKeyCaptureInfo(id) + + resp, err := c.Client.Get(ctx, key) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + + if len(resp.Kvs) == 0 { + return nil, cerror.ErrCaptureNotExist.GenWithStackByArgs(key) + } + + info = new(model.CaptureInfo) + err = info.Unmarshal(resp.Kvs[0].Value) + if err != nil { + return nil, errors.Trace(err) + } + + return +} + +// GetCaptureLeases returns a map mapping from capture ID to its lease +func (c CDCEtcdClient) GetCaptureLeases(ctx context.Context) (map[string]int64, error) { + key := CaptureInfoKeyPrefix + + resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + leases := make(map[string]int64, resp.Count) + for _, kv := range resp.Kvs { + captureID, err := model.ExtractKeySuffix(string(kv.Key)) + if err != nil { + return nil, err + } + leases[captureID] = kv.Lease + } + return leases, nil +} + +// RevokeAllLeases revokes all leases passed from parameter +func (c CDCEtcdClient) RevokeAllLeases(ctx context.Context, leases map[string]int64) error { + for _, lease := range leases { + _, err := c.Client.Revoke(ctx, clientv3.LeaseID(lease)) + if err == nil { + continue + } else if etcdErr := err.(rpctypes.EtcdError); etcdErr.Code() == codes.NotFound { + // it means the etcd lease is already expired or revoked + continue + } + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + return nil +} + +// CreateChangefeedInfo creates a change feed info into etcd and fails if it is already exists. +func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error { + infoKey := GetEtcdKeyChangeFeedInfo(changeFeedID) + jobKey := GetEtcdKeyJob(changeFeedID) + value, err := info.Marshal() + if err != nil { + return errors.Trace(err) + } + + cmps := []clientv3.Cmp{ + clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0), + clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0), + } + opsThen := []clientv3.Op{ + clientv3.OpPut(infoKey, value), + } + resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse) + if err != nil { + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if !resp.Succeeded { + log.Warn("changefeed already exists, ignore create changefeed", + zap.String("changefeed", changeFeedID)) + return cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(changeFeedID) + } + return errors.Trace(err) +} + +// SaveChangeFeedInfo stores change feed info into etcd +// TODO: this should be called from outer system, such as from a TiDB client +func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error { + key := GetEtcdKeyChangeFeedInfo(changeFeedID) + value, err := info.Marshal() + if err != nil { + return errors.Trace(err) + } + _, err = c.Client.Put(ctx, key, value) + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) +} + +// GetProcessors queries all processors of the cdc cluster, +// and returns a slice of ProcInfoSnap(without table info) +func (c CDCEtcdClient) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) { + resp, err := c.Client.Get(ctx, TaskStatusKeyPrefix, clientv3.WithPrefix()) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + infos := make([]*model.ProcInfoSnap, 0, resp.Count) + for _, rawKv := range resp.Kvs { + changefeedID, err := model.ExtractKeySuffix(string(rawKv.Key)) + if err != nil { + return nil, err + } + endIndex := len(rawKv.Key) - len(changefeedID) - 1 + captureID, err := model.ExtractKeySuffix(string(rawKv.Key[0:endIndex])) + if err != nil { + return nil, err + } + info := &model.ProcInfoSnap{ + CfID: changefeedID, + CaptureID: captureID, + } + infos = append(infos, info) + } + return infos, nil +} + +// GetAllTaskStatus queries all task status of a changefeed, and returns a map +// mapping from captureID to TaskStatus +func (c CDCEtcdClient) GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error) { + resp, err := c.Client.Get(ctx, TaskStatusKeyPrefix, clientv3.WithPrefix()) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + pinfo := make(map[string]*model.TaskStatus, resp.Count) + for _, rawKv := range resp.Kvs { + changeFeed, err := model.ExtractKeySuffix(string(rawKv.Key)) + if err != nil { + return nil, err + } + endIndex := len(rawKv.Key) - len(changeFeed) - 1 + captureID, err := model.ExtractKeySuffix(string(rawKv.Key[0:endIndex])) + if err != nil { + return nil, err + } + if changeFeed != changefeedID { + continue + } + info := &model.TaskStatus{} + err = info.Unmarshal(rawKv.Value) + if err != nil { + return nil, cerror.ErrDecodeFailed.GenWithStackByArgs("failed to unmarshal task status: %s", err) + } + info.ModRevision = rawKv.ModRevision + pinfo[captureID] = info + } + return pinfo, nil +} + +// GetTaskStatus queries task status from etcd, returns +// - ModRevision of the given key +// - *model.TaskStatus unmarshalled from the value +// - error if error happens +func (c CDCEtcdClient) GetTaskStatus( + ctx context.Context, + changefeedID string, + captureID string, +) (int64, *model.TaskStatus, error) { + key := GetEtcdKeyTaskStatus(changefeedID, captureID) + resp, err := c.Client.Get(ctx, key) + if err != nil { + return 0, nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if resp.Count == 0 { + return 0, nil, cerror.ErrTaskStatusNotExists.GenWithStackByArgs(key) + } + info := &model.TaskStatus{} + err = info.Unmarshal(resp.Kvs[0].Value) + return resp.Kvs[0].ModRevision, info, errors.Trace(err) +} + +// PutTaskStatus puts task status into etcd. +func (c CDCEtcdClient) PutTaskStatus( + ctx context.Context, + changefeedID string, + captureID string, + info *model.TaskStatus, +) error { + data, err := info.Marshal() + if err != nil { + return errors.Trace(err) + } + + key := GetEtcdKeyTaskStatus(changefeedID, captureID) + + _, err = c.Client.Put(ctx, key, data) + if err != nil { + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + + return nil +} + +// GetAllTaskPositions queries all task positions of a changefeed, and returns a map +// mapping from captureID to TaskPositions +func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error) { + resp, err := c.Client.Get(ctx, TaskPositionKeyPrefix, clientv3.WithPrefix()) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + positions := make(map[string]*model.TaskPosition, resp.Count) + for _, rawKv := range resp.Kvs { + changeFeed, err := model.ExtractKeySuffix(string(rawKv.Key)) + if err != nil { + return nil, err + } + endIndex := len(rawKv.Key) - len(changeFeed) - 1 + captureID, err := model.ExtractKeySuffix(string(rawKv.Key[0:endIndex])) + if err != nil { + return nil, err + } + if changeFeed != changefeedID { + continue + } + info := &model.TaskPosition{} + err = info.Unmarshal(rawKv.Value) + if err != nil { + return nil, cerror.ErrDecodeFailed.GenWithStackByArgs("failed to unmarshal task position: %s", err) + } + positions[captureID] = info + } + return positions, nil +} + +// GetTaskPosition queries task process from etcd, returns +// - ModRevision of the given key +// - *model.TaskPosition unmarshaled from the value +// - error if error happens +func (c CDCEtcdClient) GetTaskPosition( + ctx context.Context, + changefeedID string, + captureID string, +) (int64, *model.TaskPosition, error) { + key := GetEtcdKeyTaskPosition(changefeedID, captureID) + resp, err := c.Client.Get(ctx, key) + if err != nil { + return 0, nil, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if resp.Count == 0 { + return 0, nil, cerror.ErrTaskPositionNotExists.GenWithStackByArgs(key) + } + info := &model.TaskPosition{} + err = info.Unmarshal(resp.Kvs[0].Value) + return resp.Kvs[0].ModRevision, info, errors.Trace(err) +} + +// PutTaskPositionOnChange puts task position information into etcd if the +// task position value changes or the presvious value does not exist in etcd. +// returns true if task position is written to etcd. +func (c CDCEtcdClient) PutTaskPositionOnChange( + ctx context.Context, + changefeedID string, + captureID string, + info *model.TaskPosition, +) (bool, error) { + data, err := info.Marshal() + if err != nil { + return false, errors.Trace(err) + } + + key := GetEtcdKeyTaskPosition(changefeedID, captureID) + cmps := []clientv3.Cmp{ + clientv3.Compare(clientv3.ModRevision(key), ">", 0), + clientv3.Compare(clientv3.Value(key), "=", data), + } + opsElse := []clientv3.Op{ + clientv3.OpPut(key, data), + } + resp, err := c.Client.Txn(ctx, cmps, TxnEmptyOpsThen, opsElse) + if err != nil { + return false, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + return !resp.Succeeded, nil +} + +// PutChangeFeedStatus puts changefeed synchronization status into etcd +func (c CDCEtcdClient) PutChangeFeedStatus( + ctx context.Context, + changefeedID string, + status *model.ChangeFeedStatus, +) error { + key := GetEtcdKeyJob(changefeedID) + value, err := status.Marshal() + if err != nil { + return errors.Trace(err) + } + _, err = c.Client.Put(ctx, key, value) + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) +} + +// PutCaptureInfo put capture info into etcd. +func (c CDCEtcdClient) PutCaptureInfo(ctx context.Context, info *model.CaptureInfo, leaseID clientv3.LeaseID) error { + data, err := info.Marshal() + if err != nil { + return errors.Trace(err) + } + + key := GetEtcdKeyCaptureInfo(info.ID) + _, err = c.Client.Put(ctx, key, string(data), clientv3.WithLease(leaseID)) + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) +} + +// DeleteCaptureInfo delete capture info from etcd. +func (c CDCEtcdClient) DeleteCaptureInfo(ctx context.Context, id string) error { + key := GetEtcdKeyCaptureInfo(id) + _, err := c.Client.Delete(ctx, key) + return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) +} + +// GetOwnerID returns the owner id by querying etcd +func (c CDCEtcdClient) GetOwnerID(ctx context.Context, key string) (string, error) { + resp, err := c.Client.Get(ctx, key, clientv3.WithFirstCreate()...) + if err != nil { + return "", cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if len(resp.Kvs) == 0 { + return "", concurrency.ErrElectionNoLeader + } + return string(resp.Kvs[0].Value), nil +} + +// GetOwnerRevision gets the Etcd revision for the elected owner. +func (c CDCEtcdClient) GetOwnerRevision(ctx context.Context, captureID string) (rev int64, err error) { + resp, err := c.Client.Get(ctx, CaptureOwnerKey, clientv3.WithFirstCreate()...) + if err != nil { + return 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if len(resp.Kvs) == 0 { + return 0, cerror.ErrOwnerNotFound.GenWithStackByArgs() + } + // Checks that the given capture is indeed the owner. + if string(resp.Kvs[0].Value) != captureID { + return 0, cerror.ErrNotOwner.GenWithStackByArgs() + } + return resp.Kvs[0].ModRevision, nil +} + +>>>>>>> 8dce39fdf (etcd/client(ticdc): add retry operation for etcd transaction api (#4248) (#4474)) // getFreeListenURLs get free ports and localhost as url. func getFreeListenURLs(n int) (urls []*url.URL, retErr error) { for i := 0; i < n; i++ { diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index cb402edac0c..a5aca86dc3b 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -36,11 +37,16 @@ import ( ) const ( +<<<<<<< HEAD // etcdTxnTimeoutDuration represents the timeout duration for committing a // transaction to Etcd etcdTxnTimeoutDuration = 30 * time.Second // etcdWorkerLogsWarnDuration when EtcdWorker commits a txn to etcd or ticks // it reactor takes more than etcdWorkerLogsWarnDuration, it will print a log +======= + // When EtcdWorker commits a txn to etcd or ticks its reactor + // takes more than etcdWorkerLogsWarnDuration, it will print a log +>>>>>>> 8dce39fdf (etcd/client(ticdc): add retry operation for etcd transaction api (#4248) (#4474)) etcdWorkerLogsWarnDuration = 1 * time.Second deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" ) @@ -205,10 +211,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if len(pendingPatches) > 0 { // Here we have some patches yet to be uploaded to Etcd. pendingPatches, err = worker.applyPatchGroups(ctx, pendingPatches) + if isRetryableError(err) { + continue + } if err != nil { - if cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) { - continue - } return errors.Trace(err) } } else { @@ -255,6 +261,18 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, } } +func isRetryableError(err error) bool { + err = errors.Cause(err) + if cerrors.ErrEtcdTryAgain.Equal(err) || + context.DeadlineExceeded == err { + return true + } + // When encountering an abnormal connection with etcd, the worker will keep retrying + // until the session is done. + _, ok := err.(rpctypes.EtcdError) + return ok +} + func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) { if worker.isDeleteCounterKey(event.Kv.Key) { switch event.Type { @@ -349,7 +367,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m } cmps := make([]clientv3.Cmp, 0, len(changedState)) - ops := make([]clientv3.Op, 0, len(changedState)) + opsThen := make([]clientv3.Op, 0, len(changedState)) hasDelete := false for key, value := range changedState { @@ -371,11 +389,11 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m op = clientv3.OpDelete(key.String()) hasDelete = true } - ops = append(ops, op) + opsThen = append(opsThen, op) } if hasDelete { - ops = append(ops, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1))) + opsThen = append(opsThen, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1))) } if worker.deleteCounter > 0 { cmps = append(cmps, clientv3.Compare(clientv3.Value(worker.prefix.String()+deletionCounterKey), "=", fmt.Sprint(worker.deleteCounter))) @@ -387,10 +405,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m worker.metrics.metricEtcdTxnSize.Observe(float64(size)) startTime := time.Now() - - txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) - resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit() - cancel() + resp, err := worker.client.Txn(ctx, cmps, opsThen, etcd.TxnEmptyOpsElse) // For testing the situation where we have a progress notification that // has the same revision as the committed Etcd transaction. @@ -409,7 +424,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m return errors.Trace(err) } - logEtcdOps(ops, resp.Succeeded) + logEtcdOps(opsThen, resp.Succeeded) if resp.Succeeded { worker.barrierRev = resp.Header.GetRevision() return nil diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index d7715eb8d0b..dc1d0ba0efa 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -762,3 +763,10 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { _ = cli1.Unwrap().Close() _ = cli2.Unwrap().Close() } + +func TestRetryableError(t *testing.T) { + require.True(t, isRetryableError(cerrors.ErrEtcdTryAgain)) + require.True(t, isRetryableError(cerrors.ErrReachMaxTry.Wrap(rpctypes.ErrTimeoutDueToLeaderFail))) + require.True(t, isRetryableError(errors.Trace(context.DeadlineExceeded))) + require.False(t, isRetryableError(context.Canceled)) +}