Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deregister pipeline loader callback when inputsRunner is stopped #7893

Conversation

kvch
Copy link
Contributor

@kvch kvch commented Aug 7, 2018

From now on connectCallbacks of Elasticsearch can be deregistered.
Pipeline loader callbacks are deregistered when inputsRunner is stopped, so unnecessary callback calls are eliminated after reload and during autodiscovery. This also lets the GC to collect leftover states e.g. moduleRegistry.

Closes #7891

@@ -55,6 +56,7 @@ type inputsRunner struct {
moduleRegistry *ModuleRegistry
inputs []*input.Runner
pipelineLoaderFactory PipelineLoaderFactory
pipelineCallbackId int

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct field pipelineCallbackId should be pipelineCallbackID

@@ -46,6 +46,7 @@ type Factory struct {
beatVersion string
pipelineLoaderFactory PipelineLoaderFactory
overwritePipelines bool
pipelineCallbackId int

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct field pipelineCallbackId should be pipelineCallbackID

@kvch kvch force-pushed the fix/libbeat/deregister-elasticsearch-onconnect-callbacks branch from cbcc5db to d566eec Compare August 7, 2018 10:18
@kvch
Copy link
Contributor Author

kvch commented Aug 7, 2018

The failing tests are unrelated.

@kvch kvch added the needs_backport PR is waiting to be backported to other branches. label Aug 7, 2018
@kvch kvch requested review from urso and exekias August 7, 2018 13:31
@@ -130,7 +134,7 @@ func (p *inputsRunner) Start() {
callback := func(esClient *elasticsearch.Client) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the code on line 124 and 129: In case of an error, should we register a callback at all? Not introduced in this PR but probably something we should clean up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code is correct. The lines you are referring to are trying to load pipelines immediately in Start. It's possible it fails due to e.g ES is not being not available.
But it's the whole point of using OnConnect callbacks. If previously pipeline loading fails, it can be attempted again when connecting to ES again.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kvch I got tricked by this as well. Can you add a comment why the callback must always be registered:

  • if pipeline loading fails, retry on reload
  • always retry on reload in case of someone removing it by accident or new ES cluster in place

@@ -50,19 +50,40 @@ var (
)

type callbacksRegistry struct {
callbacks []connectCallback
callbacks map[int]connectCallback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using a uuid here instead of an int as identifier? Like this the incrementing and storing of the current key would not be needed.

@@ -50,19 +52,38 @@ var (
)

type callbacksRegistry struct {
callbacks []connectCallback
callbacks map[uuid.UUID]connectCallback
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes order in which callbacks will be called. I don't think we don't have any ordering requirements yet, but it's something we need to be aware of in the future: Callbacks must not depend on results of a former callback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a comment, so it's less likely we forget it.


key := uuid.NewV4()
connectCallbackRegistry.callbacks[key] = callback
return key
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: while it's very very unlikely to have collissions, we might account for the chance of having collisions by checking the key does not exist.

@kvch kvch force-pushed the fix/libbeat/deregister-elasticsearch-onconnect-callbacks branch from 6530daf to a48c6bb Compare August 8, 2018 14:01
@urso
Copy link

urso commented Aug 8, 2018

WFG

@adriansr adriansr merged commit 65ef265 into elastic:master Aug 8, 2018
@kvch kvch added v6.4.0 and removed needs_backport PR is waiting to be backported to other branches. labels Aug 8, 2018
@kvch kvch added the v6.5.0 label Aug 8, 2018
kvch added a commit to kvch/beats that referenced this pull request Aug 9, 2018
urso pushed a commit that referenced this pull request Aug 9, 2018
kvch added a commit to kvch/beats that referenced this pull request Aug 10, 2018
adriansr pushed a commit that referenced this pull request Aug 10, 2018
leweafan pushed a commit to leweafan/beats that referenced this pull request Apr 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants