From be8b0c43d03464d06a2b4039f057bf928a24f4bc Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Fri, 10 Feb 2023 17:24:01 +0800 Subject: [PATCH] sinkV2(ticdc): sink manager close the sink factory at the last (#8219) close pingcap/tiflow#8216 --- cdc/processor/sinkmanager/manager.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index edc60b36cc8..62c473444f0 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -878,7 +878,6 @@ func (m *SinkManager) Close() error { } m.sinkMemQuota.Close() m.redoMemQuota.Close() - m.sinkFactory.Close() m.tableSinks.Range(func(_ tablepb.Span, value interface{}) bool { sink := value.(*tableSinkWrapper) sink.close(m.ctx) @@ -887,11 +886,15 @@ func (m *SinkManager) Close() error { } return true }) + m.wg.Wait() log.Info("All table sinks closed", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Duration("cost", time.Since(start))) - m.wg.Wait() + // todo: Add a unit test to cover this, + // Make sure all sink workers exited before closing the sink factory. + // Otherwise, it would panic in the sink when you try to write some data to a closed sink. + m.sinkFactory.Close() log.Info("Closed sink manager", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID),