Skip to content

Commit

Permalink
Drain the SampleBuffer on stop and abort once
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Jun 1, 2023
1 parent 23ff170 commit 04a1356
Showing 1 changed file with 12 additions and 17 deletions.
29 changes: 12 additions & 17 deletions output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Output struct {

// abort signal to interrupt immediately all background goroutines
abort chan struct{}
abortOnce sync.Once
testStopFunc func(error)
}

Expand Down Expand Up @@ -112,7 +113,7 @@ func (o *Output) StopWithTestError(testErr error) error {
// all the queued samples even if they haven't yet passed the
// wait period.
o.collector.DropExpiringDelay()
o.collector.CollectSamples(nil)
o.collectSamples()
o.flushMetrics()

return nil
Expand Down Expand Up @@ -164,9 +165,6 @@ func (o *Output) periodicInvoke(d time.Duration, callback func()) {

func (o *Output) collectSamples() {
samples := o.GetBufferedSamples()
if len(samples) < 1 {
return
}
o.collector.CollectSamples(samples)

// TODO: other operations with the samples containers
Expand Down Expand Up @@ -215,21 +213,18 @@ func (o *Output) handleFlushError(err error) {

// Do not close multiple times (that would panic) in the case
// we hit this multiple times and/or concurrently
select {
case <-o.abort:
return
default:
o.abortOnce.Do(func() {
close(o.abort)
}

if o.config.StopOnError.Bool {
serr := errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort),
errext.AbortedByOutput,
)
if o.config.StopOnError.Bool {
serr := errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort),
errext.AbortedByOutput,
)

if o.testStopFunc != nil {
o.testStopFunc(serr)
if o.testStopFunc != nil {
o.testStopFunc(serr)
}
}
}
})
}

0 comments on commit 04a1356

Please sign in to comment.