From d7205996c79f2f6ed7b5407a559e1bfa41770312 Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Fri, 10 Dec 2021 12:05:28 +0100 Subject: [PATCH 01/11] sync: add `watch::Sender::send_modify` method --- tokio/src/sync/watch.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 7e45c116c82..91c988575b3 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -437,6 +437,36 @@ impl Sender { Ok(()) } + /// Modifies watched value, notifying all receivers. + /// + /// This can useful for modyfing the watched value, without + /// having to allocate a new instance. Additionally, this + /// method permits sending values even when there are no receivers. + /// + /// # Examples + /// ``` + /// use tokio::sync::watch; + /// + /// struct State { + /// counter: usize, + /// } + /// let (state_tx, state_rx) = watch::channel(State { counter: 0 }); + /// state_tx.send_modify(|state| state.counter += 1); + /// assert_eq!(state_rx.borrow().counter, 1); + /// ``` + pub fn send_modify(&self, func: F) + where + F: FnOnce(&mut T), + { + { + let mut lock = self.shared.value.write().unwrap(); + func(&mut lock); + self.shared.state.increment_version(); + } + + self.shared.notify_rx.notify_waiters(); + } + /// Sends a new value via the channel, notifying all receivers and returning /// the previous value in the channel. /// From e1e73da313d65d9772083d452543c25a08e97953 Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Fri, 10 Dec 2021 12:11:49 +0100 Subject: [PATCH 02/11] fix typo --- tokio/src/sync/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 91c988575b3..b43aebb03a7 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -439,7 +439,7 @@ impl Sender { /// Modifies watched value, notifying all receivers. /// - /// This can useful for modyfing the watched value, without + /// This can useful for modifying the watched value, without /// having to allocate a new instance. Additionally, this /// method permits sending values even when there are no receivers. /// From aeca6b1f61c50641f3aa72b70288f01d9795b9ed Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Fri, 10 Dec 2021 12:54:12 +0100 Subject: [PATCH 03/11] Include comments from `send_replace` method --- tokio/src/sync/watch.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index b43aebb03a7..86c9c3b4bdf 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -459,9 +459,19 @@ impl Sender { F: FnOnce(&mut T), { { + // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); + // Update the value. func(&mut lock); + self.shared.state.increment_version(); + + // Release the write lock. + // + // Incrementing the version counter while holding the lock ensures + // that receivers are able to figure out the version number of the + // value they are currently looking at. + drop(lock); } self.shared.notify_rx.notify_waiters(); From d629aa4f83cd3d84b8577c0641f6c361905f6b58 Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Fri, 10 Dec 2021 13:14:53 +0100 Subject: [PATCH 04/11] Replace impl of `send_replace` with call to `send_modify` --- tokio/src/sync/watch.rs | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 86c9c3b4bdf..aeacb952f04 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -493,28 +493,11 @@ impl Sender { /// assert_eq!(tx.send_replace(2), 1); /// assert_eq!(tx.send_replace(3), 2); /// ``` - pub fn send_replace(&self, value: T) -> T { - let old = { - // Acquire the write lock and update the value. - let mut lock = self.shared.value.write().unwrap(); - let old = mem::replace(&mut *lock, value); - - self.shared.state.increment_version(); - - // Release the write lock. - // - // Incrementing the version counter while holding the lock ensures - // that receivers are able to figure out the version number of the - // value they are currently looking at. - drop(lock); - - old - }; - - // Notify all watchers - self.shared.notify_rx.notify_waiters(); + pub fn send_replace(&self, mut value: T) -> T { + // swap old watched value with the new one + self.send_modify(|old| mem::swap(old, &mut value)); - old + value } /// Returns a reference to the most recently sent value From 02c1a2acbe4ff96fe7cbb7409c0c470615a7cd67 Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Fri, 10 Dec 2021 14:04:18 +0100 Subject: [PATCH 05/11] Catch and propagate panics inside func to the caller --- tokio/src/sync/watch.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index aeacb952f04..7029bef2581 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -60,6 +60,7 @@ use crate::loom::sync::atomic::Ordering::Relaxed; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; use std::mem; use std::ops; +use std::panic; /// Receives values from the associated [`Sender`](struct@Sender). /// @@ -461,8 +462,14 @@ impl Sender { { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); - // Update the value. - func(&mut lock); + // Update the value and catch possible panic inside func. + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + func(&mut lock); + })); + // If the func panicked return the panic to the caller. + if let Err(error) = result { + panic::resume_unwind(error); + } self.shared.state.increment_version(); From 460c3e9b30cee16177445c04c389e532ad7cef7a Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Fri, 10 Dec 2021 14:21:31 +0100 Subject: [PATCH 06/11] Drop lock before we resume unwinding --- tokio/src/sync/watch.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 7029bef2581..d616cf4a730 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -468,6 +468,8 @@ impl Sender { })); // If the func panicked return the panic to the caller. if let Err(error) = result { + // Drop the lock to avoid poisoning it. + drop(lock); panic::resume_unwind(error); } From d1de3e053786765b6c07472ef7b67dfbb0e42d4b Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Fri, 10 Dec 2021 14:55:32 +0100 Subject: [PATCH 07/11] `Sender::send` call `send_modify` instead `send_replace` --- tokio/src/sync/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index d616cf4a730..53aa8e2bae3 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -434,7 +434,7 @@ impl Sender { return Err(error::SendError(value)); } - self.send_replace(value); + self.send_modify(|old| *old = value); Ok(()) } From dcf9ffc8cf9f7947be16c921f0e03ec18bbc89a1 Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Fri, 10 Dec 2021 15:58:49 +0100 Subject: [PATCH 08/11] Add test --- tokio/tests/sync_watch.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index b7bbaf721c1..e4b9452858d 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -201,3 +201,30 @@ fn reopened_after_subscribe() { drop(rx); assert!(tx.is_closed()); } + +#[test] +fn send_modify_panic() { + let (tx, mut rx) = watch::channel("one"); + + tx.send_modify(|old| *old = "two"); + assert_eq!(*rx.borrow_and_update(), "two"); + + let mut rx2 = rx.clone(); + assert_eq!(*rx2.borrow_and_update(), "two"); + + let mut task = spawn(rx2.changed()); + + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + tx.send_modify(|old| { + *old = "panicked"; + panic!(); + }) + })); + assert!(result.is_err()); + + assert_pending!(task.poll()); + + tx.send_modify(|old| *old = "three"); + assert_ready_ok!(task.poll()); + assert_eq!(*rx.borrow_and_update(), "three"); +} From b142f96c37e52d009f9ec9253f96505cafad8bea Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Fri, 10 Dec 2021 16:00:48 +0100 Subject: [PATCH 09/11] Document panic behaviour --- tokio/src/sync/watch.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 53aa8e2bae3..abd9ec15271 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -444,7 +444,15 @@ impl Sender { /// having to allocate a new instance. Additionally, this /// method permits sending values even when there are no receivers. /// + /// # Panics + /// + /// This function panics if calling `func` results in a panic. + /// No receivers are notified if panic occurred. + /// If the panic is caught, this might leave the watched value + /// in an inconsistent state. + /// /// # Examples + /// /// ``` /// use tokio::sync::watch; /// From 69cb652d41b1367eb940884d34352ad459106912 Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Fri, 10 Dec 2021 16:16:45 +0100 Subject: [PATCH 10/11] Improve documentation and check for value after panic Co-authored-by: Alice Ryhl --- tokio/src/sync/watch.rs | 5 ++--- tokio/tests/sync_watch.rs | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index abd9ec15271..e15c26080b0 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -447,9 +447,8 @@ impl Sender { /// # Panics /// /// This function panics if calling `func` results in a panic. - /// No receivers are notified if panic occurred. - /// If the panic is caught, this might leave the watched value - /// in an inconsistent state. + /// No receivers are notified if panic occurred, but if the closure has modified + /// the value, that change is still visible to future calls to `borrow`. /// /// # Examples /// diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index e4b9452858d..7ce83871814 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -223,6 +223,7 @@ fn send_modify_panic() { assert!(result.is_err()); assert_pending!(task.poll()); + assert_eq!(*rx.borrow(), "panicked"); tx.send_modify(|old| *old = "three"); assert_ready_ok!(task.poll()); From fc551f57c436222a40beadb326dfd4bd95da5671 Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Mon, 10 Jan 2022 12:48:10 +0100 Subject: [PATCH 11/11] Restore old watch::Sender::send behaviour --- tokio/src/sync/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index e15c26080b0..f03afb03e14 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -434,7 +434,7 @@ impl Sender { return Err(error::SendError(value)); } - self.send_modify(|old| *old = value); + self.send_replace(value); Ok(()) }