Skip to content

Commit

Permalink
sink(cdc): handle sink errors more fast and light (#8949)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
hicqu and ti-chi-bot[bot] authored May 16, 2023
1 parent 3b66572 commit 6594355
Show file tree
Hide file tree
Showing 25 changed files with 620 additions and 341 deletions.
2 changes: 1 addition & 1 deletion cdc/model/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ func redoLogFromV1(rv1 *codecv1.RedoLog) (r *model.RedoLog) {
TableInfo: rv1.RedoDDL.DDL.TableInfo,
PreTableInfo: rv1.RedoDDL.DDL.PreTableInfo,
Type: rv1.RedoDDL.DDL.Type,
Done: rv1.RedoDDL.DDL.Done,
}
r.RedoDDL.DDL.Done.Store(rv1.RedoDDL.DDL.Done)
}
return
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"unsafe"

"github.com/pingcap/log"
Expand Down Expand Up @@ -610,7 +611,7 @@ type DDLEvent struct {
TableInfo *TableInfo `msg:"-"`
PreTableInfo *TableInfo `msg:"-"`
Type model.ActionType `msg:"-"`
Done bool `msg:"-"`
Done atomic.Bool `msg:"-"`
Charset string `msg:"-"`
Collate string `msg:"-"`
}
Expand Down
32 changes: 20 additions & 12 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ type changefeed struct {
filter filter.Filter,
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
) DDLSink

newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error)
Expand Down Expand Up @@ -179,7 +183,10 @@ func newChangefeed4Test(
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
) DDLSink,
newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error),
Expand Down Expand Up @@ -296,11 +303,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return errors.Trace(err)
default:
}
// we need to wait ddl ddlSink to be ready before we do the other things
// otherwise, we may cause a nil pointer panic when we try to write to the ddl ddlSink.
if !c.ddlSink.isInitialized() {
return nil
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, minTableBarrierTs, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
if err != nil {
Expand Down Expand Up @@ -567,7 +570,13 @@ LOOP:
zap.String("changefeed", c.id.ID),
)

c.ddlSink = c.newSink(c.id, c.state.Info, ctx.Throw)
c.ddlSink = c.newSink(c.id, c.state.Info, ctx.Throw, func(err error) {
// TODO(qupeng): report the warning.
log.Warn("ddlSink internal error",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
})
c.ddlSink.run(cancelCtx)

c.ddlPuller, err = c.newDDLPuller(cancelCtx,
Expand All @@ -586,8 +595,7 @@ LOOP:
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

c.downstreamObserver, err = c.newDownstreamObserver(
ctx, c.state.Info.SinkURI, c.state.Info.Config)
c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.state.Info.SinkURI, c.state.Info.Config)
if err != nil {
return err
}
Expand Down Expand Up @@ -1013,8 +1021,8 @@ func (c *changefeed) tickDownstreamObserver(ctx context.Context) {
defer cancel()
if err := c.downstreamObserver.Tick(cctx); err != nil {
// Prometheus is not deployed, it happens in non production env.
if strings.Contains(err.Error(),
fmt.Sprintf(":%d", errno.ErrPrometheusAddrIsNotSet)) {
noPrometheusMsg := fmt.Sprintf(":%d", errno.ErrPrometheusAddrIsNotSet)
if strings.Contains(err.Error(), noPrometheusMsg) {
return
}
log.Warn("backend observer tick error", zap.Error(err))
Expand Down
6 changes: 1 addition & 5 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@ func (m *mockDDLSink) close(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) isInitialized() bool {
return true
}

func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}
Expand Down Expand Up @@ -219,7 +215,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}, nil
},
// new ddl ddlSink
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(err error)) DDLSink {
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
Expand Down
Loading

0 comments on commit 6594355

Please sign in to comment.