Skip to content

Commit

Permalink
Merge pull request #598 from urso/enh/logstash-output-win-size
Browse files Browse the repository at this point in the history
Logstash client window size enhancement
  • Loading branch information
ruflin committed Dec 30, 2015
2 parents 604615e + b9ac89c commit f4b25bb
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
==== Bugfixes

*Affecting all Beats*
- Fix logstash window size of 1 not increasing. {pull}598[598]

*Packetbeat*
- Fix setting direction to out and use its value to decide when dropping events if ignore_outgoing is enabled {pull}557[557]
Expand Down
79 changes: 56 additions & 23 deletions libbeat/outputs/logstash/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"expvar"
"io"
"math"
"net"
"time"

Expand Down Expand Up @@ -71,6 +72,16 @@ func newLumberjackClient(
}
}

func (l *lumberjackClient) Connect(timeout time.Duration) error {
logp.Debug("logstash", "connect")
return l.TransportClient.Connect(timeout)
}

func (l *lumberjackClient) Close() error {
logp.Debug("logstash", "close connection")
return l.TransportClient.Close()
}

func (l *lumberjackClient) PublishEvent(event common.MapStr) error {
_, err := l.PublishEvents([]common.MapStr{event})
return err
Expand Down Expand Up @@ -106,10 +117,12 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error)
return 0, nil
}

logp.Debug("logstash", "Try to publish %v events to logstash with window size %v", len(events), l.windowSize)
batchSize := len(events)

logp.Debug("logstash", "Try to publish %v events to logstash with window size %v", batchSize, l.windowSize)

// prepare message payload
if len(events) > l.windowSize {
if batchSize > l.windowSize {
events = events[:l.windowSize]
}
count, payload, err := l.compressEvents(events)
Expand Down Expand Up @@ -146,29 +159,13 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error)
}
}

// success: increase window size by factor 1.5 until max window size
// (window size grows exponentially)
// TODO: use duration until ACK to estimate an ok max window size value
if l.maxOkWindowSize < l.windowSize {
l.maxOkWindowSize = l.windowSize

if l.windowSize < l.maxWindowSize {
l.windowSize = l.windowSize + l.windowSize/2
if l.windowSize > l.maxWindowSize {
l.windowSize = l.maxWindowSize
}
}
} else if l.windowSize < l.maxOkWindowSize {
l.windowSize = l.windowSize + l.windowSize/2
if l.windowSize > l.maxOkWindowSize {
l.windowSize = l.maxOkWindowSize
}
}

l.tryGrowWindowSize(batchSize)
return len(events), nil
}

func (l *lumberjackClient) onFail(n int, err error) (int, error) {
l.shrinkWindow()

// if timeout error, back off and ignore error
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
Expand All @@ -184,14 +181,50 @@ func (l *lumberjackClient) onFail(n int, err error) (int, error) {
return n, err
}

// timeout error. reduce window size and return 0 published events. Send
// timeout error. events. Send
// mode might try to publish again with reduce window size or ask another
// client to send events
return n, nil
}

// Increase window size by factor 1.5 until max window size
// (window size grows exponentially)
// TODO: use duration until ACK to estimate an ok max window size value
func (l *lumberjackClient) tryGrowWindowSize(batchSize int) {
if l.windowSize <= batchSize {
if l.maxOkWindowSize < l.windowSize {
logp.Debug("logstash", "update max ok window size: %v < %v", l.maxOkWindowSize, l.windowSize)
l.maxOkWindowSize = l.windowSize

newWindowSize := int(math.Ceil(1.5 * float64(l.windowSize)))
logp.Debug("logstash", "increase window size to: %v", newWindowSize)

if l.windowSize <= batchSize && batchSize < newWindowSize {
logp.Debug("logstash", "set to batchSize: %v", batchSize)
newWindowSize = batchSize
}
if newWindowSize > l.maxWindowSize {
logp.Debug("logstash", "set to max window size: %v", l.maxWindowSize)
newWindowSize = l.maxWindowSize
}
l.windowSize = newWindowSize
} else if l.windowSize < l.maxOkWindowSize {
logp.Debug("logstash", "update current window size: %v", l.windowSize)

l.windowSize = int(math.Ceil(1.5 * float64(l.windowSize)))
if l.windowSize > l.maxOkWindowSize {
logp.Debug("logstash", "set to max ok window size: %v", l.maxOkWindowSize)
l.windowSize = l.maxOkWindowSize
}
}
}
}

func (l *lumberjackClient) shrinkWindow() {
l.windowSize = l.windowSize / 2
if l.windowSize < minWindowSize {
l.windowSize = minWindowSize
}
return n, nil
}

func (l *lumberjackClient) compressEvents(
Expand Down
58 changes: 58 additions & 0 deletions libbeat/outputs/logstash/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/streambuf"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/mode"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -392,3 +393,60 @@ func TestStructuredEvent(t *testing.T) {
assert.Equal(t, true, msg.doc.get("struct.field2"))
assert.Equal(t, 2.0, msg.doc.get("struct.field5.sub1"))
}

func enableLogging(selectors []string) {
logp.LogInit(logp.LOG_DEBUG, "", false, true, selectors)
}

func TestGrowWindowSizeUpToBatchSizes(t *testing.T) {
batchSize := 114
windowSize := 1024
testGrowWindowSize(t, 10, 0, windowSize, batchSize, batchSize)
}

func TestGrowWindowSizeUpToMax(t *testing.T) {
batchSize := 114
windowSize := 64
testGrowWindowSize(t, 10, 0, windowSize, batchSize, windowSize)
}

func TestGrowWindowSizeOf1(t *testing.T) {
batchSize := 114
windowSize := 1024
testGrowWindowSize(t, 1, 0, windowSize, batchSize, batchSize)
}

func TestGrowWindowSizeToMaxOKOnly(t *testing.T) {
batchSize := 114
windowSize := 1024
maxOK := 71
testGrowWindowSize(t, 1, maxOK, windowSize, batchSize, maxOK)
}

func testGrowWindowSize(t *testing.T,
initial, maxOK, windowSize, batchSize, expected int,
) {
enableLogging([]string{"logstash"})
c := newLumberjackClient(nil, windowSize, 1*time.Second)
c.windowSize = initial
c.maxOkWindowSize = maxOK
for i := 0; i < 100; i++ {
c.tryGrowWindowSize(batchSize)
}

assert.Equal(t, expected, c.windowSize)
assert.Equal(t, expected, c.maxOkWindowSize)
}

func TestShrinkWindowSizeNeverZero(t *testing.T) {
enableLogging([]string{"logstash"})

windowSize := 124
c := newLumberjackClient(nil, windowSize, 1*time.Second)
c.windowSize = windowSize
for i := 0; i < 100; i++ {
c.shrinkWindow()
}

assert.Equal(t, 1, c.windowSize)
}

0 comments on commit f4b25bb

Please sign in to comment.