Skip to content

Commit

Permalink
re-impl Sync for RingSender and RingReceiver
Browse files Browse the repository at this point in the history
This reverts commit 972691a and closes #69.
  • Loading branch information
brunocodutra committed Dec 15, 2021
1 parent 55e6a2c commit 0807bfd
Showing 1 changed file with 33 additions and 20 deletions.
53 changes: 33 additions & 20 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct RingSender<T> {
}

unsafe impl<T: Send> Send for RingSender<T> {}
unsafe impl<T: Send> Sync for RingSender<T> {}

impl<T> RingSender<T> {
fn new(handle: ManuallyDrop<ControlBlockRef<T>>) -> Self {
Expand Down Expand Up @@ -108,6 +109,7 @@ pub struct RingReceiver<T> {
}

unsafe impl<T: Send> Send for RingReceiver<T> {}
unsafe impl<T: Send> Sync for RingReceiver<T> {}

impl<T> RingReceiver<T> {
fn new(handle: ManuallyDrop<ControlBlockRef<T>>) -> Self {
Expand Down Expand Up @@ -340,20 +342,31 @@ mod tests {
assert!(!s.handle.connected.load(Ordering::Relaxed));
}

#[derive(Clone)]
enum Endpoint<T> {
Sender(RingSender<T>),
Receiver(RingReceiver<T>),
}

#[proptest]
fn endpoints_are_safe_to_send_across_threads(
#[strategy(1..=100usize)] m: usize,
#[strategy(1..=100usize)] n: usize,
) {
#[derive(Clone)]
enum Endpoint<T> {
Sender(RingSender<T>),
Receiver(RingReceiver<T>),
}
let (s, r) = ring_channel::<()>(NonZeroUsize::new(1).unwrap());
let ls = repeatn(s, m).map(Endpoint::Sender);
let rs = repeatn(r, n).map(Endpoint::Receiver);
ls.chain(rs).for_each(drop);
}

#[proptest]
fn endpoints_are_safe_to_share_across_threads(
#[strategy(1..=100usize)] m: usize,
#[strategy(1..=100usize)] n: usize,
) {
let (s, r) = ring_channel::<()>(NonZeroUsize::new(1).unwrap());
let ls = repeatn(Endpoint::Sender(s), m);
let rs = repeatn(Endpoint::Receiver(r), n);
let ls = repeatn(&s, m).cloned().map(Endpoint::Sender);
let rs = repeatn(&r, n).cloned().map(Endpoint::Receiver);
ls.chain(rs).for_each(drop);
}

Expand Down Expand Up @@ -396,7 +409,7 @@ mod tests {
}

assert_eq!(
iter::from_fn(move || r.handle.buffer.pop()).collect::<Vec<_>>(),
iter::from_fn(|| r.handle.buffer.pop()).collect::<Vec<_>>(),
msgs.into_iter().skip(overwritten).collect::<Vec<_>>()
);
}
Expand Down Expand Up @@ -458,7 +471,7 @@ mod tests {
#[strategy(1..=100usize)] n: usize,
) {
let (_, r) = ring_channel::<()>(NonZeroUsize::new(capacity).unwrap());
repeatn(r, n).for_each(move |mut r| {
repeatn(r, n).for_each(|mut r| {
assert_eq!(r.recv(), Err(RecvError::Disconnected));
});
}
Expand Down Expand Up @@ -529,7 +542,7 @@ mod tests {
#[strategy(1..=100usize)] n: usize,
) {
let (_, r) = ring_channel::<()>(NonZeroUsize::new(capacity).unwrap());
repeatn(r, n).for_each(move |mut r| {
repeatn(r, n).for_each(|mut r| {
assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
});
}
Expand All @@ -551,7 +564,7 @@ mod tests {
drop(tx); // hang-up

assert_eq!(
iter::from_fn(move || rx.try_recv().ok()).collect::<Vec<_>>(),
iter::from_fn(|| rx.try_recv().ok()).collect::<Vec<_>>(),
msgs.into_iter().skip(overwritten).collect::<Vec<_>>()
);
}
Expand Down Expand Up @@ -582,13 +595,13 @@ mod tests {
fn stream_wakes_on_disconnect(#[strategy(1..=100usize)] n: usize) {
let (tx, rx) = ring_channel::<()>(NonZeroUsize::new(1).unwrap());

rayon::scope(move |s| {
rayon::scope(|s| {
for _ in 0..n {
let rx = rx.clone();
s.spawn(move |_| assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![]));
s.spawn(|_| assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![]));
}

s.spawn(move |_| drop(tx));
s.spawn(|_| drop(tx));
});
}

Expand All @@ -597,7 +610,7 @@ mod tests {
fn stream_wakes_on_send(#[strategy(1..=100usize)] n: usize) {
let (tx, rx) = ring_channel(NonZeroUsize::new(n).unwrap());

rayon::scope(move |s| {
rayon::scope(|s| {
for _ in 0..n {
let tx = tx.clone();
let mut rx = rx.clone();
Expand All @@ -619,7 +632,7 @@ mod tests {
fn stream_wakes_on_send_all(#[strategy(1..=100usize)] n: usize) {
let (mut tx, rx) = ring_channel(NonZeroUsize::new(n).unwrap());

rayon::scope(move |s| {
rayon::scope(|s| {
for _ in 0..n {
let tx = tx.clone();
let mut rx = rx.clone();
Expand All @@ -639,13 +652,13 @@ mod tests {
fn recv_wakes_on_disconnect(#[strategy(1..=100usize)] n: usize) {
let (tx, rx) = ring_channel::<()>(NonZeroUsize::new(1).unwrap());

rayon::scope(move |s| {
rayon::scope(|s| {
for _ in 0..n {
let mut rx = rx.clone();
s.spawn(move |_| assert_eq!(rx.recv(), Err(RecvError::Disconnected)));
}

s.spawn(move |_| drop(tx));
s.spawn(|_| drop(tx));
});
}

Expand All @@ -654,7 +667,7 @@ mod tests {
fn recv_wakes_on_send(#[strategy(1..=100usize)] n: usize) {
let (tx, rx) = ring_channel(NonZeroUsize::new(n).unwrap());

rayon::scope(move |s| {
rayon::scope(|s| {
for _ in 0..n {
let tx = tx.clone();
let mut rx = rx.clone();
Expand All @@ -676,7 +689,7 @@ mod tests {
fn recv_wakes_on_send_all(#[strategy(1..=100usize)] n: usize) {
let (mut tx, rx) = ring_channel(NonZeroUsize::new(n).unwrap());

rayon::scope(move |s| {
rayon::scope(|s| {
for _ in 0..n {
let tx = tx.clone();
let mut rx = rx.clone();
Expand Down

0 comments on commit 0807bfd

Please sign in to comment.