Skip to content

Commit

Permalink
Memory queue: cancel in-progress writes on queue closed, not producer…
Browse files Browse the repository at this point in the history
… closed (#38094) (#38178)

Fixes a race condition that could lead to incorrect event totals and occasional panics #37702.

Once a producer sends a get request to the memory queue, it must wait on the response unless the queue itself is closed, otherwise it can return a false failure. The previous code mistakenly waited on the done signal for the current producer rather than the queue. This PR adds the queue's done signal to the producer struct, and waits on that once the insert request is sent.

(cherry picked from commit d23b4d3)

Co-authored-by: Fae Charlton <fae.charlton@elastic.co>
  • Loading branch information
mergify[bot] and faec authored Mar 5, 2024
1 parent 8f6b9b0 commit 91fe08d
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ fields added to events containing the Beats version. {pull}37553[37553]
- Update github.com/lestrrat-go/jwx dependency. {pull}37799[37799]
- [threatintel] MISP pagination fixes {pull}37898[37898]
- Fix file handle leak when handling errors in filestream {pull}37973[37973]
- Fix a race condition that could crash Filebeat with a "negative WaitGroup counter" error {pull}38094[38094]
- Fix "failed processing S3 event for object key" error on aws-s3 input when key contains the "+" character {issue}38012[38012] {pull}38125[38125]

*Heartbeat*
Expand Down
39 changes: 27 additions & 12 deletions libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ type ackProducer struct {
}

type openState struct {
log *logp.Logger
done chan struct{}
events chan pushRequest
log *logp.Logger
done chan struct{}
queueDone <-chan struct{}
events chan pushRequest
}

// producerID stores the order of events within a single producer, so multiple
Expand All @@ -58,9 +59,10 @@ type ackHandler func(count int)

func newProducer(b *broker, cb ackHandler, dropCB func(interface{}), dropOnCancel bool) queue.Producer {
openState := openState{
log: b.logger,
done: make(chan struct{}),
events: b.pushChan,
log: b.logger,
done: make(chan struct{}),
queueDone: b.ctx.Done(),
events: b.pushChan,
}

if cb != nil {
Expand Down Expand Up @@ -143,27 +145,40 @@ func (st *openState) Close() {
func (st *openState) publish(req pushRequest) (queue.EntryID, bool) {
select {
case st.events <- req:
// If the output is blocked and the queue is full, `req` is written
// to `st.events`, however the queue never writes back to `req.resp`,
// which effectively blocks for ever. So we also need to select on the
// done channel to ensure we don't miss the shutdown signal.
// The events channel is buffered, which means we may successfully
// write to it even if the queue is shutting down. To avoid blocking
// forever during shutdown, we also have to wait on the queue's
// shutdown channel.
select {
case resp := <-req.resp:
return resp, true
case <-st.done:
case <-st.queueDone:
st.events = nil
return 0, false
}
case <-st.done:
st.events = nil
return 0, false
case <-st.queueDone:
st.events = nil
return 0, false
}
}

func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) {
select {
case st.events <- req:
return <-req.resp, true
// The events channel is buffered, which means we may successfully
// write to it even if the queue is shutting down. To avoid blocking
// forever during shutdown, we also have to wait on the queue's
// shutdown channel.
select {
case resp := <-req.resp:
return resp, true
case <-st.queueDone:
st.events = nil
return 0, false
}
case <-st.done:
st.events = nil
return 0, false
Expand Down
121 changes: 103 additions & 18 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
"testing"
"time"

"gotest.tools/assert"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
Expand Down Expand Up @@ -77,17 +76,17 @@ func TestProduceConsumer(t *testing.T) {
t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond)))
}

// TestProducerDoesNotBlockWhenCancelled ensures the producer Publish
// does not block indefinitely.
// TestProducerDoesNotBlockWhenQueueClosed ensures the producer Publish
// does not block indefinitely during queue shutdown.
//
// Once we get a producer `p` from the queue we want to ensure
// Once we get a producer `p` from the queue `q` we want to ensure
// that if p.Publish is called and blocks it will unblock once
// p.Cancel is called.
// `q.Close` is called.
//
// For this test we start a queue with size 2 and try to add more
// than 2 events to it, p.Publish will block, once we call p.Cancel,
// than 2 events to it, p.Publish will block, once we call q.Close,
// we ensure the 3rd event was not successfully published.
func TestProducerDoesNotBlockWhenCancelled(t *testing.T) {
func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) {
q := NewQueue(nil, nil,
Settings{
Events: 2, // Queue size
Expand Down Expand Up @@ -138,8 +137,12 @@ func TestProducerDoesNotBlockWhenCancelled(t *testing.T) {
time.Millisecond,
"the first two events were not successfully published")

// Cancel the producer, this should unblock its Publish method
p.Cancel()
// Close the queue, this should unblock the pending Publish call.
// It's not enough to just cancel the producer: once the producer
// has successfully sent a request to the queue, it must wait for
// the response unless the queue shuts down, otherwise the pipeline
// event totals will be wrong.
q.Close()

require.Eventually(
t,
Expand All @@ -149,6 +152,88 @@ func TestProducerDoesNotBlockWhenCancelled(t *testing.T) {
"test not flagged as successful, p.Publish likely blocked indefinitely")
}

func TestProducerClosePreservesEventCount(t *testing.T) {
// Check for https://github.com/elastic/beats/issues/37702, a problem
// where canceling a producer while it was waiting on a response
// to an insert request could lead to inaccurate event totals.

var activeEvents atomic.Int64

q := NewQueue(nil, nil,
Settings{
Events: 3, // Queue size
MaxGetRequest: 2,
FlushTimeout: 10 * time.Millisecond,
}, 1)

p := q.Producer(queue.ProducerConfig{
ACK: func(count int) {
activeEvents.Add(-int64(count))
},
OnDrop: func(e interface{}) {
//activeEvents.Add(-1)
},
DropOnCancel: false,
})

// Asynchronously, send 4 events to the queue.
// Three will be enqueued, and one will be buffered,
// until we start reading from the queue.
// This needs to run in a goroutine because the buffered
// event will block until the queue handles it.
var wgProducer sync.WaitGroup
wgProducer.Add(1)
go func() {
for i := 0; i < 4; i++ {
event := i
// For proper navigation of the race conditions inherent to this
// test: increment active events before the publish attempt, then
// decrement afterwards if it failed (otherwise the event count
// could become negative even under correct queue operation).
activeEvents.Add(1)
_, ok := p.Publish(event)
if !ok {
activeEvents.Add(-1)
}
}
wgProducer.Done()
}()

// This sleep is regrettable, but there's no deterministic way to know when
// the producer code has buffered an event in the queue's channel.
// However, the test is written to produce false negatives only:
// - If this test fails, it _always_ indicates a bug.
// - If there is a bug, this test will _often_ fail.
time.Sleep(20 * time.Millisecond)

// Cancel the producer, then read and acknowledge two batches. If the
// Publish calls and the queue code are working, activeEvents should
// _usually_ end up as 0, but _always_ end up non-negative.
p.Cancel()

// The queue reads also need to be done in a goroutine, in case the
// producer cancellation signal went through before the Publish
// requests -- if only 2 events entered the queue, then the second
// Get call will block until the queue itself is cancelled.
go func() {
for i := 0; i < 2; i++ {
batch, err := q.Get(2)
// Only error to worry about is queue closing, which isn't
// a test failure.
if err == nil {
batch.Done()
}
}
}()

// One last sleep to let things percolate, then we close the queue
// to unblock any helpers and verify that the final active event
// count isn't negative.
time.Sleep(10 * time.Millisecond)
q.Close()
assert.False(t, activeEvents.Load() < 0, "active event count should never be negative")
}

func TestQueueMetricsDirect(t *testing.T) {
eventsToTest := 5
maxEvents := 10
Expand Down Expand Up @@ -190,7 +275,7 @@ func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, te

// Read events, don't yet ack them
batch, err := testQueue.Get(eventsToTest)
assert.NilError(t, err, "error in Get")
assert.NoError(t, err, "error in Get")
t.Logf("Got batch of %d events", batch.Count())

queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))
Expand All @@ -206,7 +291,7 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup
// wait briefly to avoid races across all the queue channels
time.Sleep(time.Millisecond * 100)
testMetrics, err := q.Metrics()
assert.NilError(t, err, "error calling metrics for test %s", test)
assert.NoError(t, err, "error calling metrics for test %s", test)
assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test)
assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test)
assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test)
Expand Down Expand Up @@ -266,18 +351,18 @@ func TestEntryIDs(t *testing.T) {

for i := 0; i < entryCount; i++ {
batch, err := q.Get(1)
assert.NilError(t, err, "Queue read should succeed")
assert.NoError(t, err, "Queue read should succeed")
assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry")

metrics, err := q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i),
fmt.Sprintf("Oldest entry ID before ACKing event %v should be %v", i, i))

batch.Done()
waiter.waitForEvents(1)
metrics, err = q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i+1),
fmt.Sprintf("Oldest entry ID after ACKing event %v should be %v", i, i+1))

Expand All @@ -297,7 +382,7 @@ func TestEntryIDs(t *testing.T) {

for i := 0; i < entryCount; i++ {
batch, err := q.Get(1)
assert.NilError(t, err, "Queue read should succeed")
assert.NoError(t, err, "Queue read should succeed")
assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry")
batches = append(batches, batch)
}
Expand All @@ -318,15 +403,15 @@ func TestEntryIDs(t *testing.T) {
// the slight nondeterminism.
time.Sleep(1 * time.Millisecond)
metrics, err := q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(0),
fmt.Sprintf("Oldest entry ID after ACKing event %v should be 0", i))
}
// ACK the first batch, which should unblock all the later ones
batches[0].Done()
waiter.waitForEvents(100)
metrics, err := q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(100),
fmt.Sprintf("Oldest entry ID after ACKing event 0 should be %v", queue.EntryID(entryCount)))

Expand Down

0 comments on commit 91fe08d

Please sign in to comment.