diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index faf275d26d8..95c9e8a0df0 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -41,6 +41,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d ==== Bugfixes *Affecting all Beats* +- Fix logstash output handles error twice when asynchronous sending fails. {pull}2441[2441] - Fix Elasticsearch structured error response parsing error. {issue}2229[2229] - Fixed the run script to allow the overriding of the configuration file. {issue}2171[2171] - Fix logstash output crash if no hosts are configured. {issue}2325[2325] diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index b7d02743895..a2b4f11027a 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -106,6 +106,7 @@ func (c *asyncClient) AsyncPublishEvents( cb: cb, err: nil, } + defer ref.dec() for len(data) > 0 { n, err := c.publishWindowed(ref, data) @@ -115,15 +116,10 @@ func (c *asyncClient) AsyncPublishEvents( data = data[n:] if err != nil { - c.win.shrinkWindow() _ = c.Close() - - logp.Err("Failed to publish events caused by: %v", err) - eventsNotAcked.Add(int64(len(data))) return err } } - ref.dec() return nil } @@ -191,6 +187,7 @@ func (r *msgRef) dec() { err := r.err if err != nil { eventsNotAcked.Add(int64(len(r.batch))) + logp.Err("Failed to publish events caused by: %v", err) r.cb(r.batch, err) } else { r.cb(nil, nil)