Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8949
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hicqu committed May 30, 2023
1 parent ba52f01 commit 5ac4c4e
Show file tree
Hide file tree
Showing 25 changed files with 1,597 additions and 107 deletions.
244 changes: 244 additions & 0 deletions cdc/model/codec/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import (
"encoding/binary"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/model"
codecv1 "github.com/pingcap/tiflow/cdc/model/codec/v1"
"github.com/tinylib/msgp/msgp"
)

const (
v1HeaderLength int = 4
versionPrefixLength int = 2
versionFieldLength int = 2

latestVersion uint16 = 2
)

// NOTE: why we need this?
//
// Before this logic is introduced, redo log is encoded into byte slice without a version field.
// This makes it hard to extend in the future.
// However, in the old format (i.e. v1 format), the first 5 bytes are always same, which can be
// confirmed in v1/codec_gen.go. So we reuse those bytes, and add a version field in them.
var (
versionPrefix = [versionPrefixLength]byte{0xff, 0xff}
)

func postUnmarshal(r *model.RedoLog) {
workaroundColumn := func(c *model.Column, redoC *model.RedoColumn) {
c.Flag = model.ColumnFlagType(redoC.Flag)
if redoC.ValueIsEmptyBytes {
c.Value = []byte{}
} else {
c.Value = redoC.Value
}
}

if r.RedoRow.Row != nil {
row := r.RedoRow.Row
for i, c := range row.Columns {
if c != nil {
workaroundColumn(c, &r.RedoRow.Columns[i])
}
}
for i, c := range row.PreColumns {
if c != nil {
workaroundColumn(c, &r.RedoRow.PreColumns[i])
}
}
r.RedoRow.Columns = nil
r.RedoRow.PreColumns = nil
}
if r.RedoDDL.DDL != nil {
r.RedoDDL.DDL.Type = timodel.ActionType(r.RedoDDL.Type)
r.RedoDDL.DDL.TableInfo = &model.TableInfo{
TableName: r.RedoDDL.TableName,
}
}
}

func preMarshal(r *model.RedoLog) {
// Workaround empty byte slice for msgp#247
workaroundColumn := func(redoC *model.RedoColumn) {
switch v := redoC.Value.(type) {
case []byte:
if len(v) == 0 {
redoC.ValueIsEmptyBytes = true
}
}
}

if r.RedoRow.Row != nil {
row := r.RedoRow.Row
r.RedoRow.Columns = make([]model.RedoColumn, 0, len(row.Columns))
r.RedoRow.PreColumns = make([]model.RedoColumn, 0, len(row.PreColumns))
for _, c := range row.Columns {
redoC := model.RedoColumn{}
if c != nil {
redoC.Value = c.Value
redoC.Flag = uint64(c.Flag)
workaroundColumn(&redoC)
}
r.RedoRow.Columns = append(r.RedoRow.Columns, redoC)
}
for _, c := range row.PreColumns {
redoC := model.RedoColumn{}
if c != nil {
redoC.Value = c.Value
redoC.Flag = uint64(c.Flag)
workaroundColumn(&redoC)
}
r.RedoRow.PreColumns = append(r.RedoRow.PreColumns, redoC)
}
}
if r.RedoDDL.DDL != nil {
r.RedoDDL.Type = byte(r.RedoDDL.DDL.Type)
if r.RedoDDL.DDL.TableInfo != nil {
r.RedoDDL.TableName = r.RedoDDL.DDL.TableInfo.TableName
}
}
}

// UnmarshalRedoLog unmarshals a RedoLog from the given byte slice.
func UnmarshalRedoLog(bts []byte) (r *model.RedoLog, o []byte, err error) {
if len(bts) < versionPrefixLength {
err = msgp.ErrShortBytes
return
}

shouldBeV1 := false
for i := 0; i < versionPrefixLength; i++ {
if bts[i] != versionPrefix[i] {
shouldBeV1 = true
break
}
}
if shouldBeV1 {
var rv1 *codecv1.RedoLog = new(codecv1.RedoLog)
if o, err = rv1.UnmarshalMsg(bts); err != nil {
return
}
codecv1.PostUnmarshal(rv1)
r = redoLogFromV1(rv1)
} else {
bts = bts[versionPrefixLength:]
version, bts := decodeVersion(bts)
if version == latestVersion {
r = new(model.RedoLog)
if o, err = r.UnmarshalMsg(bts); err != nil {
return
}
postUnmarshal(r)
} else {
panic("unsupported codec version")
}
}
return
}

// MarshalRedoLog marshals a RedoLog into bytes.
func MarshalRedoLog(r *model.RedoLog, b []byte) (o []byte, err error) {
preMarshal(r)
b = append(b, versionPrefix[:]...)
b = binary.BigEndian.AppendUint16(b, latestVersion)
o, err = r.MarshalMsg(b)
return
}

// MarshalRowAsRedoLog converts a RowChangedEvent into RedoLog, and then marshals it.
func MarshalRowAsRedoLog(r *model.RowChangedEvent, b []byte) (o []byte, err error) {
log := &model.RedoLog{
RedoRow: model.RedoRowChangedEvent{Row: r},
Type: model.RedoLogTypeRow,
}
return MarshalRedoLog(log, b)
}

// MarshalDDLAsRedoLog converts a DDLEvent into RedoLog, and then marshals it.
func MarshalDDLAsRedoLog(d *model.DDLEvent, b []byte) (o []byte, err error) {
log := &model.RedoLog{
RedoDDL: model.RedoDDLEvent{DDL: d},
Type: model.RedoLogTypeDDL,
}
return MarshalRedoLog(log, b)
}

func decodeVersion(bts []byte) (uint16, []byte) {
version := binary.BigEndian.Uint16(bts[0:versionFieldLength])
return version, bts[versionFieldLength:]
}

func redoLogFromV1(rv1 *codecv1.RedoLog) (r *model.RedoLog) {
r = &model.RedoLog{Type: (model.RedoLogType)(rv1.Type)}
if rv1.RedoRow != nil && rv1.RedoRow.Row != nil {
r.RedoRow.Row = &model.RowChangedEvent{
StartTs: rv1.RedoRow.Row.StartTs,
CommitTs: rv1.RedoRow.Row.CommitTs,
RowID: rv1.RedoRow.Row.RowID,
Table: tableNameFromV1(rv1.RedoRow.Row.Table),
ColInfos: rv1.RedoRow.Row.ColInfos,
TableInfo: rv1.RedoRow.Row.TableInfo,
Columns: make([]*model.Column, 0, len(rv1.RedoRow.Row.Columns)),
PreColumns: make([]*model.Column, 0, len(rv1.RedoRow.Row.PreColumns)),
IndexColumns: rv1.RedoRow.Row.IndexColumns,
ApproximateDataSize: rv1.RedoRow.Row.ApproximateDataSize,
SplitTxn: rv1.RedoRow.Row.SplitTxn,
ReplicatingTs: rv1.RedoRow.Row.ReplicatingTs,
}
for _, c := range rv1.RedoRow.Row.Columns {
r.RedoRow.Row.Columns = append(r.RedoRow.Row.Columns, columnFromV1(c))
}
for _, c := range rv1.RedoRow.Row.PreColumns {
r.RedoRow.Row.PreColumns = append(r.RedoRow.Row.PreColumns, columnFromV1(c))
}
}
if rv1.RedoDDL != nil && rv1.RedoDDL.DDL != nil {
r.RedoDDL.DDL = &model.DDLEvent{
StartTs: rv1.RedoDDL.DDL.StartTs,
CommitTs: rv1.RedoDDL.DDL.CommitTs,
Query: rv1.RedoDDL.DDL.Query,
TableInfo: rv1.RedoDDL.DDL.TableInfo,
PreTableInfo: rv1.RedoDDL.DDL.PreTableInfo,
Type: rv1.RedoDDL.DDL.Type,
}
r.RedoDDL.DDL.Done.Store(rv1.RedoDDL.DDL.Done)
}
return
}

func tableNameFromV1(t *codecv1.TableName) *model.TableName {
return &model.TableName{
Schema: t.Schema,
Table: t.Table,
TableID: t.TableID,
IsPartition: t.IsPartition,
}
}

func columnFromV1(c *codecv1.Column) *model.Column {
return &model.Column{
Name: c.Name,
Type: c.Type,
Charset: c.Charset,
Flag: c.Flag,
Value: c.Value,
Default: c.Default,
ApproximateBytes: c.ApproximateBytes,
}
}
3 changes: 2 additions & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"unsafe"

"github.com/pingcap/log"
Expand Down Expand Up @@ -634,7 +635,7 @@ type DDLEvent struct {
TableInfo *TableInfo `msg:"-"`
PreTableInfo *TableInfo `msg:"-"`
Type model.ActionType `msg:"-"`
Done bool `msg:"-"`
Done atomic.Bool `msg:"-"`
Charset string `msg:"-"`
Collate string `msg:"-"`
}
Expand Down
62 changes: 54 additions & 8 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ type changefeed struct {
filter filter.Filter,
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
) DDLSink

newScheduler func(
ctx cdcContext.Context, pdClock pdutil.Clock, epoch uint64,
) (scheduler.Scheduler, error)
Expand Down Expand Up @@ -167,7 +171,10 @@ func newChangefeed4Test(
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
) DDLSink,
newScheduler func(
ctx cdcContext.Context, pdClock pdutil.Clock, epoch uint64,
) (scheduler.Scheduler, error),
Expand Down Expand Up @@ -278,11 +285,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return errors.Trace(err)
default:
}
// we need to wait ddl ddlSink to be ready before we do the other things
// otherwise, we may cause a nil pointer panic when we try to write to the ddl ddlSink.
if !c.ddlSink.isInitialized() {
return nil
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
Expand Down Expand Up @@ -566,7 +569,13 @@ LOOP:
zap.String("changefeed", c.id.ID),
)

c.ddlSink = c.newSink(c.id, c.state.Info, ctx.Throw)
c.ddlSink = c.newSink(c.id, c.state.Info, ctx.Throw, func(err error) {
// TODO(qupeng): report the warning.
log.Warn("ddlSink internal error",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
})
c.ddlSink.run(cancelCtx)

c.ddlPuller, err = c.newDDLPuller(cancelCtx,
Expand All @@ -585,6 +594,15 @@ LOOP:
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

<<<<<<< HEAD
=======
c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.state.Info.SinkURI, c.state.Info.Config)
if err != nil {
return err
}
c.observerLastTick = atomic.NewTime(time.Time{})

>>>>>>> 659435573d (sink(cdc): handle sink errors more fast and light (#8949))
stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
Expand Down Expand Up @@ -983,3 +1001,31 @@ func (c *changefeed) checkUpstream() (skip bool, err error) {
}
return
}
<<<<<<< HEAD
=======

// tickDownstreamObserver checks whether needs to trigger tick of downstream
// observer, if needed run it in an independent goroutine with 5s timeout.
func (c *changefeed) tickDownstreamObserver(ctx context.Context) {
if time.Since(c.observerLastTick.Load()) > downstreamObserverTickDuration {
c.observerLastTick.Store(time.Now())
select {
case <-ctx.Done():
return
default:
}
go func() {
cctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err := c.downstreamObserver.Tick(cctx); err != nil {
// Prometheus is not deployed, it happens in non production env.
noPrometheusMsg := fmt.Sprintf(":%d", errno.ErrPrometheusAddrIsNotSet)
if strings.Contains(err.Error(), noPrometheusMsg) {
return
}
log.Warn("backend observer tick error", zap.Error(err))
}
}()
}
}
>>>>>>> 659435573d (sink(cdc): handle sink errors more fast and light (#8949))
6 changes: 1 addition & 5 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@ func (m *mockDDLSink) close(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) isInitialized() bool {
return true
}

func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}
Expand Down Expand Up @@ -219,7 +215,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}, nil
},
// new ddl ddlSink
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(err error)) DDLSink {
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
Expand Down
Loading

0 comments on commit 5ac4c4e

Please sign in to comment.