-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update broker/pipeline setup #4650
Conversation
# Do not modify this value. | ||
#bulk_queue_size: 0 | ||
# Internal queue configuration for buffering events to be published. | ||
#queue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit strange that we call it here queue and in the code broker. This will need some docs and changelog entry. But can happen in a follow up PR. Does this have to be in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the user-visible settings. I decided to rename the broker package to queue, but first I want to get the changes to the settings in and documented, so we won't introduce BC-breaking changes in case package updates PR gets merged late.
fe27bb3
to
cc174ea
Compare
auditbeat/auditbeat.reference.yml
Outdated
# Hints the minimum number of events stored in the queue, | ||
# before providing a batch of events to the outputs. | ||
# A value of 0 (the default) ensures events are immediately available | ||
# to be send to the outputs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/send/sent/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"to be sent" is correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
auditbeat/auditbeat.reference.yml
Outdated
# Queue type by name (default 'mem') | ||
#mem: | ||
# Max number of events to store in the queue. | ||
#events: 4096 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when this is reached? Flush happens? Thinking if there should be flush.min
, flush.max
and flush.timeout
or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Semantics is a little different from old spooler here. The queue/broker does not flush its buffers. By default it basically stream the events right through to the outputs (if output takes somewhat longer it will be presented with all events available - up to bulk_max_size).
By configuring min_events, the queue will block the output until:
- at least min events is reached
- timeout is trigger
The min events
is more of a hint, as the output might still receive a smaller buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The events setting is the maximum number of events the queue can hold. It's independent of min_events and flush timer. Only min_events must be < events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming is a little weird here... as it's more like 'block-processing' settings...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urso what happens when events
is reached? I assume it will block from adding more events? In case of metricbeat assuming sending events is slower then generating events but still all events are acked. What will happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once events
is reached, the queue will block (no more space). Upon ACK of N events, another N events can be stored in the queue (N producers will be unblocked).
} | ||
|
||
p, err := pipeline.New(brokerFactory, out, settings) | ||
brokerFactory := broker.FindFactory(brokerType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As quickly discussed, later we should probably rename broker
package to queue
to make the relation to the config more obvious.
func NewBroker(eventer broker.Eventer, sz int, waitOnClose bool) *Broker { | ||
func NewBroker( | ||
eventer broker.Eventer, | ||
sz int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the 3 params could be a config struct for borkers.
# Do not modify this value. | ||
#bulk_queue_size: 0 | ||
# Internal queue configuration for buffering events to be published. | ||
#queue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll open a PR to delete this block of code from Winlogbeat so that queue
isn't rejected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hehe, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this PR is adjusting winlogbeat.
57756c6
to
b3f1b82
Compare
@@ -19,6 +19,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d | |||
- The `scripts/import_dashboards` is removed from packages. Use the `setup` command instead. {pull}4586[4586] | |||
- Change format of the saved kibana dashboards to have a single JSON file for each dashboard {pull}4413[4413] | |||
- Rename `configtest` command to `test config`. {pull}4590[4590] | |||
- Remove setting `queue_size` and `bulk_queue_size`. {pull}4650[4650] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a message to the 5.6 release that these values will change?
@@ -81,6 +83,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d | |||
- Add udp prospector type. {pull}4452[4452] | |||
- Enabled Cgo which means libc is dynamically compiled. {pull}4546[4546] | |||
- Add Beta module config reloading mechanism {pull}4566[4566] | |||
- Remove spooler and publisher components and settings. {pull}4644[4644] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, we should probably have a message in 5.6 or even better, not let the beat start in 6.0 if these values are set?
# new batches. | ||
#pipelining: 0 | ||
#pipelining: 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we changed the default to 5? Should probably be mentioned in the changelog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- pipeline can operate complete async -> take advantage of this fact
- with default settings we have a high chance to publish very small batches (no flush timeout -> reduced latency) if producers don't have a high enough rate. In Logstash case we can easily compensate for this by pushing more batches on the link
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated changelog.
@@ -20,9 +20,6 @@ const ( | |||
|
|||
type Config struct { | |||
Prospectors []*common.Config `config:"prospectors"` | |||
SpoolSize uint64 `config:"spool_size" validate:"min=1"` | |||
PublishAsync bool `config:"publish_async"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad to see this one removed ;-)
type config struct { | ||
Events int `config:"events" validate:"min=32"` | ||
Events int `config:"events" validate:"min=32"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of surprised to see a minimum here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, we need to buffer some data for pushing to the outputs. Without queue the pipeline won't be able to operate -> enforce minimum so users don't use 1 or 0.
- remove setting queue_size - remove unused setting bulk_queue_size - introduce settings namespace `queue` with default `mem.events: 4096` - add settings queue.mem.flush.min_events and queue.mem.flush.timeout - change default output.logstash.pipelining to 5 - remove spooler settings from filebeat config object - Update winlogbeat config validate check
Translating old configurations:
|
I'm removing the needs_docs label here because I believe we've covered the doc requirements with the topic https://www.elastic.co/guide/en/beats/filebeat/current/configuring-internal-queue.html and the removed settings that are documented in the breaking changes for the release. |
filebeat.spooler
,filebeat.publish_async
,filebeat.idle_timeout
,queue_size
,bulk_queue_size
queue.<queue_type>
events
,flush.min_events
,flush.timeout
Note: docs will be updated in a separate PR.