diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f64aeb5186d..c6908806c54 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix a issue when remote and local configuration didn't match when fetching configuration from Central Management. {issue}10587[10587] - Fix unauthorized error when loading dashboards by adding username and password into kibana config. {issue}10513[10513] {pull}10675[10675] - Ensure all beat commands respect configured settings. {pull}10721[10721] +- Allow to configure Kafka fetching strategy for the topic metadata. {pull}10682[10682] - Using an environment variable for the password when enrolling a beat will now raise an error if the variable doesn't exist. {pull}10936[10936] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 534778657a1..381b3c04b3d 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -228,7 +228,7 @@ auditbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -319,7 +319,7 @@ auditbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -621,6 +621,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 8086b26949d..c99d05c9554 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -939,7 +939,7 @@ filebeat.inputs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -1030,7 +1030,7 @@ filebeat.inputs: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1332,6 +1332,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index be2ee368934..2914a427fbd 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -372,7 +372,7 @@ heartbeat.scheduler: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -463,7 +463,7 @@ heartbeat.scheduler: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -765,6 +765,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 006bfe9324c..f72f40318a5 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -168,7 +168,7 @@ setup.template.settings: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -259,7 +259,7 @@ setup.template.settings: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -561,6 +561,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index c88d26a22f1..88145350c82 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -116,7 +116,7 @@ # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -207,7 +207,7 @@ # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -509,6 +509,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index c6254625434..c353e6786ab 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -249,7 +249,7 @@ endif::no_dashboards[] You can set the index dynamically by using a format string to access any event field. For example, this configuration uses a custom field, `fields.log_type`, -to set the index: +to set the index: ["source","yaml",subs="attributes"] ------------------------------------------------------------------------------ @@ -261,13 +261,13 @@ output.elasticsearch: <1> We recommend including +{beat_version_key}+ in the name to avoid mapping issues when you upgrade. -With this configuration, all events with `log_type: normal` are sent to an +With this configuration, all events with `log_type: normal` are sent to an index named +normal-{version}-{localdate}+, and all events with `log_type: critical` are sent to an index named +critical-{version}-{localdate}+. TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the index dynamically. @@ -285,7 +285,7 @@ matches, the <> setting is used. Rule settings: *`index`*:: The index format string to use. If this string contains field -references, such as `%{[fields.name]}`, the fields must exist, or the rule fails. +references, such as `%{[fields.name]}`, the fields must exist, or the rule fails. *`mappings`*:: A dictionary that takes the value returned by `index` and maps it to a new name. @@ -347,7 +347,7 @@ ifndef::no_ilm[] [[ilm-es]] ===== `ilm` -Configuration options for index lifecycle management. +Configuration options for index lifecycle management. See <> for more information. endif::no_ilm[] @@ -369,7 +369,7 @@ For more information, see <>. You can set the ingest node pipeline dynamically by using a format string to access any event field. For example, this configuration uses a custom field, -`fields.log_type`, to set the pipeline for each event: +`fields.log_type`, to set the pipeline for each event: ["source","yaml",subs="attributes"] ------------------------------------------------------------------------------ @@ -384,7 +384,7 @@ named `normal_pipeline`, and all events with `log_type: critical` are sent to a pipeline named `critical_pipeline`. TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the ingest node pipeline dynamically. @@ -403,7 +403,7 @@ Rule settings: *`pipeline`*:: The pipeline format string to use. If this string contains field references, such as `%{[fields.name]}`, the fields must exist, or the rule -fails. +fails. *`mappings`*:: A dictionary that takes the value returned by `pipeline` and maps it to a new name. @@ -870,7 +870,7 @@ topic: '%{[fields.log_topic]}' ----- TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the topic dynamically. @@ -889,7 +889,7 @@ Rule settings: *`topic`*:: The topic format string to use. If this string contains field references, such as `%{[fields.name]}`, the fields must exist, or the rule -fails. +fails. *`mappings`*:: A dictionary that takes the value returned by `topic` and maps it to a new name. @@ -901,7 +901,7 @@ match. ifndef::no-processors[] All the <> supported by processors are also supported here. -endif::no-processors[] +endif::no-processors[] ===== `key` @@ -955,6 +955,10 @@ brokers, topics, partition, and active leaders to use for publishing. *`refresh_frequency`*:: Metadata refresh interval. Defaults to 10 minutes. +*`full`*:: Strategy to use when fetching metadata, when this option is `true`, the client will maintain +a full set of metadata for all the available topics, if the this option is set to `false` it will only refresh the +metadata for the configured topics. The default is true. + *`retry.max`*:: Total number of metadata update retries when cluster is in middle of leader election. The default is 3. *`retry.backoff`*:: Waiting time between retries during leader elections. Default is 250ms. @@ -1099,7 +1103,7 @@ output.redis: TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the key dynamically. diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 48a46491f91..b042a3038be 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -62,6 +62,7 @@ type kafkaConfig struct { type metaConfig struct { Retry metaRetryConfig `config:"retry"` RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"` + Full bool `config:"full"` } type metaRetryConfig struct { @@ -90,6 +91,7 @@ func defaultConfig() kafkaConfig { Backoff: 250 * time.Millisecond, }, RefreshFreq: 10 * time.Minute, + Full: true, }, KeepAlive: 0, MaxMessageBytes: nil, // use library default @@ -177,6 +179,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { k.Metadata.Retry.Max = config.Metadata.Retry.Max k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff k.Metadata.RefreshFrequency = config.Metadata.RefreshFreq + k.Metadata.Full = config.Metadata.Full // configure producer API properties if config.MaxMessageBytes != nil { diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 3d844a437ad..ed5ae3b61e4 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -835,7 +835,7 @@ metricbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -926,7 +926,7 @@ metricbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1228,6 +1228,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 09e21d901db..1eb03dae601 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -596,7 +596,7 @@ packetbeat.ignore_outgoing: false # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -687,7 +687,7 @@ packetbeat.ignore_outgoing: false # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -989,6 +989,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index ec534c84fcf..b13c6253ff8 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -145,7 +145,7 @@ winlogbeat.event_logs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -236,7 +236,7 @@ winlogbeat.event_logs: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -538,6 +538,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index 239da5691a1..25eb815890f 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -263,7 +263,7 @@ auditbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -354,7 +354,7 @@ auditbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -656,6 +656,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 1ac2e2cb14d..0a48f185b87 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1013,7 +1013,7 @@ filebeat.inputs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -1104,7 +1104,7 @@ filebeat.inputs: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1406,6 +1406,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 05feb66beec..7184c2f40ac 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -261,7 +261,7 @@ functionbeat.provider.aws.functions: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -352,7 +352,7 @@ functionbeat.provider.aws.functions: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -654,6 +654,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 6f5f1327b1e..a588f13d2d9 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -855,7 +855,7 @@ metricbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -946,7 +946,7 @@ metricbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1248,6 +1248,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1