From 68e339247f44a87ba3ecf332eb289ae78cc234c4 Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 8 Aug 2016 18:06:15 +0200 Subject: [PATCH] Enhance kafka topic selection --- CHANGELOG.asciidoc | 4 +++ filebeat/filebeat.full.yml | 8 ++---- libbeat/_meta/config.full.yml | 8 ++---- libbeat/docs/outputconfig.asciidoc | 7 ++--- libbeat/outputs/kafka/client.go | 26 +++++++++---------- libbeat/outputs/kafka/config.go | 8 ------ libbeat/outputs/kafka/kafka.go | 20 +++++++++++--- .../outputs/kafka/kafka_integration_test.go | 14 ++++++---- metricbeat/metricbeat.full.yml | 8 ++---- packetbeat/packetbeat.full.yml | 8 ++---- winlogbeat/winlogbeat.full.yml | 8 ++---- 11 files changed, 54 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c26a69b8e89..5175480c9f0 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d *Affecting all Beats* - Change Elasticsearch output index configuration to be based on format strings. If index has been configured, no date will be appended anymore to the index name. {pull}2119[2119] +- Replace `output.kafka.use_type` by `output.kafka.topic` accepting a format string. {pull}2188[2188] *Metricbeat* - Change field type system.process.cpu.start_time from keyword to date. {issue}1565[1565] @@ -48,6 +49,9 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d - Add script to generate the Kibana index-pattern from fields.yml. {pull}2122[2122] - Enhance redis output key selection based on format string. {pull}2169[2169] - Configurable redis `keys` using filters and format strings. {pull}2169[2169] +- Add format string support to `output.kafka.topic`. {pull}2188[2188] +- Add `output.kafka.topics` for more advanced kafka topic selection per event. {pull}2188[2188] + *Metricbeat* diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 8eb4db682db..a67d54b4c24 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -483,14 +483,10 @@ output.elasticsearch: # to. #hosts: ["localhost:9092"] - # The Kafka topic used for produced events. If use_type is set to true, the - # topic will not be used. + # The Kafka topic used for produced events. The setting can be a format string + # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats - # Set Kafka topic by event type. If use_type is false, the topic option must - # be configured. The default is false. - #use_type: false - # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/libbeat/_meta/config.full.yml b/libbeat/_meta/config.full.yml index d688ff9fbe8..6d10fc225be 100644 --- a/libbeat/_meta/config.full.yml +++ b/libbeat/_meta/config.full.yml @@ -257,14 +257,10 @@ output.elasticsearch: # to. #hosts: ["localhost:9092"] - # The Kafka topic used for produced events. If use_type is set to true, the - # topic will not be used. + # The Kafka topic used for produced events. The setting can be a format string + # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats - # Set Kafka topic by event type. If use_type is false, the topic option must - # be configured. The default is false. - #use_type: false - # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index a1489cb38d0..c6e830e779e 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -543,11 +543,8 @@ The cluster metadata contain the actual Kafka brokers events are published to. ===== topic -The Kafka topic used for produced events. If `use_type` is set to true, the topic will not be used. - -===== use_type - -Set Kafka topic by event type. If `use_type` is false, the `topic` option must be configured. The default is false. +The Kafka topic used for produced events. The setting can be a format string +using any event field. To set the topic from document type use `%{[type]}`. ===== client_id diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 642b3745225..bf780e84e6d 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -11,13 +11,13 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/outil" ) type client struct { - hosts []string - topic string - useType bool - config sarama.Config + hosts []string + topic outil.Selector + config sarama.Config producer sarama.AsyncProducer @@ -38,12 +38,15 @@ var ( publishEventsCallCount = expvar.NewInt("libbeat.kafka.call_count.PublishEvents") ) -func newKafkaClient(hosts []string, topic string, useType bool, cfg *sarama.Config) (*client, error) { +func newKafkaClient( + hosts []string, + topic outil.Selector, + cfg *sarama.Config, +) (*client, error) { c := &client{ - hosts: hosts, - useType: useType, - topic: topic, - config: *cfg, + hosts: hosts, + topic: topic, + config: *cfg, } return c, nil } @@ -103,10 +106,7 @@ func (c *client) AsyncPublishEvents( ch := c.producer.Input() for _, event := range events { - topic := c.topic - if c.useType { - topic = event["type"].(string) - } + topic, err := c.topic.Select(event) jsonEvent, err := json.Marshal(event) if err != nil { diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index d0122419192..883e8736c3e 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -14,8 +14,6 @@ type kafkaConfig struct { TLS *outputs.TLSConfig `config:"tls"` Timeout time.Duration `config:"timeout" validate:"min=1"` Worker int `config:"worker" validate:"min=1"` - UseType bool `config:"use_type"` - Topic string `config:"topic"` KeepAlive time.Duration `config:"keep_alive" validate:"min=0"` MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"` RequiredACKs *int `config:"required_acks" validate:"min=-1"` @@ -32,8 +30,6 @@ var ( TLS: nil, Timeout: 30 * time.Second, Worker: 1, - UseType: false, - Topic: "", KeepAlive: 0, MaxMessageBytes: nil, // use library default RequiredACKs: nil, // use library default @@ -50,10 +46,6 @@ func (c *kafkaConfig) Validate() error { return errors.New("no hosts configured") } - if c.UseType == false && c.Topic == "" { - return errors.New("use_type must be true or topic must be set") - } - if _, ok := compressionModes[strings.ToLower(c.Compression)]; !ok { return fmt.Errorf("compression mode '%v' unknown", c.Compression) } diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index ee01c98f66e..0ae5efaeaba 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -14,10 +14,12 @@ import ( "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/mode" "github.com/elastic/beats/libbeat/outputs/mode/modeutil" + "github.com/elastic/beats/libbeat/outputs/outil" ) type kafka struct { config kafkaConfig + topic outil.Selector modeRetry mode.ConnectionMode modeGuaranteed mode.ConnectionMode @@ -71,7 +73,18 @@ func (k *kafka) init(cfg *common.Config) error { return err } - _, err := newKafkaConfig(&k.config) + var err error + k.topic, err = outil.BuildSelectorFromConfig(cfg, outil.Settings{ + Key: "topic", + MultiKey: "topics", + EnableSingleOnly: true, + FailEmpty: true, + }) + if err != nil { + return err + } + + _, err = newKafkaConfig(&k.config) if err != nil { return err } @@ -96,10 +109,9 @@ func (k *kafka) initMode(guaranteed bool) (mode.ConnectionMode, error) { var clients []mode.AsyncProtocolClient hosts := k.config.Hosts - topic := k.config.Topic - useType := k.config.UseType + topic := k.topic for i := 0; i < worker; i++ { - client, err := newKafkaClient(hosts, topic, useType, libCfg) + client, err := newKafkaClient(hosts, topic, libCfg) if err != nil { logp.Err("Failed to create kafka client: %v", err) return nil, err diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index f4689a5891e..d9bed8a91b9 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/libbeat/outputs/outil" "github.com/stretchr/testify/assert" ) @@ -47,7 +48,8 @@ func newTestKafkaClient(t *testing.T, topic string) *client { hosts := []string{getTestKafkaHost()} t.Logf("host: %v", hosts) - client, err := newKafkaClient(hosts, topic, false, nil) + sel := outil.MakeSelector(outil.ConstSelectorExpr(topic)) + client, err := newKafkaClient(hosts, sel, nil) if err != nil { t.Fatal(err) } @@ -57,11 +59,13 @@ func newTestKafkaClient(t *testing.T, topic string) *client { func newTestKafkaOutput(t *testing.T, topic string, useType bool) outputs.Outputer { + if useType { + topic = "%{[type]}" + } config := map[string]interface{}{ - "hosts": []string{getTestKafkaHost()}, - "timeout": "1s", - "topic": topic, - "use_type": useType, + "hosts": []string{getTestKafkaHost()}, + "timeout": "1s", + "topic": topic, } cfg, err := common.NewConfigFrom(config) diff --git a/metricbeat/metricbeat.full.yml b/metricbeat/metricbeat.full.yml index f7b5dabe6a2..83d00880e2e 100644 --- a/metricbeat/metricbeat.full.yml +++ b/metricbeat/metricbeat.full.yml @@ -405,14 +405,10 @@ output.elasticsearch: # to. #hosts: ["localhost:9092"] - # The Kafka topic used for produced events. If use_type is set to true, the - # topic will not be used. + # The Kafka topic used for produced events. The setting can be a format string + # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats - # Set Kafka topic by event type. If use_type is false, the topic option must - # be configured. The default is false. - #use_type: false - # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/packetbeat/packetbeat.full.yml b/packetbeat/packetbeat.full.yml index a86b2c2f907..9e390003854 100644 --- a/packetbeat/packetbeat.full.yml +++ b/packetbeat/packetbeat.full.yml @@ -675,14 +675,10 @@ output.elasticsearch: # to. #hosts: ["localhost:9092"] - # The Kafka topic used for produced events. If use_type is set to true, the - # topic will not be used. + # The Kafka topic used for produced events. The setting can be a format string + # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats - # Set Kafka topic by event type. If use_type is false, the topic option must - # be configured. The default is false. - #use_type: false - # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/winlogbeat/winlogbeat.full.yml b/winlogbeat/winlogbeat.full.yml index 8f426d14952..4f8707f8cf6 100644 --- a/winlogbeat/winlogbeat.full.yml +++ b/winlogbeat/winlogbeat.full.yml @@ -292,14 +292,10 @@ output.elasticsearch: # to. #hosts: ["localhost:9092"] - # The Kafka topic used for produced events. If use_type is set to true, the - # topic will not be used. + # The Kafka topic used for produced events. The setting can be a format string + # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats - # Set Kafka topic by event type. If use_type is false, the topic option must - # be configured. The default is false. - #use_type: false - # The number of concurrent load-balanced Kafka output workers. #worker: 1