Skip to content

Commit

Permalink
Merge pull request #138 from white-axe/plain
Browse files Browse the repository at this point in the history
Add spinlock implementation without `std::thread::sleep`
  • Loading branch information
zesterer authored Aug 15, 2024
2 parents bb5623b + eff9e70 commit 3c47bcc
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 32 deletions.
16 changes: 14 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,45 @@ jobs:
RUST_BACKTRACE: 1
strategy:
matrix:
build: [linux64, macos, win32, win64]
build: [linux64, macos, win32, win64, wasm32]
include:
- build: linux64
os: ubuntu-latest
channel: stable
toolchain: x86_64-unknown-linux-gnu
target: x86_64-unknown-linux-gnu
#- build: linux32
# os: ubuntu-latest
# channel: stable
# toolchain: i686-unknown-linux-gnu
# target: i686-unknown-linux-gnu
- build: macos
os: macos-latest
channel: stable
toolchain: x86_64-apple-darwin
target: x86_64-apple-darwin
- build: win32
os: windows-latest
channel: stable
toolchain: i686-pc-windows-msvc
target: i686-pc-windows-msvc
- build: win64
os: windows-latest
channel: stable
toolchain: x86_64-pc-windows-msvc
target: x86_64-pc-windows-msvc
- build: wasm32
os: ubuntu-latest
channel: stable
toolchain: x86_64-unknown-linux-gnu
target: wasm32-unknown-unknown
steps:
- uses: actions/checkout@v2
- run: |
TOOLCHAIN=${{ matrix.channel }}-${{ matrix.target }}
TOOLCHAIN=${{ matrix.channel }}-${{ matrix.toolchain }}
rustup toolchain install --no-self-update $TOOLCHAIN
rustup default $TOOLCHAIN
rustup target add ${{ matrix.target }}
shell: bash
- name: Rust version
run: |
Expand All @@ -53,6 +64,7 @@ jobs:
- run: cargo check --target ${{ matrix.target }}
- run: cargo build --target ${{ matrix.target }}
- run: cargo test --target ${{ matrix.target }}
if: ${{ matrix.target != 'wasm32-unknown-unknown' }}
# FIXME(#41): Some timeout/deadline tests make more sense to run in release mode.
#- run: cargo test --release --target ${{ matrix.target }}
- run: cargo build --all-targets --target ${{ matrix.target }}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- `WeakSender` is now `Clone`
- `spin` feature no longer uses `std::thread::sleep` for locking except on Unix-like operating systems and Windows

### Fixed

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async-std = { version = "1.9.0", features = ["attributes", "unstable"] }
futures = { version = "^0.3", features = ["std"] }
waker-fn = "1.1.0"
tokio = { version = "^1.16.1", features = ["rt", "macros"] }
getrandom = { version = "0.2.15", features = ["js"] }

[[bench]]
name = "basic"
Expand Down
4 changes: 2 additions & 2 deletions examples/async.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::main]
async fn main() {
let (tx, rx) = flume::bounded(1);
Expand All @@ -17,5 +17,5 @@ async fn main() {
t.await;
}

#[cfg(not(feature = "async"))]
#[cfg(any(not(feature = "async"), target_os = "unknown"))]
fn main() {}
26 changes: 17 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,18 +395,26 @@ impl<T> Hook<T, SyncSignal> {
#[cfg(feature = "spin")]
#[inline]
fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> {
let mut i = 4;
loop {
for _ in 0..10 {
if let Some(guard) = lock.try_lock() {
return guard;
// Some targets don't support `thread::sleep` (e.g. the `wasm32-unknown-unknown` target when
// running in the main thread of a web browser) so we only use it on targets where we know it
// will work
#[cfg(any(target_family = "unix", target_family = "windows"))]
{
let mut i = 4;
loop {
for _ in 0..10 {
if let Some(guard) = lock.try_lock() {
return guard;
}
thread::yield_now();
}
thread::yield_now();
// Sleep for at most ~1 ms
thread::sleep(Duration::from_nanos(1 << i.min(20)));
i += 1;
}
// Sleep for at most ~1 ms
thread::sleep(Duration::from_nanos(1 << i.min(20)));
i += 1;
}
#[cfg(not(any(target_family = "unix", target_family = "windows")))]
lock.lock()
}

#[cfg(not(feature = "spin"))]
Expand Down
20 changes: 10 additions & 10 deletions tests/async.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
use {
flume::*,
futures::{stream::FuturesUnordered, StreamExt, TryFutureExt, Future},
Expand All @@ -7,7 +7,7 @@ use {
std::{time::Duration, sync::{atomic::{AtomicUsize, Ordering}, Arc}},
};

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#async_recv() {
let (tx, rx) = unbounded();
Expand All @@ -24,7 +24,7 @@ fn r#async_recv() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#async_send() {
let (tx, rx) = bounded(1);
Expand All @@ -41,7 +41,7 @@ fn r#async_send() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#async_recv_disconnect() {
let (tx, rx) = bounded::<i32>(0);
Expand All @@ -58,7 +58,7 @@ fn r#async_recv_disconnect() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#async_send_disconnect() {
let (tx, rx) = bounded(0);
Expand All @@ -75,7 +75,7 @@ fn r#async_send_disconnect() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#async_recv_drop_recv() {
let (tx, rx) = bounded::<i32>(10);
Expand Down Expand Up @@ -103,7 +103,7 @@ fn r#async_recv_drop_recv() {
assert_eq!(t.join().unwrap(), Ok(42))
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::test]
async fn r#async_send_1_million_no_drop_or_reorder() {
#[derive(Debug)]
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn r#async_send_1_million_no_drop_or_reorder() {
assert_eq!(count, 1_000_000)
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::test]
async fn parallel_async_receivers() {
let (tx, rx) = flume::unbounded();
Expand Down Expand Up @@ -175,7 +175,7 @@ async fn parallel_async_receivers() {
println!("recv end");
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn change_waker() {
let (tx, rx) = flume::bounded(1);
Expand Down Expand Up @@ -246,7 +246,7 @@ fn change_waker() {
}
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn spsc_single_threaded_value_ordering() {
async fn test() {
Expand Down
18 changes: 9 additions & 9 deletions tests/stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
use {
flume::*,
futures::{stream::FuturesUnordered, StreamExt, TryFutureExt},
Expand All @@ -7,7 +7,7 @@ use {
};
use futures::{stream, Stream};

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn stream_recv() {
let (tx, rx) = unbounded();
Expand All @@ -28,7 +28,7 @@ fn stream_recv() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn stream_recv_disconnect() {
let (tx, rx) = bounded::<i32>(0);
Expand All @@ -48,7 +48,7 @@ fn stream_recv_disconnect() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn stream_recv_drop_recv() {
let (tx, rx) = bounded::<i32>(10);
Expand Down Expand Up @@ -80,7 +80,7 @@ fn stream_recv_drop_recv() {
assert_eq!(t.join().unwrap(), Some(42))
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#stream_drop_send_disconnect() {
let (tx, rx) = bounded::<i32>(1);
Expand All @@ -98,7 +98,7 @@ fn r#stream_drop_send_disconnect() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::test]
async fn stream_send_1_million_no_drop_or_reorder() {
#[derive(Debug)]
Expand Down Expand Up @@ -133,7 +133,7 @@ async fn stream_send_1_million_no_drop_or_reorder() {
assert_eq!(count, 1_000_000)
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::test]
async fn parallel_streams_and_async_recv() {
let (tx, rx) = flume::unbounded();
Expand Down Expand Up @@ -174,7 +174,7 @@ async fn parallel_streams_and_async_recv() {
.unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn stream_no_double_wake() {
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -220,7 +220,7 @@ fn stream_no_double_wake() {
assert_eq!(count.load(Ordering::SeqCst), 1);
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::test]
async fn stream_forward_issue_55() { // https://github.com/zesterer/flume/issues/55
fn dummy_stream() -> impl Stream<Item = usize> {
Expand Down

0 comments on commit 3c47bcc

Please sign in to comment.