-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for closing publisher.Client #1402
Conversation
After this is merged it would be nice to have a small diagram in the dev guide or in ascii version on the of publisher on how client, pipeline, output, publisher etc. are coupled. |
- refactor signaling moving duplicated code from publisher/outputs to libbeat/common/op - do not wait until pipelines are emptied by outputs on shutdown (clients must be disconnected before shutdown) - fix cancel signaler in broker worker (in pipeline) - proper signal cancel if publisher.Client has been disconnected
fd02fc2
to
18be356
Compare
publisher pipeline stop method requires all clients having disconnected before shutting down the pipeline. If this is not the case we will generated a panic.
Quite a change in semantics on shutdown here. Once a publisher.Client disconnects, it's request to publish events are cancelled. On cancel there is no guarantee about events being published or dropped (e.g. when mixed into bulk requests from other connections or already handled by output plugin). It's a way for client saying: I'm not interested in the result of publishing these events. This goes well with send-at-most-once and send-at-least-once semantics in beats. Before this PR we have had to wait for all queues in publisher pipeline being processed by output. If output has become unresponsive or slow, it would appear like beats hang on shutdown (beside eventually shutting down in the future, once output becomes responsive) requiring people for force kill beats. With this PR in place, beats can shut down immediately dropping the internal pipeline its state instead of waiting for output. |
Mostly reverts changes in #1075 |
@@ -204,6 +217,7 @@ func (p *asyncLogPublisher) collect() bool { | |||
// Tell the registrar that we've successfully sent these events | |||
select { | |||
case <-p.done: | |||
logp.Info("Shutting down - No registrar update for successfully published batch.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment 2 lines above says differently :-) Why don't we update the registrar here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're forwarding registrar updates via queue. If queue is blocking, but stop signal has been send (p.done being closed), we do not wait for registrar to finish writing, but instead prefer to shut-down and resend the same line on next restart. This is old behavior, I just added info message notifying users on shutdown registrar will not be updated anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me, but we should probably update the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improved code comment
} else { | ||
logp.Warn("EventLog[%s] Failed to publish %d events", | ||
api.Name(), numEvents) | ||
publishedEvents.Add("failures", 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look good. Since "failures" is never used (line 137), let's remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
LGTM |
libbeat API changed at: elastic/beats#1402
- Fixes #4 - libbeat API changed at: elastic/beats#1402
No description provided.