Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate batches in retry queue #5520

Merged
merged 2 commits into from
Nov 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ https://github.com/elastic/beats/compare/v5.6.4...5.6[Check the HEAD diff]

*Affecting all Beats*

- Fix duplicate batches of events in retry queue. {pull}5520[5520]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably mention deadlock, duplicates can be seen in other scenario.


*Filebeat*

*Heartbeat*
Expand Down
43 changes: 33 additions & 10 deletions libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,7 @@ func (c *asyncClient) AsyncPublishEvents(
return nil
}

ref := &msgRef{
client: c,
count: 1,
batch: data,
batchSize: len(data),
win: c.win,
cb: cb,
err: nil,
}
ref := newMsgRef(c, data, cb)
defer ref.dec()

for len(data) > 0 {
Expand Down Expand Up @@ -171,7 +163,7 @@ func (c *asyncClient) sendEvents(ref *msgRef, data []outputs.Data) error {
for i, d := range data {
window[i] = d
}
atomic.AddInt32(&ref.count, 1)
ref.inc()
return c.client.Send(ref.callback, window)
}

Expand All @@ -183,7 +175,33 @@ func (r *msgRef) callback(seq uint32, err error) {
}
}

func newMsgRef(
client *asyncClient,
data []outputs.Data,
cb func([]outputs.Data, error),
) *msgRef {
r := &msgRef{
client: client,
count: 1,
batch: data,
batchSize: len(data),
win: client.win,
cb: cb,
err: nil,
}

debug("msgref(%p) new: batch=%p, cb=%p", r, &r.batch[0], cb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create a lot of debug messages. I know that this info is for debugging, but I start to get the feeling we need something in between info and debug, something like info --verbose :-)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, some extra-debug would be nice to have. Normally the debug messages create about 4 new debug messages per batch (2048) events. Not too bad, but super helpful to identify potential issues, should we still face some problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel more like a trace level to me, but I am ok with debug since the noise wont be that much.

return r
}

func (r *msgRef) inc() {
count := atomic.AddInt32(&r.count, 1)
debug("msgref(%p) inc -> %v", r, count)
}

func (r *msgRef) done(n uint32) {
debug("msgref(%p) done(%v)", r, n)

ackedEvents.Add(int64(n))
r.batch = r.batch[n:]
if r.win != nil {
Expand All @@ -193,6 +211,8 @@ func (r *msgRef) done(n uint32) {
}

func (r *msgRef) fail(n uint32, err error) {
debug("msgref(%p) fail(%v, %v)", r, n, err)

ackedEvents.Add(int64(n))
if r.err == nil {
r.err = err
Expand All @@ -206,6 +226,7 @@ func (r *msgRef) fail(n uint32, err error) {

func (r *msgRef) dec() {
i := atomic.AddInt32(&r.count, -1)
debug("msgref(%p) dec -> %v", r, i)
if i > 0 {
return
}
Expand All @@ -214,9 +235,11 @@ func (r *msgRef) dec() {
if err != nil {
eventsNotAcked.Add(int64(len(r.batch)))
logp.Err("Failed to publish events (host: %v) caused by: %v", r.client.host, err)
debug("msgref(%p) exec callback(%p, %v)", r, &r.batch[0], err)
r.cb(r.batch, err)
return
}

debug("msgref(%p) exec callback(nil, nil)", r)
r.cb(nil, nil)
}
21 changes: 3 additions & 18 deletions libbeat/outputs/mode/lb/async_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,10 @@ func (w *asyncWorker) sendLoop() (done bool) {
}

func (w *asyncWorker) onMessage(msg eventsMessage) error {
var err error
if msg.datum.Event != nil {
err = w.client.AsyncPublishEvent(w.handleResult(msg), msg.datum)
} else {
err = w.client.AsyncPublishEvents(w.handleResults(msg), msg.data)
return w.client.AsyncPublishEvent(w.handleResult(msg), msg.datum)
}

if err != nil {
if msg.attemptsLeft > 0 {
msg.attemptsLeft--
}

// asynchronously retry to insert message (if attempts left), so worker can not
// deadlock on retries channel if client puts multiple failed outstanding
// events into the pipeline
w.onFail(msg, err)
}

return err
return w.client.AsyncPublishEvents(w.handleResults(msg), msg.data)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change in semantics is here. We always require the w.client instance to use the callback build via handleResult to report success/failure within a batch. This allows the output client to decide on sync or async error reporting.
The async client as provided by go-lumber does require full async reporting, but the Logstash output did some sync reporting as well, leading to duplicates. PR changes to have go-lumber only trigger the async reporting, indirectly via msgRef.

}

func (w *asyncWorker) handleResult(msg eventsMessage) func(error) {
Expand Down Expand Up @@ -193,7 +178,7 @@ func (w *asyncWorker) handleResults(msg eventsMessage) func([]outputs.Data, erro
}

// all events published -> signal success
debugf("async bulk publish success")
debugf("async bulk publish success (signaler=%v)", msg.signaler)
op.SigCompleted(msg.signaler)
}
}
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/mode/modetest/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func AsyncPublishFailStartWith(
inc := makeCounter(n, err)
return func(cb func([]outputs.Data, error), data []outputs.Data) error {
if err := inc(); err != nil {
cb(data, err)
return err
}
return pub(cb, data)
Expand Down