Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ptr-int transmute in std::sync::mpsc #95621

Merged
merged 1 commit into from
Apr 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions library/std/src/sync/mpsc/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Generic support for building blocking abstractions.

use crate::mem;
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::Arc;
use crate::thread::{self, Thread};
Expand Down Expand Up @@ -47,18 +46,18 @@ impl SignalToken {
wake
}

/// Converts to an unsafe usize value. Useful for storing in a pipe's state
/// Converts to an unsafe raw pointer. Useful for storing in a pipe's state
/// flag.
#[inline]
pub unsafe fn cast_to_usize(self) -> usize {
mem::transmute(self.inner)
pub unsafe fn to_raw(self) -> *mut u8 {
Arc::into_raw(self.inner) as *mut u8
}

/// Converts from an unsafe usize value. Useful for retrieving a pipe's state
/// Converts from an unsafe raw pointer. Useful for retrieving a pipe's state
/// flag.
#[inline]
pub unsafe fn cast_from_usize(signal_ptr: usize) -> SignalToken {
SignalToken { inner: mem::transmute(signal_ptr) }
pub unsafe fn from_raw(signal_ptr: *mut u8) -> SignalToken {
SignalToken { inner: Arc::from_raw(signal_ptr as *mut Inner) }
}
}

Expand Down
24 changes: 12 additions & 12 deletions library/std/src/sync/mpsc/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ pub use self::UpgradeResult::*;

use crate::cell::UnsafeCell;
use crate::ptr;
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::atomic::{AtomicPtr, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::Receiver;
use crate::time::Instant;

// Various states you can find a port in.
const EMPTY: usize = 0; // initial state: no data, no blocked receiver
const DATA: usize = 1; // data ready for receiver to take
const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
const EMPTY: *mut u8 = ptr::invalid_mut::<u8>(0); // initial state: no data, no blocked receiver
const DATA: *mut u8 = ptr::invalid_mut::<u8>(1); // data ready for receiver to take
const DISCONNECTED: *mut u8 = ptr::invalid_mut::<u8>(2); // channel is disconnected OR upgraded
// Any other value represents a pointer to a SignalToken value. The
// protocol ensures that when the state moves *to* a pointer,
// ownership of the token is given to the packet, and when the state
Expand All @@ -44,7 +44,7 @@ const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded

pub struct Packet<T> {
// Internal state of the chan/port pair (stores the blocked thread as well)
state: AtomicUsize,
state: AtomicPtr<u8>,
// One-shot data slot location
data: UnsafeCell<Option<T>>,
// when used for the second time, a oneshot channel must be upgraded, and
Expand Down Expand Up @@ -75,7 +75,7 @@ impl<T> Packet<T> {
Packet {
data: UnsafeCell::new(None),
upgrade: UnsafeCell::new(NothingSent),
state: AtomicUsize::new(EMPTY),
state: AtomicPtr::new(EMPTY),
}
}

Expand Down Expand Up @@ -108,7 +108,7 @@ impl<T> Packet<T> {
// There is a thread waiting on the other end. We leave the 'DATA'
// state inside so it'll pick it up on the other end.
ptr => {
SignalToken::cast_from_usize(ptr).signal();
SignalToken::from_raw(ptr).signal();
Ok(())
}
}
Expand All @@ -126,7 +126,7 @@ impl<T> Packet<T> {
// like we're not empty, then immediately go through to `try_recv`.
if self.state.load(Ordering::SeqCst) == EMPTY {
let (wait_token, signal_token) = blocking::tokens();
let ptr = unsafe { signal_token.cast_to_usize() };
let ptr = unsafe { signal_token.to_raw() };

// race with senders to enter the blocking state
if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
Expand All @@ -142,7 +142,7 @@ impl<T> Packet<T> {
}
} else {
// drop the signal token, since we never blocked
drop(unsafe { SignalToken::cast_from_usize(ptr) });
drop(unsafe { SignalToken::from_raw(ptr) });
}
}

Expand Down Expand Up @@ -218,7 +218,7 @@ impl<T> Packet<T> {
}

// If someone's waiting, we gotta wake them up
ptr => UpWoke(SignalToken::cast_from_usize(ptr)),
ptr => UpWoke(SignalToken::from_raw(ptr)),
}
}
}
Expand All @@ -229,7 +229,7 @@ impl<T> Packet<T> {

// If someone's waiting, we gotta wake them up
ptr => unsafe {
SignalToken::cast_from_usize(ptr).signal();
SignalToken::from_raw(ptr).signal();
},
}
}
Expand Down Expand Up @@ -301,7 +301,7 @@ impl<T> Packet<T> {

// We woke ourselves up from select.
ptr => unsafe {
drop(SignalToken::cast_from_usize(ptr));
drop(SignalToken::from_raw(ptr));
Ok(false)
},
}
Expand Down
31 changes: 16 additions & 15 deletions library/std/src/sync/mpsc/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use core::intrinsics::abort;

use crate::cell::UnsafeCell;
use crate::ptr;
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::mpsc_queue as mpsc;
use crate::sync::{Mutex, MutexGuard};
Expand All @@ -29,12 +29,13 @@ const MAX_REFCOUNT: usize = (isize::MAX) as usize;
const MAX_STEALS: isize = 5;
#[cfg(not(test))]
const MAX_STEALS: isize = 1 << 20;
const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver

pub struct Packet<T> {
queue: mpsc::Queue<T>,
cnt: AtomicIsize, // How many items are on this channel
steals: UnsafeCell<isize>, // How many times has a port received without blocking?
to_wake: AtomicUsize, // SignalToken for wake up
to_wake: AtomicPtr<u8>, // SignalToken for wake up

// The number of channels which are currently using this packet.
channels: AtomicUsize,
Expand Down Expand Up @@ -68,7 +69,7 @@ impl<T> Packet<T> {
queue: mpsc::Queue::new(),
cnt: AtomicIsize::new(0),
steals: UnsafeCell::new(0),
to_wake: AtomicUsize::new(0),
to_wake: AtomicPtr::new(EMPTY),
channels: AtomicUsize::new(2),
port_dropped: AtomicBool::new(false),
sender_drain: AtomicIsize::new(0),
Expand All @@ -93,8 +94,8 @@ impl<T> Packet<T> {
pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
if let Some(token) = token {
assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst);
self.cnt.store(-1, Ordering::SeqCst);

// This store is a little sketchy. What's happening here is that
Expand Down Expand Up @@ -250,10 +251,10 @@ impl<T> Packet<T> {
unsafe {
assert_eq!(
self.to_wake.load(Ordering::SeqCst),
0,
EMPTY,
"This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
);
let ptr = token.cast_to_usize();
let ptr = token.to_raw();
self.to_wake.store(ptr, Ordering::SeqCst);

let steals = ptr::replace(self.steals.get(), 0);
Expand All @@ -272,8 +273,8 @@ impl<T> Packet<T> {
}
}

self.to_wake.store(0, Ordering::SeqCst);
drop(SignalToken::cast_from_usize(ptr));
self.to_wake.store(EMPTY, Ordering::SeqCst);
drop(SignalToken::from_raw(ptr));
Abort
}
}
Expand Down Expand Up @@ -415,9 +416,9 @@ impl<T> Packet<T> {
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&self) -> SignalToken {
let ptr = self.to_wake.load(Ordering::SeqCst);
self.to_wake.store(0, Ordering::SeqCst);
assert!(ptr != 0);
unsafe { SignalToken::cast_from_usize(ptr) }
self.to_wake.store(EMPTY, Ordering::SeqCst);
assert!(ptr != EMPTY);
unsafe { SignalToken::from_raw(ptr) }
}

////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -462,15 +463,15 @@ impl<T> Packet<T> {
let prev = self.bump(steals + 1);

if prev == DISCONNECTED {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
true
} else {
let cur = prev + steals + 1;
assert!(cur >= 0);
if prev < 0 {
drop(self.take_to_wake());
} else {
while self.to_wake.load(Ordering::SeqCst) != 0 {
while self.to_wake.load(Ordering::SeqCst) != EMPTY {
thread::yield_now();
}
}
Expand All @@ -494,7 +495,7 @@ impl<T> Drop for Packet<T> {
// `to_wake`, so this assert cannot be removed with also removing
// the `to_wake` assert.
assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
assert_eq!(self.channels.load(Ordering::SeqCst), 0);
}
}
31 changes: 16 additions & 15 deletions library/std/src/sync/mpsc/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::ptr;
use crate::thread;
use crate::time::Instant;

use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, Ordering};
use crate::sync::mpsc::blocking::{self, SignalToken};
use crate::sync::mpsc::spsc_queue as spsc;
use crate::sync::mpsc::Receiver;
Expand All @@ -27,15 +27,16 @@ const DISCONNECTED: isize = isize::MIN;
const MAX_STEALS: isize = 5;
#[cfg(not(test))]
const MAX_STEALS: isize = 1 << 20;
const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver

pub struct Packet<T> {
// internal queue for all messages
queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
}

struct ProducerAddition {
cnt: AtomicIsize, // How many items are on this channel
to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
cnt: AtomicIsize, // How many items are on this channel
to_wake: AtomicPtr<u8>, // SignalToken for the blocked thread to wake up

port_dropped: AtomicBool, // flag if the channel has been destroyed.
}
Expand Down Expand Up @@ -71,7 +72,7 @@ impl<T> Packet<T> {
128,
ProducerAddition {
cnt: AtomicIsize::new(0),
to_wake: AtomicUsize::new(0),
to_wake: AtomicPtr::new(EMPTY),

port_dropped: AtomicBool::new(false),
},
Expand Down Expand Up @@ -147,17 +148,17 @@ impl<T> Packet<T> {
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&self) -> SignalToken {
let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
assert!(ptr != 0);
unsafe { SignalToken::cast_from_usize(ptr) }
self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
assert!(ptr != EMPTY);
unsafe { SignalToken::from_raw(ptr) }
}

// Decrements the count on the channel for a sleeper, returning the sleeper
// back if it shouldn't sleep. Note that this is the location where we take
// steals into account.
fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
let ptr = unsafe { token.cast_to_usize() };
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
let ptr = unsafe { token.to_raw() };
self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);

let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
Expand All @@ -176,8 +177,8 @@ impl<T> Packet<T> {
}
}

self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
Err(unsafe { SignalToken::cast_from_usize(ptr) })
self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
Err(unsafe { SignalToken::from_raw(ptr) })
}

pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
Expand Down Expand Up @@ -376,7 +377,7 @@ impl<T> Packet<T> {
// of time until the data is actually sent.
if was_upgrade {
assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
return Ok(true);
}

Expand All @@ -389,7 +390,7 @@ impl<T> Packet<T> {
// If we were previously disconnected, then we know for sure that there
// is no thread in to_wake, so just keep going
let has_data = if prev == DISCONNECTED {
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
true // there is data, that data is that we're disconnected
} else {
let cur = prev + steals + 1;
Expand All @@ -412,7 +413,7 @@ impl<T> Packet<T> {
if prev < 0 {
drop(self.take_to_wake());
} else {
while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY {
thread::yield_now();
}
}
Expand Down Expand Up @@ -451,6 +452,6 @@ impl<T> Drop for Packet<T> {
// `to_wake`, so this assert cannot be removed with also removing
// the `to_wake` assert.
assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
}
}