From ce7fec06ae9289a860725d7aaa45bcdaa43d917e Mon Sep 17 00:00:00 2001 From: Ivan <2103732+codebien@users.noreply.github.com> Date: Wed, 31 May 2023 10:59:13 +0200 Subject: [PATCH] Drain the SampleBuffer on stop and abort once --- output/cloud/expv2/output.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index a6a5bf4f19d..f157fbf3bf2 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -38,6 +38,7 @@ type Output struct { // abort signal to interrupt immediately all background goroutines abort chan struct{} + abortOnce sync.Once testStopFunc func(error) } @@ -112,7 +113,7 @@ func (o *Output) StopWithTestError(testErr error) error { // all the queued samples even if they haven't yet passed the // wait period. o.collector.DropExpiringDelay() - o.collector.CollectSamples(nil) + o.collectSamples() o.flushMetrics() return nil @@ -164,9 +165,6 @@ func (o *Output) periodicInvoke(d time.Duration, callback func()) { func (o *Output) collectSamples() { samples := o.GetBufferedSamples() - if len(samples) < 1 { - return - } o.collector.CollectSamples(samples) // TODO: other operations with the samples containers @@ -215,21 +213,18 @@ func (o *Output) handleFlushError(err error) { // Do not close multiple times (that would panic) in the case // we hit this multiple times and/or concurrently - select { - case <-o.abort: - return - default: + o.abortOnce.Do(func() { close(o.abort) - } - if o.config.StopOnError.Bool { - serr := errext.WithAbortReasonIfNone( - errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort), - errext.AbortedByOutput, - ) + if o.config.StopOnError.Bool { + serr := errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort), + errext.AbortedByOutput, + ) - if o.testStopFunc != nil { - o.testStopFunc(serr) + if o.testStopFunc != nil { + o.testStopFunc(serr) + } } - } + }) }