Skip to content

Commit

Permalink
Merge branch 'master' into fix-cdc-shutdown-when-pd-scale-in-after-sc…
Browse files Browse the repository at this point in the history
…ale-out
  • Loading branch information
asddongmen authored Apr 20, 2023
2 parents 909a08f + b20e1ca commit 915c351
Show file tree
Hide file tree
Showing 10 changed files with 1,091 additions and 81 deletions.
83 changes: 52 additions & 31 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,29 +336,33 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool,
) ([]*model.Column, []types.Datum, []int64, error) {
) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))
columnIDs := make([]int64, len(tableInfo.RowColumnsOffset))
columnInfos := make([]*timodel.ColumnInfo, len(tableInfo.RowColumnsOffset))

for _, colInfo := range tableInfo.Columns {
colSize := 0
if !model.IsColCDCVisible(colInfo) {
log.Debug("skip the column which is not visible",
zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O))
continue
}

colName := colInfo.Name.O
colDatums, exist := datums[colInfo.ID]
var colValue interface{}
colID := colInfo.ID
colDatums, exist := datums[colID]
if !exist && !fillWithDefaultValue {
log.Debug("column value is not found",
zap.String("table", tableInfo.Name.O), zap.String("column", colName))
continue
}
var err error
var warn string
var size int

var (
colValue interface{}
size int
warn string
err error
)
if exist {
colValue, size, warn, err = formatColVal(colDatums, colInfo)
} else if fillWithDefaultValue {
Expand All @@ -368,31 +372,33 @@ func datum2Column(
return nil, nil, nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
log.Warn(warn, zap.String("table", tableInfo.TableName.String()),
zap.String("column", colInfo.Name.String()))
}

defaultValue := getDDLDefaultDefinition(colInfo)
colSize += size
rawCols[tableInfo.RowColumnsOffset[colInfo.ID]] = colDatums
cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
offset := tableInfo.RowColumnsOffset[colID]
rawCols[offset] = colDatums
cols[offset] = &model.Column{
Name: colName,
Type: colInfo.GetType(),
Charset: colInfo.GetCharset(),
Value: colValue,
Default: defaultValue,
Flag: tableInfo.ColumnsFlag[colInfo.ID],
Flag: tableInfo.ColumnsFlag[colID],
// ApproximateBytes = column data size + column struct size
ApproximateBytes: colSize + sizeOfEmptyColumn,
ApproximateBytes: size + sizeOfEmptyColumn,
}
columnIDs[tableInfo.RowColumnsOffset[colInfo.ID]] = colInfo.ID
columnInfos[offset] = colInfo
}
return cols, rawCols, columnIDs, nil
return cols, rawCols, columnInfos, nil
}

// return error if cannot get the expected checksum from the decoder
// return false if the checksum is not matched
// return true if the checksum is matched and the checksum is the matched one.
func (m *mounter) verifyChecksum(
columnIDs []int64, rawColumns []types.Datum, isPreRow bool,
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
Expand Down Expand Up @@ -420,7 +426,18 @@ func (m *mounter) verifyChecksum(
return 0, true, nil
}

checksum, err := m.encoder.Checksum(m.sctx, columnIDs, rawColumns)
columns := make([]rowcodec.ColData, 0, len(rawColumns))
for idx, col := range columnInfos {
columns = append(columns, rowcodec.ColData{
ColumnInfo: col,
Datum: &rawColumns[idx],
})
}
calculator := rowcodec.RowData{
Cols: columns,
Data: make([]byte, 0),
}
checksum, err := calculator.Checksum()
if err != nil {
log.Error("failed to calculate the checksum", zap.Error(err))
return 0, false, errors.Trace(err)
Expand Down Expand Up @@ -457,12 +474,13 @@ func (m *mounter) verifyChecksum(

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
columnIDs []int64
matched bool
err error
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
matched bool
err error

corrupted bool
corrupted bool
checksumVersion int
)

// Decode previous columns.
Expand All @@ -479,15 +497,16 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, columnIDs, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
if err != nil {
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(columnIDs, preRawCols, true)
preChecksum, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
checksumVersion = m.encoder.ChecksumVersion()

if !matched {
log.Error("previous columns checksum mismatch",
Expand Down Expand Up @@ -519,26 +538,27 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
checksum uint32
)
if row.RowExist {
cols, rawCols, columnIDs, err = datum2Column(tableInfo, row.Row, true)
cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, true)
if err != nil {
return nil, rawRow, errors.Trace(err)
}

checksum, matched, err = m.verifyChecksum(columnIDs, rawCols, false)
checksum, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
if !matched {
log.Error("columns checksum mismatch",
zap.Uint32("checksum", preChecksum),
zap.Int64s("columnIDs", columnIDs),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID, row)
}
corrupted = true
}
checksumVersion = m.encoder.ChecksumVersion()
}

schemaName := tableInfo.TableName.Schema
Expand Down Expand Up @@ -566,9 +586,10 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
Columns: cols,
PreColumns: preCols,

Checksum: checksum,
PreChecksum: preChecksum,
Corrupted: corrupted,
Checksum: checksum,
PreChecksum: preChecksum,
Corrupted: corrupted,
ChecksumVersion: checksumVersion,

IndexColumns: tableInfo.IndexColumnsOffset,
ApproximateDataSize: dataSize,
Expand Down
32 changes: 32 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/entry/schema"
Expand Down Expand Up @@ -610,3 +611,34 @@ func TestProcessorLiveness(t *testing.T) {
require.Nil(t, p.Close())
tester.MustApplyPatches()
}

func TestProcessorDostNotStuckInInit(t *testing.T) {
_ = failpoint.
Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkManagerRunError",
"1*return(true)")
defer func() {
_ = failpoint.
Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkManagerRunError")
}()

ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)

// First tick for creating position.
err := p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()

// Second tick for init.
err = p.Tick(ctx)
require.Nil(t, err)

// Third tick for handle error.
err = p.Tick(ctx)
require.NotNil(t, err)
require.Contains(t, err.Error(), "SinkManagerRunError")

require.Nil(t, p.Close())
tester.MustApplyPatches()
}
8 changes: 8 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -182,7 +183,14 @@ func (m *SinkManager) Run(ctx context.Context) (err error) {
m.changefeedInfo.SinkURI,
m.changefeedInfo.Config,
managerErrors)
failpoint.Inject("SinkManagerRunError", func() {
log.Info("failpoint SinkManagerRunError injected",
zap.String("changefeed", m.changefeedID.ID))
err = errors.New("SinkManagerRunError")
})

if err != nil {
close(m.ready)
return errors.Trace(err)
}

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ require (
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22
github.com/pingcap/tidb v1.1.0-beta.0.20230411230700-8b7b31005148
github.com/pingcap/tidb v1.1.0-beta.0.20230418111328-47e7432054a1
github.com/pingcap/tidb-tools v6.5.1-0.20230208065359-62b90e1e24a7+incompatible
github.com/pingcap/tidb/parser v0.0.0-20230411032700-9949a54f29d8
github.com/pingcap/tidb/parser v0.0.0-20230417161919-627110332165
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/r3labs/diff v1.1.0
Expand All @@ -79,7 +79,7 @@ require (
github.com/swaggo/gin-swagger v1.2.0
github.com/swaggo/swag v1.8.3
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/tikv/client-go/v2 v2.0.7-0.20230406064257-1ec0ff5bf443
github.com/tikv/client-go/v2 v2.0.8-0.20230417065328-92db9f7b151f
github.com/tikv/pd v1.1.0-beta.0.20230203015356-248b3f0be132
github.com/tikv/pd/client v0.0.0-20230329114254-1948c247c2b1
github.com/tinylib/msgp v1.1.6
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -908,15 +908,15 @@ github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39c
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb v1.1.0-beta.0.20220511160835-98c31070d958/go.mod h1:luW4sIZoLHY3bCWuKqyqk2QgMvF+/M7nWOXf/me0+fY=
github.com/pingcap/tidb v1.1.0-beta.0.20230411230700-8b7b31005148 h1:Ba5QFDwEgdVyG8qsll5p0aIzxYPu3KZjiGpjzxlZM00=
github.com/pingcap/tidb v1.1.0-beta.0.20230411230700-8b7b31005148/go.mod h1:coCCXjP3wKEvEHAFAvyYDftSMEt+2abglH8K7R41u/8=
github.com/pingcap/tidb v1.1.0-beta.0.20230418111328-47e7432054a1 h1:7yuiJQ2iRU4Qc+MPUrRx7lWhw/cekzu4A8triPWghiI=
github.com/pingcap/tidb v1.1.0-beta.0.20230418111328-47e7432054a1/go.mod h1:coCCXjP3wKEvEHAFAvyYDftSMEt+2abglH8K7R41u/8=
github.com/pingcap/tidb-tools v6.5.1-0.20230208065359-62b90e1e24a7+incompatible h1:OT1Mrhe5UQInwiO+vGjbtd5Ej4r1ECjmeN4oaTdPlbE=
github.com/pingcap/tidb-tools v6.5.1-0.20230208065359-62b90e1e24a7+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e/go.mod h1:e1MGCA9Sg3T8jid8PKAEq5eYVuMMCq4n8gJ+Kqp4Plg=
github.com/pingcap/tidb/parser v0.0.0-20220511160835-98c31070d958/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI=
github.com/pingcap/tidb/parser v0.0.0-20221126021158-6b02a5d8ba7d/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI=
github.com/pingcap/tidb/parser v0.0.0-20230411032700-9949a54f29d8 h1:Eddfkf6qtugOvVREIk+nek3bQl+MzpAPOAzrni0kth4=
github.com/pingcap/tidb/parser v0.0.0-20230411032700-9949a54f29d8/go.mod h1:R0xUtp5gJK/Xtb+PIvR3Wh/Ayvmorwk0nzT4p3HLZJk=
github.com/pingcap/tidb/parser v0.0.0-20230417161919-627110332165 h1:Rtym1QmDOvMaW0jHpOJLpiv9nh/5OhkFicds1oc5Mp8=
github.com/pingcap/tidb/parser v0.0.0-20230417161919-627110332165/go.mod h1:R0xUtp5gJK/Xtb+PIvR3Wh/Ayvmorwk0nzT4p3HLZJk=
github.com/pingcap/tipb v0.0.0-20220215045658-d12dec7a7609/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20230310043643-5362260ee6f7 h1:CeeMOq1aHPAhXrw4eYXtQRyWOFlbfqK1+3f9Iop4IfU=
github.com/pingcap/tipb v0.0.0-20230310043643-5362260ee6f7/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
Expand Down Expand Up @@ -1086,8 +1086,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.1-0.20220510032238-ff5e35ac2869/go.mod h1:0scaG+seu7L56apm+Gjz9vckyO7ABIzM6T7n00mrIXs=
github.com/tikv/client-go/v2 v2.0.7-0.20230406064257-1ec0ff5bf443 h1:lqlizij6n/v4jx1Ph2rLF0E/gRJUg7kz3VmO6P5Y1e0=
github.com/tikv/client-go/v2 v2.0.7-0.20230406064257-1ec0ff5bf443/go.mod h1:9JNUWtHN8cx8eynHZ9xzdPi5YY6aiN1ILQyhfPUBcMo=
github.com/tikv/client-go/v2 v2.0.8-0.20230417065328-92db9f7b151f h1:pfDrSVAnfkk2EkrOc0iOmtA4n8F6TL9oEAK8R/enC50=
github.com/tikv/client-go/v2 v2.0.8-0.20230417065328-92db9f7b151f/go.mod h1:Dkqcv2dYoCOiNMiRgnEhpTa04dUaF9E3rbcz4rXxf3U=
github.com/tikv/pd v1.1.0-beta.0.20230203015356-248b3f0be132 h1:vCVu7LxFou5WuaY6jHDMHKVeJTtwr5o2i1xWgGAdDo4=
github.com/tikv/pd v1.1.0-beta.0.20230203015356-248b3f0be132/go.mod h1:jb9oq6rN4U0U3FZdvqWlpi9rZzFJxiOlvZ3aj5BTpg8=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710/go.mod h1:AtvppPwkiyUgQlR1W9qSqfTB+OsOIu19jDCOxOsPkmU=
Expand Down
Loading

0 comments on commit 915c351

Please sign in to comment.