diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index c041804459a..f092ff8a627 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -165,6 +165,7 @@ func (r *msgRef) callback(seq uint32, err error) { func (r *msgRef) done(n uint32) { ackedEvents.Add(int64(n)) r.batch = r.batch[n:] + r.win.tryGrowWindow(r.batchSize) r.dec() } @@ -172,6 +173,7 @@ func (r *msgRef) fail(n uint32, err error) { ackedEvents.Add(int64(n)) r.err = err r.batch = r.batch[n:] + r.win.shrinkWindow() r.dec() } @@ -184,10 +186,8 @@ func (r *msgRef) dec() { err := r.err if err != nil { eventsNotAcked.Add(int64(len(r.batch))) - r.win.shrinkWindow() r.cb(r.batch, err) } else { - r.win.tryGrowWindow(r.batchSize) r.cb(nil, nil) } }