From e428500630e2aa2af44e411a4ed10a5cff16d168 Mon Sep 17 00:00:00 2001 From: white-axe Date: Thu, 21 Dec 2023 13:57:44 -0500 Subject: [PATCH 1/3] Add spinlock implementation without `std::thread::sleep` --- CHANGELOG.md | 2 ++ Cargo.toml | 2 ++ src/lib.rs | 26 ++++++++++++++++---------- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37692e1..6b82f08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `spin-plain` feature that switches the locking implementation to a plain spinlock without `std::thread::sleep` + ### Removed ### Changed diff --git a/Cargo.toml b/Cargo.toml index c57b895..6be8b33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ exclude = [ [features] # Use a spinlock internally (may be faster on some platforms) spin = [] +# Use a spinlock internally without `std::thread::sleep` (for platforms that don't support blocking) +spin-plain = [] select = [] async = ["futures-sink", "futures-core"] eventual-fairness = ["select", "nanorand"] diff --git a/src/lib.rs b/src/lib.rs index 3b79610..bc13827 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,7 +47,7 @@ use std::{ fmt, }; -#[cfg(feature = "spin")] +#[cfg(any(feature = "spin", feature = "spin-plain"))] use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard}; use crate::signal::{Signal, SyncSignal}; @@ -257,13 +257,13 @@ enum TryRecvTimeoutError { } // TODO: Investigate some sort of invalidation flag for timeouts -#[cfg(feature = "spin")] +#[cfg(any(feature = "spin", feature = "spin-plain"))] struct Hook(Option>>, S); -#[cfg(not(feature = "spin"))] +#[cfg(not(any(feature = "spin", feature = "spin-plain")))] struct Hook(Option>>, S); -#[cfg(feature = "spin")] +#[cfg(any(feature = "spin", feature = "spin-plain"))] impl Hook { pub fn slot(msg: Option, signal: S) -> Arc where @@ -277,7 +277,7 @@ impl Hook { } } -#[cfg(not(feature = "spin"))] +#[cfg(not(any(feature = "spin", feature = "spin-plain")))] impl Hook { pub fn slot(msg: Option, signal: S) -> Arc where @@ -392,7 +392,7 @@ impl Hook { } } -#[cfg(feature = "spin")] +#[cfg(all(feature = "spin", not(feature = "spin-plain")))] #[inline] fn wait_lock(lock: &Spinlock) -> SpinlockGuard { let mut i = 4; @@ -409,18 +409,24 @@ fn wait_lock(lock: &Spinlock) -> SpinlockGuard { } } -#[cfg(not(feature = "spin"))] +#[cfg(feature = "spin-plain")] +#[inline] +fn wait_lock(lock: &Spinlock) -> SpinlockGuard { + lock.lock() +} + +#[cfg(not(any(feature = "spin", feature = "spin-plain")))] #[inline] fn wait_lock<'a, T>(lock: &'a Mutex) -> MutexGuard<'a, T> { lock.lock().unwrap() } -#[cfg(not(feature = "spin"))] +#[cfg(not(any(feature = "spin", feature = "spin-plain")))] use std::sync::{Mutex, MutexGuard}; -#[cfg(feature = "spin")] +#[cfg(any(feature = "spin", feature = "spin-plain"))] type ChanLock = Spinlock; -#[cfg(not(feature = "spin"))] +#[cfg(not(any(feature = "spin", feature = "spin-plain")))] type ChanLock = Mutex; From 01b53952604ec2d9c2d6cc0855a9cf553c32cfac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=9A=93?= Date: Sat, 27 Jul 2024 19:23:33 -0400 Subject: [PATCH 2/3] Disable use of `std::thread::sleep` except on Unix and Windows --- CHANGELOG.md | 3 +-- Cargo.toml | 2 -- src/lib.rs | 50 ++++++++++++++++++++++++++------------------------ 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b82f08..c88fbf6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,13 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- `spin-plain` feature that switches the locking implementation to a plain spinlock without `std::thread::sleep` - ### Removed ### Changed - `WeakSender` is now `Clone` +- `spin` feature no longer uses `std::thread::sleep` for locking except on Unix-like operating systems and Windows ### Fixed diff --git a/Cargo.toml b/Cargo.toml index 6be8b33..c57b895 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,6 @@ exclude = [ [features] # Use a spinlock internally (may be faster on some platforms) spin = [] -# Use a spinlock internally without `std::thread::sleep` (for platforms that don't support blocking) -spin-plain = [] select = [] async = ["futures-sink", "futures-core"] eventual-fairness = ["select", "nanorand"] diff --git a/src/lib.rs b/src/lib.rs index bc13827..2e52179 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,7 +47,7 @@ use std::{ fmt, }; -#[cfg(any(feature = "spin", feature = "spin-plain"))] +#[cfg(feature = "spin")] use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard}; use crate::signal::{Signal, SyncSignal}; @@ -257,13 +257,13 @@ enum TryRecvTimeoutError { } // TODO: Investigate some sort of invalidation flag for timeouts -#[cfg(any(feature = "spin", feature = "spin-plain"))] +#[cfg(feature = "spin")] struct Hook(Option>>, S); -#[cfg(not(any(feature = "spin", feature = "spin-plain")))] +#[cfg(not(feature = "spin"))] struct Hook(Option>>, S); -#[cfg(any(feature = "spin", feature = "spin-plain"))] +#[cfg(feature = "spin")] impl Hook { pub fn slot(msg: Option, signal: S) -> Arc where @@ -277,7 +277,7 @@ impl Hook { } } -#[cfg(not(any(feature = "spin", feature = "spin-plain")))] +#[cfg(not(feature = "spin"))] impl Hook { pub fn slot(msg: Option, signal: S) -> Arc where @@ -392,41 +392,43 @@ impl Hook { } } -#[cfg(all(feature = "spin", not(feature = "spin-plain")))] +#[cfg(feature = "spin")] #[inline] fn wait_lock(lock: &Spinlock) -> SpinlockGuard { - let mut i = 4; - loop { - for _ in 0..10 { - if let Some(guard) = lock.try_lock() { - return guard; + // Some targets don't support `thread::sleep` (e.g. the `wasm32-unknown-unknown` target when + // running in the main thread of a web browser) so we only use it on targets where we know it + // will work + #[cfg(any(target_family = "unix", target_family = "windows"))] + { + let mut i = 4; + loop { + for _ in 0..10 { + if let Some(guard) = lock.try_lock() { + return guard; + } + thread::yield_now(); } - thread::yield_now(); + // Sleep for at most ~1 ms + thread::sleep(Duration::from_nanos(1 << i.min(20))); + i += 1; } - // Sleep for at most ~1 ms - thread::sleep(Duration::from_nanos(1 << i.min(20))); - i += 1; } -} - -#[cfg(feature = "spin-plain")] -#[inline] -fn wait_lock(lock: &Spinlock) -> SpinlockGuard { + #[cfg(not(any(target_family = "unix", target_family = "windows")))] lock.lock() } -#[cfg(not(any(feature = "spin", feature = "spin-plain")))] +#[cfg(not(feature = "spin"))] #[inline] fn wait_lock<'a, T>(lock: &'a Mutex) -> MutexGuard<'a, T> { lock.lock().unwrap() } -#[cfg(not(any(feature = "spin", feature = "spin-plain")))] +#[cfg(not(feature = "spin"))] use std::sync::{Mutex, MutexGuard}; -#[cfg(any(feature = "spin", feature = "spin-plain"))] +#[cfg(feature = "spin")] type ChanLock = Spinlock; -#[cfg(not(any(feature = "spin", feature = "spin-plain")))] +#[cfg(not(feature = "spin"))] type ChanLock = Mutex; From eff9e706fffa67ad4a768ea73ee970890f58f10a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=9A=93?= Date: Tue, 13 Aug 2024 15:11:57 -0400 Subject: [PATCH 3/3] Add wasm32 build to CI --- .github/workflows/rust.yml | 16 ++++++++++++++-- Cargo.toml | 1 + examples/async.rs | 4 ++-- tests/async.rs | 20 ++++++++++---------- tests/stream.rs | 18 +++++++++--------- 5 files changed, 36 insertions(+), 23 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 494f9e3..f4cd6ba 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -13,34 +13,45 @@ jobs: RUST_BACKTRACE: 1 strategy: matrix: - build: [linux64, macos, win32, win64] + build: [linux64, macos, win32, win64, wasm32] include: - build: linux64 os: ubuntu-latest channel: stable + toolchain: x86_64-unknown-linux-gnu target: x86_64-unknown-linux-gnu #- build: linux32 # os: ubuntu-latest # channel: stable + # toolchain: i686-unknown-linux-gnu # target: i686-unknown-linux-gnu - build: macos os: macos-latest channel: stable + toolchain: x86_64-apple-darwin target: x86_64-apple-darwin - build: win32 os: windows-latest channel: stable + toolchain: i686-pc-windows-msvc target: i686-pc-windows-msvc - build: win64 os: windows-latest channel: stable + toolchain: x86_64-pc-windows-msvc target: x86_64-pc-windows-msvc + - build: wasm32 + os: ubuntu-latest + channel: stable + toolchain: x86_64-unknown-linux-gnu + target: wasm32-unknown-unknown steps: - uses: actions/checkout@v2 - run: | - TOOLCHAIN=${{ matrix.channel }}-${{ matrix.target }} + TOOLCHAIN=${{ matrix.channel }}-${{ matrix.toolchain }} rustup toolchain install --no-self-update $TOOLCHAIN rustup default $TOOLCHAIN + rustup target add ${{ matrix.target }} shell: bash - name: Rust version run: | @@ -53,6 +64,7 @@ jobs: - run: cargo check --target ${{ matrix.target }} - run: cargo build --target ${{ matrix.target }} - run: cargo test --target ${{ matrix.target }} + if: ${{ matrix.target != 'wasm32-unknown-unknown' }} # FIXME(#41): Some timeout/deadline tests make more sense to run in release mode. #- run: cargo test --release --target ${{ matrix.target }} - run: cargo build --all-targets --target ${{ matrix.target }} diff --git a/Cargo.toml b/Cargo.toml index c57b895..cb51bd0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ async-std = { version = "1.9.0", features = ["attributes", "unstable"] } futures = { version = "^0.3", features = ["std"] } waker-fn = "1.1.0" tokio = { version = "^1.16.1", features = ["rt", "macros"] } +getrandom = { version = "0.2.15", features = ["js"] } [[bench]] name = "basic" diff --git a/examples/async.rs b/examples/async.rs index a562700..f4a2f12 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -1,4 +1,4 @@ -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[async_std::main] async fn main() { let (tx, rx) = flume::bounded(1); @@ -17,5 +17,5 @@ async fn main() { t.await; } -#[cfg(not(feature = "async"))] +#[cfg(any(not(feature = "async"), target_os = "unknown"))] fn main() {} diff --git a/tests/async.rs b/tests/async.rs index 6c2c7f2..f74cefe 100644 --- a/tests/async.rs +++ b/tests/async.rs @@ -1,4 +1,4 @@ -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] use { flume::*, futures::{stream::FuturesUnordered, StreamExt, TryFutureExt, Future}, @@ -7,7 +7,7 @@ use { std::{time::Duration, sync::{atomic::{AtomicUsize, Ordering}, Arc}}, }; -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn r#async_recv() { let (tx, rx) = unbounded(); @@ -24,7 +24,7 @@ fn r#async_recv() { t.join().unwrap(); } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn r#async_send() { let (tx, rx) = bounded(1); @@ -41,7 +41,7 @@ fn r#async_send() { t.join().unwrap(); } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn r#async_recv_disconnect() { let (tx, rx) = bounded::(0); @@ -58,7 +58,7 @@ fn r#async_recv_disconnect() { t.join().unwrap(); } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn r#async_send_disconnect() { let (tx, rx) = bounded(0); @@ -75,7 +75,7 @@ fn r#async_send_disconnect() { t.join().unwrap(); } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn r#async_recv_drop_recv() { let (tx, rx) = bounded::(10); @@ -103,7 +103,7 @@ fn r#async_recv_drop_recv() { assert_eq!(t.join().unwrap(), Ok(42)) } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[async_std::test] async fn r#async_send_1_million_no_drop_or_reorder() { #[derive(Debug)] @@ -137,7 +137,7 @@ async fn r#async_send_1_million_no_drop_or_reorder() { assert_eq!(count, 1_000_000) } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[async_std::test] async fn parallel_async_receivers() { let (tx, rx) = flume::unbounded(); @@ -175,7 +175,7 @@ async fn parallel_async_receivers() { println!("recv end"); } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn change_waker() { let (tx, rx) = flume::bounded(1); @@ -246,7 +246,7 @@ fn change_waker() { } } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn spsc_single_threaded_value_ordering() { async fn test() { diff --git a/tests/stream.rs b/tests/stream.rs index e3b32cd..c8463a8 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -1,4 +1,4 @@ -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] use { flume::*, futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}, @@ -7,7 +7,7 @@ use { }; use futures::{stream, Stream}; -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn stream_recv() { let (tx, rx) = unbounded(); @@ -28,7 +28,7 @@ fn stream_recv() { t.join().unwrap(); } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn stream_recv_disconnect() { let (tx, rx) = bounded::(0); @@ -48,7 +48,7 @@ fn stream_recv_disconnect() { t.join().unwrap(); } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn stream_recv_drop_recv() { let (tx, rx) = bounded::(10); @@ -80,7 +80,7 @@ fn stream_recv_drop_recv() { assert_eq!(t.join().unwrap(), Some(42)) } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn r#stream_drop_send_disconnect() { let (tx, rx) = bounded::(1); @@ -98,7 +98,7 @@ fn r#stream_drop_send_disconnect() { t.join().unwrap(); } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[async_std::test] async fn stream_send_1_million_no_drop_or_reorder() { #[derive(Debug)] @@ -133,7 +133,7 @@ async fn stream_send_1_million_no_drop_or_reorder() { assert_eq!(count, 1_000_000) } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[async_std::test] async fn parallel_streams_and_async_recv() { let (tx, rx) = flume::unbounded(); @@ -174,7 +174,7 @@ async fn parallel_streams_and_async_recv() { .unwrap(); } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[test] fn stream_no_double_wake() { use std::sync::atomic::{AtomicUsize, Ordering}; @@ -220,7 +220,7 @@ fn stream_no_double_wake() { assert_eq!(count.load(Ordering::SeqCst), 1); } -#[cfg(feature = "async")] +#[cfg(all(feature = "async", not(target_os = "unknown")))] #[async_std::test] async fn stream_forward_issue_55() { // https://github.com/zesterer/flume/issues/55 fn dummy_stream() -> impl Stream {