Skip to content

Commit

Permalink
Logstash output configurable compression level
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Jan 5, 2016
1 parent 87d5da8 commit 2ab60ed
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 33 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
==== Added

*Affecting all Beats*
- Make logstash output compression level configurable. {pull}630[630]

*Packetbeat*

*Topbeat*
- group all cpu usage per core statistics and export them optionally if cpu_per_core is configured {pull}496[496]
- Group all cpu usage per core statistics and export them optionally if cpu_per_core is configured {pull}496[496]

*Filebeat*
- Add multiline support for combining multiple related lines into one event. {issue}461[461]
Expand Down
6 changes: 6 additions & 0 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,12 @@ The list of known Logstash servers to connect to. All entries in this list can
contain a port number. If no port number is given, the value specified for <<port>>
is used as the default port number.

===== compression_level

Set gzip compression level. Setting this value to values <=0 disables compression.
The compression level must be in the range of 1 (best-speed) to 9 (best compression).
The default value is 3.

===== worker

The number of workers per configured host publishing events to Logstash. This
Expand Down
77 changes: 60 additions & 17 deletions libbeat/outputs/logstash/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type lumberjackClient struct {
maxWindowSize int
timeout time.Duration
countTimeoutErr int
compressLevel int
}

const (
Expand All @@ -60,15 +61,38 @@ var (

func newLumberjackClient(
conn TransportClient,
compressLevel int,
maxWindowSize int,
timeout time.Duration,
) *lumberjackClient {
) (*lumberjackClient, error) {

// validate by creating and discarding zlib writer with configured level
if compressLevel > 0 {
tmp := bytes.NewBuffer(nil)
w, err := zlib.NewWriterLevel(tmp, compressLevel)
if err != nil {
return nil, err
}
w.Close()
}

return &lumberjackClient{
TransportClient: conn,
windowSize: defaultStartMaxWindowSize,
timeout: timeout,
maxWindowSize: maxWindowSize,
}
compressLevel: compressLevel,
}, nil
}

func (l *lumberjackClient) Connect(timeout time.Duration) error {
logp.Debug("logstash", "connect")
return l.TransportClient.Connect(timeout)
}

func (l *lumberjackClient) Close() error {
logp.Debug("logstash", "close connection")
return l.TransportClient.Close()
}

func (l *lumberjackClient) PublishEvent(event common.MapStr) error {
Expand Down Expand Up @@ -106,13 +130,17 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error)
return 0, nil
}

logp.Debug("logstash", "Try to publish %v events to logstash with window size %v", len(events), l.windowSize)
batchSize := len(events)
debug("Try to publish %v events to logstash with window size %v",
batchSize, l.windowSize)

// prepare message payload
if len(events) > l.windowSize {
events = events[:l.windowSize]
}
count, payload, err := l.compressEvents(events)

// serialize all raw events into output buffer, removing all events encoding failed for
count, payload, err := l.serializeEvents(events)
if err != nil {
return 0, err
}
Expand All @@ -130,8 +158,12 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error)
return l.onFail(0, err)
}

// send payload
if err = l.sendCompressed(payload); err != nil {
if l.compressLevel > 0 {
err = l.sendCompressed(payload)
} else {
_, err = l.Write(payload)
}
if err != nil {
return l.onFail(0, err)
}

Expand Down Expand Up @@ -194,29 +226,39 @@ func (l *lumberjackClient) onFail(n int, err error) (int, error) {
return n, nil
}

func (l *lumberjackClient) compressEvents(
func (l *lumberjackClient) serializeEvents(
events []common.MapStr,
) (uint32, []byte, error) {
buf := bytes.NewBuffer(nil)

// compress events
compressor, _ := zlib.NewWriterLevel(buf, 3) // todo make compression level configurable?
if l.compressLevel > 0 {
w, _ := zlib.NewWriterLevel(buf, l.compressLevel)
count, err := l.doSerializeEvents(w, events)
if err != nil {
return 0, nil, err
}
if err := w.Close(); err != nil {
debug("Finalizing zlib compression failed with: %s", err)
return 0, nil, err
}
return count, buf.Bytes(), nil
}

count, err := l.doSerializeEvents(buf, events)
return count, buf.Bytes(), err
}

func (l *lumberjackClient) doSerializeEvents(out io.Writer, events []common.MapStr) (uint32, error) {
var sequence uint32
for _, event := range events {
sequence++
err := l.writeDataFrame(event, sequence, compressor)
err := l.writeDataFrame(event, sequence, out)
if err != nil {
logp.Critical("failed to encode event: %v", err)
sequence-- //forget this last broken event and continue
}
}
if err := compressor.Close(); err != nil {
debug("Finalizing zlib compression failed with: %s", err)
return 0, nil, err
}
payload := buf.Bytes()

return sequence, payload, nil
return sequence, nil
}

func (l *lumberjackClient) readACK() (uint32, error) {
Expand Down Expand Up @@ -256,6 +298,7 @@ func (l *lumberjackClient) sendCompressed(payload []byte) error {
if err := l.SetDeadline(time.Now().Add(l.timeout)); err != nil {
return err
}

if _, err := l.Write(codeCompressed); err != nil {
return err
}
Expand Down
17 changes: 11 additions & 6 deletions libbeat/outputs/logstash/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ type mockTransport struct {
control chan mockTransportCommand
}

func newLumberjackTestClient(conn TransportClient) *lumberjackClient {
c, err := newLumberjackClient(conn, 3, testMaxWindowSize, 5*time.Second)
if err != nil {
panic(err)
}
return c
}

func newClientTestDriver(client mode.ProtocolClient) *testClientDriver {
driver := &testClientDriver{
client: client,
Expand Down Expand Up @@ -300,8 +308,7 @@ const testMaxWindowSize = 64

func TestSendZero(t *testing.T) {
transp := newMockTransport()
client := newClientTestDriver(
newLumberjackClient(transp, testMaxWindowSize, 5*time.Second))
client := newClientTestDriver(newLumberjackTestClient(transp))

client.Publish(make([]common.MapStr, 0))

Expand All @@ -315,8 +322,7 @@ func TestSendZero(t *testing.T) {

func TestSimpleEvent(t *testing.T) {
transp := newMockTransport()
client := newClientTestDriver(
newLumberjackClient(transp, testMaxWindowSize, 5*time.Second))
client := newClientTestDriver(newLumberjackTestClient(transp))

event := common.MapStr{"name": "me", "line": 10}
client.Publish([]common.MapStr{event})
Expand Down Expand Up @@ -348,8 +354,7 @@ func TestSimpleEvent(t *testing.T) {

func TestStructuredEvent(t *testing.T) {
transp := newMockTransport()
client := newClientTestDriver(
newLumberjackClient(transp, testMaxWindowSize, 5*time.Second))
client := newClientTestDriver(newLumberjackTestClient(transp))
event := common.MapStr{
"name": "test",
"struct": common.MapStr{
Expand Down
33 changes: 24 additions & 9 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
logstasDefaultMaxTimeout = 90 * time.Second
defaultSendRetries = 3
defaultMaxWindowSize = 1024
defaultCompressionLevel = 3
)

var waitRetry = time.Duration(1) * time.Second
Expand Down Expand Up @@ -72,6 +73,11 @@ func (lj *logstash) init(
maxWindowSize = *config.BulkMaxSize
}

compressLevel := defaultCompressionLevel
if config.CompressionLevel != nil {
compressLevel = *config.CompressionLevel
}

var clients []mode.ProtocolClient
var err error
if useTLS {
Expand All @@ -82,16 +88,12 @@ func (lj *logstash) init(
}

clients, err = mode.MakeClients(config,
makeClientFactory(maxWindowSize, timeout,
func(host string) (TransportClient, error) {
return newTLSClient(host, defaultPort, tlsConfig)
}))
makeClientFactory(maxWindowSize, compressLevel, timeout,
makeTLSClient(defaultPort, tlsConfig)))
} else {
clients, err = mode.MakeClients(config,
makeClientFactory(maxWindowSize, timeout,
func(host string) (TransportClient, error) {
return newTCPClient(host, defaultPort)
}))
makeClientFactory(maxWindowSize, compressLevel, timeout,
makeTCPClient(defaultPort)))
}
if err != nil {
return err
Expand Down Expand Up @@ -133,6 +135,7 @@ func (lj *logstash) init(

func makeClientFactory(
maxWindowSize int,
compressLevel int,
timeout time.Duration,
makeTransp func(string) (TransportClient, error),
) func(string) (mode.ProtocolClient, error) {
Expand All @@ -141,7 +144,19 @@ func makeClientFactory(
if err != nil {
return nil, err
}
return newLumberjackClient(transp, maxWindowSize, timeout), nil
return newLumberjackClient(transp, compressLevel, maxWindowSize, timeout)
}
}

func makeTCPClient(port int) func(string) (TransportClient, error) {
return func(host string) (TransportClient, error) {
return newTCPClient(host, port)
}
}

func makeTLSClient(port int, tls *tls.Config) func(string) (TransportClient, error) {
return func(host string) (TransportClient, error) {
return newTLSClient(host, port, tls)
}
}

Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type MothershipConfig struct {
Pretty *bool `yaml:"pretty"`
TLS *TLSConfig
Worker int
CompressionLevel *int `yaml:"compression_level"`
}

type Outputer interface {
Expand Down

0 comments on commit 2ab60ed

Please sign in to comment.