Skip to content

Commit

Permalink
Fix logstash output panic without hosts (#2326)
Browse files Browse the repository at this point in the history
- Fix logstash output configuration test if no hosts are configured
  • Loading branch information
Steffen Siering authored and ruflin committed Aug 22, 2016
1 parent b51f0b1 commit 0f104d0
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 48 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Affecting all Beats*
- Fix Elasticsearch structured error response parsing error. {issue}2229[2229]

- Fixed the run script to allow the overriding of the configuration file. {issue}2171[2171]
- Fix logstash output crash if no hosts are configured. {issue}2325[2325]

*Metricbeat*
- Fix module filters to work properly with drop_event filter. {issue}2249[2249]
Expand Down
9 changes: 7 additions & 2 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,13 @@ func (out *elasticsearchOutput) init(

out.clients = clients
loadBalance := config.LoadBalance
m, err := modeutil.NewConnectionMode(clients, !loadBalance,
maxAttempts, waitRetry, config.Timeout, maxWaitRetry)
m, err := modeutil.NewConnectionMode(clients, modeutil.Settings{
Failover: !loadBalance,
MaxAttempts: maxAttempts,
Timeout: config.Timeout,
WaitRetry: waitRetry,
MaxWaitRetry: maxWaitRetry,
})
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ func (k *kafka) initMode(guaranteed bool) (mode.ConnectionMode, error) {
maxAttempts = 0
}

mode, err := modeutil.NewAsyncConnectionMode(
clients,
false,
maxAttempts,
defaultWaitRetry,
libCfg.Net.WriteTimeout,
defaultMaxWaitRetry)
mode, err := modeutil.NewAsyncConnectionMode(clients, modeutil.Settings{
Failover: false,
MaxAttempts: maxAttempts,
WaitRetry: defaultWaitRetry,
Timeout: libCfg.Net.WriteTimeout,
MaxWaitRetry: defaultMaxWaitRetry,
})
if err != nil {
logp.Err("Failed to configure kafka connection: %v", err)
return nil, err
Expand Down
58 changes: 36 additions & 22 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,6 @@ func (lj *logstash) init(cfg *common.Config) error {
return err
}

sendRetries := config.MaxRetries
maxAttempts := sendRetries + 1
if sendRetries < 0 {
maxAttempts = 0
}

tls, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return err
Expand All @@ -93,22 +87,8 @@ func (lj *logstash) init(cfg *common.Config) error {
},
}

logp.Info("Max Retries set to: %v", sendRetries)
var m mode.ConnectionMode
if config.Pipelining == 0 {
clients, err := modeutil.MakeClients(cfg, makeClientFactory(&config, transp))
if err == nil {
m, err = modeutil.NewConnectionMode(clients, !config.LoadBalance,
maxAttempts, defaultWaitRetry, config.Timeout, defaultMaxWaitRetry)
}
} else {
clients, err := modeutil.MakeAsyncClients(cfg,
makeAsyncClientFactory(&config, transp))
if err == nil {
m, err = modeutil.NewAsyncConnectionMode(clients, !config.LoadBalance,
maxAttempts, defaultWaitRetry, config.Timeout, defaultMaxWaitRetry)
}
}
logp.Info("Max Retries set to: %v", config.MaxRetries)
m, err := initConnectionMode(cfg, &config, transp)
if err != nil {
return err
}
Expand All @@ -119,6 +99,40 @@ func (lj *logstash) init(cfg *common.Config) error {
return nil
}

func initConnectionMode(
cfg *common.Config,
config *logstashConfig,
transp *transport.Config,
) (mode.ConnectionMode, error) {
sendRetries := config.MaxRetries
maxAttempts := sendRetries + 1
if sendRetries < 0 {
maxAttempts = 0
}

settings := modeutil.Settings{
Failover: !config.LoadBalance,
MaxAttempts: maxAttempts,
Timeout: config.Timeout,
WaitRetry: defaultWaitRetry,
MaxWaitRetry: defaultMaxWaitRetry,
}

if config.Pipelining == 0 {
clients, err := modeutil.MakeClients(cfg, makeClientFactory(config, transp))
if err != nil {
return nil, err
}
return modeutil.NewConnectionMode(clients, settings)
}

clients, err := modeutil.MakeAsyncClients(cfg, makeAsyncClientFactory(config, transp))
if err != nil {
return nil, err
}
return modeutil.NewAsyncConnectionMode(clients, settings)
}

func makeClientFactory(
cfg *logstashConfig,
tcfg *transport.Config,
Expand Down
35 changes: 22 additions & 13 deletions libbeat/outputs/mode/modeutil/modeutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,41 @@ type ClientFactory func(host string) (mode.ProtocolClient, error)

type AsyncClientFactory func(string) (mode.AsyncProtocolClient, error)

type Settings struct {
Failover bool
MaxAttempts int
WaitRetry time.Duration
Timeout time.Duration
MaxWaitRetry time.Duration
}

func NewConnectionMode(
clients []mode.ProtocolClient,
failover bool,
maxAttempts int,
waitRetry, timeout, maxWaitRetry time.Duration,
s Settings,
) (mode.ConnectionMode, error) {
if failover {
if s.Failover {
clients = NewFailoverClient(clients)
}

maxSend := s.MaxAttempts
wait := s.WaitRetry
maxWait := s.MaxWaitRetry
to := s.Timeout

if len(clients) == 1 {
return single.New(clients[0], maxAttempts, waitRetry, timeout, maxWaitRetry)
return single.New(clients[0], maxSend, wait, to, maxWait)
}
return lb.NewSync(clients, maxAttempts, waitRetry, timeout, maxWaitRetry)
return lb.NewSync(clients, maxSend, wait, to, maxWait)
}

func NewAsyncConnectionMode(
clients []mode.AsyncProtocolClient,
failover bool,
maxAttempts int,
waitRetry, timeout, maxWaitRetry time.Duration,
s Settings,
) (mode.ConnectionMode, error) {
if failover {
if s.Failover {
clients = NewAsyncFailoverClient(clients)
}
return lb.NewAsync(clients, maxAttempts, waitRetry, timeout, maxWaitRetry)
return lb.NewAsync(clients, s.MaxAttempts, s.WaitRetry, s.Timeout, s.MaxWaitRetry)
}

// MakeClients will create a list from of ProtocolClient instances from
Expand Down Expand Up @@ -99,8 +108,8 @@ func MakeAsyncClients(

func ReadHostList(cfg *common.Config) ([]string, error) {
config := struct {
Hosts []string `config:"hosts"`
Worker int `config:"worker"`
Hosts []string `config:"hosts" validate:"required"`
Worker int `config:"worker" validate:"min=1"`
}{
Worker: 1,
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/mode/modeutil/modeutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func makeTestClients(c map[string]interface{},
func TestMakeEmptyClientFail(t *testing.T) {
config := map[string]interface{}{}
clients, err := makeTestClients(config, dummyMockClientFactory)
assert.Equal(t, mode.ErrNoHostsConfigured, err)
assert.Error(t, err)
assert.Equal(t, 0, len(clients))
}

Expand Down
9 changes: 7 additions & 2 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,13 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
}

logp.Info("Max Retries set to: %v", sendRetries)
m, err := modeutil.NewConnectionMode(clients, !config.LoadBalance,
maxAttempts, defaultWaitRetry, config.Timeout, defaultMaxWaitRetry)
m, err := modeutil.NewConnectionMode(clients, modeutil.Settings{
Failover: !config.LoadBalance,
MaxAttempts: maxAttempts,
Timeout: config.Timeout,
WaitRetry: defaultWaitRetry,
MaxWaitRetry: defaultMaxWaitRetry,
})
if err != nil {
return err
}
Expand Down

0 comments on commit 0f104d0

Please sign in to comment.