From 2ab60ede1ac5e7b6648fdbf1c1c89d5efb42b6ce Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 24 Dec 2015 03:03:44 +0100 Subject: [PATCH] Logstash output configurable compression level --- CHANGELOG.asciidoc | 3 +- libbeat/docs/outputconfig.asciidoc | 6 ++ libbeat/outputs/logstash/client.go | 77 +++++++++++++++++++------ libbeat/outputs/logstash/client_test.go | 17 ++++-- libbeat/outputs/logstash/logstash.go | 33 ++++++++--- libbeat/outputs/outputs.go | 1 + 6 files changed, 104 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index d8f90211197..5b8ac6928eb 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -41,11 +41,12 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] ==== Added *Affecting all Beats* +- Make logstash output compression level configurable. {pull}630[630] *Packetbeat* *Topbeat* -- group all cpu usage per core statistics and export them optionally if cpu_per_core is configured {pull}496[496] +- Group all cpu usage per core statistics and export them optionally if cpu_per_core is configured {pull}496[496] *Filebeat* - Add multiline support for combining multiple related lines into one event. {issue}461[461] diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index ff092600684..6e740103aec 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -326,6 +326,12 @@ The list of known Logstash servers to connect to. All entries in this list can contain a port number. If no port number is given, the value specified for <> is used as the default port number. +===== compression_level + +Set gzip compression level. Setting this value to values <=0 disables compression. +The compression level must be in the range of 1 (best-speed) to 9 (best compression). +The default value is 3. + ===== worker The number of workers per configured host publishing events to Logstash. This diff --git a/libbeat/outputs/logstash/client.go b/libbeat/outputs/logstash/client.go index 38bfbaa3709..01b5e8d3aed 100644 --- a/libbeat/outputs/logstash/client.go +++ b/libbeat/outputs/logstash/client.go @@ -35,6 +35,7 @@ type lumberjackClient struct { maxWindowSize int timeout time.Duration countTimeoutErr int + compressLevel int } const ( @@ -60,15 +61,38 @@ var ( func newLumberjackClient( conn TransportClient, + compressLevel int, maxWindowSize int, timeout time.Duration, -) *lumberjackClient { +) (*lumberjackClient, error) { + + // validate by creating and discarding zlib writer with configured level + if compressLevel > 0 { + tmp := bytes.NewBuffer(nil) + w, err := zlib.NewWriterLevel(tmp, compressLevel) + if err != nil { + return nil, err + } + w.Close() + } + return &lumberjackClient{ TransportClient: conn, windowSize: defaultStartMaxWindowSize, timeout: timeout, maxWindowSize: maxWindowSize, - } + compressLevel: compressLevel, + }, nil +} + +func (l *lumberjackClient) Connect(timeout time.Duration) error { + logp.Debug("logstash", "connect") + return l.TransportClient.Connect(timeout) +} + +func (l *lumberjackClient) Close() error { + logp.Debug("logstash", "close connection") + return l.TransportClient.Close() } func (l *lumberjackClient) PublishEvent(event common.MapStr) error { @@ -106,13 +130,17 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error) return 0, nil } - logp.Debug("logstash", "Try to publish %v events to logstash with window size %v", len(events), l.windowSize) + batchSize := len(events) + debug("Try to publish %v events to logstash with window size %v", + batchSize, l.windowSize) // prepare message payload if len(events) > l.windowSize { events = events[:l.windowSize] } - count, payload, err := l.compressEvents(events) + + // serialize all raw events into output buffer, removing all events encoding failed for + count, payload, err := l.serializeEvents(events) if err != nil { return 0, err } @@ -130,8 +158,12 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error) return l.onFail(0, err) } - // send payload - if err = l.sendCompressed(payload); err != nil { + if l.compressLevel > 0 { + err = l.sendCompressed(payload) + } else { + _, err = l.Write(payload) + } + if err != nil { return l.onFail(0, err) } @@ -194,29 +226,39 @@ func (l *lumberjackClient) onFail(n int, err error) (int, error) { return n, nil } -func (l *lumberjackClient) compressEvents( +func (l *lumberjackClient) serializeEvents( events []common.MapStr, ) (uint32, []byte, error) { buf := bytes.NewBuffer(nil) - // compress events - compressor, _ := zlib.NewWriterLevel(buf, 3) // todo make compression level configurable? + if l.compressLevel > 0 { + w, _ := zlib.NewWriterLevel(buf, l.compressLevel) + count, err := l.doSerializeEvents(w, events) + if err != nil { + return 0, nil, err + } + if err := w.Close(); err != nil { + debug("Finalizing zlib compression failed with: %s", err) + return 0, nil, err + } + return count, buf.Bytes(), nil + } + + count, err := l.doSerializeEvents(buf, events) + return count, buf.Bytes(), err +} + +func (l *lumberjackClient) doSerializeEvents(out io.Writer, events []common.MapStr) (uint32, error) { var sequence uint32 for _, event := range events { sequence++ - err := l.writeDataFrame(event, sequence, compressor) + err := l.writeDataFrame(event, sequence, out) if err != nil { logp.Critical("failed to encode event: %v", err) sequence-- //forget this last broken event and continue } } - if err := compressor.Close(); err != nil { - debug("Finalizing zlib compression failed with: %s", err) - return 0, nil, err - } - payload := buf.Bytes() - - return sequence, payload, nil + return sequence, nil } func (l *lumberjackClient) readACK() (uint32, error) { @@ -256,6 +298,7 @@ func (l *lumberjackClient) sendCompressed(payload []byte) error { if err := l.SetDeadline(time.Now().Add(l.timeout)); err != nil { return err } + if _, err := l.Write(codeCompressed); err != nil { return err } diff --git a/libbeat/outputs/logstash/client_test.go b/libbeat/outputs/logstash/client_test.go index 8f548b2d306..dd92dce89ac 100644 --- a/libbeat/outputs/logstash/client_test.go +++ b/libbeat/outputs/logstash/client_test.go @@ -63,6 +63,14 @@ type mockTransport struct { control chan mockTransportCommand } +func newLumberjackTestClient(conn TransportClient) *lumberjackClient { + c, err := newLumberjackClient(conn, 3, testMaxWindowSize, 5*time.Second) + if err != nil { + panic(err) + } + return c +} + func newClientTestDriver(client mode.ProtocolClient) *testClientDriver { driver := &testClientDriver{ client: client, @@ -300,8 +308,7 @@ const testMaxWindowSize = 64 func TestSendZero(t *testing.T) { transp := newMockTransport() - client := newClientTestDriver( - newLumberjackClient(transp, testMaxWindowSize, 5*time.Second)) + client := newClientTestDriver(newLumberjackTestClient(transp)) client.Publish(make([]common.MapStr, 0)) @@ -315,8 +322,7 @@ func TestSendZero(t *testing.T) { func TestSimpleEvent(t *testing.T) { transp := newMockTransport() - client := newClientTestDriver( - newLumberjackClient(transp, testMaxWindowSize, 5*time.Second)) + client := newClientTestDriver(newLumberjackTestClient(transp)) event := common.MapStr{"name": "me", "line": 10} client.Publish([]common.MapStr{event}) @@ -348,8 +354,7 @@ func TestSimpleEvent(t *testing.T) { func TestStructuredEvent(t *testing.T) { transp := newMockTransport() - client := newClientTestDriver( - newLumberjackClient(transp, testMaxWindowSize, 5*time.Second)) + client := newClientTestDriver(newLumberjackTestClient(transp)) event := common.MapStr{ "name": "test", "struct": common.MapStr{ diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 5db95330c9c..4e1ca1d82ab 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -45,6 +45,7 @@ const ( logstasDefaultMaxTimeout = 90 * time.Second defaultSendRetries = 3 defaultMaxWindowSize = 1024 + defaultCompressionLevel = 3 ) var waitRetry = time.Duration(1) * time.Second @@ -72,6 +73,11 @@ func (lj *logstash) init( maxWindowSize = *config.BulkMaxSize } + compressLevel := defaultCompressionLevel + if config.CompressionLevel != nil { + compressLevel = *config.CompressionLevel + } + var clients []mode.ProtocolClient var err error if useTLS { @@ -82,16 +88,12 @@ func (lj *logstash) init( } clients, err = mode.MakeClients(config, - makeClientFactory(maxWindowSize, timeout, - func(host string) (TransportClient, error) { - return newTLSClient(host, defaultPort, tlsConfig) - })) + makeClientFactory(maxWindowSize, compressLevel, timeout, + makeTLSClient(defaultPort, tlsConfig))) } else { clients, err = mode.MakeClients(config, - makeClientFactory(maxWindowSize, timeout, - func(host string) (TransportClient, error) { - return newTCPClient(host, defaultPort) - })) + makeClientFactory(maxWindowSize, compressLevel, timeout, + makeTCPClient(defaultPort))) } if err != nil { return err @@ -133,6 +135,7 @@ func (lj *logstash) init( func makeClientFactory( maxWindowSize int, + compressLevel int, timeout time.Duration, makeTransp func(string) (TransportClient, error), ) func(string) (mode.ProtocolClient, error) { @@ -141,7 +144,19 @@ func makeClientFactory( if err != nil { return nil, err } - return newLumberjackClient(transp, maxWindowSize, timeout), nil + return newLumberjackClient(transp, compressLevel, maxWindowSize, timeout) + } +} + +func makeTCPClient(port int) func(string) (TransportClient, error) { + return func(host string) (TransportClient, error) { + return newTCPClient(host, port) + } +} + +func makeTLSClient(port int, tls *tls.Config) func(string) (TransportClient, error) { + return func(host string) (TransportClient, error) { + return newTLSClient(host, port, tls) } } diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index f76d0ec2c8d..0570dba6d80 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -33,6 +33,7 @@ type MothershipConfig struct { Pretty *bool `yaml:"pretty"` TLS *TLSConfig Worker int + CompressionLevel *int `yaml:"compression_level"` } type Outputer interface {