-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stopping / tracking harvester implementation #964
Conversation
ca6e3ed
to
cd68248
Compare
@urso This implementation has currently the problem that if the output is not responding, PublishEvent is blocking and filebeat does not stop. Any recommendation on how to handle this case? |
This should also get #1148 to green. |
|
||
logp.Info("Starting prospector of type: %v", p.ProspectorConfig.Harvester.InputType) | ||
|
||
defer func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use defer wg.Done()
. There's a TODO statement a few lines up that can be removed now that this is deferred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
14ce2a9
to
2b746c5
Compare
This PR simplifies elastic#964 by already applying the non related parts. * Remove unnecessary for loop in readline
go h.Stop() | ||
} | ||
logp.Debug("prospector", "Waiting for %d harvesters to stop", len(p.harvesters)) | ||
p.harvestersWaitGroup.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is p.done used? the prospector loop starting new harvesters must be stopped before shutting down harvesters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. close(p.done)
should be moved up before the stopping and it seems like each prospector needs its own waitgroup or channel to notify when all Run methods are stopped.
shutdown still seems to be somewhat incomplete (subject to hanging). once harvester is running, the event flows like this (each step running in it's own go-routine communicating via channels):
on shutdown ensure the prospector loop looking for new files to be harvested is shutdown before stopping the harvesters. after stopping harvester, stop the spooler and next the filebeat publisher. the registrar must be stopped last. Not sure if filebeat is stopping it's own publisher worker yet. when stopping the spooler make the publisher break the queue at line 148. The publisher in filebeat has a 'Stop' method, but not sure if it's called. Stop on publisher must be called in between spooler and registrar shutdown. Unfortunately the publisher itself might hang on shutdown in |
The publisher worker is initialized and started at line 96, but never killed. |
2b746c5
to
e181f96
Compare
@urso Most comments applied, except for
I'm not sure how to break the queue here, as in case it already hangs on line 148, the only thing I know of to stop it is closing the channel which leads to a panic? I'm sure you had a better way in mind :-) |
Waiting for #1402. |
69ff414
to
e89777a
Compare
The test is skipped until elastic#964 is addressed.
The test is skipped until elastic#964 is addressed.
The test is skipped until #964 is addressed.
* Add clean prospector shutdown * Add harvester waitgroup to prospectors * Refactor harvester run function * Stop publisher
e89777a
to
fe8b9dc
Compare
Closing as this one got replaced by #1604 |
The goal of this PR is properly track and shut down harvesters