diff --git a/CHANGELOG.md b/CHANGELOG.md index 906737b7682..aa42ccb8430 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ that calls shutdown to terminate it; this is done per memory limiter instance. Added memory limiter factory to cache initiated object and be reused by similar config. This guarantees a single running `checkMemLimits` per config (#4886) +- Resolved race condition in collector when calling `Shutdown` (#4878) ## v0.45.0 Beta diff --git a/service/collector.go b/service/collector.go index ace363a84ca..3c060458563 100644 --- a/service/collector.go +++ b/service/collector.go @@ -96,6 +96,8 @@ func New(set CollectorSettings) (*Collector, error) { set: set, state: state, + + shutdownChan: make(chan struct{}), }, nil } @@ -113,12 +115,14 @@ func (col *Collector) GetLogger() *zap.Logger { // Shutdown shuts down the collector server. func (col *Collector) Shutdown() { - defer func() { - if r := recover(); r != nil { - col.logger.Info("shutdownChan already closed") - } - }() - close(col.shutdownChan) + // Only shutdown if we're in a Running or Starting State else noop + state := col.GetState() + if state == Running || state == Starting { + defer func() { + recover() // nolint:errcheck + }() + close(col.shutdownChan) + } } // runAndWaitForShutdownEvent waits for one of the shutdown events that can happen. @@ -131,7 +135,6 @@ func (col *Collector) runAndWaitForShutdownEvent(ctx context.Context) error { signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) } - col.shutdownChan = make(chan struct{}) col.setCollectorState(Running) LOOP: for { diff --git a/service/collector_test.go b/service/collector_test.go index 4d9b4ec19ed..5049ffd52ea 100644 --- a/service/collector_test.go +++ b/service/collector_test.go @@ -179,6 +179,42 @@ func TestCollector_ShutdownNoop(t *testing.T) { require.NotPanics(t, func() { col.Shutdown() }) } +func TestCollector_ShutdownBeforeRun(t *testing.T) { + // use a mock AppTelemetry struct to return an error on shutdown + preservedAppTelemetry := collectorTelemetry + collectorTelemetry = &colTelemetry{} + defer func() { collectorTelemetry = preservedAppTelemetry }() + + factories, err := testcomponents.NewDefaultFactories() + require.NoError(t, err) + + set := CollectorSettings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil), + } + col, err := New(set) + require.NoError(t, err) + + // Calling shutdown before collector is running should cause it to return quickly + col.Shutdown() + + colDone := make(chan struct{}) + go func() { + defer close(colDone) + colErr := col.Run(context.Background()) + if colErr != nil { + err = colErr + } + }() + + col.Shutdown() + <-colDone + assert.Eventually(t, func() bool { + return Closed == col.GetState() + }, time.Second*2, time.Millisecond*200) +} + type mockColTelemetry struct{} func (tel *mockColTelemetry) init(*Collector) error {