Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logstash output panic without hosts #2326

Merged
merged 1 commit into from
Aug 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Affecting all Beats*
- Fix Elasticsearch structured error response parsing error. {issue}2229[2229]

- Fixed the run script to allow the overriding of the configuration file. {issue}2171[2171]
- Fix logstash output crash if no hosts are configured. {issue}2325[2325]

*Metricbeat*
- Fix module filters to work properly with drop_event filter. {issue}2249[2249]
Expand Down
9 changes: 7 additions & 2 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,13 @@ func (out *elasticsearchOutput) init(

out.clients = clients
loadBalance := config.LoadBalance
m, err := modeutil.NewConnectionMode(clients, !loadBalance,
maxAttempts, waitRetry, config.Timeout, maxWaitRetry)
m, err := modeutil.NewConnectionMode(clients, modeutil.Settings{
Failover: !loadBalance,
MaxAttempts: maxAttempts,
Timeout: config.Timeout,
WaitRetry: waitRetry,
MaxWaitRetry: maxWaitRetry,
})
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ func (k *kafka) initMode(guaranteed bool) (mode.ConnectionMode, error) {
maxAttempts = 0
}

mode, err := modeutil.NewAsyncConnectionMode(
clients,
false,
maxAttempts,
defaultWaitRetry,
libCfg.Net.WriteTimeout,
defaultMaxWaitRetry)
mode, err := modeutil.NewAsyncConnectionMode(clients, modeutil.Settings{
Failover: false,
MaxAttempts: maxAttempts,
WaitRetry: defaultWaitRetry,
Timeout: libCfg.Net.WriteTimeout,
MaxWaitRetry: defaultMaxWaitRetry,
})
if err != nil {
logp.Err("Failed to configure kafka connection: %v", err)
return nil, err
Expand Down
58 changes: 36 additions & 22 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,6 @@ func (lj *logstash) init(cfg *common.Config) error {
return err
}

sendRetries := config.MaxRetries
maxAttempts := sendRetries + 1
if sendRetries < 0 {
maxAttempts = 0
}

tls, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return err
Expand All @@ -93,22 +87,8 @@ func (lj *logstash) init(cfg *common.Config) error {
},
}

logp.Info("Max Retries set to: %v", sendRetries)
var m mode.ConnectionMode
if config.Pipelining == 0 {
clients, err := modeutil.MakeClients(cfg, makeClientFactory(&config, transp))
if err == nil {
m, err = modeutil.NewConnectionMode(clients, !config.LoadBalance,
maxAttempts, defaultWaitRetry, config.Timeout, defaultMaxWaitRetry)
}
} else {
clients, err := modeutil.MakeAsyncClients(cfg,
makeAsyncClientFactory(&config, transp))
if err == nil {
m, err = modeutil.NewAsyncConnectionMode(clients, !config.LoadBalance,
maxAttempts, defaultWaitRetry, config.Timeout, defaultMaxWaitRetry)
}
}
logp.Info("Max Retries set to: %v", config.MaxRetries)
m, err := initConnectionMode(cfg, &config, transp)
if err != nil {
return err
}
Expand All @@ -119,6 +99,40 @@ func (lj *logstash) init(cfg *common.Config) error {
return nil
}

func initConnectionMode(
cfg *common.Config,
config *logstashConfig,
transp *transport.Config,
) (mode.ConnectionMode, error) {
sendRetries := config.MaxRetries
maxAttempts := sendRetries + 1
if sendRetries < 0 {
maxAttempts = 0
}

settings := modeutil.Settings{
Failover: !config.LoadBalance,
MaxAttempts: maxAttempts,
Timeout: config.Timeout,
WaitRetry: defaultWaitRetry,
MaxWaitRetry: defaultMaxWaitRetry,
}

if config.Pipelining == 0 {
clients, err := modeutil.MakeClients(cfg, makeClientFactory(config, transp))
if err != nil {
return nil, err
}
return modeutil.NewConnectionMode(clients, settings)
}

clients, err := modeutil.MakeAsyncClients(cfg, makeAsyncClientFactory(config, transp))
if err != nil {
return nil, err
}
return modeutil.NewAsyncConnectionMode(clients, settings)
}

func makeClientFactory(
cfg *logstashConfig,
tcfg *transport.Config,
Expand Down
35 changes: 22 additions & 13 deletions libbeat/outputs/mode/modeutil/modeutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,41 @@ type ClientFactory func(host string) (mode.ProtocolClient, error)

type AsyncClientFactory func(string) (mode.AsyncProtocolClient, error)

type Settings struct {
Failover bool
MaxAttempts int
WaitRetry time.Duration
Timeout time.Duration
MaxWaitRetry time.Duration
}

func NewConnectionMode(
clients []mode.ProtocolClient,
failover bool,
maxAttempts int,
waitRetry, timeout, maxWaitRetry time.Duration,
s Settings,
) (mode.ConnectionMode, error) {
if failover {
if s.Failover {
clients = NewFailoverClient(clients)
}

maxSend := s.MaxAttempts
wait := s.WaitRetry
maxWait := s.MaxWaitRetry
to := s.Timeout

if len(clients) == 1 {
return single.New(clients[0], maxAttempts, waitRetry, timeout, maxWaitRetry)
return single.New(clients[0], maxSend, wait, to, maxWait)
}
return lb.NewSync(clients, maxAttempts, waitRetry, timeout, maxWaitRetry)
return lb.NewSync(clients, maxSend, wait, to, maxWait)
}

func NewAsyncConnectionMode(
clients []mode.AsyncProtocolClient,
failover bool,
maxAttempts int,
waitRetry, timeout, maxWaitRetry time.Duration,
s Settings,
) (mode.ConnectionMode, error) {
if failover {
if s.Failover {
clients = NewAsyncFailoverClient(clients)
}
return lb.NewAsync(clients, maxAttempts, waitRetry, timeout, maxWaitRetry)
return lb.NewAsync(clients, s.MaxAttempts, s.WaitRetry, s.Timeout, s.MaxWaitRetry)
}

// MakeClients will create a list from of ProtocolClient instances from
Expand Down Expand Up @@ -99,8 +108,8 @@ func MakeAsyncClients(

func ReadHostList(cfg *common.Config) ([]string, error) {
config := struct {
Hosts []string `config:"hosts"`
Worker int `config:"worker"`
Hosts []string `config:"hosts" validate:"required"`
Worker int `config:"worker" validate:"min=1"`
}{
Worker: 1,
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/mode/modeutil/modeutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func makeTestClients(c map[string]interface{},
func TestMakeEmptyClientFail(t *testing.T) {
config := map[string]interface{}{}
clients, err := makeTestClients(config, dummyMockClientFactory)
assert.Equal(t, mode.ErrNoHostsConfigured, err)
assert.Error(t, err)
assert.Equal(t, 0, len(clients))
}

Expand Down
9 changes: 7 additions & 2 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,13 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
}

logp.Info("Max Retries set to: %v", sendRetries)
m, err := modeutil.NewConnectionMode(clients, !config.LoadBalance,
maxAttempts, defaultWaitRetry, config.Timeout, defaultMaxWaitRetry)
m, err := modeutil.NewConnectionMode(clients, modeutil.Settings{
Failover: !config.LoadBalance,
MaxAttempts: maxAttempts,
Timeout: config.Timeout,
WaitRetry: defaultWaitRetry,
MaxWaitRetry: defaultMaxWaitRetry,
})
if err != nil {
return err
}
Expand Down