From 8f755fe3a275d42bb926971b50dc09382b62ba1e Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Sat, 15 Jan 2022 00:59:42 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #4262 Signed-off-by: ti-chi-bot --- cdc/model/changefeed.go | 111 +++++-- cdc/model/changefeed_test.go | 270 ++++++++++++++++++ cdc/owner/changefeed.go | 2 +- cdc/owner/feed_state_manager.go | 165 ++++++++++- cdc/owner/feed_state_manager_test.go | 72 ++++- cdc/sink/codec/schema_registry.go | 2 +- go.mod | 5 + go.sum | 3 +- integration/framework/dsl.go | 10 +- pkg/config/cyclic.go | 4 +- .../kafka_sink_error_resume/run.sh | 4 +- 11 files changed, 590 insertions(+), 58 deletions(-) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 87158608f41..c4eec13acb1 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -17,7 +17,6 @@ import ( "encoding/json" "math" "regexp" - "sort" "time" "github.com/pingcap/errors" @@ -73,6 +72,7 @@ func (s FeedState) ToInt() int { return -1 } +<<<<<<< HEAD const ( // errorHistoryGCInterval represents how long we keep error record in changefeed info errorHistoryGCInterval = time.Minute * 10 @@ -85,6 +85,25 @@ const ( // changefeed, if it is less than ErrorHistoryThreshold, then initialize it. ErrorHistoryThreshold = 3 ) +======= +// IsNeeded return true if the given feedState matches the listState. +func (s FeedState) IsNeeded(need string) bool { + if need == "all" { + return true + } + if need == "" { + switch s { + case StateNormal: + return true + case StateStopped: + return true + case StateFailed: + return true + } + } + return need == string(s) +} +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) // ChangeFeedInfo describes the detail of a ChangeFeed type ChangeFeedInfo struct { @@ -103,10 +122,9 @@ type ChangeFeedInfo struct { // but can be fetched for backward compatibility SortDir string `json:"sort-dir"` - Config *config.ReplicaConfig `json:"config"` - State FeedState `json:"state"` - ErrorHis []int64 `json:"history"` - Error *RunningError `json:"error"` + Config *config.ReplicaConfig `json:"config"` + State FeedState `json:"state"` + Error *RunningError `json:"error"` SyncPointEnabled bool `json:"sync-point-enabled"` SyncPointInterval time.Duration `json:"sync-point-interval"` @@ -232,6 +250,13 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error { if info.Config.Scheduler == nil { info.Config.Scheduler = defaultConfig.Scheduler } +<<<<<<< HEAD +======= + if info.Config.Consistent == nil { + info.Config.Consistent = defaultConfig.Consistent + } + +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) return nil } @@ -283,6 +308,7 @@ func (info *ChangeFeedInfo) fixState() { } } +<<<<<<< HEAD // CheckErrorHistory checks error history of a changefeed // if having error record older than GC interval, set needSave to true. // if error counts reach threshold, set canInit to false. @@ -303,6 +329,57 @@ func (info *ChangeFeedInfo) CheckErrorHistory() (needSave bool, canInit bool) { }) canInit = len(info.ErrorHis)-i < ErrorHistoryThreshold return +======= +// fixSinkProtocol attempts to fix protocol incompatible. +// We no longer support the acceptance of protocols that are not known. +// The ones that were already accepted need to be fixed. +func (info *ChangeFeedInfo) fixSinkProtocol() { + sinkURIParsed, err := url.Parse(info.SinkURI) + if err != nil { + log.Warn("parse sink URI failed", zap.Error(err)) + // SAFETY: It is safe to ignore this unresolvable sink URI here, + // as it is almost impossible for this to happen. + // If we ignore it when fixing it after it happens, + // it will expose the problem when starting the changefeed, + // which is easier to troubleshoot than reporting the error directly in the bootstrap process. + return + } + rawQuery := sinkURIParsed.Query() + protocolStr := rawQuery.Get(config.ProtocolKey) + + needsFix := func(protocolStr string) bool { + var protocol config.Protocol + err = protocol.FromString(protocolStr) + // There are two cases: + // 1. there is an error indicating that the old ticdc accepts + // a protocol that is not known. It needs to be fixed as open protocol. + // 2. If it is default, then it needs to be fixed as open protocol. + return err != nil || protocolStr == config.ProtocolDefault.String() + } + + openProtocolStr := config.ProtocolOpen.String() + // The sinkURI always has a higher priority. + if protocolStr != "" { + if needsFix(protocolStr) { + rawQuery.Set(config.ProtocolKey, openProtocolStr) + oldRawQuery := sinkURIParsed.RawQuery + newRawQuery := rawQuery.Encode() + sinkURIParsed.RawQuery = newRawQuery + fixedSinkURI := sinkURIParsed.String() + log.Info("handle incompatible protocol from sink URI", + zap.String("old URI query", oldRawQuery), + zap.String("fixed URI query", newRawQuery)) + info.SinkURI = fixedSinkURI + } + } else { + if needsFix(info.Config.Sink.Protocol) { + log.Info("handle incompatible protocol from sink config", + zap.String("oldProtocol", info.Config.Sink.Protocol), + zap.String("fixedProtocol", openProtocolStr)) + info.Config.Sink.Protocol = openProtocolStr + } + } +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) } // HasFastFailError returns true if the error in changefeed is fast-fail @@ -312,27 +389,3 @@ func (info *ChangeFeedInfo) HasFastFailError() bool { } return cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) } - -// findActiveErrors finds all errors occurring within errorHistoryCheckInterval -func (info *ChangeFeedInfo) findActiveErrors() []int64 { - i := sort.Search(len(info.ErrorHis), func(i int) bool { - ts := info.ErrorHis[i] - // ts is a errors occurrence time, here to find all errors occurring within errorHistoryCheckInterval - return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval - }) - return info.ErrorHis[i:] -} - -// ErrorsReachedThreshold checks error history of a changefeed -// returns true if error counts reach threshold -func (info *ChangeFeedInfo) ErrorsReachedThreshold() bool { - return len(info.findActiveErrors()) >= ErrorHistoryThreshold -} - -// CleanUpOutdatedErrorHistory cleans up the outdated error history -// return true if the ErrorHis changed -func (info *ChangeFeedInfo) CleanUpOutdatedErrorHistory() bool { - lastLenOfErrorHis := len(info.ErrorHis) - info.ErrorHis = info.findActiveErrors() - return lastLenOfErrorHis != len(info.ErrorHis) -} diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 43ea14afcae..0b9c9ee5bd3 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -500,6 +500,276 @@ func (s *changefeedSuite) TestFixState(c *check.C) { for _, tc := range testCases { tc.info.fixState() +<<<<<<< HEAD c.Assert(tc.info.State, check.Equals, tc.expectedState) +======= + require.Equal(t, tc.expectedState, tc.info.State) + } +} + +func TestFixSinkProtocol(t *testing.T) { + t.Parallel() + + // Test fixing the protocol in the configuration. + configTestCases := []struct { + info *ChangeFeedInfo + expectedProtocol config.Protocol + }{ + { + info: &ChangeFeedInfo{ + SinkURI: "mysql://root:test@127.0.0.1:3306/", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + }, + }, + expectedProtocol: config.ProtocolOpen, + }, + { + info: &ChangeFeedInfo{ + SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: config.ProtocolCanal.String()}, + }, + }, + expectedProtocol: config.ProtocolCanal, + }, + { + info: &ChangeFeedInfo{ + SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + }, + }, + expectedProtocol: config.ProtocolOpen, + }, + { + info: &ChangeFeedInfo{ + SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: "random"}, + }, + }, + expectedProtocol: config.ProtocolOpen, + }, + } + + for _, tc := range configTestCases { + tc.info.fixSinkProtocol() + var protocol config.Protocol + err := protocol.FromString(tc.info.Config.Sink.Protocol) + require.Nil(t, err) + require.Equal(t, tc.expectedProtocol, protocol) + } + + // Test fixing the protocol in SinkURI. + sinkURITestCases := []struct { + info *ChangeFeedInfo + expectedSinkURI string + }{ + { + info: &ChangeFeedInfo{ + SinkURI: "mysql://root:test@127.0.0.1:3306/", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + }, + }, + expectedSinkURI: "mysql://root:test@127.0.0.1:3306/", + }, + { + info: &ChangeFeedInfo{ + SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: config.ProtocolCanal.String()}, + }, + }, + expectedSinkURI: "kafka://127.0.0.1:9092/ticdc-test2", + }, + { + info: &ChangeFeedInfo{ + SinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=canal", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + }, + }, + expectedSinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=canal", + }, + { + info: &ChangeFeedInfo{ + SinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=random", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + }, + }, + expectedSinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=open-protocol", + }, + { + info: &ChangeFeedInfo{ + SinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=random&max-message-size=15", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + }, + }, + expectedSinkURI: "kafka://127.0.0.1:9092/ticdc-test2?max-message-size=15&protocol=open-protocol", + }, + { + info: &ChangeFeedInfo{ + SinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=default&max-message-size=15", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + }, + }, + expectedSinkURI: "kafka://127.0.0.1:9092/ticdc-test2?max-message-size=15&protocol=open-protocol", + }, + } + + for _, tc := range sinkURITestCases { + tc.info.fixSinkProtocol() + require.Equal(t, tc.expectedSinkURI, tc.info.SinkURI) + } +} + +func TestChangeFeedInfoClone(t *testing.T) { + t.Parallel() + + info := &ChangeFeedInfo{ + SinkURI: "blackhole://", + Opts: map[string]string{}, + StartTs: 417257993615179777, + Config: &config.ReplicaConfig{ + CaseSensitive: true, + EnableOldValue: true, + CheckGCSafePoint: true, + }, + } + + cloned, err := info.Clone() + require.Nil(t, err) + sinkURI := "mysql://unix:/var/run/tidb.sock" + cloned.SinkURI = sinkURI + cloned.Config.EnableOldValue = false + require.Equal(t, sinkURI, cloned.SinkURI) + require.False(t, cloned.Config.EnableOldValue) + require.Equal(t, "blackhole://", info.SinkURI) + require.True(t, info.Config.EnableOldValue) +} + +func TestChangefeedInfoStringer(t *testing.T) { + t.Parallel() + + testcases := []struct { + info *ChangeFeedInfo + expectedSinkURIRegexp string + }{ + { + &ChangeFeedInfo{ + SinkURI: "blackhole://", + StartTs: 418881574869139457, + }, + `.*blackhole:.*`, + }, + { + &ChangeFeedInfo{ + SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", + StartTs: 418881574869139457, + }, + `.*kafka://\*\*\*/ticdc-test2.*`, + }, + { + &ChangeFeedInfo{ + SinkURI: "mysql://root:124567@127.0.0.1:3306/", + StartTs: 418881574869139457, + }, + `.*mysql://username:password@\*\*\*/.*`, + }, + { + &ChangeFeedInfo{ + SinkURI: "mysql://root@127.0.0.1:3306/", + StartTs: 418881574869139457, + }, + `.*mysql://username:password@\*\*\*/.*`, + }, + { + &ChangeFeedInfo{ + SinkURI: "mysql://root:test%21%23%24%25%5E%26%2A@127.0.0.1:3306/", + StartTs: 418881574869139457, + }, + `.*mysql://username:password@\*\*\*/.*`, + }, + } + + for _, tc := range testcases { + require.Regexp(t, tc.expectedSinkURIRegexp, tc.info.String()) + } +} + +func TestValidateChangefeedID(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + id string + wantErr bool + }{ + { + name: "alphabet", + id: "testTtTT", + wantErr: false, + }, + { + name: "number", + id: "01131323", + wantErr: false, + }, + { + name: "mixed", + id: "9ff52acaA-aea6-4022-8eVc4-fbee3fD2c7890", + wantErr: false, + }, + { + name: "len==128", + id: "1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890123456789012345678901234567890", + wantErr: false, + }, + { + name: "empty string 1", + id: "", + wantErr: true, + }, + { + name: "empty string 2", + id: " ", + wantErr: true, + }, + { + name: "test_task", + id: "test_task ", + wantErr: true, + }, + { + name: "job$", + id: "job$ ", + wantErr: true, + }, + { + name: "test-", + id: "test-", + wantErr: true, + }, + { + name: "-", + id: "-", + wantErr: true, + }, + { + name: "-sfsdfdf1", + id: "-sfsdfdf1", + wantErr: true, + }, + { + name: "len==129", + id: "1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-123456789012345678901234567890", + wantErr: true, + }, +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) } } diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index f3056aa91d6..e93de2e1b28 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -75,7 +75,7 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed { id: id, scheduler: newScheduler(), barriers: newBarriers(), - feedStateManager: new(feedStateManager), + feedStateManager: newFeedStateManager(), gcManager: gcManager, errCh: make(chan error, defaultErrChSize), diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index a10376e07f1..af11cd759e6 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -16,6 +16,7 @@ package owner import ( "time" + "github.com/cenkalti/backoff/v4" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -23,13 +24,95 @@ import ( "go.uber.org/zap" ) +const ( + // When errors occurred and we need to do backoff, we start an exponential backoff + // with an interval from 10s to 30min (10s, 20s, 40s, 80s, 160s, 320s, 640s, 1280s, 1800s). + // And the backoff will be stopped after 72 min (about 9 tries) because if we do another 30min backoff, + // the total duration (72+30=102min) will exceeds the MaxElapsedTime (90min). + // To avoid thunderherd, a random factor is also added. + defaultBackoffInitInterval = 10 * time.Second + defaultBackoffMaxInterval = 30 * time.Minute + defaultBackoffMaxElapsedTime = 90 * time.Minute + defaultBackoffRandomizationFactor = 0.1 + defaultBackoffMultiplier = 2.0 + + // If all states recorded in window are 'normal', it can be assumed that the changfeed + // is running steady. And then if we enter a state other than normal at next tick, + // the backoff must be reset. + defaultStateWindowSize = 512 +) + // feedStateManager manages the ReactorState of a changefeed // when a error or a admin job occurs, the feedStateManager is responsible for controlling the ReactorState type feedStateManager struct { state *model.ChangefeedReactorState shouldBeRunning bool - adminJobQueue []*model.AdminJob + adminJobQueue []*model.AdminJob + stateHistory [defaultStateWindowSize]model.FeedState + lastErrorTime time.Time // time of last error for a changefeed + backoffInterval time.Duration // the interval for restarting a changefeed in 'error' state + errBackoff *backoff.ExponentialBackOff // an exponential backoff for restarting a changefeed +} + +// newFeedStateManager creates feedStateManager and initialize the exponential backoff +func newFeedStateManager() *feedStateManager { + f := new(feedStateManager) + + f.errBackoff = backoff.NewExponentialBackOff() + f.errBackoff.InitialInterval = defaultBackoffInitInterval + f.errBackoff.MaxInterval = defaultBackoffMaxInterval + f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime + f.errBackoff.Multiplier = defaultBackoffMultiplier + f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor + + f.resetErrBackoff() + f.lastErrorTime = time.Unix(0, 0) + + return f +} + +// newFeedStateManager4Test creates feedStateManager for test +func newFeedStateManager4Test() *feedStateManager { + f := new(feedStateManager) + + f.errBackoff = backoff.NewExponentialBackOff() + f.errBackoff.InitialInterval = 200 * time.Millisecond + f.errBackoff.MaxInterval = 1600 * time.Millisecond + f.errBackoff.MaxElapsedTime = 6 * time.Second + f.errBackoff.Multiplier = 2.0 + f.errBackoff.RandomizationFactor = 0 + + f.resetErrBackoff() + f.lastErrorTime = time.Unix(0, 0) + + return f +} + +// resetErrBackoff reset the backoff-related fields +func (m *feedStateManager) resetErrBackoff() { + m.errBackoff.Reset() + m.backoffInterval = m.errBackoff.NextBackOff() +} + +// isChangefeedStable check if there are states other than 'normal' in this sliding window. +func (m *feedStateManager) isChangefeedStable() bool { + for _, val := range m.stateHistory { + if val != model.StateNormal { + return false + } + } + + return true +} + +// shiftStateWindow shift the sliding window +func (m *feedStateManager) shiftStateWindow(state model.FeedState) { + for i := 0; i < defaultStateWindowSize-1; i++ { + m.stateHistory[i] = m.stateHistory[i+1] + } + + m.stateHistory[defaultStateWindowSize-1] = state } func (m *feedStateManager) Tick(state *model.ChangefeedReactorState) { @@ -132,13 +215,22 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { return } m.shouldBeRunning = true + // when the changefeed is manually resumed, we must reset the backoff + m.resetErrBackoff() + // The lastErrorTime also needs to be cleared before a fresh run. + m.lastErrorTime = time.Unix(0, 0) jobsPending = true m.patchState(model.StateNormal) - // remove error history to make sure the changefeed can running in next tick m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { +<<<<<<< HEAD if info.Error != nil || len(info.ErrorHis) != 0 { +======= + if info == nil { + return nil, false, nil + } + if info.Error != nil { +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) info.Error = nil - info.ErrorHis = nil return info, true, nil } return info, false, nil @@ -257,12 +349,33 @@ func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError { return result } +<<<<<<< HEAD func (m *feedStateManager) HandleError(errs ...*model.RunningError) { +======= +func (m *feedStateManager) handleError(errs ...*model.RunningError) { + // if there are a fastFail error in errs, we can just fastFail the changefeed + // and no need to patch other error to the changefeed info + for _, err := range errs { + if cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(err.Code)) { + m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + if info == nil { + return nil, false, nil + } + info.Error = err + return info, true, nil + }) + m.shouldBeRunning = false + m.patchState(model.StateFailed) + return + } + } + +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { for _, err := range errs { info.Error = err - info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6) } +<<<<<<< HEAD needSave := info.CleanUpOutdatedErrorHistory() return info, needSave || len(errs) > 0, nil }) @@ -279,8 +392,50 @@ func (m *feedStateManager) HandleError(errs ...*model.RunningError) { } // if the number of errors has reached the error threshold, stop the changefeed if m.state.Info.ErrorsReachedThreshold() { +======= + return info, len(errs) > 0, nil + }) + + // If we enter into an abnormal state ('error', 'failed') for this changefeed now + // but haven't seen abnormal states in a sliding window (512 ticks), + // it can be assumed that this changefeed meets a sudden change from a stable condition. + // So we can reset the exponential backoff and re-backoff from the InitialInterval. + // TODO: this detection policy should be added into unit test. + if len(errs) > 0 { + m.lastErrorTime = time.Now() + if m.isChangefeedStable() { + m.resetErrBackoff() + } + } else { + if m.state.Info.State == model.StateNormal { + m.lastErrorTime = time.Unix(0, 0) + } + } + m.shiftStateWindow(m.state.Info.State) + + if m.lastErrorTime == time.Unix(0, 0) { + return + } + + if time.Since(m.lastErrorTime) < m.backoffInterval { +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) m.shouldBeRunning = false m.patchState(model.StateError) - return + } else { + oldBackoffInterval := m.backoffInterval + m.backoffInterval = m.errBackoff.NextBackOff() + m.lastErrorTime = time.Unix(0, 0) + + // if the duration since backoff start exceeds MaxElapsedTime, + // we set the state of changefeed to "failed" and don't let it run again unless it is manually resumed. + if m.backoffInterval == backoff.Stop { + log.Warn("changefeed will not be restarted because it has been failing for a long time period", + zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime)) + m.shouldBeRunning = false + m.patchState(model.StateFailed) + } else { + log.Info("changefeed restart backoff interval is changed", zap.String("changefeed", m.state.ID), + zap.Duration("oldInterval", oldBackoffInterval), zap.Duration("newInterval", m.backoffInterval)) + } } } diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 4a9ec487745..4bc6073dc97 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -14,6 +14,8 @@ package owner import ( + "time" + "github.com/pingcap/check" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -30,8 +32,13 @@ type feedStateManagerSuite struct { func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) +<<<<<<< HEAD manager := new(feedStateManager) state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) +======= + manager := newFeedStateManager4Test() + state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { c.Assert(info, check.IsNil) @@ -102,8 +109,13 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) +<<<<<<< HEAD manager := new(feedStateManager) state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) +======= + manager := newFeedStateManager4Test() + state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { c.Assert(info, check.IsNil) @@ -130,8 +142,13 @@ func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) { func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) +<<<<<<< HEAD manager := new(feedStateManager) state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) +======= + manager := newFeedStateManager4Test() + state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { c.Assert(info, check.IsNil) @@ -173,8 +190,13 @@ func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) { func (s *feedStateManagerSuite) TestHandleError(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) +<<<<<<< HEAD manager := new(feedStateManager) state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) +======= + manager := newFeedStateManager4Test() + state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { c.Assert(info, check.IsNil) @@ -187,25 +209,22 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) { state.PatchTaskStatus(ctx.GlobalVars().CaptureInfo.ID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { return &model.TaskStatus{}, true, nil }) - state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { - return &model.TaskPosition{Error: &model.RunningError{ - Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, - Code: "[CDC:ErrEtcdSessionDone]", - Message: "fake error for test", - }}, true, nil - }) + state.PatchTaskWorkload(ctx.GlobalVars().CaptureInfo.ID, func(workload model.TaskWorkload) (model.TaskWorkload, bool, error) { return model.TaskWorkload{}, true, nil }) tester.MustApplyPatches() manager.Tick(state) tester.MustApplyPatches() - c.Assert(manager.ShouldRunning(), check.IsTrue) - // error reported by processor in task position should be cleaned - c.Assert(state.TaskPositions[ctx.GlobalVars().CaptureInfo.ID].Error, check.IsNil) - // throw error more than history threshold to turn feed state into error - for i := 0; i < model.ErrorHistoryThreshold; i++ { + // the backoff will be stopped after 4600ms because 4600ms + 1600ms > 6000ms. + intervals := []time.Duration{200, 400, 800, 1600, 1600} + for i, d := range intervals { + intervals[i] = d * time.Millisecond + } + + for _, d := range intervals { + c.Assert(manager.ShouldRunning(), check.IsTrue) state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return &model.TaskPosition{Error: &model.RunningError{ Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, @@ -216,18 +235,47 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) { tester.MustApplyPatches() manager.Tick(state) tester.MustApplyPatches() + c.Assert(manager.ShouldRunning(), check.IsFalse) + time.Sleep(d) + manager.Tick(state) + tester.MustApplyPatches() } + c.Assert(manager.ShouldRunning(), check.IsFalse) +<<<<<<< HEAD c.Assert(state.Info.State, check.Equals, model.StateError) +======= + c.Assert(manager.ShouldRemoved(), check.IsFalse) + c.Assert(state.Info.State, check.Equals, model.StateFailed) +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) c.Assert(state.Info.AdminJobType, check.Equals, model.AdminStop) c.Assert(state.Status.AdminJobType, check.Equals, model.AdminStop) + + // admin resume must retry changefeed immediately. + manager.PushAdminJob(&model.AdminJob{ + CfID: ctx.ChangefeedVars().ID, + Type: model.AdminResume, + Opts: &model.AdminJobOption{ForceRemove: false}, + }) + manager.Tick(state) + tester.MustApplyPatches() + c.Assert(manager.ShouldRunning(), check.IsTrue) + c.Assert(manager.ShouldRemoved(), check.IsFalse) + c.Assert(state.Info.State, check.Equals, model.StateNormal) + c.Assert(state.Info.AdminJobType, check.Equals, model.AdminNone) + c.Assert(state.Status.AdminJobType, check.Equals, model.AdminNone) } func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) +<<<<<<< HEAD manager := new(feedStateManager) state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) +======= + manager := newFeedStateManager4Test() + state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) tester := orchestrator.NewReactorStateTester(c, state, map[string]string{ "/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5","address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`, "/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole:///","opts":{},"create-time":"2021-06-05T00:44:15.065939487+08:00","start-ts":425381670108266496,"target-ts":0,"admin-job-type":1,"sort-engine":"unified","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"failed","history":[],"error":{"addr":"172.16.6.147:8300","code":"CDC:ErrSnapshotLostByGC","message":"[CDC:ErrSnapshotLostByGC]fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts 425381670108266496 is earlier than GC safepoint at 0"},"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v5.0.0-master-dirty"}`, diff --git a/cdc/sink/codec/schema_registry.go b/cdc/sink/codec/schema_registry.go index bed095edf48..51e8a96ac70 100644 --- a/cdc/sink/codec/schema_registry.go +++ b/cdc/sink/codec/schema_registry.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/linkedin/goavro/v2" "github.com/pingcap/errors" "github.com/pingcap/log" diff --git a/go.mod b/go.mod index 3353be73e2a..327411bb27a 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,12 @@ require ( github.com/apache/pulsar-client-go v0.1.1 github.com/benbjohnson/clock v1.3.0 github.com/bradleyjkemp/grpc-tools v0.2.5 +<<<<<<< HEAD github.com/cenkalti/backoff v2.2.1+incompatible +======= + github.com/cenkalti/backoff/v4 v4.0.2 + github.com/chaos-mesh/go-sqlsmith v0.0.0-20211025024535-03ae33408684 +>>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/coreos/go-semver v0.3.0 github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect diff --git a/go.sum b/go.sum index f1f665d98eb..c1a0164fce1 100644 --- a/go.sum +++ b/go.sum @@ -89,8 +89,7 @@ github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwP github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets= github.com/carlmjohnson/flagext v0.21.0 h1:/c4uK3ie786Z7caXLcIMvePNSSiH3bQVGDvmGLMme60= github.com/carlmjohnson/flagext v0.21.0/go.mod h1:Eenv0epIUAr4NuedNmkzI8WmBmjIxZC239XcKxYS2ac= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs= github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= diff --git a/integration/framework/dsl.go b/integration/framework/dsl.go index 547e18bd625..fd71b4cebba 100644 --- a/integration/framework/dsl.go +++ b/integration/framework/dsl.go @@ -17,7 +17,7 @@ import ( "context" "time" - backoff2 "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/pingcap/errors" "github.com/pingcap/log" "go.uber.org/zap" @@ -90,8 +90,8 @@ func (b *basicAwaitable) Wait() Checkable { } defer cancel() - backoff := backoff2.NewExponentialBackOff() - backoff.MaxInterval = waitMaxPollInterval + expBackoff := backoff.NewExponentialBackOff() + expBackoff.MaxInterval = waitMaxPollInterval for { select { case <-ctx.Done(): @@ -109,8 +109,8 @@ func (b *basicAwaitable) Wait() Checkable { return b } - interval := backoff.NextBackOff() - if interval == backoff2.Stop { + interval := expBackoff.NextBackOff() + if interval == backoff.Stop { return &errorCheckableAndAwaitable{errors.New("Maximum retry interval reached")} } log.Debug("Wait(): pollable returned false, backing off", zap.Duration("interval", interval)) diff --git a/pkg/config/cyclic.go b/pkg/config/cyclic.go index bba476912a4..bff55c546b9 100644 --- a/pkg/config/cyclic.go +++ b/pkg/config/cyclic.go @@ -33,7 +33,7 @@ func (c *CyclicConfig) IsEnabled() bool { return c != nil && c.Enable } -// Marshal returns the json marshal format of a ReplicationConfig +// Marshal returns the json marshal format of a CyclicConfig func (c *CyclicConfig) Marshal() (string, error) { cfg, err := json.Marshal(c) if err != nil { @@ -42,7 +42,7 @@ func (c *CyclicConfig) Marshal() (string, error) { return string(cfg), nil } -// Unmarshal unmarshals into *ReplicationConfig from json marshal byte slice +// Unmarshal unmarshals into *CyclicConfig from json marshal byte slice func (c *CyclicConfig) Unmarshal(data []byte) error { return json.Unmarshal(data, c) } diff --git a/tests/integration_tests/kafka_sink_error_resume/run.sh b/tests/integration_tests/kafka_sink_error_resume/run.sh index 1020d5cd233..7a8c5ccbb3e 100755 --- a/tests/integration_tests/kafka_sink_error_resume/run.sh +++ b/tests/integration_tests/kafka_sink_error_resume/run.sh @@ -39,7 +39,9 @@ function run() { TOPIC_NAME="ticdc-kafka-sink-error-resume-test-$RANDOM" SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/producer/kafka/KafkaSinkAsyncSendError=4*return(true)' + # Return an failpoint error to fail a kafka changefeed. + # Note we return one error for the failpoint, if owner retry changefeed frequently, it may break the test. + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/producer/kafka/KafkaSinkAsyncSendError=1*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" From 059ba2e0f82e00458095f7eb22881b74d5286db0 Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Wed, 19 Jan 2022 15:58:38 +0800 Subject: [PATCH 2/3] address conflicts --- cdc/model/changefeed.go | 114 ----------- cdc/model/changefeed_test.go | 270 --------------------------- cdc/owner/changefeed.go | 2 +- cdc/owner/feed_state_manager.go | 29 +-- cdc/owner/feed_state_manager_test.go | 41 +--- go.mod | 5 - pkg/retry/retry.go | 2 +- 7 files changed, 8 insertions(+), 455 deletions(-) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index c4eec13acb1..ee06b127197 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -72,39 +72,6 @@ func (s FeedState) ToInt() int { return -1 } -<<<<<<< HEAD -const ( - // errorHistoryGCInterval represents how long we keep error record in changefeed info - errorHistoryGCInterval = time.Minute * 10 - - // errorHistoryCheckInterval represents time window for failure check - errorHistoryCheckInterval = time.Minute * 2 - - // ErrorHistoryThreshold represents failure upper limit in time window. - // Before a changefeed is initialized, check the the failure count of this - // changefeed, if it is less than ErrorHistoryThreshold, then initialize it. - ErrorHistoryThreshold = 3 -) -======= -// IsNeeded return true if the given feedState matches the listState. -func (s FeedState) IsNeeded(need string) bool { - if need == "all" { - return true - } - if need == "" { - switch s { - case StateNormal: - return true - case StateStopped: - return true - case StateFailed: - return true - } - } - return need == string(s) -} ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) - // ChangeFeedInfo describes the detail of a ChangeFeed type ChangeFeedInfo struct { SinkURI string `json:"sink-uri"` @@ -250,13 +217,6 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error { if info.Config.Scheduler == nil { info.Config.Scheduler = defaultConfig.Scheduler } -<<<<<<< HEAD -======= - if info.Config.Consistent == nil { - info.Config.Consistent = defaultConfig.Consistent - } - ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) return nil } @@ -308,80 +268,6 @@ func (info *ChangeFeedInfo) fixState() { } } -<<<<<<< HEAD -// CheckErrorHistory checks error history of a changefeed -// if having error record older than GC interval, set needSave to true. -// if error counts reach threshold, set canInit to false. -func (info *ChangeFeedInfo) CheckErrorHistory() (needSave bool, canInit bool) { - i := sort.Search(len(info.ErrorHis), func(i int) bool { - ts := info.ErrorHis[i] - return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryGCInterval - }) - info.ErrorHis = info.ErrorHis[i:] - - if i > 0 { - needSave = true - } - - i = sort.Search(len(info.ErrorHis), func(i int) bool { - ts := info.ErrorHis[i] - return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval - }) - canInit = len(info.ErrorHis)-i < ErrorHistoryThreshold - return -======= -// fixSinkProtocol attempts to fix protocol incompatible. -// We no longer support the acceptance of protocols that are not known. -// The ones that were already accepted need to be fixed. -func (info *ChangeFeedInfo) fixSinkProtocol() { - sinkURIParsed, err := url.Parse(info.SinkURI) - if err != nil { - log.Warn("parse sink URI failed", zap.Error(err)) - // SAFETY: It is safe to ignore this unresolvable sink URI here, - // as it is almost impossible for this to happen. - // If we ignore it when fixing it after it happens, - // it will expose the problem when starting the changefeed, - // which is easier to troubleshoot than reporting the error directly in the bootstrap process. - return - } - rawQuery := sinkURIParsed.Query() - protocolStr := rawQuery.Get(config.ProtocolKey) - - needsFix := func(protocolStr string) bool { - var protocol config.Protocol - err = protocol.FromString(protocolStr) - // There are two cases: - // 1. there is an error indicating that the old ticdc accepts - // a protocol that is not known. It needs to be fixed as open protocol. - // 2. If it is default, then it needs to be fixed as open protocol. - return err != nil || protocolStr == config.ProtocolDefault.String() - } - - openProtocolStr := config.ProtocolOpen.String() - // The sinkURI always has a higher priority. - if protocolStr != "" { - if needsFix(protocolStr) { - rawQuery.Set(config.ProtocolKey, openProtocolStr) - oldRawQuery := sinkURIParsed.RawQuery - newRawQuery := rawQuery.Encode() - sinkURIParsed.RawQuery = newRawQuery - fixedSinkURI := sinkURIParsed.String() - log.Info("handle incompatible protocol from sink URI", - zap.String("old URI query", oldRawQuery), - zap.String("fixed URI query", newRawQuery)) - info.SinkURI = fixedSinkURI - } - } else { - if needsFix(info.Config.Sink.Protocol) { - log.Info("handle incompatible protocol from sink config", - zap.String("oldProtocol", info.Config.Sink.Protocol), - zap.String("fixedProtocol", openProtocolStr)) - info.Config.Sink.Protocol = openProtocolStr - } - } ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) -} - // HasFastFailError returns true if the error in changefeed is fast-fail func (info *ChangeFeedInfo) HasFastFailError() bool { if info.Error == nil { diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 0b9c9ee5bd3..43ea14afcae 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -500,276 +500,6 @@ func (s *changefeedSuite) TestFixState(c *check.C) { for _, tc := range testCases { tc.info.fixState() -<<<<<<< HEAD c.Assert(tc.info.State, check.Equals, tc.expectedState) -======= - require.Equal(t, tc.expectedState, tc.info.State) - } -} - -func TestFixSinkProtocol(t *testing.T) { - t.Parallel() - - // Test fixing the protocol in the configuration. - configTestCases := []struct { - info *ChangeFeedInfo - expectedProtocol config.Protocol - }{ - { - info: &ChangeFeedInfo{ - SinkURI: "mysql://root:test@127.0.0.1:3306/", - Config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, - }, - }, - expectedProtocol: config.ProtocolOpen, - }, - { - info: &ChangeFeedInfo{ - SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", - Config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{Protocol: config.ProtocolCanal.String()}, - }, - }, - expectedProtocol: config.ProtocolCanal, - }, - { - info: &ChangeFeedInfo{ - SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", - Config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, - }, - }, - expectedProtocol: config.ProtocolOpen, - }, - { - info: &ChangeFeedInfo{ - SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", - Config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{Protocol: "random"}, - }, - }, - expectedProtocol: config.ProtocolOpen, - }, - } - - for _, tc := range configTestCases { - tc.info.fixSinkProtocol() - var protocol config.Protocol - err := protocol.FromString(tc.info.Config.Sink.Protocol) - require.Nil(t, err) - require.Equal(t, tc.expectedProtocol, protocol) - } - - // Test fixing the protocol in SinkURI. - sinkURITestCases := []struct { - info *ChangeFeedInfo - expectedSinkURI string - }{ - { - info: &ChangeFeedInfo{ - SinkURI: "mysql://root:test@127.0.0.1:3306/", - Config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, - }, - }, - expectedSinkURI: "mysql://root:test@127.0.0.1:3306/", - }, - { - info: &ChangeFeedInfo{ - SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", - Config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{Protocol: config.ProtocolCanal.String()}, - }, - }, - expectedSinkURI: "kafka://127.0.0.1:9092/ticdc-test2", - }, - { - info: &ChangeFeedInfo{ - SinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=canal", - Config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, - }, - }, - expectedSinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=canal", - }, - { - info: &ChangeFeedInfo{ - SinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=random", - Config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, - }, - }, - expectedSinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=open-protocol", - }, - { - info: &ChangeFeedInfo{ - SinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=random&max-message-size=15", - Config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, - }, - }, - expectedSinkURI: "kafka://127.0.0.1:9092/ticdc-test2?max-message-size=15&protocol=open-protocol", - }, - { - info: &ChangeFeedInfo{ - SinkURI: "kafka://127.0.0.1:9092/ticdc-test2?protocol=default&max-message-size=15", - Config: &config.ReplicaConfig{ - Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, - }, - }, - expectedSinkURI: "kafka://127.0.0.1:9092/ticdc-test2?max-message-size=15&protocol=open-protocol", - }, - } - - for _, tc := range sinkURITestCases { - tc.info.fixSinkProtocol() - require.Equal(t, tc.expectedSinkURI, tc.info.SinkURI) - } -} - -func TestChangeFeedInfoClone(t *testing.T) { - t.Parallel() - - info := &ChangeFeedInfo{ - SinkURI: "blackhole://", - Opts: map[string]string{}, - StartTs: 417257993615179777, - Config: &config.ReplicaConfig{ - CaseSensitive: true, - EnableOldValue: true, - CheckGCSafePoint: true, - }, - } - - cloned, err := info.Clone() - require.Nil(t, err) - sinkURI := "mysql://unix:/var/run/tidb.sock" - cloned.SinkURI = sinkURI - cloned.Config.EnableOldValue = false - require.Equal(t, sinkURI, cloned.SinkURI) - require.False(t, cloned.Config.EnableOldValue) - require.Equal(t, "blackhole://", info.SinkURI) - require.True(t, info.Config.EnableOldValue) -} - -func TestChangefeedInfoStringer(t *testing.T) { - t.Parallel() - - testcases := []struct { - info *ChangeFeedInfo - expectedSinkURIRegexp string - }{ - { - &ChangeFeedInfo{ - SinkURI: "blackhole://", - StartTs: 418881574869139457, - }, - `.*blackhole:.*`, - }, - { - &ChangeFeedInfo{ - SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", - StartTs: 418881574869139457, - }, - `.*kafka://\*\*\*/ticdc-test2.*`, - }, - { - &ChangeFeedInfo{ - SinkURI: "mysql://root:124567@127.0.0.1:3306/", - StartTs: 418881574869139457, - }, - `.*mysql://username:password@\*\*\*/.*`, - }, - { - &ChangeFeedInfo{ - SinkURI: "mysql://root@127.0.0.1:3306/", - StartTs: 418881574869139457, - }, - `.*mysql://username:password@\*\*\*/.*`, - }, - { - &ChangeFeedInfo{ - SinkURI: "mysql://root:test%21%23%24%25%5E%26%2A@127.0.0.1:3306/", - StartTs: 418881574869139457, - }, - `.*mysql://username:password@\*\*\*/.*`, - }, - } - - for _, tc := range testcases { - require.Regexp(t, tc.expectedSinkURIRegexp, tc.info.String()) - } -} - -func TestValidateChangefeedID(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - id string - wantErr bool - }{ - { - name: "alphabet", - id: "testTtTT", - wantErr: false, - }, - { - name: "number", - id: "01131323", - wantErr: false, - }, - { - name: "mixed", - id: "9ff52acaA-aea6-4022-8eVc4-fbee3fD2c7890", - wantErr: false, - }, - { - name: "len==128", - id: "1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890123456789012345678901234567890", - wantErr: false, - }, - { - name: "empty string 1", - id: "", - wantErr: true, - }, - { - name: "empty string 2", - id: " ", - wantErr: true, - }, - { - name: "test_task", - id: "test_task ", - wantErr: true, - }, - { - name: "job$", - id: "job$ ", - wantErr: true, - }, - { - name: "test-", - id: "test-", - wantErr: true, - }, - { - name: "-", - id: "-", - wantErr: true, - }, - { - name: "-sfsdfdf1", - id: "-sfsdfdf1", - wantErr: true, - }, - { - name: "len==129", - id: "1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-1234567890-123456789012345678901234567890", - wantErr: true, - }, ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) } } diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index e93de2e1b28..480502e4dbe 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -112,7 +112,7 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactor } else { code = string(cerror.ErrOwnerUnknown.RFCCode()) } - c.feedStateManager.HandleError(&model.RunningError{ + c.feedStateManager.handleError(&model.RunningError{ Addr: util.CaptureAddrFromCtx(ctx), Code: code, Message: err.Error(), diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index af11cd759e6..c3629697186 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -136,7 +136,7 @@ func (m *feedStateManager) Tick(state *model.ChangefeedReactorState) { return } errs := m.errorsReportedByProcessors() - m.HandleError(errs...) + m.handleError(errs...) } func (m *feedStateManager) ShouldRunning() bool { @@ -222,14 +222,10 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { jobsPending = true m.patchState(model.StateNormal) m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { -<<<<<<< HEAD - if info.Error != nil || len(info.ErrorHis) != 0 { -======= if info == nil { return nil, false, nil } if info.Error != nil { ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) info.Error = nil return info, true, nil } @@ -349,9 +345,6 @@ func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError { return result } -<<<<<<< HEAD -func (m *feedStateManager) HandleError(errs ...*model.RunningError) { -======= func (m *feedStateManager) handleError(errs ...*model.RunningError) { // if there are a fastFail error in errs, we can just fastFail the changefeed // and no need to patch other error to the changefeed info @@ -370,29 +363,10 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { } } ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { for _, err := range errs { info.Error = err } -<<<<<<< HEAD - needSave := info.CleanUpOutdatedErrorHistory() - return info, needSave || len(errs) > 0, nil - }) - var err *model.RunningError - if len(errs) > 0 { - err = errs[len(errs)-1] - } - // if one of the error stored by changefeed state(error in the last tick) or the error specified by this function(error in the this tick) - // is a fast-fail error, the changefeed should be failed - if m.state.Info.HasFastFailError() || (err != nil && cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(err.Code))) { - m.shouldBeRunning = false - m.patchState(model.StateFailed) - return - } - // if the number of errors has reached the error threshold, stop the changefeed - if m.state.Info.ErrorsReachedThreshold() { -======= return info, len(errs) > 0, nil }) @@ -418,7 +392,6 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { } if time.Since(m.lastErrorTime) < m.backoffInterval { ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) m.shouldBeRunning = false m.patchState(model.StateError) } else { diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 4bc6073dc97..19fb303fc94 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -32,13 +32,8 @@ type feedStateManagerSuite struct { func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) -<<<<<<< HEAD - manager := new(feedStateManager) - state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) -======= manager := newFeedStateManager4Test() - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) + state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { c.Assert(info, check.IsNil) @@ -109,13 +104,8 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) -<<<<<<< HEAD - manager := new(feedStateManager) - state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) -======= manager := newFeedStateManager4Test() - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) + state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { c.Assert(info, check.IsNil) @@ -142,13 +132,8 @@ func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) { func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) -<<<<<<< HEAD - manager := new(feedStateManager) - state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) -======= manager := newFeedStateManager4Test() - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) + state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { c.Assert(info, check.IsNil) @@ -190,13 +175,8 @@ func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) { func (s *feedStateManagerSuite) TestHandleError(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) -<<<<<<< HEAD - manager := new(feedStateManager) - state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) -======= manager := newFeedStateManager4Test() - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) + state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { c.Assert(info, check.IsNil) @@ -242,12 +222,7 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) { } c.Assert(manager.ShouldRunning(), check.IsFalse) -<<<<<<< HEAD - c.Assert(state.Info.State, check.Equals, model.StateError) -======= - c.Assert(manager.ShouldRemoved(), check.IsFalse) c.Assert(state.Info.State, check.Equals, model.StateFailed) ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) c.Assert(state.Info.AdminJobType, check.Equals, model.AdminStop) c.Assert(state.Status.AdminJobType, check.Equals, model.AdminStop) @@ -260,7 +235,6 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) { manager.Tick(state) tester.MustApplyPatches() c.Assert(manager.ShouldRunning(), check.IsTrue) - c.Assert(manager.ShouldRemoved(), check.IsFalse) c.Assert(state.Info.State, check.Equals, model.StateNormal) c.Assert(state.Info.AdminJobType, check.Equals, model.AdminNone) c.Assert(state.Status.AdminJobType, check.Equals, model.AdminNone) @@ -269,13 +243,8 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) { func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) -<<<<<<< HEAD - manager := new(feedStateManager) - state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) -======= manager := newFeedStateManager4Test() - state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID) ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) + state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, map[string]string{ "/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5","address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`, "/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole:///","opts":{},"create-time":"2021-06-05T00:44:15.065939487+08:00","start-ts":425381670108266496,"target-ts":0,"admin-job-type":1,"sort-engine":"unified","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"failed","history":[],"error":{"addr":"172.16.6.147:8300","code":"CDC:ErrSnapshotLostByGC","message":"[CDC:ErrSnapshotLostByGC]fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts 425381670108266496 is earlier than GC safepoint at 0"},"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v5.0.0-master-dirty"}`, diff --git a/go.mod b/go.mod index 327411bb27a..385b5518ded 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,7 @@ require ( github.com/apache/pulsar-client-go v0.1.1 github.com/benbjohnson/clock v1.3.0 github.com/bradleyjkemp/grpc-tools v0.2.5 -<<<<<<< HEAD - github.com/cenkalti/backoff v2.2.1+incompatible -======= github.com/cenkalti/backoff/v4 v4.0.2 - github.com/chaos-mesh/go-sqlsmith v0.0.0-20211025024535-03ae33408684 ->>>>>>> 58c7cc3ae (owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262)) github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/coreos/go-semver v0.3.0 github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index bd8bc41a34a..089c83a7043 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -18,7 +18,7 @@ import ( "math" "time" - "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/pingcap/errors" ) From 265b523873f5627025f6d3f164e9405415f98284 Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Wed, 19 Jan 2022 16:25:33 +0800 Subject: [PATCH 3/3] fix a failed ut --- cdc/model/changefeed_test.go | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 43ea14afcae..b9d69130fab 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -218,33 +218,6 @@ type changefeedSuite struct{} var _ = check.Suite(&changefeedSuite{}) -func (s *changefeedSuite) TestCheckErrorHistory(c *check.C) { - defer testleak.AfterTest(c)() - now := time.Now() - info := &ChangeFeedInfo{ - ErrorHis: []int64{}, - } - for i := 0; i < 5; i++ { - tm := now.Add(-errorHistoryGCInterval) - info.ErrorHis = append(info.ErrorHis, tm.UnixNano()/1e6) - time.Sleep(time.Millisecond) - } - for i := 0; i < ErrorHistoryThreshold-1; i++ { - info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6) - time.Sleep(time.Millisecond) - } - time.Sleep(time.Millisecond) - needSave, canInit := info.CheckErrorHistory() - c.Assert(needSave, check.IsTrue) - c.Assert(canInit, check.IsTrue) - c.Assert(info.ErrorHis, check.HasLen, ErrorHistoryThreshold-1) - - info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6) - needSave, canInit = info.CheckErrorHistory() - c.Assert(needSave, check.IsFalse) - c.Assert(canInit, check.IsFalse) -} - func (s *changefeedSuite) TestChangefeedInfoStringer(c *check.C) { defer testleak.AfterTest(c)() info := &ChangeFeedInfo{