Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(maitake): use stdlib pin! and poll_fn #486

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion maitake-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ tracing = { version = "0.1", default_features = false }

[dev-dependencies]
futures-util = "0.3"
futures = "0.3"
tokio-test = "0.4"
tracing = { version = "0.1", default_features = false, features = ["std"] }
tracing-subscriber = { version = "0.3.11", features = ["fmt", "env-filter"] }
Expand Down
5 changes: 2 additions & 3 deletions maitake-sync/src/wait_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,23 +262,22 @@ impl<'map, 'wait, K: PartialEq, V, Lock: ScopedRawMutex> Wait<'map, K, V, Lock>
///
/// ```ignore
/// use std::sync::Arc;
/// use std::pin::pin;
/// use maitake::scheduler;
/// use maitake_sync::wait_map::{WaitMap, WakeOutcome};
/// use futures_util::pin_mut;
///
/// let scheduler = Scheduler::new();
/// let q = Arc::new(WaitMap::new());
///
/// let q2 = q.clone();
/// scheduler.spawn(async move {
/// let wait = q2.wait(0);
/// let mut wait = pin!(q2.wait(0));
///
/// // At this point, we have created the future, but it has not yet
/// // been added to the queue. We could immediately await 'wait',
/// // but then we would be unable to progress further. We must
/// // first pin the `wait` future, to ensure that it does not move
/// // until it has been completed.
/// pin_mut!(wait);
/// wait.as_mut().subscribe().await.unwrap();
///
/// // We now know the waiter has been enqueued, at this point we could
Expand Down
27 changes: 12 additions & 15 deletions maitake-sync/src/wait_map/tests/alloc_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use super::super::*;
use crate::loom::sync::Arc;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use futures::{future::poll_fn, pin_mut, select_biased, FutureExt};
use core::{
future,
pin::pin,
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
};
use futures_util::{select_biased, FutureExt};
use tokio_test::{assert_pending, assert_ready, assert_ready_err, task};

#[test]
Expand Down Expand Up @@ -31,9 +35,7 @@ fn enqueue() {
let mut waiter2 = task::spawn({
let q = q.clone();
async move {
let wait = q.wait(1);

pin_mut!(wait);
let mut wait = pin!(q.wait(1));
wait.as_mut().subscribe().await.unwrap();
ENQUEUED.fetch_add(1, Ordering::Relaxed);

Expand Down Expand Up @@ -78,9 +80,7 @@ fn duplicate() {
let mut waiter1 = task::spawn({
let q = q.clone();
async move {
let wait = q.wait(0);

pin_mut!(wait);
let mut wait = pin!(q.wait(0));
wait.as_mut().subscribe().await.unwrap();
ENQUEUED.fetch_add(1, Ordering::Relaxed);

Expand All @@ -95,9 +95,7 @@ fn duplicate() {
let q = q.clone();
async move {
// Duplicate key!
let wait = q.wait(0);

pin_mut!(wait);
let mut wait = pin!(q.wait(0));
wait.as_mut().subscribe().await
}
});
Expand Down Expand Up @@ -345,19 +343,18 @@ fn drop_wake_bailed() {
.map(|i| {
let q = q.clone();
task::spawn(async move {
let mut bail_fut = poll_fn(|_| match BAIL.load(Ordering::Relaxed) {
let mut bail_fut = future::poll_fn(|_| match BAIL.load(Ordering::Relaxed) {
false => Poll::Pending,
true => Poll::Ready(()),
})
.fuse();

let wait_fut = q
let mut wait_fut = pin!(q
.wait(CountDropKey {
idx: i,
cnt: &KEY_DROPS,
})
.fuse();
pin_mut!(wait_fut);
.fuse());

// NOTE: `select_baised is used specifically to ensure the bail
// future is processed first.
Expand Down
2 changes: 1 addition & 1 deletion maitake-sync/src/wait_map/tests/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn wake_close() {

#[test]
fn wake_and_drop() {
use futures::FutureExt;
use futures_util::FutureExt;
loom::model(|| {
// use `Arc`s as the value type to ensure their destructors are run.
let q = Arc::new(WaitMap::<usize, Arc<()>>::new());
Expand Down
6 changes: 3 additions & 3 deletions maitake-sync/src/wait_queue/tests/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ fn wake_mixed() {

#[test]
fn drop_wait_future() {
use futures_util::future::poll_fn;
use std::future::Future;
use std::future::poll_fn;
use std::pin::pin;
use std::task::Poll;

loom::model(|| {
Expand All @@ -200,7 +200,7 @@ fn drop_wait_future() {
let thread1 = thread::spawn({
let q = q.clone();
move || {
let mut wait = Box::pin(q.wait());
let mut wait = pin!(q.wait());

future::block_on(poll_fn(|cx| {
if wait.as_mut().poll(cx).is_ready() {
Expand Down
2 changes: 0 additions & 2 deletions maitake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ version = "0.1.35"
optional = true

[dev-dependencies]
futures-util = "0.3"
futures = "0.3"
tokio-test = "0.4"

[target.'cfg(not(loom))'.dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions maitake/src/future.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub(crate) mod yield_future;
pub use self::yield_future::{yield_now, Yield};
pub use core::future::*;
7 changes: 3 additions & 4 deletions maitake/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
loom::sync::atomic::{AtomicUsize, Ordering::Relaxed},
sync::WaitCell,
};
use core::task::Poll;
use core::{pin::pin, task::Poll};
use std::sync::Arc;

#[cfg(all(feature = "alloc", not(loom)))]
Expand Down Expand Up @@ -35,13 +35,12 @@ impl Chan {
pub(crate) async fn wait(self: Arc<Chan>) {
let this = Arc::downgrade(&self);
drop(self);
futures_util::future::poll_fn(move |cx| {
future::poll_fn(move |cx| {
let Some(this) = this.upgrade() else {
return Poll::Ready(());
};

let res = this.task.wait();
futures_util::pin_mut!(res);
let res = pin!(this.task.wait());

if this.num_notify == this.num.load(Relaxed) {
return Poll::Ready(());
Expand Down
5 changes: 3 additions & 2 deletions maitake/src/task/tests/alloc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ fn task_is_valid_for_casts() {
fn empty_task_size() {
use core::{
any::type_name,
future,
mem::{size_of, size_of_val},
};

type Future = futures::future::Ready<()>;
type Future = future::Ready<()>;
type EmptyTask = Task<NopSchedule, Future, BoxStorage>;

println!(
Expand All @@ -65,7 +66,7 @@ fn empty_task_size() {
size_of::<EmptyTask>() - size_of::<Future>()
);

let task = Task::<Scheduler, Future, BoxStorage>::new(futures::future::ready(()));
let task = Task::<Scheduler, Future, BoxStorage>::new(future::ready(()));
println!("\nTask {{ // {}B", size_of_val(&task));
println!(
" schedulable: Schedulable {{ // {}B",
Expand Down
10 changes: 4 additions & 6 deletions maitake/src/time/timer/tests/concurrent.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::*;
use crate::loom::{sync::Arc, thread};
use core::{
future::Future,
future::{self, Future},
pin::pin,
task::{Context, Poll},
};
use futures_util::{future, pin_mut};

#[cfg(loom)]
use loom::future::block_on;
Expand Down Expand Up @@ -116,8 +116,7 @@ fn two_sleeps_sequential() {
fn cancel_polled_sleeps() {
fn poll_and_cancel(timer: Arc<Timer>) {
block_on(async move {
let sleep = timer.sleep_ticks(15);
pin_mut!(sleep);
let mut sleep = pin!(timer.sleep_ticks(15));
future::poll_fn(move |cx| {
// poll once to register the sleep with the timer wheel, and
// then return `Ready` so it gets canceled.
Expand Down Expand Up @@ -164,8 +163,7 @@ fn reregister_waker() {
let clock = clock.test_clock();
move || {
let _clock = clock.enter();
let sleep = timer.sleep(Duration::from_secs(1));
pin_mut!(sleep);
let mut sleep = pin!(timer.sleep(Duration::from_secs(1)));
// poll the sleep future initially with a no-op waker.
let _ = sleep.as_mut().poll(&mut Context::from_waker(
futures_util::task::noop_waker_ref(),
Expand Down
Loading