Skip to content

Commit

Permalink
owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262
Browse files Browse the repository at this point in the history
) (#4339)

close #3329, close #3987
  • Loading branch information
ti-chi-bot authored Feb 21, 2022
1 parent 759b3ec commit 9d50aba
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 135 deletions.
68 changes: 4 additions & 64 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"encoding/json"
"math"
"regexp"
"sort"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -91,19 +90,6 @@ func (s FeedState) IsNeeded(need string) bool {
return need == string(s)
}

const (
// errorHistoryGCInterval represents how long we keep error record in changefeed info
errorHistoryGCInterval = time.Minute * 10

// errorHistoryCheckInterval represents time window for failure check
errorHistoryCheckInterval = time.Minute * 2

// ErrorHistoryThreshold represents failure upper limit in time window.
// Before a changefeed is initialized, check the the failure count of this
// changefeed, if it is less than ErrorHistoryThreshold, then initialize it.
ErrorHistoryThreshold = 3
)

// ChangeFeedInfo describes the detail of a ChangeFeed
type ChangeFeedInfo struct {
SinkURI string `json:"sink-uri"`
Expand All @@ -121,10 +107,9 @@ type ChangeFeedInfo struct {
// but can be fetched for backward compatibility
SortDir string `json:"sort-dir"`

Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
ErrorHis []int64 `json:"history"`
Error *RunningError `json:"error"`
Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
Error *RunningError `json:"error"`

SyncPointEnabled bool `json:"sync-point-enabled"`
SyncPointInterval time.Duration `json:"sync-point-interval"`
Expand Down Expand Up @@ -253,6 +238,7 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error {
if info.Config.Consistent == nil {
info.Config.Consistent = defaultConfig.Consistent
}

return nil
}

Expand Down Expand Up @@ -304,56 +290,10 @@ func (info *ChangeFeedInfo) fixState() {
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
func (info *ChangeFeedInfo) CheckErrorHistory() (needSave bool, canInit bool) {
i := sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryGCInterval
})
info.ErrorHis = info.ErrorHis[i:]

if i > 0 {
needSave = true
}

i = sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval
})
canInit = len(info.ErrorHis)-i < ErrorHistoryThreshold
return
}

// HasFastFailError returns true if the error in changefeed is fast-fail
func (info *ChangeFeedInfo) HasFastFailError() bool {
if info.Error == nil {
return false
}
return cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code))
}

// findActiveErrors finds all errors occurring within errorHistoryCheckInterval
func (info *ChangeFeedInfo) findActiveErrors() []int64 {
i := sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
// ts is a errors occurrence time, here to find all errors occurring within errorHistoryCheckInterval
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval
})
return info.ErrorHis[i:]
}

// ErrorsReachedThreshold checks error history of a changefeed
// returns true if error counts reach threshold
func (info *ChangeFeedInfo) ErrorsReachedThreshold() bool {
return len(info.findActiveErrors()) >= ErrorHistoryThreshold
}

// CleanUpOutdatedErrorHistory cleans up the outdated error history
// return true if the ErrorHis changed
func (info *ChangeFeedInfo) CleanUpOutdatedErrorHistory() bool {
lastLenOfErrorHis := len(info.ErrorHis)
info.ErrorHis = info.findActiveErrors()
return lastLenOfErrorHis != len(info.ErrorHis)
}
28 changes: 0 additions & 28 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,34 +358,6 @@ func TestChangeFeedInfoClone(t *testing.T) {
require.True(t, info.Config.EnableOldValue)
}

func TestCheckErrorHistory(t *testing.T) {
t.Parallel()

now := time.Now()
info := &ChangeFeedInfo{
ErrorHis: []int64{},
}
for i := 0; i < 5; i++ {
tm := now.Add(-errorHistoryGCInterval)
info.ErrorHis = append(info.ErrorHis, tm.UnixNano()/1e6)
time.Sleep(time.Millisecond)
}
for i := 0; i < ErrorHistoryThreshold-1; i++ {
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
needSave, canInit := info.CheckErrorHistory()
require.True(t, needSave)
require.True(t, canInit)
require.Equal(t, ErrorHistoryThreshold-1, len(info.ErrorHis))

info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
needSave, canInit = info.CheckErrorHistory()
require.False(t, needSave)
require.False(t, canInit)
}

func TestChangefeedInfoStringer(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
id: id,
scheduler: newScheduler(),
barriers: newBarriers(),
feedStateManager: new(feedStateManager),
feedStateManager: newFeedStateManager(),
gcManager: gcManager,

errCh: make(chan error, defaultErrChSize),
Expand Down
143 changes: 131 additions & 12 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package owner
import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -24,6 +25,24 @@ import (
"go.uber.org/zap"
)

const (
// When errors occurred and we need to do backoff, we start an exponential backoff
// with an interval from 10s to 30min (10s, 20s, 40s, 80s, 160s, 320s, 640s, 1280s, 1800s).
// And the backoff will be stopped after 72 min (about 9 tries) because if we do another 30min backoff,
// the total duration (72+30=102min) will exceeds the MaxElapsedTime (90min).
// To avoid thunderherd, a random factor is also added.
defaultBackoffInitInterval = 10 * time.Second
defaultBackoffMaxInterval = 30 * time.Minute
defaultBackoffMaxElapsedTime = 90 * time.Minute
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0

// If all states recorded in window are 'normal', it can be assumed that the changfeed
// is running steady. And then if we enter a state other than normal at next tick,
// the backoff must be reset.
defaultStateWindowSize = 512
)

// feedStateManager manages the ReactorState of a changefeed
// when an error or an admin job occurs, the feedStateManager is responsible for controlling the ReactorState
type feedStateManager struct {
Expand All @@ -34,7 +53,71 @@ type feedStateManager struct {
// shouldBeRemoved = false means the changefeed is paused
shouldBeRemoved bool

adminJobQueue []*model.AdminJob
adminJobQueue []*model.AdminJob
stateHistory [defaultStateWindowSize]model.FeedState
lastErrorTime time.Time // time of last error for a changefeed
backoffInterval time.Duration // the interval for restarting a changefeed in 'error' state
errBackoff *backoff.ExponentialBackOff // an exponential backoff for restarting a changefeed
}

// newFeedStateManager creates feedStateManager and initialize the exponential backoff
func newFeedStateManager() *feedStateManager {
f := new(feedStateManager)

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = defaultBackoffInitInterval
f.errBackoff.MaxInterval = defaultBackoffMaxInterval
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime
f.errBackoff.Multiplier = defaultBackoffMultiplier
f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)

return f
}

// newFeedStateManager4Test creates feedStateManager for test
func newFeedStateManager4Test() *feedStateManager {
f := new(feedStateManager)

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = 200 * time.Millisecond
f.errBackoff.MaxInterval = 1600 * time.Millisecond
f.errBackoff.MaxElapsedTime = 6 * time.Second
f.errBackoff.Multiplier = 2.0
f.errBackoff.RandomizationFactor = 0

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)

return f
}

// resetErrBackoff reset the backoff-related fields
func (m *feedStateManager) resetErrBackoff() {
m.errBackoff.Reset()
m.backoffInterval = m.errBackoff.NextBackOff()
}

// isChangefeedStable check if there are states other than 'normal' in this sliding window.
func (m *feedStateManager) isChangefeedStable() bool {
for _, val := range m.stateHistory {
if val != model.StateNormal {
return false
}
}

return true
}

// shiftStateWindow shift the sliding window
func (m *feedStateManager) shiftStateWindow(state model.FeedState) {
for i := 0; i < defaultStateWindowSize-1; i++ {
m.stateHistory[i] = m.stateHistory[i+1]
}

m.stateHistory[defaultStateWindowSize-1] = state
}

func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) {
Expand Down Expand Up @@ -145,13 +228,18 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
return
}
m.shouldBeRunning = true
// when the changefeed is manually resumed, we must reset the backoff
m.resetErrBackoff()
// The lastErrorTime also needs to be cleared before a fresh run.
m.lastErrorTime = time.Unix(0, 0)
jobsPending = true
m.patchState(model.StateNormal)
// remove error history to make sure the changefeed can running in next tick
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info.Error != nil || len(info.ErrorHis) != 0 {
if info == nil {
return nil, false, nil
}
if info.Error != nil {
info.Error = nil
info.ErrorHis = nil
return info, true, nil
}
return info, false, nil
Expand Down Expand Up @@ -277,8 +365,6 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
if cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(err.Code)) {
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
info.Error = err
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
info.CleanUpOutdatedErrorHistory()
return info, true, nil
})
m.shouldBeRunning = false
Expand All @@ -290,16 +376,49 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
for _, err := range errs {
info.Error = err
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
}
changed := info.CleanUpOutdatedErrorHistory()
return info, changed || len(errs) > 0, nil
return info, len(errs) > 0, nil
})

// if the number of errors has reached the error threshold, stop the changefeed
if m.state.Info.ErrorsReachedThreshold() {
// If we enter into an abnormal state ('error', 'failed') for this changefeed now
// but haven't seen abnormal states in a sliding window (512 ticks),
// it can be assumed that this changefeed meets a sudden change from a stable condition.
// So we can reset the exponential backoff and re-backoff from the InitialInterval.
// TODO: this detection policy should be added into unit test.
if len(errs) > 0 {
m.lastErrorTime = time.Now()
if m.isChangefeedStable() {
m.resetErrBackoff()
}
} else {
if m.state.Info.State == model.StateNormal {
m.lastErrorTime = time.Unix(0, 0)
}
}
m.shiftStateWindow(m.state.Info.State)

if m.lastErrorTime == time.Unix(0, 0) {
return
}

if time.Since(m.lastErrorTime) < m.backoffInterval {
m.shouldBeRunning = false
m.patchState(model.StateError)
return
} else {
oldBackoffInterval := m.backoffInterval
m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorTime = time.Unix(0, 0)

// if the duration since backoff start exceeds MaxElapsedTime,
// we set the state of changefeed to "failed" and don't let it run again unless it is manually resumed.
if m.backoffInterval == backoff.Stop {
log.Warn("changefeed will not be restarted because it has been failing for a long time period",
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime))
m.shouldBeRunning = false
m.patchState(model.StateFailed)
} else {
log.Info("changefeed restart backoff interval is changed", zap.String("changefeed", m.state.ID),
zap.Duration("oldInterval", oldBackoffInterval), zap.Duration("newInterval", m.backoffInterval))
}
}
}
Loading

0 comments on commit 9d50aba

Please sign in to comment.