Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterhelper] Fix potential deadlock in the batch sender #10315

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .chloggen/batchseder-fix-potential-deadlock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix potential deadlock in the batch sender

# One or more tracking issues or pull requests related to the change
issues: [10315]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
change_logs: [user]
28 changes: 10 additions & 18 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ type batchSender struct {
concurrencyLimit uint64
activeRequests atomic.Uint64

resetTimerCh chan struct{}

mu sync.Mutex
activeBatch *batch
lastFlushed time.Time

logger *zap.Logger

Expand All @@ -57,7 +56,6 @@ func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings,
shutdownCh: nil,
shutdownCompleteCh: make(chan struct{}),
stopped: &atomic.Bool{},
resetTimerCh: make(chan struct{}),
}
return bs
}
Expand Down Expand Up @@ -85,16 +83,17 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
return
case <-timer.C:
bs.mu.Lock()
nextFlush := bs.cfg.FlushTimeout
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
sinceLastFlush := time.Since(bs.lastFlushed)
if sinceLastFlush >= bs.cfg.FlushTimeout {
bs.exportActiveBatch()
} else {
nextFlush = bs.cfg.FlushTimeout - sinceLastFlush
}
}
bs.mu.Unlock()
timer.Reset(bs.cfg.FlushTimeout)
case <-bs.resetTimerCh:
if !timer.Stop() {
<-timer.C
}
timer.Reset(bs.cfg.FlushTimeout)
timer.Reset(nextFlush)
}
}
}()
Expand Down Expand Up @@ -123,15 +122,10 @@ func (bs *batchSender) exportActiveBatch() {
b.err = bs.nextSender.send(b.ctx, b.request)
close(b.done)
}(bs.activeBatch)
bs.lastFlushed = time.Now()
bs.activeBatch = newEmptyBatch()
}

func (bs *batchSender) resetTimer() {
if !bs.stopped.Load() {
bs.resetTimerCh <- struct{}{}
}
}

// isActiveBatchReady returns true if the active batch is ready to be exported.
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
// Caller must hold the lock.
Expand Down Expand Up @@ -168,7 +162,6 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err
batch := bs.activeBatch
if bs.isActiveBatchReady() || len(reqs) > 1 {
bs.exportActiveBatch()
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
Expand Down Expand Up @@ -208,7 +201,6 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
batch := bs.activeBatch
if bs.isActiveBatchReady() {
bs.exportActiveBatch()
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
Expand Down
85 changes: 85 additions & 0 deletions exporter/exporterhelper/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,91 @@ func TestBatchSenderWithTimeout(t *testing.T) {
assert.EqualValues(t, 12, sink.itemsCount.Load())
}

func TestBatchSenderTimerResetNoConflict(t *testing.T) {
delayBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) {
time.Sleep(30 * time.Millisecond)
if r1 == nil {
return r2, nil
}
fr1 := r1.(*fakeRequest)
fr2 := r2.(*fakeRequest)
if fr2.mergeErr != nil {
return nil, fr2.mergeErr
}
return &fakeRequest{
items: fr1.items + fr2.items,
sink: fr1.sink,
exportErr: fr2.exportErr,
delay: fr1.delay + fr2.delay,
}, nil
}
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 8
bCfg.FlushTimeout = 50 * time.Millisecond
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc)))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
sink := newFakeRequestSink()

// Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer
go func() {
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()
time.Sleep(30 * time.Millisecond)
go func() {
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()

// The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load())
assert.EqualValues(c, 8, sink.itemsCount.Load())
}, 200*time.Millisecond, 10*time.Millisecond)

require.NoError(t, be.Shutdown(context.Background()))
}

func TestBatchSenderTimerFlush(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 8
bCfg.FlushTimeout = 100 * time.Millisecond
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
sink := newFakeRequestSink()
time.Sleep(50 * time.Millisecond)

// Send 2 concurrent requests that should be merged in one batch and sent immediately
go func() {
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()
go func() {
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load())
assert.EqualValues(c, 8, sink.itemsCount.Load())
}, 30*time.Millisecond, 5*time.Millisecond)

// Send another request that should be flushed after 100ms instead of 50ms since last flush
go func() {
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()

// Confirm that it is not flushed in 50ms
time.Sleep(60 * time.Millisecond)
assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load())
assert.EqualValues(t, 8, sink.itemsCount.Load())

// Confirm that it is flushed after 100ms (using 60+50=110 here to be safe)
time.Sleep(50 * time.Millisecond)
assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load())
assert.EqualValues(t, 12, sink.itemsCount.Load())
require.NoError(t, be.Shutdown(context.Background()))
}

func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter {
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption,
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))
Expand Down
Loading