diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6aa5f2107b20..920d69999180 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -70,6 +70,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Fix http status phrase parsing not allow spaces. {pull}5312[5312] - Fix missing length check in the PostgreSQL module. {pull}5457[5457] +- Fix panic in ACK handler if event is dropped on blocked queue {issue}5524[5524] *Winlogbeat* diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 58722514470e..21af8bf95c46 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -53,13 +53,20 @@ func (l *ackLoop) run() { count, events := lst.count() l.lst.concat(&lst) - // log.Debugf("ackloop: scheduledACKs count=%v events=%v\n", count, events) + // log.Debug("ACK List:") + // for current := l.lst.head; current != nil; current = current.next { + // log.Debugf(" ack entry(seq=%v, start=%v, count=%v", + // current.seq, current.start, current.count) + // } + l.batchesSched += uint64(count) l.totalSched += uint64(events) case <-l.sig: acked += l.handleBatchSig() - acks = l.broker.acks + if acked > 0 { + acks = l.broker.acks + } } // log.Debug("ackloop INFO") @@ -87,12 +94,14 @@ func (l *ackLoop) handleBatchSig() int { count += current.count } - if e := l.broker.eventer; e != nil { - e.OnACK(count) - } + if count > 0 { + if e := l.broker.eventer; e != nil { + e.OnACK(count) + } - // report acks to waiting clients - l.processACK(lst, count) + // report acks to waiting clients + l.processACK(lst, count) + } for !lst.empty() { releaseACKChan(lst.pop()) @@ -110,6 +119,7 @@ func (l *ackLoop) collectAcked() chanList { lst := chanList{} acks := l.lst.pop() + l.onACK(acks) lst.append(acks) done := false @@ -117,8 +127,7 @@ func (l *ackLoop) collectAcked() chanList { acks := l.lst.front() select { case <-acks.ch: - l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count) - l.batchesACKed++ + l.onACK(acks) lst.append(l.lst.pop()) default: @@ -128,3 +137,8 @@ func (l *ackLoop) collectAcked() chanList { return lst } + +func (l *ackLoop) onACK(acks *ackChan) { + l.batchesACKed++ + l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count) +} diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index a74d294c1958..9a27a36ec926 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -200,6 +200,10 @@ func (l *chanList) prepend(ch *ackChan) { } func (l *chanList) concat(other *chanList) { + if other.head == nil { + return + } + if l.head == nil { *l = *other return diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index ccf6b568a36f..a10a2e7e8b82 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -21,6 +21,7 @@ type ackProducer struct { } type openState struct { + log logger isOpen atomic.Bool done chan struct{} events chan pushRequest @@ -37,6 +38,7 @@ type ackHandler func(count int) func newProducer(b *Broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer { openState := openState{ + log: b.logger, isOpen: atomic.MakeBool(true), done: make(chan struct{}), events: b.events, @@ -69,11 +71,18 @@ func (p *forgetfullProducer) Cancel() int { } func (p *ackProducer) Publish(event publisher.Event) bool { - return p.openState.publish(p.makeRequest(event)) + return p.updSeq(p.openState.publish(p.makeRequest(event))) } func (p *ackProducer) TryPublish(event publisher.Event) bool { - return p.openState.tryPublish(p.makeRequest(event)) + return p.updSeq(p.openState.tryPublish(p.makeRequest(event))) +} + +func (p *ackProducer) updSeq(ok bool) bool { + if ok { + p.seq++ + } + return ok } func (p *ackProducer) makeRequest(event publisher.Event) pushRequest { @@ -82,7 +91,6 @@ func (p *ackProducer) makeRequest(event publisher.Event) pushRequest { seq: p.seq, state: &p.state, } - p.seq++ return req } @@ -126,6 +134,7 @@ func (st *openState) tryPublish(req pushRequest) bool { st.events = nil return false default: + st.log.Debugf("Dropping event, queue is blocked (seq=%v) ", req.seq) return false } }