Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the Metadata Refresh stragegy for the Kafka output. #10682

Merged
merged 5 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix a issue when remote and local configuration didn't match when fetching configuration from Central Management. {issue}10587[10587]
- Fix unauthorized error when loading dashboards by adding username and password into kibana config. {issue}10513[10513] {pull}10675[10675]
- Ensure all beat commands respect configured settings. {pull}10721[10721]
- Allow to configure Kafka fetching strategy for the topic metadata. {pull}10682[10682]
- Using an environment variable for the password when enrolling a beat will now raise an error if the variable doesn't exist. {pull}10936[10936]

*Auditbeat*
Expand Down
7 changes: 5 additions & 2 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ auditbeat.modules:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -319,7 +319,7 @@ auditbeat.modules:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -621,6 +621,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ filebeat.inputs:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -1030,7 +1030,7 @@ filebeat.inputs:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -1332,6 +1332,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ heartbeat.scheduler:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -463,7 +463,7 @@ heartbeat.scheduler:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -765,6 +765,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ setup.template.settings:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -259,7 +259,7 @@ setup.template.settings:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -561,6 +561,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -207,7 +207,7 @@
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -509,6 +509,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
28 changes: 16 additions & 12 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ endif::no_dashboards[]

You can set the index dynamically by using a format string to access any event
field. For example, this configuration uses a custom field, `fields.log_type`,
to set the index:
to set the index:

["source","yaml",subs="attributes"]
------------------------------------------------------------------------------
Expand All @@ -261,13 +261,13 @@ output.elasticsearch:
<1> We recommend including +{beat_version_key}+ in the name to avoid mapping issues
when you upgrade.

With this configuration, all events with `log_type: normal` are sent to an
With this configuration, all events with `log_type: normal` are sent to an
index named +normal-{version}-{localdate}+, and all events with
`log_type: critical` are sent to an index named
+critical-{version}-{localdate}+.

TIP: To learn how to add custom fields to events, see the
<<libbeat-configuration-fields,`fields`>> option.
<<libbeat-configuration-fields,`fields`>> option.

See the <<indices-option-es,`indices`>> setting for other ways to set the index
dynamically.
Expand All @@ -285,7 +285,7 @@ matches, the <<index-option-es,`index`>> setting is used.
Rule settings:

*`index`*:: The index format string to use. If this string contains field
references, such as `%{[fields.name]}`, the fields must exist, or the rule fails.
references, such as `%{[fields.name]}`, the fields must exist, or the rule fails.

*`mappings`*:: A dictionary that takes the value returned by `index` and maps it
to a new name.
Expand Down Expand Up @@ -347,7 +347,7 @@ ifndef::no_ilm[]
[[ilm-es]]
===== `ilm`

Configuration options for index lifecycle management.
Configuration options for index lifecycle management.

See <<ilm>> for more information.
endif::no_ilm[]
Expand All @@ -369,7 +369,7 @@ For more information, see <<configuring-ingest-node>>.

You can set the ingest node pipeline dynamically by using a format string to
access any event field. For example, this configuration uses a custom field,
`fields.log_type`, to set the pipeline for each event:
`fields.log_type`, to set the pipeline for each event:

["source","yaml",subs="attributes"]
------------------------------------------------------------------------------
Expand All @@ -384,7 +384,7 @@ named `normal_pipeline`, and all events with `log_type: critical` are sent to a
pipeline named `critical_pipeline`.

TIP: To learn how to add custom fields to events, see the
<<libbeat-configuration-fields,`fields`>> option.
<<libbeat-configuration-fields,`fields`>> option.

See the <<pipelines-option-es,`pipelines`>> setting for other ways to set the
ingest node pipeline dynamically.
Expand All @@ -403,7 +403,7 @@ Rule settings:

*`pipeline`*:: The pipeline format string to use. If this string contains field
references, such as `%{[fields.name]}`, the fields must exist, or the rule
fails.
fails.

*`mappings`*:: A dictionary that takes the value returned by `pipeline` and maps
it to a new name.
Expand Down Expand Up @@ -870,7 +870,7 @@ topic: '%{[fields.log_topic]}'
-----

TIP: To learn how to add custom fields to events, see the
<<libbeat-configuration-fields,`fields`>> option.
<<libbeat-configuration-fields,`fields`>> option.

See the <<topics-option-kafka,`topics`>> setting for other ways to set the
topic dynamically.
Expand All @@ -889,7 +889,7 @@ Rule settings:

*`topic`*:: The topic format string to use. If this string contains field
references, such as `%{[fields.name]}`, the fields must exist, or the rule
fails.
fails.

*`mappings`*:: A dictionary that takes the value returned by `topic` and maps it
to a new name.
Expand All @@ -901,7 +901,7 @@ match.
ifndef::no-processors[]
All the <<conditions,conditions>> supported by processors are also supported
here.
endif::no-processors[]
endif::no-processors[]


===== `key`
Expand Down Expand Up @@ -955,6 +955,10 @@ brokers, topics, partition, and active leaders to use for publishing.

*`refresh_frequency`*:: Metadata refresh interval. Defaults to 10 minutes.

*`full`*:: Strategy to use when fetching metadata, when this option is `true`, the client will maintain
a full set of metadata for all the available topics, if the this option is set to `false` it will only refresh the
metadata for the configured topics. The default is true.

*`retry.max`*:: Total number of metadata update retries when cluster is in middle of leader election. The default is 3.

*`retry.backoff`*:: Waiting time between retries during leader elections. Default is 250ms.
Expand Down Expand Up @@ -1099,7 +1103,7 @@ output.redis:


TIP: To learn how to add custom fields to events, see the
<<libbeat-configuration-fields,`fields`>> option.
<<libbeat-configuration-fields,`fields`>> option.

See the <<keys-option-redis,`keys`>> setting for other ways to set the key
dynamically.
Expand Down
3 changes: 3 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type kafkaConfig struct {
type metaConfig struct {
Retry metaRetryConfig `config:"retry"`
RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"`
Full bool `config:"full"`
}

type metaRetryConfig struct {
Expand Down Expand Up @@ -90,6 +91,7 @@ func defaultConfig() kafkaConfig {
Backoff: 250 * time.Millisecond,
},
RefreshFreq: 10 * time.Minute,
Full: true,
},
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
Expand Down Expand Up @@ -177,6 +179,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
k.Metadata.Retry.Max = config.Metadata.Retry.Max
k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
k.Metadata.RefreshFrequency = config.Metadata.RefreshFreq
k.Metadata.Full = config.Metadata.Full

// configure producer API properties
if config.MaxMessageBytes != nil {
Expand Down
7 changes: 5 additions & 2 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ metricbeat.modules:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -926,7 +926,7 @@ metricbeat.modules:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -1228,6 +1228,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ packetbeat.ignore_outgoing: false
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -687,7 +687,7 @@ packetbeat.ignore_outgoing: false
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -989,6 +989,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ winlogbeat.event_logs:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -236,7 +236,7 @@ winlogbeat.event_logs:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -538,6 +538,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
Loading