From 9a997487583cb066d01755971695d03661de375f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 12 Jan 2022 16:59:42 +0800 Subject: [PATCH] sink (ticdc): fix a deadlock due to checkpointTs fall back in sinkNode (#4084) (#4098) close pingcap/tiflow#4055 --- cdc/processor/pipeline/sink.go | 11 +++++- cdc/processor/pipeline/sink_test.go | 58 +++++++++++++++++++++++++++++ pkg/pipeline/test.go | 2 +- 3 files changed, 69 insertions(+), 2 deletions(-) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 5c09a6736cf..849c2d79755 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -142,12 +142,21 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err if err != nil { return errors.Trace(err) } + + // we must call flowController.Release immediately after we call + // FlushRowChangedEvents to prevent deadlock cause by checkpointTs + // fall back + n.flowController.Release(checkpointTs) + + // the checkpointTs may fall back in some situation such as: + // 1. This table is newly added to the processor + // 2. There is one table in the processor that has a smaller + // checkpointTs than this one if checkpointTs <= n.checkpointTs { return nil } atomic.StoreUint64(&n.checkpointTs, checkpointTs) - n.flowController.Release(checkpointTs) return nil } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 03fbeb061ef..75b12a5ba4c 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -555,3 +555,61 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) { c.Assert(node.eventBuffer[insertEventIndex].Row.Columns, check.HasLen, 2) c.Assert(node.eventBuffer[insertEventIndex].Row.PreColumns, check.HasLen, 0) } + +type flushFlowController struct { + mockFlowController + releaseCounter int +} + +func (c *flushFlowController) Release(resolvedTs uint64) { + c.releaseCounter++ +} + +type flushSink struct { + mockSink +} + +// use to simulate the situation that resolvedTs return from sink manager +// fall back +var fallBackResolvedTs = uint64(10) + +func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { + if resolvedTs == fallBackResolvedTs { + return 0, nil + } + return resolvedTs, nil +} + +// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always +// call flowController.Release to release the memory quota of the table to avoid +// deadlock if there is no error occur +func (s *outputSuite) TestFlushSinkReleaseFlowController(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + cfg := config.GetDefaultReplicaConfig() + cfg.EnableOldValue = false + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-flushSink", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: cfg, + }, + }) + flowController := &flushFlowController{} + sink := &flushSink{} + // sNode is a sinkNode + sNode := newSinkNode(1, sink, 0, 10, flowController) + c.Assert(sNode.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) + sNode.barrierTs = 10 + + cctx := pipeline.MockNodeContext4Test(nil, pipeline.TickMessage(), nil) + err := sNode.flushSink(cctx, uint64(8)) + c.Assert(err, check.IsNil) + c.Assert(sNode.checkpointTs, check.Equals, uint64(8)) + c.Assert(flowController.releaseCounter, check.Equals, 1) + // resolvedTs will fall back in this call + err = sNode.flushSink(cctx, uint64(10)) + c.Assert(err, check.IsNil) + c.Assert(sNode.checkpointTs, check.Equals, uint64(8)) + c.Assert(flowController.releaseCounter, check.Equals, 2) +} diff --git a/pkg/pipeline/test.go b/pkg/pipeline/test.go index bfc25ac69b8..e0d3cd7f68c 100644 --- a/pkg/pipeline/test.go +++ b/pkg/pipeline/test.go @@ -30,7 +30,7 @@ func SendMessageToNode4Test(ctx context.Context, node Node, msgs []Message, outp return Message{}, nil } -// MockNodeContext4Test creates a node context with a message and a output channel for tests. +// MockNodeContext4Test creates a node context with a message and an output channel for tests. func MockNodeContext4Test(ctx context.Context, msg Message, outputCh chan Message) NodeContext { return newNodeContext(ctx, msg, outputCh) }