Skip to content

Commit

Permalink
Enhance kafka topic selection (#2188)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored and ruflin committed Aug 10, 2016
1 parent 894e479 commit e7211c5
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 65 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Affecting all Beats*
- Change Elasticsearch output index configuration to be based on format strings. If index has been configured, no date will be appended anymore to the index name. {pull}2119[2119]
- Replace `output.kafka.use_type` by `output.kafka.topic` accepting a format string. {pull}2188[2188]

*Metricbeat*
- Change field type system.process.cpu.start_time from keyword to date. {issue}1565[1565]
Expand Down Expand Up @@ -50,6 +51,9 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
- Add script to generate the Kibana index-pattern from fields.yml. {pull}2122[2122]
- Enhance redis output key selection based on format string. {pull}2169[2169]
- Configurable redis `keys` using filters and format strings. {pull}2169[2169]
- Add format string support to `output.kafka.topic`. {pull}2188[2188]
- Add `output.kafka.topics` for more advanced kafka topic selection per event. {pull}2188[2188]


*Metricbeat*

Expand Down
8 changes: 2 additions & 6 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -483,14 +483,10 @@ output.elasticsearch:
# to.
#hosts: ["localhost:9092"]

# The Kafka topic used for produced events. If use_type is set to true, the
# topic will not be used.
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Set Kafka topic by event type. If use_type is false, the topic option must
# be configured. The default is false.
#use_type: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
8 changes: 2 additions & 6 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,10 @@ output.elasticsearch:
# to.
#hosts: ["localhost:9092"]

# The Kafka topic used for produced events. If use_type is set to true, the
# topic will not be used.
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Set Kafka topic by event type. If use_type is false, the topic option must
# be configured. The default is false.
#use_type: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 2 additions & 5 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -543,11 +543,8 @@ The cluster metadata contain the actual Kafka brokers events are published to.

===== topic

The Kafka topic used for produced events. If `use_type` is set to true, the topic will not be used.

===== use_type

Set Kafka topic by event type. If `use_type` is false, the `topic` option must be configured. The default is false.
The Kafka topic used for produced events. The setting can be a format string
using any event field. To set the topic from document type use `%{[type]}`.

===== client_id

Expand Down
26 changes: 13 additions & 13 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/outil"
)

type client struct {
hosts []string
topic string
useType bool
config sarama.Config
hosts []string
topic outil.Selector
config sarama.Config

producer sarama.AsyncProducer

Expand All @@ -38,12 +38,15 @@ var (
publishEventsCallCount = expvar.NewInt("libbeat.kafka.call_count.PublishEvents")
)

func newKafkaClient(hosts []string, topic string, useType bool, cfg *sarama.Config) (*client, error) {
func newKafkaClient(
hosts []string,
topic outil.Selector,
cfg *sarama.Config,
) (*client, error) {
c := &client{
hosts: hosts,
useType: useType,
topic: topic,
config: *cfg,
hosts: hosts,
topic: topic,
config: *cfg,
}
return c, nil
}
Expand Down Expand Up @@ -103,10 +106,7 @@ func (c *client) AsyncPublishEvents(
ch := c.producer.Input()

for _, event := range events {
topic := c.topic
if c.useType {
topic = event["type"].(string)
}
topic, err := c.topic.Select(event)

jsonEvent, err := json.Marshal(event)
if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ type kafkaConfig struct {
TLS *outputs.TLSConfig `config:"tls"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Worker int `config:"worker" validate:"min=1"`
UseType bool `config:"use_type"`
Topic string `config:"topic"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
Expand All @@ -32,8 +30,6 @@ var (
TLS: nil,
Timeout: 30 * time.Second,
Worker: 1,
UseType: false,
Topic: "",
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
RequiredACKs: nil, // use library default
Expand All @@ -50,10 +46,6 @@ func (c *kafkaConfig) Validate() error {
return errors.New("no hosts configured")
}

if c.UseType == false && c.Topic == "" {
return errors.New("use_type must be true or topic must be set")
}

if _, ok := compressionModes[strings.ToLower(c.Compression)]; !ok {
return fmt.Errorf("compression mode '%v' unknown", c.Compression)
}
Expand Down
20 changes: 16 additions & 4 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/mode/modeutil"
"github.com/elastic/beats/libbeat/outputs/outil"
)

type kafka struct {
config kafkaConfig
topic outil.Selector

modeRetry mode.ConnectionMode
modeGuaranteed mode.ConnectionMode
Expand Down Expand Up @@ -71,7 +73,18 @@ func (k *kafka) init(cfg *common.Config) error {
return err
}

_, err := newKafkaConfig(&k.config)
var err error
k.topic, err = outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
})
if err != nil {
return err
}

_, err = newKafkaConfig(&k.config)
if err != nil {
return err
}
Expand All @@ -96,10 +109,9 @@ func (k *kafka) initMode(guaranteed bool) (mode.ConnectionMode, error) {

var clients []mode.AsyncProtocolClient
hosts := k.config.Hosts
topic := k.config.Topic
useType := k.config.UseType
topic := k.topic
for i := 0; i < worker; i++ {
client, err := newKafkaClient(hosts, topic, useType, libCfg)
client, err := newKafkaClient(hosts, topic, libCfg)
if err != nil {
logp.Err("Failed to create kafka client: %v", err)
return nil, err
Expand Down
14 changes: 9 additions & 5 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -47,7 +48,8 @@ func newTestKafkaClient(t *testing.T, topic string) *client {
hosts := []string{getTestKafkaHost()}
t.Logf("host: %v", hosts)

client, err := newKafkaClient(hosts, topic, false, nil)
sel := outil.MakeSelector(outil.ConstSelectorExpr(topic))
client, err := newKafkaClient(hosts, sel, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -57,11 +59,13 @@ func newTestKafkaClient(t *testing.T, topic string) *client {

func newTestKafkaOutput(t *testing.T, topic string, useType bool) outputs.Outputer {

if useType {
topic = "%{[type]}"
}
config := map[string]interface{}{
"hosts": []string{getTestKafkaHost()},
"timeout": "1s",
"topic": topic,
"use_type": useType,
"hosts": []string{getTestKafkaHost()},
"timeout": "1s",
"topic": topic,
}

cfg, err := common.NewConfigFrom(config)
Expand Down
8 changes: 2 additions & 6 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,10 @@ output.elasticsearch:
# to.
#hosts: ["localhost:9092"]

# The Kafka topic used for produced events. If use_type is set to true, the
# topic will not be used.
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Set Kafka topic by event type. If use_type is false, the topic option must
# be configured. The default is false.
#use_type: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
8 changes: 2 additions & 6 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -675,14 +675,10 @@ output.elasticsearch:
# to.
#hosts: ["localhost:9092"]

# The Kafka topic used for produced events. If use_type is set to true, the
# topic will not be used.
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Set Kafka topic by event type. If use_type is false, the topic option must
# be configured. The default is false.
#use_type: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
8 changes: 2 additions & 6 deletions winlogbeat/winlogbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,10 @@ output.elasticsearch:
# to.
#hosts: ["localhost:9092"]

# The Kafka topic used for produced events. If use_type is set to true, the
# topic will not be used.
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Set Kafka topic by event type. If use_type is false, the topic option must
# be configured. The default is false.
#use_type: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down

0 comments on commit e7211c5

Please sign in to comment.