Skip to content

Commit

Permalink
Fix concurrency issue with oneshot_broadcast (sigp#3596)
Browse files Browse the repository at this point in the history
## Issue Addressed

NA

## Proposed Changes

Fixes an issue found during testing with sigp#3595.

## Additional Info

NA
  • Loading branch information
paulhauner authored and Woodpile37 committed Jan 6, 2024
1 parent 44228c4 commit ea1e3ff
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions common/oneshot_broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//!
//! This implementation may not be blazingly fast but it should be simple enough to be reliable.
use parking_lot::{Condvar, Mutex};
use std::sync::{Arc, Weak};
use std::sync::Arc;

#[derive(Copy, Clone, Debug, PartialEq)]
pub enum Error {
Expand All @@ -13,9 +13,10 @@ pub enum Error {
enum Future<T> {
/// The future is ready and the item may be consumed.
Ready(T),
/// Future is not ready. The contained `Weak` is a reference to the `Sender` that may be used to
/// detect when the channel is disconnected.
NotReady(Weak<()>),
/// Future is not ready.
NotReady,
/// The sender has been dropped without sending a message.
SenderDropped,
}

struct MutexCondvar<T> {
Expand All @@ -24,7 +25,7 @@ struct MutexCondvar<T> {
}

/// The sending pair of the `oneshot` channel.
pub struct Sender<T>(Arc<MutexCondvar<T>>, Option<Arc<()>>);
pub struct Sender<T>(Arc<MutexCondvar<T>>);

impl<T> Sender<T> {
/// Send a message, consuming `self` and delivering the message to *all* receivers.
Expand All @@ -35,11 +36,15 @@ impl<T> Sender<T> {
}

impl<T> Drop for Sender<T> {
/// Drop the `Arc` and notify all receivers so they can't upgrade their `Weak`s and know that
/// the sender has been dropped.
/// Flag the sender as dropped and notify all receivers.
fn drop(&mut self) {
self.1 = None;
let mut lock = self.0.mutex.lock();
if !matches!(*lock, Future::Ready(_)) {
*lock = Future::SenderDropped
}
self.0.condvar.notify_all();
// The lock must be held whilst the condvar is notified.
drop(lock);
}
}

Expand All @@ -59,8 +64,8 @@ impl<T: Clone> Receiver<T> {
pub fn try_recv(&self) -> Result<Option<T>, Error> {
match &*self.0.mutex.lock() {
Future::Ready(item) => Ok(Some(item.clone())),
Future::NotReady(weak) if weak.upgrade().is_some() => Ok(None),
Future::NotReady(_) => Err(Error::SenderDropped),
Future::NotReady => Ok(None),
Future::SenderDropped => Err(Error::SenderDropped),
}
}

Expand All @@ -71,10 +76,8 @@ impl<T: Clone> Receiver<T> {
loop {
match &*lock {
Future::Ready(item) => return Ok(item.clone()),
Future::NotReady(weak) if weak.upgrade().is_some() => {
self.0.condvar.wait(&mut lock)
}
Future::NotReady(_) => return Err(Error::SenderDropped),
Future::NotReady => self.0.condvar.wait(&mut lock),
Future::SenderDropped => return Err(Error::SenderDropped),
}
}
}
Expand All @@ -84,13 +87,12 @@ impl<T: Clone> Receiver<T> {
///
/// The sender may send *only one* message which will be received by *all* receivers.
pub fn oneshot<T: Clone>() -> (Sender<T>, Receiver<T>) {
let sender_ref = Arc::new(());
let mutex_condvar = Arc::new(MutexCondvar {
mutex: Mutex::new(Future::NotReady(Arc::downgrade(&sender_ref))),
mutex: Mutex::new(Future::NotReady),
condvar: Condvar::new(),
});
let receiver = Receiver(mutex_condvar.clone());
let sender = Sender(mutex_condvar, Some(sender_ref));
let sender = Sender(mutex_condvar);
(sender, receiver)
}

Expand Down

0 comments on commit ea1e3ff

Please sign in to comment.