diff --git a/.chloggen/batchseder-fix-potential-deadlock.yaml b/.chloggen/batchseder-fix-potential-deadlock.yaml new file mode 100644 index 00000000000..d3b89737775 --- /dev/null +++ b/.chloggen/batchseder-fix-potential-deadlock.yaml @@ -0,0 +1,19 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix potential deadlock in the batch sender + +# One or more tracking issues or pull requests related to the change +issues: [10315] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +change_logs: [user] diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go index 8c69c4c1f61..3501fc3efb0 100644 --- a/exporter/exporterhelper/batch_sender.go +++ b/exporter/exporterhelper/batch_sender.go @@ -33,10 +33,9 @@ type batchSender struct { concurrencyLimit uint64 activeRequests atomic.Uint64 - resetTimerCh chan struct{} - mu sync.Mutex activeBatch *batch + lastFlushed time.Time logger *zap.Logger @@ -57,7 +56,6 @@ func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings, shutdownCh: nil, shutdownCompleteCh: make(chan struct{}), stopped: &atomic.Bool{}, - resetTimerCh: make(chan struct{}), } return bs } @@ -85,16 +83,17 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error { return case <-timer.C: bs.mu.Lock() + nextFlush := bs.cfg.FlushTimeout if bs.activeBatch.request != nil { - bs.exportActiveBatch() + sinceLastFlush := time.Since(bs.lastFlushed) + if sinceLastFlush >= bs.cfg.FlushTimeout { + bs.exportActiveBatch() + } else { + nextFlush = bs.cfg.FlushTimeout - sinceLastFlush + } } bs.mu.Unlock() - timer.Reset(bs.cfg.FlushTimeout) - case <-bs.resetTimerCh: - if !timer.Stop() { - <-timer.C - } - timer.Reset(bs.cfg.FlushTimeout) + timer.Reset(nextFlush) } } }() @@ -123,15 +122,10 @@ func (bs *batchSender) exportActiveBatch() { b.err = bs.nextSender.send(b.ctx, b.request) close(b.done) }(bs.activeBatch) + bs.lastFlushed = time.Now() bs.activeBatch = newEmptyBatch() } -func (bs *batchSender) resetTimer() { - if !bs.stopped.Load() { - bs.resetTimerCh <- struct{}{} - } -} - // isActiveBatchReady returns true if the active batch is ready to be exported. // The batch is ready if it has reached the minimum size or the concurrency limit is reached. // Caller must hold the lock. @@ -168,7 +162,6 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err batch := bs.activeBatch if bs.isActiveBatchReady() || len(reqs) > 1 { bs.exportActiveBatch() - bs.resetTimer() } bs.mu.Unlock() <-batch.done @@ -208,7 +201,6 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error { batch := bs.activeBatch if bs.isActiveBatchReady() { bs.exportActiveBatch() - bs.resetTimer() } bs.mu.Unlock() <-batch.done diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index cfcef01711c..5a5a175bdab 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -535,6 +535,91 @@ func TestBatchSenderWithTimeout(t *testing.T) { assert.EqualValues(t, 12, sink.itemsCount.Load()) } +func TestBatchSenderTimerResetNoConflict(t *testing.T) { + delayBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { + time.Sleep(30 * time.Millisecond) + if r1 == nil { + return r2, nil + } + fr1 := r1.(*fakeRequest) + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{ + items: fr1.items + fr2.items, + sink: fr1.sink, + exportErr: fr2.exportErr, + delay: fr1.delay + fr2.delay, + }, nil + } + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 8 + bCfg.FlushTimeout = 50 * time.Millisecond + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + sink := newFakeRequestSink() + + // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer + go func() { + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + time.Sleep(30 * time.Millisecond) + go func() { + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + + // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) + assert.EqualValues(c, 8, sink.itemsCount.Load()) + }, 200*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestBatchSenderTimerFlush(t *testing.T) { + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 8 + bCfg.FlushTimeout = 100 * time.Millisecond + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + sink := newFakeRequestSink() + time.Sleep(50 * time.Millisecond) + + // Send 2 concurrent requests that should be merged in one batch and sent immediately + go func() { + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + go func() { + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) + assert.EqualValues(c, 8, sink.itemsCount.Load()) + }, 30*time.Millisecond, 5*time.Millisecond) + + // Send another request that should be flushed after 100ms instead of 50ms since last flush + go func() { + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + + // Confirm that it is not flushed in 50ms + time.Sleep(60 * time.Millisecond) + assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load()) + assert.EqualValues(t, 8, sink.itemsCount.Load()) + + // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) + time.Sleep(50 * time.Millisecond) + assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load()) + assert.EqualValues(t, 12, sink.itemsCount.Load()) + require.NoError(t, be.Shutdown(context.Background())) +} + func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))