diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 5584ffe8a47..fcaabb38188 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -17,7 +17,6 @@ import ( "encoding/json" "math" "regexp" - "sort" "time" "github.com/pingcap/errors" @@ -91,19 +90,6 @@ func (s FeedState) IsNeeded(need string) bool { return need == string(s) } -const ( - // errorHistoryGCInterval represents how long we keep error record in changefeed info - errorHistoryGCInterval = time.Minute * 10 - - // errorHistoryCheckInterval represents time window for failure check - errorHistoryCheckInterval = time.Minute * 2 - - // ErrorHistoryThreshold represents failure upper limit in time window. - // Before a changefeed is initialized, check the the failure count of this - // changefeed, if it is less than ErrorHistoryThreshold, then initialize it. - ErrorHistoryThreshold = 3 -) - // ChangeFeedInfo describes the detail of a ChangeFeed type ChangeFeedInfo struct { SinkURI string `json:"sink-uri"` @@ -121,10 +107,9 @@ type ChangeFeedInfo struct { // but can be fetched for backward compatibility SortDir string `json:"sort-dir"` - Config *config.ReplicaConfig `json:"config"` - State FeedState `json:"state"` - ErrorHis []int64 `json:"history"` - Error *RunningError `json:"error"` + Config *config.ReplicaConfig `json:"config"` + State FeedState `json:"state"` + Error *RunningError `json:"error"` SyncPointEnabled bool `json:"sync-point-enabled"` SyncPointInterval time.Duration `json:"sync-point-interval"` @@ -253,6 +238,7 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error { if info.Config.Consistent == nil { info.Config.Consistent = defaultConfig.Consistent } + return nil } @@ -304,28 +290,6 @@ func (info *ChangeFeedInfo) fixState() { } } -// CheckErrorHistory checks error history of a changefeed -// if having error record older than GC interval, set needSave to true. -// if error counts reach threshold, set canInit to false. -func (info *ChangeFeedInfo) CheckErrorHistory() (needSave bool, canInit bool) { - i := sort.Search(len(info.ErrorHis), func(i int) bool { - ts := info.ErrorHis[i] - return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryGCInterval - }) - info.ErrorHis = info.ErrorHis[i:] - - if i > 0 { - needSave = true - } - - i = sort.Search(len(info.ErrorHis), func(i int) bool { - ts := info.ErrorHis[i] - return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval - }) - canInit = len(info.ErrorHis)-i < ErrorHistoryThreshold - return -} - // HasFastFailError returns true if the error in changefeed is fast-fail func (info *ChangeFeedInfo) HasFastFailError() bool { if info.Error == nil { @@ -333,27 +297,3 @@ func (info *ChangeFeedInfo) HasFastFailError() bool { } return cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) } - -// findActiveErrors finds all errors occurring within errorHistoryCheckInterval -func (info *ChangeFeedInfo) findActiveErrors() []int64 { - i := sort.Search(len(info.ErrorHis), func(i int) bool { - ts := info.ErrorHis[i] - // ts is a errors occurrence time, here to find all errors occurring within errorHistoryCheckInterval - return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval - }) - return info.ErrorHis[i:] -} - -// ErrorsReachedThreshold checks error history of a changefeed -// returns true if error counts reach threshold -func (info *ChangeFeedInfo) ErrorsReachedThreshold() bool { - return len(info.findActiveErrors()) >= ErrorHistoryThreshold -} - -// CleanUpOutdatedErrorHistory cleans up the outdated error history -// return true if the ErrorHis changed -func (info *ChangeFeedInfo) CleanUpOutdatedErrorHistory() bool { - lastLenOfErrorHis := len(info.ErrorHis) - info.ErrorHis = info.findActiveErrors() - return lastLenOfErrorHis != len(info.ErrorHis) -} diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index ee1f03618a8..38c0555a992 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -358,34 +358,6 @@ func TestChangeFeedInfoClone(t *testing.T) { require.True(t, info.Config.EnableOldValue) } -func TestCheckErrorHistory(t *testing.T) { - t.Parallel() - - now := time.Now() - info := &ChangeFeedInfo{ - ErrorHis: []int64{}, - } - for i := 0; i < 5; i++ { - tm := now.Add(-errorHistoryGCInterval) - info.ErrorHis = append(info.ErrorHis, tm.UnixNano()/1e6) - time.Sleep(time.Millisecond) - } - for i := 0; i < ErrorHistoryThreshold-1; i++ { - info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6) - time.Sleep(time.Millisecond) - } - time.Sleep(time.Millisecond) - needSave, canInit := info.CheckErrorHistory() - require.True(t, needSave) - require.True(t, canInit) - require.Equal(t, ErrorHistoryThreshold-1, len(info.ErrorHis)) - - info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6) - needSave, canInit = info.CheckErrorHistory() - require.False(t, needSave) - require.False(t, canInit) -} - func TestChangefeedInfoStringer(t *testing.T) { t.Parallel() diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 2f99abf3a51..902066684a9 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -81,7 +81,7 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed { id: id, scheduler: newScheduler(), barriers: newBarriers(), - feedStateManager: new(feedStateManager), + feedStateManager: newFeedStateManager(), gcManager: gcManager, errCh: make(chan error, defaultErrChSize), diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 256ad0e07eb..5c0467c722b 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -16,6 +16,7 @@ package owner import ( "time" + "github.com/cenkalti/backoff/v4" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -24,6 +25,24 @@ import ( "go.uber.org/zap" ) +const ( + // When errors occurred and we need to do backoff, we start an exponential backoff + // with an interval from 10s to 30min (10s, 20s, 40s, 80s, 160s, 320s, 640s, 1280s, 1800s). + // And the backoff will be stopped after 72 min (about 9 tries) because if we do another 30min backoff, + // the total duration (72+30=102min) will exceeds the MaxElapsedTime (90min). + // To avoid thunderherd, a random factor is also added. + defaultBackoffInitInterval = 10 * time.Second + defaultBackoffMaxInterval = 30 * time.Minute + defaultBackoffMaxElapsedTime = 90 * time.Minute + defaultBackoffRandomizationFactor = 0.1 + defaultBackoffMultiplier = 2.0 + + // If all states recorded in window are 'normal', it can be assumed that the changfeed + // is running steady. And then if we enter a state other than normal at next tick, + // the backoff must be reset. + defaultStateWindowSize = 512 +) + // feedStateManager manages the ReactorState of a changefeed // when an error or an admin job occurs, the feedStateManager is responsible for controlling the ReactorState type feedStateManager struct { @@ -34,7 +53,71 @@ type feedStateManager struct { // shouldBeRemoved = false means the changefeed is paused shouldBeRemoved bool - adminJobQueue []*model.AdminJob + adminJobQueue []*model.AdminJob + stateHistory [defaultStateWindowSize]model.FeedState + lastErrorTime time.Time // time of last error for a changefeed + backoffInterval time.Duration // the interval for restarting a changefeed in 'error' state + errBackoff *backoff.ExponentialBackOff // an exponential backoff for restarting a changefeed +} + +// newFeedStateManager creates feedStateManager and initialize the exponential backoff +func newFeedStateManager() *feedStateManager { + f := new(feedStateManager) + + f.errBackoff = backoff.NewExponentialBackOff() + f.errBackoff.InitialInterval = defaultBackoffInitInterval + f.errBackoff.MaxInterval = defaultBackoffMaxInterval + f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime + f.errBackoff.Multiplier = defaultBackoffMultiplier + f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor + + f.resetErrBackoff() + f.lastErrorTime = time.Unix(0, 0) + + return f +} + +// newFeedStateManager4Test creates feedStateManager for test +func newFeedStateManager4Test() *feedStateManager { + f := new(feedStateManager) + + f.errBackoff = backoff.NewExponentialBackOff() + f.errBackoff.InitialInterval = 200 * time.Millisecond + f.errBackoff.MaxInterval = 1600 * time.Millisecond + f.errBackoff.MaxElapsedTime = 6 * time.Second + f.errBackoff.Multiplier = 2.0 + f.errBackoff.RandomizationFactor = 0 + + f.resetErrBackoff() + f.lastErrorTime = time.Unix(0, 0) + + return f +} + +// resetErrBackoff reset the backoff-related fields +func (m *feedStateManager) resetErrBackoff() { + m.errBackoff.Reset() + m.backoffInterval = m.errBackoff.NextBackOff() +} + +// isChangefeedStable check if there are states other than 'normal' in this sliding window. +func (m *feedStateManager) isChangefeedStable() bool { + for _, val := range m.stateHistory { + if val != model.StateNormal { + return false + } + } + + return true +} + +// shiftStateWindow shift the sliding window +func (m *feedStateManager) shiftStateWindow(state model.FeedState) { + for i := 0; i < defaultStateWindowSize-1; i++ { + m.stateHistory[i] = m.stateHistory[i+1] + } + + m.stateHistory[defaultStateWindowSize-1] = state } func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) { @@ -145,13 +228,18 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { return } m.shouldBeRunning = true + // when the changefeed is manually resumed, we must reset the backoff + m.resetErrBackoff() + // The lastErrorTime also needs to be cleared before a fresh run. + m.lastErrorTime = time.Unix(0, 0) jobsPending = true m.patchState(model.StateNormal) - // remove error history to make sure the changefeed can running in next tick m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { - if info.Error != nil || len(info.ErrorHis) != 0 { + if info == nil { + return nil, false, nil + } + if info.Error != nil { info.Error = nil - info.ErrorHis = nil return info, true, nil } return info, false, nil @@ -277,8 +365,6 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { if cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(err.Code)) { m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { info.Error = err - info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6) - info.CleanUpOutdatedErrorHistory() return info, true, nil }) m.shouldBeRunning = false @@ -290,16 +376,49 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { for _, err := range errs { info.Error = err - info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6) } - changed := info.CleanUpOutdatedErrorHistory() - return info, changed || len(errs) > 0, nil + return info, len(errs) > 0, nil }) - // if the number of errors has reached the error threshold, stop the changefeed - if m.state.Info.ErrorsReachedThreshold() { + // If we enter into an abnormal state ('error', 'failed') for this changefeed now + // but haven't seen abnormal states in a sliding window (512 ticks), + // it can be assumed that this changefeed meets a sudden change from a stable condition. + // So we can reset the exponential backoff and re-backoff from the InitialInterval. + // TODO: this detection policy should be added into unit test. + if len(errs) > 0 { + m.lastErrorTime = time.Now() + if m.isChangefeedStable() { + m.resetErrBackoff() + } + } else { + if m.state.Info.State == model.StateNormal { + m.lastErrorTime = time.Unix(0, 0) + } + } + m.shiftStateWindow(m.state.Info.State) + + if m.lastErrorTime == time.Unix(0, 0) { + return + } + + if time.Since(m.lastErrorTime) < m.backoffInterval { m.shouldBeRunning = false m.patchState(model.StateError) - return + } else { + oldBackoffInterval := m.backoffInterval + m.backoffInterval = m.errBackoff.NextBackOff() + m.lastErrorTime = time.Unix(0, 0) + + // if the duration since backoff start exceeds MaxElapsedTime, + // we set the state of changefeed to "failed" and don't let it run again unless it is manually resumed. + if m.backoffInterval == backoff.Stop { + log.Warn("changefeed will not be restarted because it has been failing for a long time period", + zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime)) + m.shouldBeRunning = false + m.patchState(model.StateFailed) + } else { + log.Info("changefeed restart backoff interval is changed", zap.String("changefeed", m.state.ID), + zap.Duration("oldInterval", oldBackoffInterval), zap.Duration("newInterval", m.backoffInterval)) + } } } diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 6bb962938e7..f92c0b73f71 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -14,6 +14,8 @@ package owner import ( + "time" + "github.com/pingcap/check" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -29,7 +31,7 @@ type feedStateManagerSuite struct{} func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) - manager := new(feedStateManager) + manager := newFeedStateManager4Test() state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -104,7 +106,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) - manager := new(feedStateManager) + manager := newFeedStateManager4Test() state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -132,7 +134,7 @@ func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) { func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) - manager := new(feedStateManager) + manager := newFeedStateManager4Test() state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -175,7 +177,7 @@ func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) { func (s *feedStateManagerSuite) TestHandleError(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) - manager := new(feedStateManager) + manager := newFeedStateManager4Test() state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -189,25 +191,22 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) { state.PatchTaskStatus(ctx.GlobalVars().CaptureInfo.ID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { return &model.TaskStatus{}, true, nil }) - state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { - return &model.TaskPosition{Error: &model.RunningError{ - Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, - Code: "[CDC:ErrEtcdSessionDone]", - Message: "fake error for test", - }}, true, nil - }) + state.PatchTaskWorkload(ctx.GlobalVars().CaptureInfo.ID, func(workload model.TaskWorkload) (model.TaskWorkload, bool, error) { return model.TaskWorkload{}, true, nil }) tester.MustApplyPatches() manager.Tick(state) tester.MustApplyPatches() - c.Assert(manager.ShouldRunning(), check.IsTrue) - // error reported by processor in task position should be cleaned - c.Assert(state.TaskPositions[ctx.GlobalVars().CaptureInfo.ID].Error, check.IsNil) - // throw error more than history threshold to turn feed state into error - for i := 0; i < model.ErrorHistoryThreshold; i++ { + // the backoff will be stopped after 4600ms because 4600ms + 1600ms > 6000ms. + intervals := []time.Duration{200, 400, 800, 1600, 1600} + for i, d := range intervals { + intervals[i] = d * time.Millisecond + } + + for _, d := range intervals { + c.Assert(manager.ShouldRunning(), check.IsTrue) state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return &model.TaskPosition{Error: &model.RunningError{ Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, @@ -218,18 +217,37 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) { tester.MustApplyPatches() manager.Tick(state) tester.MustApplyPatches() + c.Assert(manager.ShouldRunning(), check.IsFalse) + time.Sleep(d) + manager.Tick(state) + tester.MustApplyPatches() } + c.Assert(manager.ShouldRunning(), check.IsFalse) c.Assert(manager.ShouldRemoved(), check.IsFalse) - c.Assert(state.Info.State, check.Equals, model.StateError) + c.Assert(state.Info.State, check.Equals, model.StateFailed) c.Assert(state.Info.AdminJobType, check.Equals, model.AdminStop) c.Assert(state.Status.AdminJobType, check.Equals, model.AdminStop) + + // admin resume must retry changefeed immediately. + manager.PushAdminJob(&model.AdminJob{ + CfID: ctx.ChangefeedVars().ID, + Type: model.AdminResume, + Opts: &model.AdminJobOption{ForceRemove: false}, + }) + manager.Tick(state) + tester.MustApplyPatches() + c.Assert(manager.ShouldRunning(), check.IsTrue) + c.Assert(manager.ShouldRemoved(), check.IsFalse) + c.Assert(state.Info.State, check.Equals, model.StateNormal) + c.Assert(state.Info.AdminJobType, check.Equals, model.AdminNone) + c.Assert(state.Status.AdminJobType, check.Equals, model.AdminNone) } func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) - manager := new(feedStateManager) + manager := newFeedStateManager4Test() state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, map[string]string{ "/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5","address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`, diff --git a/cdc/sink/codec/schema_registry.go b/cdc/sink/codec/schema_registry.go index bca5cf6b57d..4dd3479bac2 100644 --- a/cdc/sink/codec/schema_registry.go +++ b/cdc/sink/codec/schema_registry.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/linkedin/goavro/v2" "github.com/pingcap/errors" "github.com/pingcap/log" diff --git a/go.mod b/go.mod index 77977624bb0..1bf2423c279 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/aws/aws-sdk-go v1.35.3 github.com/benbjohnson/clock v1.1.0 github.com/bradleyjkemp/grpc-tools v0.2.5 - github.com/cenkalti/backoff v2.2.1+incompatible + github.com/cenkalti/backoff/v4 v4.0.2 github.com/chaos-mesh/go-sqlsmith v0.0.0-20211025024535-03ae33408684 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/coreos/go-semver v0.3.0 diff --git a/go.sum b/go.sum index d9448e91b09..462cb9bda9a 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,7 @@ github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwP github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets= github.com/carlmjohnson/flagext v0.21.0 h1:/c4uK3ie786Z7caXLcIMvePNSSiH3bQVGDvmGLMme60= github.com/carlmjohnson/flagext v0.21.0/go.mod h1:Eenv0epIUAr4NuedNmkzI8WmBmjIxZC239XcKxYS2ac= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs= github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= diff --git a/integration/framework/dsl.go b/integration/framework/dsl.go index 547e18bd625..fd71b4cebba 100644 --- a/integration/framework/dsl.go +++ b/integration/framework/dsl.go @@ -17,7 +17,7 @@ import ( "context" "time" - backoff2 "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/pingcap/errors" "github.com/pingcap/log" "go.uber.org/zap" @@ -90,8 +90,8 @@ func (b *basicAwaitable) Wait() Checkable { } defer cancel() - backoff := backoff2.NewExponentialBackOff() - backoff.MaxInterval = waitMaxPollInterval + expBackoff := backoff.NewExponentialBackOff() + expBackoff.MaxInterval = waitMaxPollInterval for { select { case <-ctx.Done(): @@ -109,8 +109,8 @@ func (b *basicAwaitable) Wait() Checkable { return b } - interval := backoff.NextBackOff() - if interval == backoff2.Stop { + interval := expBackoff.NextBackOff() + if interval == backoff.Stop { return &errorCheckableAndAwaitable{errors.New("Maximum retry interval reached")} } log.Debug("Wait(): pollable returned false, backing off", zap.Duration("interval", interval)) diff --git a/pkg/config/cyclic.go b/pkg/config/cyclic.go index bba476912a4..bff55c546b9 100644 --- a/pkg/config/cyclic.go +++ b/pkg/config/cyclic.go @@ -33,7 +33,7 @@ func (c *CyclicConfig) IsEnabled() bool { return c != nil && c.Enable } -// Marshal returns the json marshal format of a ReplicationConfig +// Marshal returns the json marshal format of a CyclicConfig func (c *CyclicConfig) Marshal() (string, error) { cfg, err := json.Marshal(c) if err != nil { @@ -42,7 +42,7 @@ func (c *CyclicConfig) Marshal() (string, error) { return string(cfg), nil } -// Unmarshal unmarshals into *ReplicationConfig from json marshal byte slice +// Unmarshal unmarshals into *CyclicConfig from json marshal byte slice func (c *CyclicConfig) Unmarshal(data []byte) error { return json.Unmarshal(data, c) } diff --git a/tests/integration_tests/kafka_sink_error_resume/run.sh b/tests/integration_tests/kafka_sink_error_resume/run.sh index 1020d5cd233..7a8c5ccbb3e 100755 --- a/tests/integration_tests/kafka_sink_error_resume/run.sh +++ b/tests/integration_tests/kafka_sink_error_resume/run.sh @@ -39,7 +39,9 @@ function run() { TOPIC_NAME="ticdc-kafka-sink-error-resume-test-$RANDOM" SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/producer/kafka/KafkaSinkAsyncSendError=4*return(true)' + # Return an failpoint error to fail a kafka changefeed. + # Note we return one error for the failpoint, if owner retry changefeed frequently, it may break the test. + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/producer/kafka/KafkaSinkAsyncSendError=1*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"