Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): fix changefeed state is automatically changed bug #8331

10 changes: 10 additions & 0 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,16 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
}
}

// changefeed state from stopped to failed is allowed
// but stopped to error or normal is not allowed
if m.state.Info != nil && m.state.Info.State == model.StateStopped {
log.Warn("changefeed is stopped, ignore errors",
zap.String("changefeed", m.state.ID.ID),
zap.String("namespace", m.state.ID.Namespace),
zap.Any("errors", errs))
return
}

// we need to patch changefeed unretryable error to the changefeed info,
// so we have to iterate all errs here to check wether it is a unretryable
// error in errs
Expand Down
17 changes: 17 additions & 0 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,23 @@ func TestHandleFastFailError(t *testing.T) {
tester.MustApplyPatches()
}

func TestHandleErrorWhenChangefeedIsPaused(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(0, 0, 0, 0)
manager.state = orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
err := &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "CDC:ErrReachMaxTry",
Message: "fake error for test",
}
manager.state.Info = &model.ChangeFeedInfo{
State: model.StateStopped,
}
manager.handleError(err)
require.Equal(t, model.StateStopped, manager.state.Info.State)
}

func TestChangefeedStatusNotExist(t *testing.T) {
changefeedInfo := `
{
Expand Down