diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 17381dfa5a63..1257274d5a79 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,8 @@ https://github.com/elastic/beats/compare/v5.6.4...5.6[Check the HEAD diff] *Affecting all Beats* +- Fix duplicate batches of events in retry queue. {pull}5520[5520] + *Filebeat* *Heartbeat* diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 9475d693f00c..2f6379064333 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -107,15 +107,7 @@ func (c *asyncClient) AsyncPublishEvents( return nil } - ref := &msgRef{ - client: c, - count: 1, - batch: data, - batchSize: len(data), - win: c.win, - cb: cb, - err: nil, - } + ref := newMsgRef(c, data, cb) defer ref.dec() for len(data) > 0 { @@ -171,7 +163,7 @@ func (c *asyncClient) sendEvents(ref *msgRef, data []outputs.Data) error { for i, d := range data { window[i] = d } - atomic.AddInt32(&ref.count, 1) + ref.inc() return c.client.Send(ref.callback, window) } @@ -183,7 +175,33 @@ func (r *msgRef) callback(seq uint32, err error) { } } +func newMsgRef( + client *asyncClient, + data []outputs.Data, + cb func([]outputs.Data, error), +) *msgRef { + r := &msgRef{ + client: client, + count: 1, + batch: data, + batchSize: len(data), + win: client.win, + cb: cb, + err: nil, + } + + debug("msgref(%p) new: batch=%p, cb=%p", r, &r.batch[0], cb) + return r +} + +func (r *msgRef) inc() { + count := atomic.AddInt32(&r.count, 1) + debug("msgref(%p) inc -> %v", r, count) +} + func (r *msgRef) done(n uint32) { + debug("msgref(%p) done(%v)", r, n) + ackedEvents.Add(int64(n)) r.batch = r.batch[n:] if r.win != nil { @@ -193,6 +211,8 @@ func (r *msgRef) done(n uint32) { } func (r *msgRef) fail(n uint32, err error) { + debug("msgref(%p) fail(%v, %v)", r, n, err) + ackedEvents.Add(int64(n)) if r.err == nil { r.err = err @@ -206,6 +226,7 @@ func (r *msgRef) fail(n uint32, err error) { func (r *msgRef) dec() { i := atomic.AddInt32(&r.count, -1) + debug("msgref(%p) dec -> %v", r, i) if i > 0 { return } @@ -214,9 +235,11 @@ func (r *msgRef) dec() { if err != nil { eventsNotAcked.Add(int64(len(r.batch))) logp.Err("Failed to publish events (host: %v) caused by: %v", r.client.host, err) + debug("msgref(%p) exec callback(%p, %v)", r, &r.batch[0], err) r.cb(r.batch, err) return } + debug("msgref(%p) exec callback(nil, nil)", r) r.cb(nil, nil) } diff --git a/libbeat/outputs/mode/lb/async_worker.go b/libbeat/outputs/mode/lb/async_worker.go index ed6cbc39e411..abd619002237 100644 --- a/libbeat/outputs/mode/lb/async_worker.go +++ b/libbeat/outputs/mode/lb/async_worker.go @@ -111,25 +111,10 @@ func (w *asyncWorker) sendLoop() (done bool) { } func (w *asyncWorker) onMessage(msg eventsMessage) error { - var err error if msg.datum.Event != nil { - err = w.client.AsyncPublishEvent(w.handleResult(msg), msg.datum) - } else { - err = w.client.AsyncPublishEvents(w.handleResults(msg), msg.data) + return w.client.AsyncPublishEvent(w.handleResult(msg), msg.datum) } - - if err != nil { - if msg.attemptsLeft > 0 { - msg.attemptsLeft-- - } - - // asynchronously retry to insert message (if attempts left), so worker can not - // deadlock on retries channel if client puts multiple failed outstanding - // events into the pipeline - w.onFail(msg, err) - } - - return err + return w.client.AsyncPublishEvents(w.handleResults(msg), msg.data) } func (w *asyncWorker) handleResult(msg eventsMessage) func(error) { @@ -193,7 +178,7 @@ func (w *asyncWorker) handleResults(msg eventsMessage) func([]outputs.Data, erro } // all events published -> signal success - debugf("async bulk publish success") + debugf("async bulk publish success (signaler=%v)", msg.signaler) op.SigCompleted(msg.signaler) } } diff --git a/libbeat/outputs/mode/modetest/callbacks.go b/libbeat/outputs/mode/modetest/callbacks.go index 245f083c3291..0693efd2a096 100644 --- a/libbeat/outputs/mode/modetest/callbacks.go +++ b/libbeat/outputs/mode/modetest/callbacks.go @@ -120,6 +120,7 @@ func AsyncPublishFailStartWith( inc := makeCounter(n, err) return func(cb func([]outputs.Data, error), data []outputs.Data) error { if err := inc(); err != nil { + cb(data, err) return err } return pub(cb, data)