Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

output/cloudv2: Drain the SampleBuffer on stop and abort once #3105

Merged
merged 1 commit into from
Jun 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Output struct {

// abort signal to interrupt immediately all background goroutines
abort chan struct{}
abortOnce sync.Once
testStopFunc func(error)
}

Expand Down Expand Up @@ -112,7 +113,7 @@ func (o *Output) StopWithTestError(testErr error) error {
// all the queued samples even if they haven't yet passed the
// wait period.
o.collector.DropExpiringDelay()
o.collector.CollectSamples(nil)
o.collectSamples()
o.flushMetrics()

return nil
Expand Down Expand Up @@ -164,9 +165,6 @@ func (o *Output) periodicInvoke(d time.Duration, callback func()) {

func (o *Output) collectSamples() {
samples := o.GetBufferedSamples()
if len(samples) < 1 {
return
}
Comment on lines 166 to -169
Copy link
Contributor Author

@codebien codebien May 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the check based on the assumption we don't expect to invoke this and get an empty buffer because the aggregation period should be wide enough, and in the case, we get it then there are two next operations:

  1. a for range: so it is skipped
  2. The expired check: we go over the list one more time but if we get some expired, then they will be removed from the list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if those assumptions are valid, I don't see how an early return if there's no work to be done would hurt. If nothing else it would simplify understanding this logic without being aware of these nuances.

But it's up to you 🙂

Copy link
Contributor Author

@codebien codebien Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how an early return if there's no work to be done would hurt.

@imiric Then the code is not obvious and I think we need to arrange it in another way. The reason why I removed it is because we need to flush the SampleBuffer + the collector's buffer. If we return we can't execute the drain of the collecort's buffer. Do you think it makes sense?

The alternative is doing something one of the following in StopWithTestError:

output.collectSamples()
output.collector.collectSamples(nil)

this has the downside to call two times the collector.CollectSamples method, so another way could be:

output.collector.collectSamples(o.GetBufferedSamples())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I was misunderstanding where the flush is happening.

I still don't have a full grasp of the code here, but that's on me, so I don't think refactoring would help. 🙂

o.collector.CollectSamples(samples)

// TODO: other operations with the samples containers
Expand Down Expand Up @@ -215,21 +213,18 @@ func (o *Output) handleFlushError(err error) {

// Do not close multiple times (that would panic) in the case
// we hit this multiple times and/or concurrently
select {
case <-o.abort:
return
default:
o.abortOnce.Do(func() {
close(o.abort)
}

if o.config.StopOnError.Bool {
serr := errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort),
errext.AbortedByOutput,
)
if o.config.StopOnError.Bool {
serr := errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort),
errext.AbortedByOutput,
)

if o.testStopFunc != nil {
o.testStopFunc(serr)
if o.testStopFunc != nil {
o.testStopFunc(serr)
}
}
}
})
}