diff --git a/src/util_types/sync/tokio/atomic_mutex.rs b/src/util_types/sync/tokio/atomic_mutex.rs index 212498346..cfd659d8a 100644 --- a/src/util_types/sync/tokio/atomic_mutex.rs +++ b/src/util_types/sync/tokio/atomic_mutex.rs @@ -1,10 +1,12 @@ +use super::{LockAcquisition, LockCallbackFn, LockCallbackInfo, LockEvent, LockType}; use futures::future::BoxFuture; +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use tokio::sync::{Mutex, MutexGuard}; /// An `Arc>` wrapper to make data thread-safe and easy to work with. /// -/// # Example +/// # Examples /// ``` /// # use neptune_core::util_types::sync::tokio::AtomicMutex; /// struct Car { @@ -12,46 +14,180 @@ use tokio::sync::{Mutex, MutexGuard}; /// }; /// # tokio_test::block_on(async { /// let atomic_car = AtomicMutex::from(Car{year: 2016}); -/// atomic_car.lock(|c| println!("year: {}", c.year)).await; -/// atomic_car.lock_mut(|mut c| c.year = 2023).await; +/// atomic_car.lock(|c| {println!("year: {}", c.year)}).await; +/// atomic_car.lock_mut(|mut c| {c.year = 2023}).await; /// # }) /// ``` -#[derive(Debug, Default, Clone)] -pub struct AtomicMutex(Arc>); +/// +/// It is also possible to provide a name and callback fn +/// during instantiation. In this way, the application +/// can easily trace lock acquisitions. +/// +/// # Examples +/// ``` +/// # use neptune_core::util_types::sync::tokio::{AtomicMutex, LockEvent, LockCallbackFn}; +/// struct Car { +/// year: u16, +/// }; +/// +/// pub fn log_lock_event(lock_event: LockEvent) { +/// let (event, info, acquisition) = +/// match lock_event { +/// LockEvent::TryAcquire{info, acquisition} => ("TryAcquire", info, acquisition), +/// LockEvent::Acquire{info, acquisition} => ("Acquire", info, acquisition), +/// LockEvent::Release{info, acquisition} => ("Release", info, acquisition), +/// }; +/// +/// println!( +/// "{} lock `{}` of type `{}` for `{}` by\n\t|-- thread {}, `{:?}`", +/// event, +/// info.name().unwrap_or("?"), +/// info.lock_type(), +/// acquisition, +/// std::thread::current().name().unwrap_or("?"), +/// std::thread::current().id(), +/// ); +/// } +/// const LOG_LOCK_EVENT_CB: LockCallbackFn = log_lock_event; +/// +/// # tokio_test::block_on(async { +/// let atomic_car = AtomicMutex::::from((Car{year: 2016}, Some("car"), Some(LOG_LOCK_EVENT_CB))); +/// atomic_car.lock(|c| {println!("year: {}", c.year)}).await; +/// atomic_car.lock_mut(|mut c| {c.year = 2023}).await; +/// # }) +/// ``` +/// +/// results in: +/// ```text +/// TryAcquire lock `car` of type `Mutex` for `Read` by +/// |-- thread main, `ThreadId(1)` +/// Acquire lock `car` of type `Mutex` for `Read` by +/// |-- thread main, `ThreadId(1)` +/// year: 2016 +/// Release lock `car` of type `Mutex` for `Read` by +/// |-- thread main, `ThreadId(1)` +/// TryAcquire lock `car` of type `Mutex` for `Write` by +/// |-- thread main, `ThreadId(1)` +/// Acquire lock `car` of type `Mutex` for `Write` by +/// |-- thread main, `ThreadId(1)` +/// Release lock `car` of type `Mutex` for `Write` by +/// |-- thread main, `ThreadId(1)` +/// ``` +#[derive(Debug)] +pub struct AtomicMutex { + inner: Arc>, + lock_callback_info: LockCallbackInfo, +} + +impl Default for AtomicMutex { + fn default() -> Self { + Self { + inner: Default::default(), + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, None, None), + } + } +} + impl From for AtomicMutex { #[inline] fn from(t: T) -> Self { - Self(Arc::new(Mutex::new(t))) + Self { + inner: Arc::new(Mutex::new(t)), + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, None, None), + } + } +} +impl From<(T, Option, Option)> for AtomicMutex { + /// Create from an optional name and an optional callback function, which + /// is called when a lock event occurs. + #[inline] + fn from(v: (T, Option, Option)) -> Self { + Self { + inner: Arc::new(Mutex::new(v.0)), + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, v.1, v.2), + } + } +} +impl From<(T, Option<&str>, Option)> for AtomicMutex { + /// Create from a name ref and an optional callback function, which + /// is called when a lock event occurs. + #[inline] + fn from(v: (T, Option<&str>, Option)) -> Self { + Self { + inner: Arc::new(Mutex::new(v.0)), + lock_callback_info: LockCallbackInfo::new( + LockType::Mutex, + v.1.map(|s| s.to_owned()), + v.2, + ), + } + } +} + +impl Clone for AtomicMutex { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + lock_callback_info: self.lock_callback_info.clone(), + } } } impl From> for AtomicMutex { #[inline] fn from(t: Mutex) -> Self { - Self(Arc::new(t)) + Self { + inner: Arc::new(t), + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, None, None), + } + } +} +impl From<(Mutex, Option, Option)> for AtomicMutex { + /// Create from an Mutex plus an optional name + /// and an optional callback function, which is called + /// when a lock event occurs. + #[inline] + fn from(v: (Mutex, Option, Option)) -> Self { + Self { + inner: Arc::new(v.0), + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, v.1, v.2), + } } } impl TryFrom> for Mutex { type Error = Arc>; - - #[inline] fn try_from(t: AtomicMutex) -> Result, Self::Error> { - Arc::>::try_unwrap(t.0) + Arc::>::try_unwrap(t.inner) } } impl From>> for AtomicMutex { #[inline] fn from(t: Arc>) -> Self { - Self(t) + Self { + inner: t, + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, None, None), + } + } +} +impl From<(Arc>, Option, Option)> for AtomicMutex { + /// Create from an `Arc>` plus an optional name and + /// an optional callback function, which is called when a lock + /// event occurs. + #[inline] + fn from(v: (Arc>, Option, Option)) -> Self { + Self { + inner: v.0, + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, v.1, v.2), + } } } impl From> for Arc> { #[inline] fn from(t: AtomicMutex) -> Self { - t.0 + t.inner } } @@ -67,17 +203,19 @@ impl AtomicMutex { /// year: u16, /// }; /// # tokio_test::block_on(async { - /// let atomic_car = AtomicMutex::from(Car{year: 2016}); - /// atomic_car.lock_guard_mut().await.year = 2022; + /// let atomic_car = AtomicMutex::from(Car{year: 2016}); + /// let year = atomic_car.lock_guard().await.year; /// # }) /// ``` - pub async fn lock_guard_mut(&self) -> MutexGuard { - self.0.lock().await + pub async fn lock_guard(&self) -> AtomicMutexGuard { + self.try_acquire_read_cb(); + let guard = self.inner.lock().await; + AtomicMutexGuard::new(guard, &self.lock_callback_info, LockAcquisition::Read) } - /// Immutably access the data of type `T` in a closure and return a result + /// Acquire write lock and return an `AtomicMutexGuard` /// - /// # Example + /// # Examples /// ``` /// # use neptune_core::util_types::sync::tokio::AtomicMutex; /// struct Car { @@ -85,21 +223,43 @@ impl AtomicMutex { /// }; /// # tokio_test::block_on(async { /// let atomic_car = AtomicMutex::from(Car{year: 2016}); - /// atomic_car.lock(|c| println!("year: {}", c.year)); - /// let year = atomic_car.lock(|c| c.year).await; + /// atomic_car.lock_guard_mut().await.year = 2022; /// # }) /// ``` + pub async fn lock_guard_mut(&self) -> AtomicMutexGuard { + self.try_acquire_write_cb(); + let guard = self.inner.lock().await; + AtomicMutexGuard::new(guard, &self.lock_callback_info, LockAcquisition::Write) + } + + /// Immutably access the data of type `T` in a closure and possibly return a result of type `R` + /// + /// # Examples + /// ``` + /// # use neptune_core::util_types::sync::tokio::AtomicMutex; + /// struct Car { + /// year: u16, + /// }; + /// # tokio_test::block_on(async { + /// let atomic_car = AtomicMutex::from(Car{year: 2016}); + /// atomic_car.lock(|c| println!("year: {}", c.year)).await; + /// let year = atomic_car.lock(|c| c.year).await; + /// }) + /// ``` pub async fn lock(&self, f: F) -> R where F: FnOnce(&T) -> R, { - let mut lock = self.0.lock().await; - f(&mut lock) + self.try_acquire_read_cb(); + let inner_guard = self.inner.lock().await; + let guard = + AtomicMutexGuard::new(inner_guard, &self.lock_callback_info, LockAcquisition::Read); + f(&guard) } - /// Mutably access the data of type `T` in a closure and return a result + /// Mutably access the data of type `T` in a closure and possibly return a result of type `R` /// - /// # Example + /// # Examples /// ``` /// # use neptune_core::util_types::sync::tokio::AtomicMutex; /// struct Car { @@ -109,14 +269,20 @@ impl AtomicMutex { /// let atomic_car = AtomicMutex::from(Car{year: 2016}); /// atomic_car.lock_mut(|mut c| c.year = 2022).await; /// let year = atomic_car.lock_mut(|mut c| {c.year = 2023; c.year}).await; - /// # }) + /// }) /// ``` pub async fn lock_mut(&self, f: F) -> R where F: FnOnce(&mut T) -> R, { - let mut lock = self.0.lock().await; - f(&mut lock) + self.try_acquire_write_cb(); + let inner_guard = self.inner.lock().await; + let mut guard = AtomicMutexGuard::new( + inner_guard, + &self.lock_callback_info, + LockAcquisition::Write, + ); + f(&mut guard) } /// Immutably access the data of type `T` in an async closure and possibly return a result of type `R` @@ -139,8 +305,11 @@ impl AtomicMutex { /// ``` // design background: https://stackoverflow.com/a/77657788/10087197 pub async fn lock_async(&self, f: impl FnOnce(&T) -> BoxFuture<'_, R>) -> R { - let lock = self.0.lock().await; - f(&lock).await + self.try_acquire_read_cb(); + let inner_guard = self.inner.lock().await; + let guard = + AtomicMutexGuard::new(inner_guard, &self.lock_callback_info, LockAcquisition::Read); + f(&guard).await } /// Mutably access the data of type `T` in an async closure and possibly return a result of type `R` @@ -163,16 +332,95 @@ impl AtomicMutex { /// ``` // design background: https://stackoverflow.com/a/77657788/10087197 pub async fn lock_mut_async(&self, f: impl FnOnce(&mut T) -> BoxFuture<'_, R>) -> R { - let mut lock = self.0.lock().await; - f(&mut lock).await + self.try_acquire_write_cb(); + let inner_guard = self.inner.lock().await; + let mut guard = AtomicMutexGuard::new( + inner_guard, + &self.lock_callback_info, + LockAcquisition::Write, + ); + f(&mut guard).await + } + + fn try_acquire_read_cb(&self) { + if let Some(cb) = self.lock_callback_info.lock_callback_fn { + cb(LockEvent::TryAcquire { + info: self.lock_callback_info.lock_info_owned.as_lock_info(), + acquisition: LockAcquisition::Read, + }); + } + } + + fn try_acquire_write_cb(&self) { + if let Some(cb) = self.lock_callback_info.lock_callback_fn { + cb(LockEvent::TryAcquire { + info: self.lock_callback_info.lock_info_owned.as_lock_info(), + acquisition: LockAcquisition::Write, + }); + } + } +} + +/// A wrapper for [MutexGuard](tokio::sync::MutexGuard) that +/// can optionally call a callback to notify when the +/// lock event occurs. +pub struct AtomicMutexGuard<'a, T> { + guard: MutexGuard<'a, T>, + lock_callback_info: &'a LockCallbackInfo, + acquisition: LockAcquisition, +} + +impl<'a, T> AtomicMutexGuard<'a, T> { + fn new( + guard: MutexGuard<'a, T>, + lock_callback_info: &'a LockCallbackInfo, + acquisition: LockAcquisition, + ) -> Self { + if let Some(cb) = lock_callback_info.lock_callback_fn { + cb(LockEvent::Acquire { + info: lock_callback_info.lock_info_owned.as_lock_info(), + acquisition, + }); + } + Self { + guard, + lock_callback_info, + acquisition, + } + } +} + +impl<'a, T> Drop for AtomicMutexGuard<'a, T> { + fn drop(&mut self) { + let lock_callback_info = self.lock_callback_info; + if let Some(cb) = lock_callback_info.lock_callback_fn { + cb(LockEvent::Release { + info: lock_callback_info.lock_info_owned.as_lock_info(), + acquisition: self.acquisition, + }); + } + } +} + +impl<'a, T> Deref for AtomicMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.guard + } +} + +impl<'a, T> DerefMut for AtomicMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.guard } } /* - note: commenting until async-traits are supported in stable rust. - It is supposed to be available in 1.75.0 on Dec 28, 2023. - See: https://releases.rs/docs/1.75.0/ +note: commenting until async-traits are supported in stable rust. + It is supposed to be available in 1.75.0 on Dec 28, 2023. + See: https://releases.rs/docs/1.75.0/ impl Atomic for AtomicMutex { + #[inline] async fn lock(&self, f: F) -> R where F: FnOnce(&T) -> R, @@ -180,6 +428,7 @@ impl Atomic for AtomicMutex { AtomicMutex:::.lock(self, f).await } + #[inline] async fn lock_mut(&self, f: F) -> R where F: FnOnce(&mut T) -> R, @@ -187,7 +436,7 @@ impl Atomic for AtomicMutex { AtomicMutex:::.lock_mut(self, f).await } } - */ +*/ #[cfg(test)] mod tests { @@ -195,13 +444,13 @@ mod tests { use futures::future::FutureExt; #[tokio::test] - // Verify (compile-time) that AtomicMutex:.lock() and :.lock_mut() accept mutable values. (FnOnce) + // Verify (compile-time) that AtomicMutex:.lock() and :.lock_mut() accept mutable values. (FnMut) async fn mutable_assignment() { let name = "Jim".to_string(); let atomic_name = AtomicMutex::from(name); let mut new_name: String = Default::default(); - atomic_name.lock(|n| new_name = n.to_string()).await; + atomic_name.lock_mut(|n| *n = "Sally".to_string()).await; atomic_name.lock_mut(|n| new_name = n.to_string()).await; } diff --git a/src/util_types/sync/tokio/atomic_rw.rs b/src/util_types/sync/tokio/atomic_rw.rs index 06580b560..ba26b9a49 100644 --- a/src/util_types/sync/tokio/atomic_rw.rs +++ b/src/util_types/sync/tokio/atomic_rw.rs @@ -1,10 +1,12 @@ +use super::{LockAcquisition, LockCallbackFn, LockCallbackInfo, LockEvent, LockType}; use futures::future::BoxFuture; +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; /// An `Arc>` wrapper to make data thread-safe and easy to work with. /// -/// # Example +/// # Examples /// ``` /// # use neptune_core::util_types::sync::tokio::AtomicRw; /// struct Car { @@ -16,53 +18,183 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; /// atomic_car.lock_mut(|mut c| {c.year = 2023}).await; /// # }) /// ``` -#[derive(Debug, Default)] -pub struct AtomicRw(Arc>); +/// +/// It is also possible to provide a name and callback fn +/// during instantiation. In this way, the application +/// can easily trace lock acquisitions. +/// +/// # Examples +/// ``` +/// # use neptune_core::util_types::sync::tokio::{AtomicRw, LockEvent, LockCallbackFn}; +/// struct Car { +/// year: u16, +/// }; +/// +/// pub fn log_lock_event(lock_event: LockEvent) { +/// let (event, info, acquisition) = +/// match lock_event { +/// LockEvent::TryAcquire{info, acquisition} => ("TryAcquire", info, acquisition), +/// LockEvent::Acquire{info, acquisition} => ("Acquire", info, acquisition), +/// LockEvent::Release{info, acquisition} => ("Release", info, acquisition), +/// }; +/// +/// println!( +/// "{} lock `{}` of type `{}` for `{}` by\n\t|-- thread {}, `{:?}`", +/// event, +/// info.name().unwrap_or("?"), +/// info.lock_type(), +/// acquisition, +/// std::thread::current().name().unwrap_or("?"), +/// std::thread::current().id(), +/// ); +/// } +/// const LOG_LOCK_EVENT_CB: LockCallbackFn = log_lock_event; +/// +/// # tokio_test::block_on(async { +/// let atomic_car = AtomicRw::::from((Car{year: 2016}, Some("car"), Some(LOG_LOCK_EVENT_CB))); +/// atomic_car.lock(|c| {println!("year: {}", c.year)}).await; +/// atomic_car.lock_mut(|mut c| {c.year = 2023}).await; +/// # }) +/// ``` +/// +/// results in: +/// ```text +/// TryAcquire lock `car` of type `RwLock` for `Read` by +/// |-- thread main, `ThreadId(1)` +/// Acquire lock `car` of type `RwLock` for `Read` by +/// |-- thread main, `ThreadId(1)` +/// year: 2016 +/// Release lock `car` of type `RwLock` for `Read` by +/// |-- thread main, `ThreadId(1)` +/// TryAcquire lock `car` of type `RwLock` for `Write` by +/// |-- thread main, `ThreadId(1)` +/// Acquire lock `car` of type `RwLock` for `Write` by +/// |-- thread main, `ThreadId(1)` +/// Release lock `car` of type `RwLock` for `Write` by +/// |-- thread main, `ThreadId(1)` +/// ``` +#[derive(Debug)] +pub struct AtomicRw { + inner: Arc>, + lock_callback_info: LockCallbackInfo, +} + +impl Default for AtomicRw { + fn default() -> Self { + Self { + inner: Default::default(), + lock_callback_info: LockCallbackInfo::new(LockType::RwLock, None, None), + } + } +} + impl From for AtomicRw { #[inline] fn from(t: T) -> Self { - Self(Arc::new(RwLock::new(t))) + Self { + inner: Arc::new(RwLock::new(t)), + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, None, None), + } + } +} +impl From<(T, Option, Option)> for AtomicRw { + /// Create from an optional name and an optional callback function, which + /// is called when a lock event occurs. + #[inline] + fn from(v: (T, Option, Option)) -> Self { + Self { + inner: Arc::new(RwLock::new(v.0)), + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, v.1, v.2), + } + } +} +impl From<(T, Option<&str>, Option)> for AtomicRw { + /// Create from a name ref and an optional callback function, which + /// is called when a lock event occurs. + #[inline] + fn from(v: (T, Option<&str>, Option)) -> Self { + Self { + inner: Arc::new(RwLock::new(v.0)), + lock_callback_info: LockCallbackInfo::new( + LockType::Mutex, + v.1.map(|s| s.to_owned()), + v.2, + ), + } } } impl Clone for AtomicRw { fn clone(&self) -> Self { - Self(self.0.clone()) + Self { + inner: self.inner.clone(), + lock_callback_info: self.lock_callback_info.clone(), + } } } impl From> for AtomicRw { #[inline] fn from(t: RwLock) -> Self { - Self(Arc::new(t)) + Self { + inner: Arc::new(t), + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, None, None), + } + } +} +impl From<(RwLock, Option, Option)> for AtomicRw { + /// Create from an RwLock plus an optional name + /// and an optional callback function, which is called + /// when a lock event occurs. + #[inline] + fn from(v: (RwLock, Option, Option)) -> Self { + Self { + inner: Arc::new(v.0), + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, v.1, v.2), + } } } impl TryFrom> for RwLock { type Error = Arc>; fn try_from(t: AtomicRw) -> Result, Self::Error> { - Arc::>::try_unwrap(t.0) + Arc::>::try_unwrap(t.inner) } } impl From>> for AtomicRw { #[inline] fn from(t: Arc>) -> Self { - Self(t) + Self { + inner: t, + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, None, None), + } + } +} +impl From<(Arc>, Option, Option)> for AtomicRw { + /// Create from an `Arc>` plus an optional name and + /// an optional callback function, which is called when a lock + /// event occurs. + #[inline] + fn from(v: (Arc>, Option, Option)) -> Self { + Self { + inner: v.0, + lock_callback_info: LockCallbackInfo::new(LockType::Mutex, v.1, v.2), + } } } impl From> for Arc> { #[inline] fn from(t: AtomicRw) -> Self { - t.0 + t.inner } } // note: we impl the Atomic trait methods here also so they // can be used without caller having to use the trait. impl AtomicRw { - /// Acquire read lock and return an `RwLockReadGuard` + /// Acquire read lock and return an `AtomicRwReadGuard` /// /// # Examples /// ``` @@ -75,11 +207,13 @@ impl AtomicRw { /// let year = atomic_car.lock_guard().await.year; /// # }) /// ``` - pub async fn lock_guard(&self) -> RwLockReadGuard { - self.0.read().await + pub async fn lock_guard(&self) -> AtomicRwReadGuard { + self.try_acquire_read_cb(); + let guard = self.inner.read().await; + AtomicRwReadGuard::new(guard, &self.lock_callback_info) } - /// Acquire write lock and return an `RwLockWriteGuard` + /// Acquire write lock and return an `AtomicRwWriteGuard` /// /// # Examples /// ``` @@ -92,8 +226,10 @@ impl AtomicRw { /// atomic_car.lock_guard_mut().await.year = 2022; /// # }) /// ``` - pub async fn lock_guard_mut(&self) -> RwLockWriteGuard { - self.0.write().await + pub async fn lock_guard_mut(&self) -> AtomicRwWriteGuard { + self.try_acquire_write_cb(); + let guard = self.inner.write().await; + AtomicRwWriteGuard::new(guard, &self.lock_callback_info) } /// Immutably access the data of type `T` in a closure and possibly return a result of type `R` @@ -114,8 +250,10 @@ impl AtomicRw { where F: FnOnce(&T) -> R, { - let lock = self.0.read().await; - f(&lock) + self.try_acquire_read_cb(); + let inner_guard = self.inner.read().await; + let guard = AtomicRwReadGuard::new(inner_guard, &self.lock_callback_info); + f(&guard) } /// Mutably access the data of type `T` in a closure and possibly return a result of type `R` @@ -136,8 +274,10 @@ impl AtomicRw { where F: FnOnce(&mut T) -> R, { - let mut lock = self.0.write().await; - f(&mut lock) + self.try_acquire_write_cb(); + let inner_guard = self.inner.write().await; + let mut guard = AtomicRwWriteGuard::new(inner_guard, &self.lock_callback_info); + f(&mut guard) } /// Immutably access the data of type `T` in an async closure and possibly return a result of type `R` @@ -160,8 +300,10 @@ impl AtomicRw { /// ``` // design background: https://stackoverflow.com/a/77657788/10087197 pub async fn lock_async(&self, f: impl FnOnce(&T) -> BoxFuture<'_, R>) -> R { - let lock = self.0.read().await; - f(&lock).await + self.try_acquire_read_cb(); + let inner_guard = self.inner.read().await; + let guard = AtomicRwReadGuard::new(inner_guard, &self.lock_callback_info); + f(&guard).await } /// Mutably access the data of type `T` in an async closure and possibly return a result of type `R` @@ -184,8 +326,118 @@ impl AtomicRw { /// ``` // design background: https://stackoverflow.com/a/77657788/10087197 pub async fn lock_mut_async(&self, f: impl FnOnce(&mut T) -> BoxFuture<'_, R>) -> R { - let mut lock = self.0.write().await; - f(&mut lock).await + self.try_acquire_write_cb(); + let inner_guard = self.inner.write().await; + let mut guard = AtomicRwWriteGuard::new(inner_guard, &self.lock_callback_info); + f(&mut guard).await + } + + fn try_acquire_read_cb(&self) { + if let Some(cb) = self.lock_callback_info.lock_callback_fn { + cb(LockEvent::TryAcquire { + info: self.lock_callback_info.lock_info_owned.as_lock_info(), + acquisition: LockAcquisition::Read, + }); + } + } + + fn try_acquire_write_cb(&self) { + if let Some(cb) = self.lock_callback_info.lock_callback_fn { + cb(LockEvent::TryAcquire { + info: self.lock_callback_info.lock_info_owned.as_lock_info(), + acquisition: LockAcquisition::Write, + }); + } + } +} + +/// A wrapper for [RwLockReadGuard](tokio::sync::RwLockReadGuard) that +/// can optionally call a callback to notify when the +/// lock event occurs. +pub struct AtomicRwReadGuard<'a, T> { + guard: RwLockReadGuard<'a, T>, + lock_callback_info: &'a LockCallbackInfo, +} + +impl<'a, T> AtomicRwReadGuard<'a, T> { + fn new(guard: RwLockReadGuard<'a, T>, lock_callback_info: &'a LockCallbackInfo) -> Self { + if let Some(cb) = lock_callback_info.lock_callback_fn { + cb(LockEvent::Acquire { + info: lock_callback_info.lock_info_owned.as_lock_info(), + acquisition: LockAcquisition::Read, + }); + } + Self { + guard, + lock_callback_info, + } + } +} + +impl<'a, T> Drop for AtomicRwReadGuard<'a, T> { + fn drop(&mut self) { + let lock_callback_info = self.lock_callback_info; + if let Some(cb) = lock_callback_info.lock_callback_fn { + cb(LockEvent::Release { + info: lock_callback_info.lock_info_owned.as_lock_info(), + acquisition: LockAcquisition::Read, + }); + } + } +} + +impl<'a, T> Deref for AtomicRwReadGuard<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.guard + } +} + +/// A wrapper for [RwLockWriteGuard](tokio::sync::RwLockWriteGuard) that +/// can optionally call a callback to notify when the +/// lock event occurs. +pub struct AtomicRwWriteGuard<'a, T> { + guard: RwLockWriteGuard<'a, T>, + lock_callback_info: &'a LockCallbackInfo, +} + +impl<'a, T> AtomicRwWriteGuard<'a, T> { + fn new(guard: RwLockWriteGuard<'a, T>, lock_callback_info: &'a LockCallbackInfo) -> Self { + if let Some(cb) = lock_callback_info.lock_callback_fn { + cb(LockEvent::Acquire { + info: lock_callback_info.lock_info_owned.as_lock_info(), + acquisition: LockAcquisition::Write, + }); + } + Self { + guard, + lock_callback_info, + } + } +} + +impl<'a, T> Drop for AtomicRwWriteGuard<'a, T> { + fn drop(&mut self) { + let lock_callback_info = self.lock_callback_info; + if let Some(cb) = lock_callback_info.lock_callback_fn { + cb(LockEvent::Release { + info: lock_callback_info.lock_info_owned.as_lock_info(), + acquisition: LockAcquisition::Write, + }); + } + } +} + +impl<'a, T> Deref for AtomicRwWriteGuard<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.guard + } +} + +impl<'a, T> DerefMut for AtomicRwWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.guard } } diff --git a/src/util_types/sync/tokio/mod.rs b/src/util_types/sync/tokio/mod.rs index 5f5e7320a..deb716ee8 100644 --- a/src/util_types/sync/tokio/mod.rs +++ b/src/util_types/sync/tokio/mod.rs @@ -8,7 +8,11 @@ mod atomic_mutex; mod atomic_rw; +mod shared; pub mod traits; pub use atomic_mutex::AtomicMutex; pub use atomic_rw::AtomicRw; +pub use shared::{LockAcquisition, LockCallbackFn, LockEvent, LockInfo, LockType}; + +use shared::LockCallbackInfo; diff --git a/src/util_types/sync/tokio/shared.rs b/src/util_types/sync/tokio/shared.rs new file mode 100644 index 000000000..26e27dcd7 --- /dev/null +++ b/src/util_types/sync/tokio/shared.rs @@ -0,0 +1,108 @@ +/// Indicates the lock's underlying type +#[derive(Debug, Clone, Copy)] +pub enum LockType { + Mutex, + RwLock, +} + +impl std::fmt::Display for LockType { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Mutex => write!(f, "Mutex"), + Self::RwLock => write!(f, "RwLock"), + } + } +} + +/// Indicates how a lock was acquired. +#[derive(Debug, Clone, Copy)] +pub enum LockAcquisition { + Read, + Write, +} + +impl std::fmt::Display for LockAcquisition { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Read => write!(f, "Read"), + Self::Write => write!(f, "Write"), + } + } +} + +#[derive(Debug, Clone)] +pub(super) struct LockInfoOwned { + pub name: Option, + pub lock_type: LockType, +} +impl LockInfoOwned { + #[inline] + pub fn as_lock_info(&self) -> LockInfo<'_> { + LockInfo { + name: self.name.as_deref(), + lock_type: self.lock_type, + } + } +} + +/// Contains metadata about a lock +#[derive(Debug, Clone)] +pub struct LockInfo<'a> { + name: Option<&'a str>, + lock_type: LockType, +} +impl<'a> LockInfo<'a> { + /// get the lock's name + #[inline] + pub fn name(&self) -> Option<&str> { + self.name + } + + /// get the lock's type + #[inline] + pub fn lock_type(&self) -> LockType { + self.lock_type + } +} + +#[derive(Debug, Clone)] +pub(super) struct LockCallbackInfo { + pub lock_info_owned: LockInfoOwned, + pub lock_callback_fn: Option, +} +impl LockCallbackInfo { + #[inline] + pub fn new( + lock_type: LockType, + name: Option, + lock_callback_fn: Option, + ) -> Self { + Self { + lock_info_owned: LockInfoOwned { name, lock_type }, + lock_callback_fn, + } + } +} + +/// Represents an event (acquire/release) for a lock +#[derive(Debug, Clone)] +pub enum LockEvent<'a> { + TryAcquire { + info: LockInfo<'a>, + acquisition: LockAcquisition, + }, + Acquire { + info: LockInfo<'a>, + acquisition: LockAcquisition, + }, + Release { + info: LockInfo<'a>, + acquisition: LockAcquisition, + }, +} + +/// A callback fn for receiving [LockEvent] event +/// each time a lock is acquired or released. +pub type LockCallbackFn = fn(lock_event: LockEvent);