From fd76552a4b41d132701dc0857a9fee9a3850088e Mon Sep 17 00:00:00 2001 From: joboet Date: Wed, 18 May 2022 12:18:51 +0200 Subject: [PATCH 1/4] std: use an event flag based thread parker on SOLID --- library/std/src/sys/itron/abi.rs | 48 +++++++++- library/std/src/sys/itron/wait_flag.rs | 67 +++++++++++++ library/std/src/sys/solid/mod.rs | 2 + .../std/src/sys_common/thread_parker/mod.rs | 7 +- .../src/sys_common/thread_parker/wait_flag.rs | 96 +++++++++++++++++++ 5 files changed, 214 insertions(+), 6 deletions(-) create mode 100644 library/std/src/sys/itron/wait_flag.rs create mode 100644 library/std/src/sys_common/thread_parker/wait_flag.rs diff --git a/library/std/src/sys/itron/abi.rs b/library/std/src/sys/itron/abi.rs index f99ee4fa897e..5eb14bb7e534 100644 --- a/library/std/src/sys/itron/abi.rs +++ b/library/std/src/sys/itron/abi.rs @@ -30,15 +30,32 @@ pub type ER = int_t; /// Error code type, `ID` on success pub type ER_ID = int_t; +/// Service call operational mode +pub type MODE = uint_t; + +/// OR waiting condition for an eventflag +pub const TWF_ORW: MODE = 0x01; + +/// Object attributes +pub type ATR = uint_t; + +/// FIFO wait order +pub const TA_FIFO: ATR = 0; +/// Only one task is allowed to be in the waiting state for the eventflag +pub const TA_WSGL: ATR = 0; +/// The eventflag’s bit pattern is cleared when a task is released from the +/// waiting state for that eventflag. +pub const TA_CLR: ATR = 0x04; + +/// Bit pattern of an eventflag +pub type FLGPTN = uint_t; + /// Task or interrupt priority pub type PRI = int_t; /// The special value of `PRI` representing the current task's priority. pub const TPRI_SELF: PRI = 0; -/// Object attributes -pub type ATR = uint_t; - /// Use the priority inheritance protocol #[cfg(target_os = "solid_asp3")] pub const TA_INHERIT: ATR = 0x02; @@ -90,6 +107,13 @@ pub struct T_CSEM { pub maxsem: uint_t, } +#[derive(Clone, Copy)] +#[repr(C)] +pub struct T_CFLG { + pub flgatr: ATR, + pub iflgptn: FLGPTN, +} + #[derive(Clone, Copy)] #[repr(C)] pub struct T_CMTX { @@ -139,6 +163,24 @@ extern "C" { pub fn sns_dsp() -> bool_t; #[link_name = "__asp3_get_tim"] pub fn get_tim(p_systim: *mut SYSTIM) -> ER; + #[link_name = "__asp3_acre_flg"] + pub fn acre_flg(pk_cflg: *const T_CFLG) -> ER_ID; + #[link_name = "__asp3_del_flg"] + pub fn del_flg(flgid: ID) -> ER; + #[link_name = "__asp3_set_flg"] + pub fn set_flg(flgid: ID, setptn: FLGPTN) -> ER; + #[link_name = "__asp3_clr_flg"] + pub fn clr_flg(flgid: ID, clrptn: FLGPTN) -> ER; + #[link_name = "__asp3_wai_flg"] + pub fn wai_flg(flgid: ID, waiptn: FLGPTN, wfmode: MODE, p_flgptn: *mut FLGPTN) -> ER; + #[link_name = "__asp3_twai_flg"] + pub fn twai_flg( + flgid: ID, + waiptn: FLGPTN, + wfmode: MODE, + p_flgptn: *mut FLGPTN, + tmout: TMO, + ) -> ER; #[link_name = "__asp3_acre_mtx"] pub fn acre_mtx(pk_cmtx: *const T_CMTX) -> ER_ID; #[link_name = "__asp3_del_mtx"] diff --git a/library/std/src/sys/itron/wait_flag.rs b/library/std/src/sys/itron/wait_flag.rs new file mode 100644 index 000000000000..805f85a69b6c --- /dev/null +++ b/library/std/src/sys/itron/wait_flag.rs @@ -0,0 +1,67 @@ +use crate::mem::MaybeUninit; +use crate::time::Duration; + +use super::{ + abi, + error::{expect_success, fail}, + time::with_tmos, +}; + +const CLEAR: abi::FLGPTN = 0; +const RAISED: abi::FLGPTN = 1; + +/// A thread parking primitive that is not susceptible to race conditions, +/// but provides no atomic ordering guarantees and allows only one `raise` per wait. +pub struct WaitFlag { + flag: abi::ID, +} + +impl WaitFlag { + /// Creates a new wait flag. + pub fn new() -> WaitFlag { + let flag = expect_success( + unsafe { + abi::acre_flg(&abi::T_CFLG { + flgatr: abi::TA_FIFO | abi::TA_WSGL | abi::TA_CLR, + iflgptn: CLEAR, + }) + }, + &"acre_flg", + ); + + WaitFlag { flag } + } + + /// Wait for the wait flag to be raised. + pub fn wait(&self) { + let mut token = MaybeUninit::uninit(); + expect_success( + unsafe { abi::wai_flg(self.flag, RAISED, abi::TWF_ORW, token.as_mut_ptr()) }, + &"wai_flg", + ); + } + + /// Wait for the wait flag to be raised or the timeout to occur. + pub fn wait_timeout(&self, dur: Duration) { + let mut token = MaybeUninit::uninit(); + let er = with_tmos(dur, |tmout| unsafe { + abi::twai_flg(self.flag, RAISED, abi::TWF_ORW, token.as_mut_ptr(), tmout) + }); + if er != abi::E_OK && er != abi::E_TMOUT { + fail(er, &"twai_flg"); + } + } + + /// Raise the wait flag. + /// + /// Calls to this function should be balanced with the number of successful waits. + pub fn raise(&self) { + expect_success(unsafe { abi::set_flg(self.flag, RAISED) }, &"set_flg"); + } +} + +impl Drop for WaitFlag { + fn drop(&mut self) { + expect_success(unsafe { abi::del_flg(self.flag) }, &"del_flg"); + } +} diff --git a/library/std/src/sys/solid/mod.rs b/library/std/src/sys/solid/mod.rs index 5ffa381f2e50..2d21e4764fc2 100644 --- a/library/std/src/sys/solid/mod.rs +++ b/library/std/src/sys/solid/mod.rs @@ -15,6 +15,7 @@ mod itron { pub mod thread; pub(super) mod time; use super::unsupported; + pub mod wait_flag; } pub mod alloc; @@ -43,6 +44,7 @@ pub mod memchr; pub mod thread_local_dtor; pub mod thread_local_key; pub mod time; +pub use self::itron::wait_flag; mod rwlock; diff --git a/library/std/src/sys_common/thread_parker/mod.rs b/library/std/src/sys_common/thread_parker/mod.rs index c789a388e05a..79d5a498e054 100644 --- a/library/std/src/sys_common/thread_parker/mod.rs +++ b/library/std/src/sys_common/thread_parker/mod.rs @@ -9,9 +9,10 @@ cfg_if::cfg_if! { ))] { mod futex; pub use futex::Parker; - } else if #[cfg(windows)] { - pub use crate::sys::thread_parker::Parker; - } else if #[cfg(target_family = "unix")] { + } else if #[cfg(target_os = "solid_asp3")] { + mod wait_flag; + pub use wait_flag::Parker; + } else if #[cfg(any(windows, target_family = "unix"))] { pub use crate::sys::thread_parker::Parker; } else { mod generic; diff --git a/library/std/src/sys_common/thread_parker/wait_flag.rs b/library/std/src/sys_common/thread_parker/wait_flag.rs new file mode 100644 index 000000000000..39a0df1cd3de --- /dev/null +++ b/library/std/src/sys_common/thread_parker/wait_flag.rs @@ -0,0 +1,96 @@ +//! A wait-flag-based thread parker. +//! +//! Some operating systems provide low-level parking primitives like wait counts, +//! event flags or semaphores which are not susceptible to race conditions (meaning +//! the wakeup can occure before the wait operation). To implement the `std` thread +//! parker on top of these primitives, we only have to ensure that parking is fast +//! when the thread token is available, the atomic ordering guarantees are maintained +//! and spurious wakeups are minimized. +//! +//! To achieve this, this parker uses an atomic variable with three states: `EMPTY`, +//! `PARKED` and `NOTIFIED`: +//! * `EMPTY` means the token has not been made available, but the thread is not +//! currently waiting on it. +//! * `PARKED` means the token is not available and the thread is parked. +//! * `NOTIFIED` means the token is available. +//! +//! `park` and `park_timeout` change the state from `EMPTY` to `PARKED` and from +//! `NOTIFIED` to `EMPTY`. If the state was `NOTIFIED`, the thread was unparked and +//! execution can continue without calling into the OS. If the state was `EMPTY`, +//! the token is not available and the thread waits on the primitive (here called +//! "wait flag"). +//! +//! `unpark` changes the state to `NOTIFIED`. If the state was `PARKED`, the thread +//! is or will be sleeping on the wait flag, so we raise it. Only the first thread +//! to call `unpark` will raise the wait flag, so spurious wakeups are avoided +//! (this is especially important for semaphores). + +use crate::pin::Pin; +use crate::sync::atomic::AtomicI8; +use crate::sync::atomic::Ordering::SeqCst; +use crate::sys::wait_flag::WaitFlag; +use crate::time::Duration; + +const EMPTY: i8 = 0; +const PARKED: i8 = -1; +const NOTIFIED: i8 = 1; + +pub struct Parker { + state: AtomicI8, + wait_flag: WaitFlag, +} + +impl Parker { + /// Construct a parker for the current thread. The UNIX parker + /// implementation requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + parker.write(Parker { state: AtomicI8::new(EMPTY), wait_flag: WaitFlag::new() }) + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park(self: Pin<&Self>) { + // The state values are chosen so that this subtraction changes + // `NOTIFIED` to `EMPTY` and `EMPTY` to `PARKED`. + let state = self.state.fetch_sub(1, SeqCst); + match state { + EMPTY => (), + NOTIFIED => return, + _ => panic!("inconsistent park state"), + } + + self.wait_flag.wait(); + + // We need to do a load here to use `Acquire` ordering. + self.state.swap(EMPTY, SeqCst); + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { + let state = self.state.fetch_sub(1, SeqCst); + match state { + EMPTY => (), + NOTIFIED => return, + _ => panic!("inconsistent park state"), + } + + self.wait_flag.wait_timeout(dur); + let state = self.state.swap(EMPTY, SeqCst); + if state == NOTIFIED { + // The token was made available after the timeout occurred, but before + // we reset the state, so we need to reset the wait flag to avoid + // spurious wakeups. This wait has no timeout, but we know it will + // return quickly, as the unparking thread will definitely raise the + // flag if it has not already done so. + self.wait_flag.wait(); + } + } + + // This implementation doesn't require `Pin`, but other implementations do. + pub fn unpark(self: Pin<&Self>) { + let state = self.state.swap(NOTIFIED, SeqCst); + + if state == PARKED { + self.wait_flag.raise(); + } + } +} From 3b6ae15058dbb68710f92697265580c7e957629f Mon Sep 17 00:00:00 2001 From: joboet Date: Thu, 19 May 2022 14:37:29 +0200 Subject: [PATCH 2/4] std: fix deadlock in `Parker` --- library/std/src/sys/itron/wait_flag.rs | 13 +++++++++---- .../std/src/sys_common/thread_parker/wait_flag.rs | 8 ++++---- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/library/std/src/sys/itron/wait_flag.rs b/library/std/src/sys/itron/wait_flag.rs index 805f85a69b6c..e432edd20775 100644 --- a/library/std/src/sys/itron/wait_flag.rs +++ b/library/std/src/sys/itron/wait_flag.rs @@ -42,13 +42,18 @@ impl WaitFlag { } /// Wait for the wait flag to be raised or the timeout to occur. - pub fn wait_timeout(&self, dur: Duration) { + /// + /// Returns whether the flag was raised (`true`) or the operation timed out (`false`). + pub fn wait_timeout(&self, dur: Duration) -> bool { let mut token = MaybeUninit::uninit(); - let er = with_tmos(dur, |tmout| unsafe { + let res = with_tmos(dur, |tmout| unsafe { abi::twai_flg(self.flag, RAISED, abi::TWF_ORW, token.as_mut_ptr(), tmout) }); - if er != abi::E_OK && er != abi::E_TMOUT { - fail(er, &"twai_flg"); + + match res { + abi::E_OK => true, + abi::E_TMOUT => false, + error => fail(error, &"twai_flg"), } } diff --git a/library/std/src/sys_common/thread_parker/wait_flag.rs b/library/std/src/sys_common/thread_parker/wait_flag.rs index 39a0df1cd3de..f9581ff5d577 100644 --- a/library/std/src/sys_common/thread_parker/wait_flag.rs +++ b/library/std/src/sys_common/thread_parker/wait_flag.rs @@ -2,7 +2,7 @@ //! //! Some operating systems provide low-level parking primitives like wait counts, //! event flags or semaphores which are not susceptible to race conditions (meaning -//! the wakeup can occure before the wait operation). To implement the `std` thread +//! the wakeup can occur before the wait operation). To implement the `std` thread //! parker on top of these primitives, we only have to ensure that parking is fast //! when the thread token is available, the atomic ordering guarantees are maintained //! and spurious wakeups are minimized. @@ -73,10 +73,10 @@ impl Parker { _ => panic!("inconsistent park state"), } - self.wait_flag.wait_timeout(dur); + let wakeup = self.wait_flag.wait_timeout(dur); let state = self.state.swap(EMPTY, SeqCst); - if state == NOTIFIED { - // The token was made available after the timeout occurred, but before + if state == NOTIFIED && !wakeup { + // The token was made available after the wait timed out, but before // we reset the state, so we need to reset the wait flag to avoid // spurious wakeups. This wait has no timeout, but we know it will // return quickly, as the unparking thread will definitely raise the From b9660de664cc491b3f22a5c1dadee5a03e506b2a Mon Sep 17 00:00:00 2001 From: joboet Date: Sat, 4 Jun 2022 20:57:25 +0200 Subject: [PATCH 3/4] std: solve priority issue for Parker --- .../src/sys_common/thread_parker/wait_flag.rs | 55 +++++++++++-------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/library/std/src/sys_common/thread_parker/wait_flag.rs b/library/std/src/sys_common/thread_parker/wait_flag.rs index f9581ff5d577..8db12693ef73 100644 --- a/library/std/src/sys_common/thread_parker/wait_flag.rs +++ b/library/std/src/sys_common/thread_parker/wait_flag.rs @@ -21,13 +21,11 @@ //! "wait flag"). //! //! `unpark` changes the state to `NOTIFIED`. If the state was `PARKED`, the thread -//! is or will be sleeping on the wait flag, so we raise it. Only the first thread -//! to call `unpark` will raise the wait flag, so spurious wakeups are avoided -//! (this is especially important for semaphores). +//! is or will be sleeping on the wait flag, so we raise it. use crate::pin::Pin; use crate::sync::atomic::AtomicI8; -use crate::sync::atomic::Ordering::SeqCst; +use crate::sync::atomic::Ordering::{Relaxed, SeqCst}; use crate::sys::wait_flag::WaitFlag; use crate::time::Duration; @@ -49,39 +47,48 @@ impl Parker { // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. pub unsafe fn park(self: Pin<&Self>) { - // The state values are chosen so that this subtraction changes - // `NOTIFIED` to `EMPTY` and `EMPTY` to `PARKED`. - let state = self.state.fetch_sub(1, SeqCst); - match state { - EMPTY => (), + match self.state.fetch_sub(1, SeqCst) { + // NOTIFIED => EMPTY NOTIFIED => return, + // EMPTY => PARKED + EMPTY => (), _ => panic!("inconsistent park state"), } - self.wait_flag.wait(); + // Avoid waking up from spurious wakeups (these are quite likely, see below). + loop { + self.wait_flag.wait(); - // We need to do a load here to use `Acquire` ordering. - self.state.swap(EMPTY, SeqCst); + match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, Relaxed) { + Ok(_) => return, + Err(PARKED) => (), + Err(_) => panic!("inconsistent park state"), + } + } } // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { - let state = self.state.fetch_sub(1, SeqCst); - match state { - EMPTY => (), + match self.state.fetch_sub(1, SeqCst) { NOTIFIED => return, + EMPTY => (), _ => panic!("inconsistent park state"), } - let wakeup = self.wait_flag.wait_timeout(dur); - let state = self.state.swap(EMPTY, SeqCst); - if state == NOTIFIED && !wakeup { - // The token was made available after the wait timed out, but before - // we reset the state, so we need to reset the wait flag to avoid - // spurious wakeups. This wait has no timeout, but we know it will - // return quickly, as the unparking thread will definitely raise the - // flag if it has not already done so. - self.wait_flag.wait(); + self.wait_flag.wait_timeout(dur); + + // Either a wakeup or a timeout occurred. Wakeups may be spurious, as there can be + // a race condition when `unpark` is performed between receiving the timeout and + // resetting the state, resulting in the eventflag being set unnecessarily. `park` + // is protected against this by looping until the token is actually given, but + // here we cannot easily tell. + + // Use `swap` to provide acquire ordering (not strictly necessary, but all other + // implementations do). + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => (), + PARKED => (), + _ => panic!("inconsistent park state"), } } From caff72361f9a3d9938032be703295ef7a0c0dd5d Mon Sep 17 00:00:00 2001 From: joboet Date: Tue, 7 Jun 2022 11:06:19 +0200 Subject: [PATCH 4/4] std: relax memory orderings in `Parker` Co-authored-by: Tomoaki Kawada --- .../std/src/sys_common/thread_parker/wait_flag.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/library/std/src/sys_common/thread_parker/wait_flag.rs b/library/std/src/sys_common/thread_parker/wait_flag.rs index 8db12693ef73..6561c186655a 100644 --- a/library/std/src/sys_common/thread_parker/wait_flag.rs +++ b/library/std/src/sys_common/thread_parker/wait_flag.rs @@ -25,7 +25,7 @@ use crate::pin::Pin; use crate::sync::atomic::AtomicI8; -use crate::sync::atomic::Ordering::{Relaxed, SeqCst}; +use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; use crate::sys::wait_flag::WaitFlag; use crate::time::Duration; @@ -47,7 +47,7 @@ impl Parker { // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. pub unsafe fn park(self: Pin<&Self>) { - match self.state.fetch_sub(1, SeqCst) { + match self.state.fetch_sub(1, Acquire) { // NOTIFIED => EMPTY NOTIFIED => return, // EMPTY => PARKED @@ -59,7 +59,7 @@ impl Parker { loop { self.wait_flag.wait(); - match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, Relaxed) { + match self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed) { Ok(_) => return, Err(PARKED) => (), Err(_) => panic!("inconsistent park state"), @@ -69,7 +69,7 @@ impl Parker { // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { - match self.state.fetch_sub(1, SeqCst) { + match self.state.fetch_sub(1, Acquire) { NOTIFIED => return, EMPTY => (), _ => panic!("inconsistent park state"), @@ -83,9 +83,8 @@ impl Parker { // is protected against this by looping until the token is actually given, but // here we cannot easily tell. - // Use `swap` to provide acquire ordering (not strictly necessary, but all other - // implementations do). - match self.state.swap(EMPTY, SeqCst) { + // Use `swap` to provide acquire ordering. + match self.state.swap(EMPTY, Acquire) { NOTIFIED => (), PARKED => (), _ => panic!("inconsistent park state"), @@ -94,7 +93,7 @@ impl Parker { // This implementation doesn't require `Pin`, but other implementations do. pub fn unpark(self: Pin<&Self>) { - let state = self.state.swap(NOTIFIED, SeqCst); + let state = self.state.swap(NOTIFIED, Release); if state == PARKED { self.wait_flag.raise();