-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow Beats to increase publisher internal queue size #22650
Conversation
f9f065b
to
2a74ac5
Compare
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
Steps errorsExpand to view the steps failures
|
Test | Results |
---|---|
Failed | 0 |
Passed | 16895 |
Skipped | 1373 |
Total | 18268 |
Pinging @elastic/security-external-integrations (Team:Security-External Integrations) |
libbeat/publisher/queue/queue.go
Outdated
@@ -122,3 +127,15 @@ type Batch interface { | |||
Events() []publisher.Event | |||
ACK() | |||
} | |||
|
|||
// AdjustInternalQueueSize decides the size for the internal queue used by most queue implementations. | |||
func AdjustInternalQueueSize(requested, mainQueueSize int) (actual int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, sorry for the shuffle but I think I'd prefer if this was back within memqueue
... this calculation is nearly a noop for the spool, which behaves very differently (and will be going away in the future anyway), so the real use of this helper is just to cap the intake size for the memory queue to something reasonable.
In an ideal world I'd actually like the flag itself to be in memqueue.Settings
(like the analogous WriteAheadLimit
flag is on the disk queue), but right now there's no clean way to do that without exposing the setting to the user, which seems undesirable. I don't want to gate the change on fixing that plumbing, so it makes sense to me to put it on the Beat config, but I'd still prefer to keep the scope narrow if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this to memque. Also renamed all references to "InternalQueueSize" to "InputQueueSize", which I think it better defines it, "internal queue" being what we call the main event queue (mem, spool...) in our docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, missed when this was updated again -- looks good, and thanks!
d59e132
to
43a288c
Compare
This is a mitigation to fix a problem with Beats that use PublishMode=DropIfFull (only Packetbeat). This parameter is meant to drop events when the configured queue is full. However, due to the way the mem and spool queues are implemented, this is actually dropping events when the internal "publisher" queue is full. Due to the small hardcoded size in this queue (20 events), it can get full easily when the publisher queue is not full, just because a burst of events is published and the consumer goroutine doesn't react fast enough to consume them. Changes in this PR allows Beats to set a parameter in instance.Settings to request a larger internal queue and Packetbeat is updated to request a queue for up to 400 events. All other Beats remain unchanged. (cherry picked from commit 1657b5a)
This is a mitigation to fix a problem with Beats that use PublishMode=DropIfFull (only Packetbeat). This parameter is meant to drop events when the configured queue is full. However, due to the way the mem and spool queues are implemented, this is actually dropping events when the internal "publisher" queue is full. Due to the small hardcoded size in this queue (20 events), it can get full easily when the publisher queue is not full, just because a burst of events is published and the consumer goroutine doesn't react fast enough to consume them. Changes in this PR allows Beats to set a parameter in instance.Settings to request a larger internal queue and Packetbeat is updated to request a queue for up to 400 events. All other Beats remain unchanged. (cherry picked from commit 1657b5a)
What does this PR do?
This is a mitigation to fix a problem with Beats that use PublishMode=DropIfFull (only Packetbeat).
This parameter is meant to drop events when the configured queue is full. However, due to the way the
mem
andspool
queues are implemented, this is actually dropping events when the internal "publisher" queue is full. Due to the small hardcoded size in this queue (20 events), it can get full easily when the publisher queue is not full, just because a burst of events is published and the consumer goroutine doesn't react fast enough to consume them.Changes in this PR allows Beats to set a parameter in instance.Settings to request a larger internal queue and Packetbeat is updated to request a queue for up to 400 events. All other Beats remain unchanged.
Why is it important?
Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Author's Checklist
How to test this PR locally
Related issues
Use cases
Screenshots
Logs