Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): cherry pick sink bug fix to release 5.2 (#4083) #4119

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *mockSink) Close(ctx context.Context) error {
return nil
}

func (m *mockSink) Barrier(ctx context.Context) error {
func (m *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down
10 changes: 6 additions & 4 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ func (s *TableStatus) Store(new TableStatus) {
}

type sinkNode struct {
sink sink.Sink
status TableStatus
sink sink.Sink
status TableStatus
tableID model.TableID

resolvedTs model.Ts
checkpointTs model.Ts
Expand All @@ -78,8 +79,9 @@ type sinkNode struct {
flowController tableFlowController
}

func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
return &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
Expand Down Expand Up @@ -136,7 +138,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if err := n.emitRow2Sink(ctx); err != nil {
return errors.Trace(err)
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, resolvedTs)
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs)
if err != nil {
return errors.Trace(err)
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (s *mockSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand All @@ -92,7 +92,7 @@ func (s *mockSink) Close(ctx context.Context) error {
return nil
}

func (s *mockSink) Barrier(ctx context.Context) error {
func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
})

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -163,7 +163,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(10))

// test the stop at ts command
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -186,7 +186,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(2))

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down Expand Up @@ -223,7 +223,7 @@ func (s *outputSuite) TestStopStatus(c *check.C) {
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *outputSuite) TestManyTs(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down Expand Up @@ -379,7 +379,7 @@ func (s *outputSuite) TestIgnoreEmptyRowChangeEvent(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// empty row, no Columns and PreColumns.
Expand All @@ -399,7 +399,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
Expand Down Expand Up @@ -458,7 +458,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func NewTablePipeline(ctx cdcContext.Context,
if cyclicEnabled {
p.AppendNode(ctx, "cyclic", newCyclicMarkNode(replicaInfo.MarkTableID))
}
tablePipeline.sinkNode = newSinkNode(sink, replicaInfo.StartTs, targetTs, flowController)
tablePipeline.sinkNode = newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController)
p.AppendNode(ctx, "sink", tablePipeline.sinkNode)
tablePipeline.p = p
return tablePipeline
Expand Down
2 changes: 2 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS
if err := p.lazyInit(ctx); err != nil {
return nil, errors.Trace(err)
}
// sink manager will return this checkpointTs to sink node if sink node resolvedTs flush failed
p.sinkManager.UpdateChangeFeedCheckpointTs(state.Info.GetCheckpointTs(state.Status))
if err := p.handleTableOperation(ctx); err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/sink"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
Expand All @@ -47,6 +48,7 @@ func newProcessor4Test(
) *processor {
p := newProcessor(ctx)
p.lazyInit = func(ctx cdcContext.Context) error { return nil }
p.sinkManager = &sink.Manager{}
p.createTablePipeline = createTablePipeline
p.schemaStorage = &mockSchemaStorage{c: c}
return p
Expand Down
6 changes: 2 additions & 4 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func newBlackHoleSink(ctx context.Context, opts map[string]string) *blackHoleSin

type blackHoleSink struct {
statistics *Statistics
checkpointTs uint64
accumulated uint64
lastAccumulated uint64
}
Expand All @@ -46,7 +45,7 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model
return nil
}

func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs))
err := b.statistics.RecordBatchExecution(func() (int, error) {
// TODO: add some random replication latency
Expand All @@ -56,7 +55,6 @@ func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs ui
return int(batchSize), nil
})
b.statistics.PrintStatus(ctx)
atomic.StoreUint64(&b.checkpointTs, resolvedTs)
return resolvedTs, err
}

Expand All @@ -79,6 +77,6 @@ func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}

func (b *blackHoleSink) Barrier(ctx context.Context) error {
func (b *blackHoleSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}
91 changes: 91 additions & 0 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"context"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)

func TestTableIsNotFlushed(t *testing.T) {
b := bufferSink{changeFeedCheckpointTs: 1}
require.Equal(t, uint64(1), b.getTableCheckpointTs(2))
b.UpdateChangeFeedCheckpointTs(3)
require.Equal(t, uint64(3), b.getTableCheckpointTs(2))
}

func TestFlushTable(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer func() {
cancel()
}()
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
require.Nil(t, b.EmitRowChangedEvents(ctx))
tbl1 := &model.TableName{TableID: 1}
tbl2 := &model.TableName{TableID: 2}
tbl3 := &model.TableName{TableID: 3}
tbl4 := &model.TableName{TableID: 4}
require.Nil(t, b.EmitRowChangedEvents(ctx, []*model.RowChangedEvent{
{CommitTs: 6, Table: tbl1},
{CommitTs: 6, Table: tbl2},
{CommitTs: 6, Table: tbl3},
{CommitTs: 6, Table: tbl4},
{CommitTs: 10, Table: tbl1},
{CommitTs: 10, Table: tbl2},
{CommitTs: 10, Table: tbl3},
{CommitTs: 10, Table: tbl4},
}...))
checkpoint, err := b.FlushRowChangedEvents(ctx, 1, 7)
require.True(t, checkpoint <= 7)
require.Nil(t, err)
checkpoint, err = b.FlushRowChangedEvents(ctx, 2, 6)
require.True(t, checkpoint <= 6)
require.Nil(t, err)
checkpoint, err = b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(7), b.getTableCheckpointTs(1))
require.Equal(t, uint64(6), b.getTableCheckpointTs(2))
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(4))
b.UpdateChangeFeedCheckpointTs(6)
require.Equal(t, uint64(7), b.getTableCheckpointTs(1))
require.Equal(t, uint64(6), b.getTableCheckpointTs(2))
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(6), b.getTableCheckpointTs(4))
}

func TestFlushFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
cancel()
checkpoint, _ = b.FlushRowChangedEvents(ctx, 3, 18)
require.Equal(t, uint64(8), checkpoint)
checkpoint, _ = b.FlushRowChangedEvents(ctx, 1, 18)
require.Equal(t, uint64(5), checkpoint)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(1))
}
4 changes: 2 additions & 2 deletions cdc/sink/cdclog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (f *fileSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowC
return f.emitRowChangedEvents(ctx, newTableStream, rows...)
}

func (f *fileSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (f *fileSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
log.Debug("[FlushRowChangedEvents] enter", zap.Uint64("ts", resolvedTs))
return f.flushRowChangedEvents(ctx, resolvedTs)
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func (f *fileSink) Close(ctx context.Context) error {
return nil
}

func (f *fileSink) Barrier(ctx context.Context) error {
func (f *fileSink) Barrier(ctx context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in file sink has flushed
// all buffered events forcedlly.
return nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *s3Sink) flushLogMeta(ctx context.Context) error {
return cerror.WrapError(cerror.ErrS3SinkWriteStorage, s.storage.WriteFile(ctx, logMetaFile, data))
}

func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
// we should flush all events before resolvedTs, there are two kind of flush policy
// 1. flush row events to a s3 chunk: if the event size is not enough,
// TODO: when cdc crashed, we should repair these chunks to a complete file
Expand Down Expand Up @@ -347,7 +347,7 @@ func (s *s3Sink) Close(ctx context.Context) error {
return nil
}

func (s *s3Sink) Barrier(ctx context.Context) error {
func (s *s3Sink) Barrier(ctx context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in s3 sink has flushed
// all buffered events forcedlly.
return nil
Expand Down
33 changes: 12 additions & 21 deletions cdc/sink/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package common
import (
"sort"
"sync"
"sync/atomic"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -55,7 +54,6 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) {
type UnresolvedTxnCache struct {
unresolvedTxnsMu sync.Mutex
unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs
checkpointTs uint64
}

// NewUnresolvedTxnCache returns a new UnresolvedTxnCache
Expand Down Expand Up @@ -103,32 +101,27 @@ func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha

// Resolved returns resolved txns according to resolvedTs
// The returned map contains many txns grouped by tableID. for each table, the each commitTs of txn in txns slice is strictly increasing
func (c *UnresolvedTxnCache) Resolved(resolvedTs uint64) map[model.TableID][]*model.SingleTableTxn {
if resolvedTs <= atomic.LoadUint64(&c.checkpointTs) {
return nil
}

func (c *UnresolvedTxnCache) Resolved(resolvedTsMap *sync.Map) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) {
c.unresolvedTxnsMu.Lock()
defer c.unresolvedTxnsMu.Unlock()
if len(c.unresolvedTxns) == 0 {
return nil
return nil, nil
}

_, resolvedTxnsMap := splitResolvedTxn(resolvedTs, c.unresolvedTxns)
return resolvedTxnsMap
}

// UpdateCheckpoint updates the checkpoint ts
func (c *UnresolvedTxnCache) UpdateCheckpoint(checkpointTs uint64) {
atomic.StoreUint64(&c.checkpointTs, checkpointTs)
return splitResolvedTxn(resolvedTsMap, c.unresolvedTxns)
}

func splitResolvedTxn(
resolvedTs uint64, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (minTs uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns))
minTs = resolvedTs
flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns))
for tableID, txns := range unresolvedTxns {
v, ok := resolvedTsMap.Load(tableID)
if !ok {
continue
}
resolvedTs := v.(uint64)
i := sort.Search(len(txns), func(i int) bool {
return txns[i].commitTs > resolvedTs
})
Expand All @@ -154,9 +147,7 @@ func splitResolvedTxn(
}
}
resolvedRowsMap[tableID] = resolvedTxns
if len(resolvedTxnsWithTheSameCommitTs) > 0 && resolvedTxnsWithTheSameCommitTs[0].commitTs < minTs {
minTs = resolvedTxnsWithTheSameCommitTs[0].commitTs
}
flushedResolvedTsMap[tableID] = resolvedTs
}
return
}
Loading