From 006fd05214c139d1a126423a7530429751bf3d66 Mon Sep 17 00:00:00 2001 From: urso Date: Sat, 20 Aug 2016 13:03:00 +0200 Subject: [PATCH] Fix logstash output panic without hosts - Fix logstash output configuration test if no hosts are configured --- CHANGELOG.asciidoc | 2 +- libbeat/outputs/elasticsearch/output.go | 9 ++- libbeat/outputs/kafka/kafka.go | 14 ++--- libbeat/outputs/logstash/logstash.go | 58 ++++++++++++------- libbeat/outputs/mode/modeutil/modeutil.go | 35 ++++++----- .../outputs/mode/modeutil/modeutil_test.go | 2 +- libbeat/outputs/redis/redis.go | 9 ++- 7 files changed, 81 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 96fbb7f1665..0f294da5897 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -37,8 +37,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d *Affecting all Beats* - Fix Elasticsearch structured error response parsing error. {issue}2229[2229] - - Fixed the run script to allow the overriding of the configuration file. {issue}2171[2171] +- Fix logstash output crash if no hosts are configured. {issue}2325[2325] *Metricbeat* - Fix module filters to work properly with drop_event filter. {issue}2249[2249] diff --git a/libbeat/outputs/elasticsearch/output.go b/libbeat/outputs/elasticsearch/output.go index 272e164a0b7..4e49a362eff 100644 --- a/libbeat/outputs/elasticsearch/output.go +++ b/libbeat/outputs/elasticsearch/output.go @@ -132,8 +132,13 @@ func (out *elasticsearchOutput) init( out.clients = clients loadBalance := config.LoadBalance - m, err := modeutil.NewConnectionMode(clients, !loadBalance, - maxAttempts, waitRetry, config.Timeout, maxWaitRetry) + m, err := modeutil.NewConnectionMode(clients, modeutil.Settings{ + Failover: !loadBalance, + MaxAttempts: maxAttempts, + Timeout: config.Timeout, + WaitRetry: waitRetry, + MaxWaitRetry: maxWaitRetry, + }) if err != nil { return err } diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 9d9613f657b..0e3322de1fb 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -143,13 +143,13 @@ func (k *kafka) initMode(guaranteed bool) (mode.ConnectionMode, error) { maxAttempts = 0 } - mode, err := modeutil.NewAsyncConnectionMode( - clients, - false, - maxAttempts, - defaultWaitRetry, - libCfg.Net.WriteTimeout, - defaultMaxWaitRetry) + mode, err := modeutil.NewAsyncConnectionMode(clients, modeutil.Settings{ + Failover: false, + MaxAttempts: maxAttempts, + WaitRetry: defaultWaitRetry, + Timeout: libCfg.Net.WriteTimeout, + MaxWaitRetry: defaultMaxWaitRetry, + }) if err != nil { logp.Err("Failed to configure kafka connection: %v", err) return nil, err diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 1f581e198b4..04837425f27 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -70,12 +70,6 @@ func (lj *logstash) init(cfg *common.Config) error { return err } - sendRetries := config.MaxRetries - maxAttempts := sendRetries + 1 - if sendRetries < 0 { - maxAttempts = 0 - } - tls, err := outputs.LoadTLSConfig(config.TLS) if err != nil { return err @@ -93,22 +87,8 @@ func (lj *logstash) init(cfg *common.Config) error { }, } - logp.Info("Max Retries set to: %v", sendRetries) - var m mode.ConnectionMode - if config.Pipelining == 0 { - clients, err := modeutil.MakeClients(cfg, makeClientFactory(&config, transp)) - if err == nil { - m, err = modeutil.NewConnectionMode(clients, !config.LoadBalance, - maxAttempts, defaultWaitRetry, config.Timeout, defaultMaxWaitRetry) - } - } else { - clients, err := modeutil.MakeAsyncClients(cfg, - makeAsyncClientFactory(&config, transp)) - if err == nil { - m, err = modeutil.NewAsyncConnectionMode(clients, !config.LoadBalance, - maxAttempts, defaultWaitRetry, config.Timeout, defaultMaxWaitRetry) - } - } + logp.Info("Max Retries set to: %v", config.MaxRetries) + m, err := initConnectionMode(cfg, &config, transp) if err != nil { return err } @@ -119,6 +99,40 @@ func (lj *logstash) init(cfg *common.Config) error { return nil } +func initConnectionMode( + cfg *common.Config, + config *logstashConfig, + transp *transport.Config, +) (mode.ConnectionMode, error) { + sendRetries := config.MaxRetries + maxAttempts := sendRetries + 1 + if sendRetries < 0 { + maxAttempts = 0 + } + + settings := modeutil.Settings{ + Failover: !config.LoadBalance, + MaxAttempts: maxAttempts, + Timeout: config.Timeout, + WaitRetry: defaultWaitRetry, + MaxWaitRetry: defaultMaxWaitRetry, + } + + if config.Pipelining == 0 { + clients, err := modeutil.MakeClients(cfg, makeClientFactory(config, transp)) + if err != nil { + return nil, err + } + return modeutil.NewConnectionMode(clients, settings) + } + + clients, err := modeutil.MakeAsyncClients(cfg, makeAsyncClientFactory(config, transp)) + if err != nil { + return nil, err + } + return modeutil.NewAsyncConnectionMode(clients, settings) +} + func makeClientFactory( cfg *logstashConfig, tcfg *transport.Config, diff --git a/libbeat/outputs/mode/modeutil/modeutil.go b/libbeat/outputs/mode/modeutil/modeutil.go index 925d6fc897f..6663141a800 100644 --- a/libbeat/outputs/mode/modeutil/modeutil.go +++ b/libbeat/outputs/mode/modeutil/modeutil.go @@ -13,32 +13,41 @@ type ClientFactory func(host string) (mode.ProtocolClient, error) type AsyncClientFactory func(string) (mode.AsyncProtocolClient, error) +type Settings struct { + Failover bool + MaxAttempts int + WaitRetry time.Duration + Timeout time.Duration + MaxWaitRetry time.Duration +} + func NewConnectionMode( clients []mode.ProtocolClient, - failover bool, - maxAttempts int, - waitRetry, timeout, maxWaitRetry time.Duration, + s Settings, ) (mode.ConnectionMode, error) { - if failover { + if s.Failover { clients = NewFailoverClient(clients) } + maxSend := s.MaxAttempts + wait := s.WaitRetry + maxWait := s.MaxWaitRetry + to := s.Timeout + if len(clients) == 1 { - return single.New(clients[0], maxAttempts, waitRetry, timeout, maxWaitRetry) + return single.New(clients[0], maxSend, wait, to, maxWait) } - return lb.NewSync(clients, maxAttempts, waitRetry, timeout, maxWaitRetry) + return lb.NewSync(clients, maxSend, wait, to, maxWait) } func NewAsyncConnectionMode( clients []mode.AsyncProtocolClient, - failover bool, - maxAttempts int, - waitRetry, timeout, maxWaitRetry time.Duration, + s Settings, ) (mode.ConnectionMode, error) { - if failover { + if s.Failover { clients = NewAsyncFailoverClient(clients) } - return lb.NewAsync(clients, maxAttempts, waitRetry, timeout, maxWaitRetry) + return lb.NewAsync(clients, s.MaxAttempts, s.WaitRetry, s.Timeout, s.MaxWaitRetry) } // MakeClients will create a list from of ProtocolClient instances from @@ -99,8 +108,8 @@ func MakeAsyncClients( func ReadHostList(cfg *common.Config) ([]string, error) { config := struct { - Hosts []string `config:"hosts"` - Worker int `config:"worker"` + Hosts []string `config:"hosts" validate:"required"` + Worker int `config:"worker" validate:"min=1"` }{ Worker: 1, } diff --git a/libbeat/outputs/mode/modeutil/modeutil_test.go b/libbeat/outputs/mode/modeutil/modeutil_test.go index 1aecf269d17..6a74a79d237 100644 --- a/libbeat/outputs/mode/modeutil/modeutil_test.go +++ b/libbeat/outputs/mode/modeutil/modeutil_test.go @@ -34,7 +34,7 @@ func makeTestClients(c map[string]interface{}, func TestMakeEmptyClientFail(t *testing.T) { config := map[string]interface{}{} clients, err := makeTestClients(config, dummyMockClientFactory) - assert.Equal(t, mode.ErrNoHostsConfigured, err) + assert.Error(t, err) assert.Equal(t, 0, len(clients)) } diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 52f0ba2927e..3d0b988ec56 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -131,8 +131,13 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error { } logp.Info("Max Retries set to: %v", sendRetries) - m, err := modeutil.NewConnectionMode(clients, !config.LoadBalance, - maxAttempts, defaultWaitRetry, config.Timeout, defaultMaxWaitRetry) + m, err := modeutil.NewConnectionMode(clients, modeutil.Settings{ + Failover: !config.LoadBalance, + MaxAttempts: maxAttempts, + Timeout: config.Timeout, + WaitRetry: defaultWaitRetry, + MaxWaitRetry: defaultMaxWaitRetry, + }) if err != nil { return err }