diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index edc60b36cc8..62c473444f0 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -878,7 +878,6 @@ func (m *SinkManager) Close() error { } m.sinkMemQuota.Close() m.redoMemQuota.Close() - m.sinkFactory.Close() m.tableSinks.Range(func(_ tablepb.Span, value interface{}) bool { sink := value.(*tableSinkWrapper) sink.close(m.ctx) @@ -887,11 +886,15 @@ func (m *SinkManager) Close() error { } return true }) + m.wg.Wait() log.Info("All table sinks closed", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Duration("cost", time.Since(start))) - m.wg.Wait() + // todo: Add a unit test to cover this, + // Make sure all sink workers exited before closing the sink factory. + // Otherwise, it would panic in the sink when you try to write some data to a closed sink. + m.sinkFactory.Close() log.Info("Closed sink manager", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID),