Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): Add backoff mechanism into changefeed restart logic (#4262) #4337

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 3 additions & 64 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"encoding/json"
"math"
"regexp"
"sort"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -73,19 +72,6 @@ func (s FeedState) ToInt() int {
return -1
}

const (
// errorHistoryGCInterval represents how long we keep error record in changefeed info
errorHistoryGCInterval = time.Minute * 10

// errorHistoryCheckInterval represents time window for failure check
errorHistoryCheckInterval = time.Minute * 2

// ErrorHistoryThreshold represents failure upper limit in time window.
// Before a changefeed is initialized, check the the failure count of this
// changefeed, if it is less than ErrorHistoryThreshold, then initialize it.
ErrorHistoryThreshold = 3
)

// ChangeFeedInfo describes the detail of a ChangeFeed
type ChangeFeedInfo struct {
SinkURI string `json:"sink-uri"`
Expand All @@ -103,10 +89,9 @@ type ChangeFeedInfo struct {
// but can be fetched for backward compatibility
SortDir string `json:"sort-dir"`

Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
ErrorHis []int64 `json:"history"`
Error *RunningError `json:"error"`
Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
Error *RunningError `json:"error"`

SyncPointEnabled bool `json:"sync-point-enabled"`
SyncPointInterval time.Duration `json:"sync-point-interval"`
Expand Down Expand Up @@ -283,56 +268,10 @@ func (info *ChangeFeedInfo) fixState() {
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
func (info *ChangeFeedInfo) CheckErrorHistory() (needSave bool, canInit bool) {
i := sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryGCInterval
})
info.ErrorHis = info.ErrorHis[i:]

if i > 0 {
needSave = true
}

i = sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval
})
canInit = len(info.ErrorHis)-i < ErrorHistoryThreshold
return
}

// HasFastFailError returns true if the error in changefeed is fast-fail
func (info *ChangeFeedInfo) HasFastFailError() bool {
if info.Error == nil {
return false
}
return cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code))
}

// findActiveErrors finds all errors occurring within errorHistoryCheckInterval
func (info *ChangeFeedInfo) findActiveErrors() []int64 {
i := sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
// ts is a errors occurrence time, here to find all errors occurring within errorHistoryCheckInterval
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval
})
return info.ErrorHis[i:]
}

// ErrorsReachedThreshold checks error history of a changefeed
// returns true if error counts reach threshold
func (info *ChangeFeedInfo) ErrorsReachedThreshold() bool {
return len(info.findActiveErrors()) >= ErrorHistoryThreshold
}

// CleanUpOutdatedErrorHistory cleans up the outdated error history
// return true if the ErrorHis changed
func (info *ChangeFeedInfo) CleanUpOutdatedErrorHistory() bool {
lastLenOfErrorHis := len(info.ErrorHis)
info.ErrorHis = info.findActiveErrors()
return lastLenOfErrorHis != len(info.ErrorHis)
}
27 changes: 0 additions & 27 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,33 +218,6 @@ type changefeedSuite struct{}

var _ = check.Suite(&changefeedSuite{})

func (s *changefeedSuite) TestCheckErrorHistory(c *check.C) {
defer testleak.AfterTest(c)()
now := time.Now()
info := &ChangeFeedInfo{
ErrorHis: []int64{},
}
for i := 0; i < 5; i++ {
tm := now.Add(-errorHistoryGCInterval)
info.ErrorHis = append(info.ErrorHis, tm.UnixNano()/1e6)
time.Sleep(time.Millisecond)
}
for i := 0; i < ErrorHistoryThreshold-1; i++ {
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
needSave, canInit := info.CheckErrorHistory()
c.Assert(needSave, check.IsTrue)
c.Assert(canInit, check.IsTrue)
c.Assert(info.ErrorHis, check.HasLen, ErrorHistoryThreshold-1)

info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
needSave, canInit = info.CheckErrorHistory()
c.Assert(needSave, check.IsFalse)
c.Assert(canInit, check.IsFalse)
}

func (s *changefeedSuite) TestChangefeedInfoStringer(c *check.C) {
defer testleak.AfterTest(c)()
info := &ChangeFeedInfo{
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
id: id,
scheduler: newScheduler(),
barriers: newBarriers(),
feedStateManager: new(feedStateManager),
feedStateManager: newFeedStateManager(),
gcManager: gcManager,

errCh: make(chan error, defaultErrChSize),
Expand Down Expand Up @@ -112,7 +112,7 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactor
} else {
code = string(cerror.ErrOwnerUnknown.RFCCode())
}
c.feedStateManager.HandleError(&model.RunningError{
c.feedStateManager.handleError(&model.RunningError{
Addr: util.CaptureAddrFromCtx(ctx),
Code: code,
Message: err.Error(),
Expand Down
163 changes: 144 additions & 19 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,103 @@ package owner
import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

const (
// When errors occurred and we need to do backoff, we start an exponential backoff
// with an interval from 10s to 30min (10s, 20s, 40s, 80s, 160s, 320s, 640s, 1280s, 1800s).
// And the backoff will be stopped after 72 min (about 9 tries) because if we do another 30min backoff,
// the total duration (72+30=102min) will exceeds the MaxElapsedTime (90min).
// To avoid thunderherd, a random factor is also added.
defaultBackoffInitInterval = 10 * time.Second
defaultBackoffMaxInterval = 30 * time.Minute
defaultBackoffMaxElapsedTime = 90 * time.Minute
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0

// If all states recorded in window are 'normal', it can be assumed that the changfeed
// is running steady. And then if we enter a state other than normal at next tick,
// the backoff must be reset.
defaultStateWindowSize = 512
)

// feedStateManager manages the ReactorState of a changefeed
// when a error or a admin job occurs, the feedStateManager is responsible for controlling the ReactorState
type feedStateManager struct {
state *model.ChangefeedReactorState
shouldBeRunning bool

adminJobQueue []*model.AdminJob
adminJobQueue []*model.AdminJob
stateHistory [defaultStateWindowSize]model.FeedState
lastErrorTime time.Time // time of last error for a changefeed
backoffInterval time.Duration // the interval for restarting a changefeed in 'error' state
errBackoff *backoff.ExponentialBackOff // an exponential backoff for restarting a changefeed
}

// newFeedStateManager creates feedStateManager and initialize the exponential backoff
func newFeedStateManager() *feedStateManager {
f := new(feedStateManager)

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = defaultBackoffInitInterval
f.errBackoff.MaxInterval = defaultBackoffMaxInterval
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime
f.errBackoff.Multiplier = defaultBackoffMultiplier
f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)

return f
}

// newFeedStateManager4Test creates feedStateManager for test
func newFeedStateManager4Test() *feedStateManager {
f := new(feedStateManager)

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = 200 * time.Millisecond
f.errBackoff.MaxInterval = 1600 * time.Millisecond
f.errBackoff.MaxElapsedTime = 6 * time.Second
f.errBackoff.Multiplier = 2.0
f.errBackoff.RandomizationFactor = 0

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)

return f
}

// resetErrBackoff reset the backoff-related fields
func (m *feedStateManager) resetErrBackoff() {
m.errBackoff.Reset()
m.backoffInterval = m.errBackoff.NextBackOff()
}

// isChangefeedStable check if there are states other than 'normal' in this sliding window.
func (m *feedStateManager) isChangefeedStable() bool {
for _, val := range m.stateHistory {
if val != model.StateNormal {
return false
}
}

return true
}

// shiftStateWindow shift the sliding window
func (m *feedStateManager) shiftStateWindow(state model.FeedState) {
for i := 0; i < defaultStateWindowSize-1; i++ {
m.stateHistory[i] = m.stateHistory[i+1]
}

m.stateHistory[defaultStateWindowSize-1] = state
}

func (m *feedStateManager) Tick(state *model.ChangefeedReactorState) {
Expand All @@ -53,7 +136,7 @@ func (m *feedStateManager) Tick(state *model.ChangefeedReactorState) {
return
}
errs := m.errorsReportedByProcessors()
m.HandleError(errs...)
m.handleError(errs...)
}

func (m *feedStateManager) ShouldRunning() bool {
Expand Down Expand Up @@ -132,16 +215,18 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
return
}
m.shouldBeRunning = true
// when the changefeed is manually resumed, we must reset the backoff
m.resetErrBackoff()
// The lastErrorTime also needs to be cleared before a fresh run.
m.lastErrorTime = time.Unix(0, 0)
jobsPending = true
m.patchState(model.StateNormal)
// remove error history to make sure the changefeed can running in next tick
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
}
if info.Error != nil || len(info.ErrorHis) != 0 {
if info.Error != nil {
info.Error = nil
info.ErrorHis = nil
return info, true, nil
}
return info, false, nil
Expand Down Expand Up @@ -263,33 +348,73 @@ func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError {
return result
}

func (m *feedStateManager) HandleError(errs ...*model.RunningError) {
func (m *feedStateManager) handleError(errs ...*model.RunningError) {
// if there are a fastFail error in errs, we can just fastFail the changefeed
// and no need to patch other error to the changefeed info
for _, err := range errs {
if cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(err.Code)) {
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
}
info.Error = err
return info, true, nil
})
m.shouldBeRunning = false
m.patchState(model.StateFailed)
return
}
}

m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
}
for _, err := range errs {
info.Error = err
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
}
needSave := info.CleanUpOutdatedErrorHistory()
return info, needSave || len(errs) > 0, nil
return info, len(errs) > 0, nil
})
var err *model.RunningError

// If we enter into an abnormal state ('error', 'failed') for this changefeed now
// but haven't seen abnormal states in a sliding window (512 ticks),
// it can be assumed that this changefeed meets a sudden change from a stable condition.
// So we can reset the exponential backoff and re-backoff from the InitialInterval.
// TODO: this detection policy should be added into unit test.
if len(errs) > 0 {
err = errs[len(errs)-1]
m.lastErrorTime = time.Now()
if m.isChangefeedStable() {
m.resetErrBackoff()
}
} else {
if m.state.Info.State == model.StateNormal {
m.lastErrorTime = time.Unix(0, 0)
}
}
// if one of the error stored by changefeed state(error in the last tick) or the error specified by this function(error in the this tick)
// is a fast-fail error, the changefeed should be failed
if m.state.Info.HasFastFailError() || (err != nil && cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(err.Code))) {
m.shouldBeRunning = false
m.patchState(model.StateFailed)
m.shiftStateWindow(m.state.Info.State)

if m.lastErrorTime == time.Unix(0, 0) {
return
}
// if the number of errors has reached the error threshold, stop the changefeed
if m.state.Info.ErrorsReachedThreshold() {

if time.Since(m.lastErrorTime) < m.backoffInterval {
m.shouldBeRunning = false
m.patchState(model.StateError)
return
} else {
oldBackoffInterval := m.backoffInterval
m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorTime = time.Unix(0, 0)

// if the duration since backoff start exceeds MaxElapsedTime,
// we set the state of changefeed to "failed" and don't let it run again unless it is manually resumed.
if m.backoffInterval == backoff.Stop {
log.Warn("changefeed will not be restarted because it has been failing for a long time period",
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime))
m.shouldBeRunning = false
m.patchState(model.StateFailed)
} else {
log.Info("changefeed restart backoff interval is changed", zap.String("changefeed", m.state.ID),
zap.Duration("oldInterval", oldBackoffInterval), zap.Duration("newInterval", m.backoffInterval))
}
}
}
Loading