-
Notifications
You must be signed in to change notification settings - Fork 991
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chain head listener: workaround watcher deadlock #2982
Conversation
if !sending_to_watcher.load(atomic::Ordering::SeqCst) { | ||
let sending_to_watcher = sending_to_watcher.cheap_clone(); | ||
let sender = watcher.sender.cheap_clone(); | ||
tokio::task::spawn_blocking(move || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can that lead to a large number of dead threads hanging around? Maybe we need another watcher_is_live: Arc<AtomicBool>
that is set to false
just before the call to send and set to true
right afterwards where we do not try to send if watcher_is_live
is false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the sending_to_watcher
is already serving as the check you propose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, of course, you are right.
@@ -173,9 +178,19 @@ impl ChainHeadUpdateListener { | |||
|
|||
// If there are subscriptions for this network, notify them. | |||
if let Some(watcher) = watchers.read(&logger).get(&update.network_name) { | |||
debug!(logger, "sending chain head update"; "network" => &update.network_name); | |||
watcher.send(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We didn't unwrap
this call before?
let sender = watcher.sender.cheap_clone(); | ||
tokio::task::spawn_blocking(move || { | ||
sending_to_watcher.store(true, atomic::Ordering::SeqCst); | ||
sender.send(()).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that watcher.send()
didn't receive any arguments before, why is it receiving a unit ()
value now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we were using the send
on our watcher struct, which is now dead code until we can revert this hack.
@lutter @otaviopace thanks for the review, comments addressed. |
if !sending_to_watcher.load(atomic::Ordering::SeqCst) { | ||
let sending_to_watcher = sending_to_watcher.cheap_clone(); | ||
let sender = watcher.sender.cheap_clone(); | ||
tokio::task::spawn_blocking(move || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, of course, you are right.
With the logs we were able to confirm that this deadlocks within the tokio watcher implementation, this has been reported upstream tokio-rs/tokio#4246. This PR hacks around that until it is fixed by detecting a deadlocked watcher and continuing to pull notifications, if only to discard them.