Skip to content

Commit

Permalink
Merge pull request #2190 from urso/upd/sarama-v1.10.0
Browse files Browse the repository at this point in the history
Update to sarama v1.10.0
  • Loading branch information
andrewkroh authored Aug 10, 2016
2 parents 6c2e3c9 + 44da82f commit 1851dc8
Show file tree
Hide file tree
Showing 69 changed files with 1,414 additions and 325 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
- Configurable redis `keys` using filters and format strings. {pull}2169[2169]
- Add format string support to `output.kafka.topic`. {pull}2188[2188]
- Add `output.kafka.topics` for more advanced kafka topic selection per event. {pull}2188[2188]

- Add support for kafka 0.10.
- Add support for kafka 0.10. {pull}2190[2190]
- Add SASL/PLAIN authentication support to kafka output. {pull}2190[2190]
- Make Kafka metadata update configurable. {pull}2190[2190]
- Add kafka version setting (optional) enabling kafka broker version support. {pull}2190[2190]
- Add kafka message timestamp if at least version 0.10 is configured. {pull}2190[2190]

*Metricbeat*

Expand Down
21 changes: 21 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,27 @@ output.elasticsearch:
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

# Kafka version filebeat is assumed to run against. Defaults to the oldest
# supported stable version (currently version 0.8.2.0)
#version: 0.8.2

# Metadata update configuration. Metadata do contain leader information
# deciding which broker to use when publishing.
#metadata:
# Max metadata request retry attempts when cluster is in middle of leader
# election. Defaults to 3 retries.
#retry.max: 3

# Waiting time between retries during leader elections. Default is 250ms.
#retry.backoff: 250ms

# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
4 changes: 3 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ import:
- package: github.com/miekg/dns
version: 5d001d020961ae1c184f9f8152fdc73810481677
- package: github.com/Shopify/sarama
version: v1.9.0
version: v1.10.0
- package: github.com/klauspost/crc32
version: v1.0
- package: github.com/golang/snappy
version: d9eb7a3d35ec988b8585d4a0068e462c27d28380
- package: github.com/eapache/go-xerial-snappy
version: bb955e01b9346ac19dc29eb16586c90ded99a98c
- package: github.com/eapache/queue
version: ded5959c0d4e360646dc9e9908cff48666781367
- package: github.com/eapache/go-resiliency
Expand Down
21 changes: 21 additions & 0 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,27 @@ output.elasticsearch:
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

# Kafka version beatname is assumed to run against. Defaults to the oldest
# supported stable version (currently version 0.8.2.0)
#version: 0.8.2

# Metadata update configuration. Metadata do contain leader information
# deciding which broker to use when publishing.
#metadata:
# Max metadata request retry attempts when cluster is in middle of leader
# election. Defaults to 3 retries.
#retry.max: 3

# Waiting time between retries during leader elections. Default is 250ms.
#retry.backoff: 250ms

# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
29 changes: 29 additions & 0 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,24 @@ The default value is true.
The list of Kafka broker addresses from where to fetch the cluster metadata.
The cluster metadata contain the actual Kafka brokers events are published to.

===== version

Kafka version ${beatname_lc} is assumed to run against. Defaults to oldest
supported stable version (currently version 0.8.2.0).

Event timestamps will be added, if version 0.10.0.0+ is enabled.

Valid values are `0.8.2.0`, `0.8.2.1`, `0.8.2.2`, `0.8.2`, `0.8`, `0.9.0.0`,
`0.9.0.1`, `0.9.0`, `0.9`, `0.10.0.0`, `0.10.0`, and `0.10`.

===== username

The username for connecting to Kafka. If username is configured, the passowrd must be configured as well. Only SASL/PLAIN is supported.

===== password

The password for connecting to Kafka.

===== topic

The Kafka topic used for produced events. The setting can be a format string
Expand All @@ -554,6 +572,17 @@ The configurable ClientID used for logging, debugging, and auditing purposes. Th

The number of concurrent load-balanced Kafka output workers.

===== metadata

Kafka metadata update settings. The metadata do contain information about
brokers, topics, partition, and active leaders to use for publishing.

*`refresh_frequency`*:: Metadata refreash interval. Defaults to 10 minutes.

*`retry.max`*:: Total number of metadata update retries when cluster is in middle of leader election. The default is 3.

*`retry.backoff`*:: Waiting time between retries during leader elections. Default is 250ms.

===== max_retries

The number of times to retry publishing an event after a publishing failure.
Expand Down
20 changes: 17 additions & 3 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,30 @@ func (c *client) AsyncPublishEvents(
for _, event := range events {
topic, err := c.topic.Select(event)

var ts time.Time

// message timestamps have been added to kafka with version 0.10.0.0
if c.config.Version.IsAtLeast(sarama.V0_10_0_0) {
if tsRaw, ok := event["@timestamp"]; ok {
if tmp, ok := tsRaw.(common.Time); ok {
ts = time.Time(tmp)
} else if tmp, ok := tsRaw.(time.Time); ok {
ts = tmp
}
}
}

jsonEvent, err := json.Marshal(event)
if err != nil {
ref.done()
continue
}

msg := &sarama.ProducerMessage{
Metadata: ref,
Topic: topic,
Value: sarama.ByteEncoder(jsonEvent),
Metadata: ref,
Topic: topic,
Value: sarama.ByteEncoder(jsonEvent),
Timestamp: ts,
}

ch <- msg
Expand Down
40 changes: 36 additions & 4 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,54 @@ type kafkaConfig struct {
TLS *outputs.TLSConfig `config:"tls"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Worker int `config:"worker" validate:"min=1"`
Metadata metaConfig `config:"metadata"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
Version string `config:"version"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Password string `config:"password"`
}

type metaConfig struct {
Retry metaRetryConfig `config:"retry"`
RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"`
}

type metaRetryConfig struct {
Max int `config:"max" validate:"min=0"`
Backoff time.Duration `config:"backoff" validate:"min=0"`
}

var (
defaultConfig = kafkaConfig{
Hosts: nil,
TLS: nil,
Timeout: 30 * time.Second,
Worker: 1,
Hosts: nil,
TLS: nil,
Timeout: 30 * time.Second,
Worker: 1,
Metadata: metaConfig{
Retry: metaRetryConfig{
Max: 3,
Backoff: 250 * time.Millisecond,
},
RefreshFreq: 10 * time.Minute,
},
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
RequiredACKs: nil, // use library default
BrokerTimeout: 10 * time.Second,
Compression: "gzip",
Version: "",
MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Username: "",
Password: "",
}
)

Expand All @@ -50,5 +74,13 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression mode '%v' unknown", c.Compression)
}

if _, ok := kafkaVersions[c.Version]; !ok {
return fmt.Errorf("unknown/unsupported kafka version '%v'", c.Version)
}

if c.Username != "" && c.Password == "" {
return fmt.Errorf("password must be set when username is configured")
}

return nil
}
39 changes: 37 additions & 2 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,25 @@ var (
"gzip": sarama.CompressionGZIP,
"snappy": sarama.CompressionSnappy,
}

kafkaVersions = map[string]sarama.KafkaVersion{
"": sarama.V0_8_2_0,

"0.8.2.0": sarama.V0_8_2_0,
"0.8.2.1": sarama.V0_8_2_1,
"0.8.2.2": sarama.V0_8_2_2,
"0.8.2": sarama.V0_8_2_2,
"0.8": sarama.V0_8_2_2,

"0.9.0.0": sarama.V0_9_0_0,
"0.9.0.1": sarama.V0_9_0_1,
"0.9.0": sarama.V0_9_0_1,
"0.9": sarama.V0_9_0_1,

"0.10.0.0": sarama.V0_10_0_0,
"0.10.0": sarama.V0_10_0_0,
"0.10": sarama.V0_10_0_0,
}
)

// New instantiates a new kafka output instance.
Expand Down Expand Up @@ -211,8 +230,16 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) {
k.Net.TLS.Enable = tls != nil
k.Net.TLS.Config = tls

// TODO: configure metadata level properties
// use lib defaults
if config.Username != "" {
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
}

// configure metadata update properties
k.Metadata.Retry.Max = config.Metadata.Retry.Max
k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
k.Metadata.RefreshFrequency = config.Metadata.RefreshFreq

// configure producer API properties
if config.MaxMessageBytes != nil {
Expand All @@ -237,6 +264,7 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) {
retryMax = 1000
}
k.Producer.Retry.Max = retryMax
// TODO: k.Producer.Retry.Backoff = ?

// configure per broker go channel buffering
k.ChannelBufferSize = config.ChanBufferSize
Expand All @@ -247,5 +275,12 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) {
logp.Err("Invalid kafka configuration: %v", err)
return nil, err
}

version, ok := kafkaVersions[config.Version]
if !ok {
return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version)
}
k.Version = version

return k, nil
}
21 changes: 21 additions & 0 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,27 @@ output.elasticsearch:
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

# Kafka version metricbeat is assumed to run against. Defaults to the oldest
# supported stable version (currently version 0.8.2.0)
#version: 0.8.2

# Metadata update configuration. Metadata do contain leader information
# deciding which broker to use when publishing.
#metadata:
# Max metadata request retry attempts when cluster is in middle of leader
# election. Defaults to 3 retries.
#retry.max: 3

# Waiting time between retries during leader elections. Default is 250ms.
#retry.backoff: 250ms

# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
21 changes: 21 additions & 0 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,27 @@ output.elasticsearch:
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

# Kafka version packetbeat is assumed to run against. Defaults to the oldest
# supported stable version (currently version 0.8.2.0)
#version: 0.8.2

# Metadata update configuration. Metadata do contain leader information
# deciding which broker to use when publishing.
#metadata:
# Max metadata request retry attempts when cluster is in middle of leader
# election. Defaults to 3 retries.
#retry.max: 3

# Waiting time between retries during leader elections. Default is 250ms.
#retry.backoff: 250ms

# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
19 changes: 19 additions & 0 deletions vendor/github.com/Shopify/sarama/.github/ISSUE_TEMPLATE.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1851dc8

Please sign in to comment.