Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(cdc): handle sink errors more fast and light (#8949) #8976

Merged
2 changes: 1 addition & 1 deletion cdc/model/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ func redoLogFromV1(rv1 *codecv1.RedoLog) (r *model.RedoLog) {
TableInfo: rv1.RedoDDL.DDL.TableInfo,
PreTableInfo: rv1.RedoDDL.DDL.PreTableInfo,
Type: rv1.RedoDDL.DDL.Type,
Done: rv1.RedoDDL.DDL.Done,
}
r.RedoDDL.DDL.Done.Store(rv1.RedoDDL.DDL.Done)
}
return
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"unsafe"

"github.com/pingcap/log"
Expand Down Expand Up @@ -610,7 +611,7 @@ type DDLEvent struct {
TableInfo *TableInfo `msg:"-"`
PreTableInfo *TableInfo `msg:"-"`
Type model.ActionType `msg:"-"`
Done bool `msg:"-"`
Done atomic.Bool `msg:"-"`
Charset string `msg:"-"`
Collate string `msg:"-"`
}
Expand Down
32 changes: 20 additions & 12 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ type changefeed struct {
filter filter.Filter,
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
) DDLSink

newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error)
Expand Down Expand Up @@ -179,7 +183,10 @@ func newChangefeed4Test(
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
) DDLSink,
newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error),
Expand Down Expand Up @@ -296,11 +303,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return errors.Trace(err)
default:
}
// we need to wait ddl ddlSink to be ready before we do the other things
// otherwise, we may cause a nil pointer panic when we try to write to the ddl ddlSink.
if !c.ddlSink.isInitialized() {
return nil
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, minTableBarrierTs, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
if err != nil {
Expand Down Expand Up @@ -567,7 +570,13 @@ LOOP:
zap.String("changefeed", c.id.ID),
)

c.ddlSink = c.newSink(c.id, c.state.Info, ctx.Throw)
c.ddlSink = c.newSink(c.id, c.state.Info, ctx.Throw, func(err error) {
// TODO(qupeng): report the warning.
log.Warn("ddlSink internal error",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
})
c.ddlSink.run(cancelCtx)

c.ddlPuller, err = c.newDDLPuller(cancelCtx,
Expand All @@ -586,8 +595,7 @@ LOOP:
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

c.downstreamObserver, err = c.newDownstreamObserver(
ctx, c.state.Info.SinkURI, c.state.Info.Config)
c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.state.Info.SinkURI, c.state.Info.Config)
if err != nil {
return err
}
Expand Down Expand Up @@ -1013,8 +1021,8 @@ func (c *changefeed) tickDownstreamObserver(ctx context.Context) {
defer cancel()
if err := c.downstreamObserver.Tick(cctx); err != nil {
// Prometheus is not deployed, it happens in non production env.
if strings.Contains(err.Error(),
fmt.Sprintf(":%d", errno.ErrPrometheusAddrIsNotSet)) {
noPrometheusMsg := fmt.Sprintf(":%d", errno.ErrPrometheusAddrIsNotSet)
if strings.Contains(err.Error(), noPrometheusMsg) {
return
}
log.Warn("backend observer tick error", zap.Error(err))
Expand Down
6 changes: 1 addition & 5 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@ func (m *mockDDLSink) close(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) isInitialized() bool {
return true
}

func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}
Expand Down Expand Up @@ -219,7 +215,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}, nil
},
// new ddl ddlSink
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(err error)) DDLSink {
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
Expand Down
Loading