From 964f78647d40677730b16901c022f5f95cd500b9 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Mon, 11 Feb 2019 15:16:33 -0500 Subject: [PATCH 1/4] Expose the Metadata Refresh stragegy for the Kafka output. By default the Kafka output will periodically refresh the metadata information for all the available topics. This is the default strategy that beat uses. But if you have strict permissions in place for your Kafka cluster this could lead to errors in your log while trying to get information for a topic that you don't have permissions. This commit keep the default behavior but allow to change the strategy from the config file. --- libbeat/docs/outputconfig.asciidoc | 28 ++++++++++++++++------------ libbeat/outputs/kafka/config.go | 3 +++ 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index c6254625434..c353e6786ab 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -249,7 +249,7 @@ endif::no_dashboards[] You can set the index dynamically by using a format string to access any event field. For example, this configuration uses a custom field, `fields.log_type`, -to set the index: +to set the index: ["source","yaml",subs="attributes"] ------------------------------------------------------------------------------ @@ -261,13 +261,13 @@ output.elasticsearch: <1> We recommend including +{beat_version_key}+ in the name to avoid mapping issues when you upgrade. -With this configuration, all events with `log_type: normal` are sent to an +With this configuration, all events with `log_type: normal` are sent to an index named +normal-{version}-{localdate}+, and all events with `log_type: critical` are sent to an index named +critical-{version}-{localdate}+. TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the index dynamically. @@ -285,7 +285,7 @@ matches, the <> setting is used. Rule settings: *`index`*:: The index format string to use. If this string contains field -references, such as `%{[fields.name]}`, the fields must exist, or the rule fails. +references, such as `%{[fields.name]}`, the fields must exist, or the rule fails. *`mappings`*:: A dictionary that takes the value returned by `index` and maps it to a new name. @@ -347,7 +347,7 @@ ifndef::no_ilm[] [[ilm-es]] ===== `ilm` -Configuration options for index lifecycle management. +Configuration options for index lifecycle management. See <> for more information. endif::no_ilm[] @@ -369,7 +369,7 @@ For more information, see <>. You can set the ingest node pipeline dynamically by using a format string to access any event field. For example, this configuration uses a custom field, -`fields.log_type`, to set the pipeline for each event: +`fields.log_type`, to set the pipeline for each event: ["source","yaml",subs="attributes"] ------------------------------------------------------------------------------ @@ -384,7 +384,7 @@ named `normal_pipeline`, and all events with `log_type: critical` are sent to a pipeline named `critical_pipeline`. TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the ingest node pipeline dynamically. @@ -403,7 +403,7 @@ Rule settings: *`pipeline`*:: The pipeline format string to use. If this string contains field references, such as `%{[fields.name]}`, the fields must exist, or the rule -fails. +fails. *`mappings`*:: A dictionary that takes the value returned by `pipeline` and maps it to a new name. @@ -870,7 +870,7 @@ topic: '%{[fields.log_topic]}' ----- TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the topic dynamically. @@ -889,7 +889,7 @@ Rule settings: *`topic`*:: The topic format string to use. If this string contains field references, such as `%{[fields.name]}`, the fields must exist, or the rule -fails. +fails. *`mappings`*:: A dictionary that takes the value returned by `topic` and maps it to a new name. @@ -901,7 +901,7 @@ match. ifndef::no-processors[] All the <> supported by processors are also supported here. -endif::no-processors[] +endif::no-processors[] ===== `key` @@ -955,6 +955,10 @@ brokers, topics, partition, and active leaders to use for publishing. *`refresh_frequency`*:: Metadata refresh interval. Defaults to 10 minutes. +*`full`*:: Strategy to use when fetching metadata, when this option is `true`, the client will maintain +a full set of metadata for all the available topics, if the this option is set to `false` it will only refresh the +metadata for the configured topics. The default is true. + *`retry.max`*:: Total number of metadata update retries when cluster is in middle of leader election. The default is 3. *`retry.backoff`*:: Waiting time between retries during leader elections. Default is 250ms. @@ -1099,7 +1103,7 @@ output.redis: TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the key dynamically. diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 9cf81d34261..8bf8425c355 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -62,6 +62,7 @@ type kafkaConfig struct { type metaConfig struct { Retry metaRetryConfig `config:"retry"` RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"` + Full bool `config:"full"` } type metaRetryConfig struct { @@ -90,6 +91,7 @@ func defaultConfig() kafkaConfig { Backoff: 250 * time.Millisecond, }, RefreshFreq: 10 * time.Minute, + Full: true, }, KeepAlive: 0, MaxMessageBytes: nil, // use library default @@ -169,6 +171,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { k.Metadata.Retry.Max = config.Metadata.Retry.Max k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff k.Metadata.RefreshFrequency = config.Metadata.RefreshFreq + k.Metadata.Full = config.Metadata.Full // configure producer API properties if config.MaxMessageBytes != nil { From bfd9984b934fa21cf88bbc7384848300a3b26ef3 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Mon, 11 Feb 2019 15:24:09 -0500 Subject: [PATCH 2/4] update configuration yaml --- auditbeat/auditbeat.reference.yml | 7 +++++-- filebeat/filebeat.reference.yml | 7 +++++-- heartbeat/heartbeat.reference.yml | 7 +++++-- journalbeat/journalbeat.reference.yml | 7 +++++-- libbeat/_meta/config.reference.yml | 7 +++++-- metricbeat/metricbeat.reference.yml | 7 +++++-- packetbeat/packetbeat.reference.yml | 7 +++++-- winlogbeat/winlogbeat.reference.yml | 7 +++++-- x-pack/auditbeat/auditbeat.reference.yml | 7 +++++-- x-pack/filebeat/filebeat.reference.yml | 7 +++++-- x-pack/functionbeat/functionbeat.reference.yml | 7 +++++-- x-pack/metricbeat/metricbeat.reference.yml | 7 +++++-- 12 files changed, 60 insertions(+), 24 deletions(-) diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 534778657a1..381b3c04b3d 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -228,7 +228,7 @@ auditbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -319,7 +319,7 @@ auditbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -621,6 +621,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 187730cd3ab..62891a2167c 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -939,7 +939,7 @@ filebeat.inputs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -1030,7 +1030,7 @@ filebeat.inputs: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1332,6 +1332,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index be2ee368934..2914a427fbd 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -372,7 +372,7 @@ heartbeat.scheduler: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -463,7 +463,7 @@ heartbeat.scheduler: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -765,6 +765,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 006bfe9324c..f72f40318a5 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -168,7 +168,7 @@ setup.template.settings: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -259,7 +259,7 @@ setup.template.settings: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -561,6 +561,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index c88d26a22f1..88145350c82 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -116,7 +116,7 @@ # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -207,7 +207,7 @@ # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -509,6 +509,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 3d844a437ad..ed5ae3b61e4 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -835,7 +835,7 @@ metricbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -926,7 +926,7 @@ metricbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1228,6 +1228,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 09e21d901db..1eb03dae601 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -596,7 +596,7 @@ packetbeat.ignore_outgoing: false # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -687,7 +687,7 @@ packetbeat.ignore_outgoing: false # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -989,6 +989,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index ec534c84fcf..b13c6253ff8 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -145,7 +145,7 @@ winlogbeat.event_logs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -236,7 +236,7 @@ winlogbeat.event_logs: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -538,6 +538,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index 239da5691a1..25eb815890f 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -263,7 +263,7 @@ auditbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -354,7 +354,7 @@ auditbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -656,6 +656,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 087cbb9fd72..55c1c1454ad 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1003,7 +1003,7 @@ filebeat.inputs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -1094,7 +1094,7 @@ filebeat.inputs: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1396,6 +1396,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 05feb66beec..7184c2f40ac 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -261,7 +261,7 @@ functionbeat.provider.aws.functions: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -352,7 +352,7 @@ functionbeat.provider.aws.functions: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -654,6 +654,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 68362dd1147..3f4157c7838 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -853,7 +853,7 @@ metricbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -944,7 +944,7 @@ metricbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1246,6 +1246,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 From 80fdfa3015a41bfca21a53201a191f48619b4018 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Mon, 11 Feb 2019 15:30:28 -0500 Subject: [PATCH 3/4] changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cafbbbce152..7528ed968b7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix a issue when remote and local configuration didn't match when fetching configuration from Central Management. {issue}10587[10587] - Fix unauthorized error when loading dashboards by adding username and password into kibana config. {issue}10513[10513] {pull}10675[10675] - Ensure all beat commands respect configured settings. {pull}10721[10721] +- Allow to configure kafka fetching strategy for the topic metadata. {pull}10682[10682] *Auditbeat* From 1864a188b88ff5abec177848df6c7d7dd7262cad Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Mon, 18 Feb 2019 10:39:09 -0500 Subject: [PATCH 4/4] kafka -> Kafka --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7528ed968b7..415d4c59f6b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -170,7 +170,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix a issue when remote and local configuration didn't match when fetching configuration from Central Management. {issue}10587[10587] - Fix unauthorized error when loading dashboards by adding username and password into kibana config. {issue}10513[10513] {pull}10675[10675] - Ensure all beat commands respect configured settings. {pull}10721[10721] -- Allow to configure kafka fetching strategy for the topic metadata. {pull}10682[10682] +- Allow to configure Kafka fetching strategy for the topic metadata. {pull}10682[10682] *Auditbeat*