Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: new internal semaphore based on intrusive lists #2325

Merged
merged 97 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
bfc4414
sync: adds Notify for basic task notification
carllerche Feb 3, 2020
1890645
fix warnings
carllerche Feb 3, 2020
5e82767
wip
hawkw Feb 17, 2020
ea8ba09
wip2
hawkw Feb 18, 2020
a888cb7
let's see just how broken it is
hawkw Feb 19, 2020
9ae6fba
works but is horrible
hawkw Feb 20, 2020
ea499fe
wip
hawkw Feb 21, 2020
3b4a14f
fix weird logic
hawkw Feb 21, 2020
8836c38
fix close behavior
hawkw Feb 27, 2020
4c81c51
Merge branch 'master' into eliza/semaphore-2
hawkw Feb 27, 2020
5ee03b1
update to use new linked list
hawkw Feb 27, 2020
c4c685b
bring back old semaphore_ll so channels can work
hawkw Feb 29, 2020
1337d8e
add loom tests
hawkw Mar 6, 2020
19e0157
decr assigned permits when assigning to self
hawkw Mar 6, 2020
2764ba2
closed means REALLY closed
hawkw Mar 7, 2020
d5ed440
broken
hawkw Mar 9, 2020
cb472fd
wip broken
hawkw Mar 9, 2020
66e419b
Merge branch 'master' into eliza/semaphore-2
hawkw Mar 9, 2020
bd1c5db
update after merge
hawkw Mar 9, 2020
1880121
almost evherything works
hawkw Mar 10, 2020
c7c08ff
wip
hawkw Mar 10, 2020
1a4f733
ALL LOOM TESTS PASS
hawkw Mar 11, 2020
a320d56
fix typoed fail ordering
hawkw Mar 11, 2020
df1e7bb
remove some warnings
hawkw Mar 11, 2020
b957af8
bring back try_acquire
hawkw Mar 12, 2020
ec2b4e7
misc cleanup
hawkw Mar 12, 2020
770d859
add test for release of leftover permits
hawkw Mar 12, 2020
38f654f
simplify `poll_acquire` behavior
hawkw Mar 12, 2020
9046b04
rm unneeded AtomicWaker
hawkw Mar 12, 2020
187e4d7
remove unused imports
hawkw Mar 12, 2020
c2450a2
add test for cancelling an acquire future
hawkw Mar 12, 2020
ddc1726
dropping an incomplete `Acquire` releases permits
hawkw Mar 12, 2020
1aa8088
no need to lock if unqueued
hawkw Mar 12, 2020
88b5e86
remove dbg/println
hawkw Mar 12, 2020
cf27256
remove more dbg/printlns
hawkw Mar 12, 2020
cb73e3f
add comments/remove dbgs
hawkw Mar 12, 2020
ce66762
add comments/cleanup
hawkw Mar 13, 2020
9f49f80
rm tests for stuff you can't do w new semaphore
hawkw Mar 13, 2020
fea9fd2
clean up & add comments
hawkw Mar 13, 2020
df6c556
rewrap comments, fix unused warnings
hawkw Mar 16, 2020
201db12
more wrapping
hawkw Mar 16, 2020
9f3f544
style/clean up
hawkw Mar 16, 2020
8628723
revert unneeded MPSC changes
hawkw Mar 16, 2020
49ed5dd
revert unneeded MPSC test changes
hawkw Mar 16, 2020
f5f5bc1
Apply suggestions from code review
hawkw Mar 16, 2020
24479c5
remove printlns from tests
hawkw Mar 16, 2020
6275513
checks to ensure API types remain Send/Sync/Unpin
hawkw Mar 16, 2020
f86147a
style: more consistent imports
hawkw Mar 16, 2020
9206558
make docs more accurate
hawkw Mar 16, 2020
0bcb271
Merge branch 'eliza/semaphore-2' of github.com:tokio-rs/tokio into el…
hawkw Mar 16, 2020
b412284
comments style improvements
hawkw Mar 16, 2020
3b4f416
make LinkedList::is_linked more misuse-resistant
hawkw Mar 16, 2020
c2c3e04
fix feature flag unhappiness
hawkw Mar 16, 2020
c9e122f
quick rwlock benches
hawkw Mar 17, 2020
034e141
quick semaphore benches
hawkw Mar 17, 2020
bffa18b
Merge branch 'eliza/bench-sync' into eliza/semaphore-2
hawkw Mar 17, 2020
0e0e13d
add `LinkedList::split_back` method
hawkw Mar 17, 2020
705afe1
add DoubleEndedIterator for linked_list::Iter~
hawkw Mar 17, 2020
c9e4800
(WIP) move notification outside of lock
hawkw Mar 17, 2020
389e231
clean up
hawkw Mar 17, 2020
a18d569
fixup
hawkw Mar 17, 2020
f758fb6
rm unused
hawkw Mar 18, 2020
8231eb7
Merge branch 'master' into eliza/semaphore-2
hawkw Mar 18, 2020
90f98c9
put back coop yielding in acquire futures
hawkw Mar 18, 2020
32a12bb
(hopefully) fix feature flags
hawkw Mar 18, 2020
5c76e67
move waiter state inside of lock
hawkw Mar 19, 2020
95bb327
remove dbg
hawkw Mar 19, 2020
c9699dc
cleanup
hawkw Mar 19, 2020
da1b027
remove unneeded SeqCst
hawkw Mar 20, 2020
819a59f
notify outside of lock on close, too
hawkw Mar 20, 2020
29fea22
carl-style feature flags
hawkw Mar 20, 2020
bdce157
ag
hawkw Mar 20, 2020
d9bf8b2
simplify permit struct significantly
hawkw Mar 20, 2020
7406d54
simplify lock/cas loop interaction
hawkw Mar 20, 2020
1f8afa6
simplify lock/cas loop interaction, episode II
hawkw Mar 20, 2020
2d8d413
remove permit struct entirely
hawkw Mar 20, 2020
cc2afac
unweaken cas
hawkw Mar 20, 2020
5573c65
fix backwards assertions in linked-list tests
hawkw Mar 20, 2020
e3c4b9f
rustfmt
hawkw Mar 21, 2020
23257c1
fixup loom test
hawkw Mar 21, 2020
4de0d58
LinkedList fmt::Debug impls shouldn't need T: Debug
hawkw Mar 21, 2020
f22a180
fix racy close behavior
hawkw Mar 21, 2020
9c9354b
run rustfmt on file cargo fmt doesn't see
hawkw Mar 21, 2020
2f1ead5
don't set unqueued when errored
hawkw Mar 22, 2020
077a61f
bring back atomic permit counter
hawkw Mar 22, 2020
be77a72
simplify a few things
hawkw Mar 22, 2020
d20a93a
expect instead of unwrap
hawkw Mar 22, 2020
be8df9e
ensure node pointers don't dangle on panics
hawkw Mar 22, 2020
af8273f
cleanup/docs
hawkw Mar 22, 2020
4bb9fb1
fix max permits, use low bits for flags
hawkw Mar 23, 2020
67fe420
const-ify permit shift amount
hawkw Mar 23, 2020
4d84ec7
check before reregistering waker/reduce unsafe scope
hawkw Mar 23, 2020
f1e3c4c
add overflow checks
hawkw Mar 23, 2020
a82c043
add comments explaining notification
hawkw Mar 23, 2020
88fa1aa
fix backwards assertion
hawkw Mar 23, 2020
0a01a59
Merge branch 'master' into eliza/semaphore-2
hawkw Mar 23, 2020
449f8b1
document future causalcell improvement
hawkw Mar 23, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ harness = false
name = "scheduler"
path = "scheduler.rs"
harness = false


[[bench]]
name = "sync_rwlock"
path = "sync_rwlock.rs"
harness = false

[[bench]]
name = "sync_semaphore"
path = "sync_semaphore.rs"
harness = false
147 changes: 147 additions & 0 deletions benches/sync_rwlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use bencher::{black_box, Bencher};
use std::sync::Arc;
use tokio::{sync::RwLock, task};

fn read_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
for _ in 0..6 {
let read = lock.read().await;
black_box(read);
}
})
});
}

fn read_concurrent_uncontended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

async fn task(lock: Arc<RwLock<()>>) {
let read = lock.read().await;
black_box(read);
}

let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone()))
};
j.unwrap();
})
});
}

fn read_concurrent_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();

async fn task(lock: Arc<RwLock<()>>) {
let read = lock.read().await;
black_box(read);
}

let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
tokio::join! {
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone())
};
})
});
}

fn read_concurrent_contended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

async fn task(lock: Arc<RwLock<()>>) {
let read = lock.read().await;
black_box(read);
}

let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let write = lock.write().await;
let j = tokio::try_join! {
async move { drop(write); Ok(()) },
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
};
j.unwrap();
})
});
}

fn read_concurrent_contended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();

async fn task(lock: Arc<RwLock<()>>) {
let read = lock.read().await;
black_box(read);
}

let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let write = lock.write().await;
tokio::join! {
async move { drop(write) },
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
};
})
});
}

bencher::benchmark_group!(
sync_rwlock,
read_uncontended,
read_concurrent_uncontended,
read_concurrent_uncontended_multi,
read_concurrent_contended,
read_concurrent_contended_multi
);

bencher::benchmark_main!(sync_rwlock);
130 changes: 130 additions & 0 deletions benches/sync_semaphore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use bencher::Bencher;
use std::sync::Arc;
use tokio::{sync::Semaphore, task};

fn uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

let s = Arc::new(Semaphore::new(10));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
for _ in 0..6 {
let permit = s.acquire().await;
drop(permit);
}
})
});
}

async fn task(s: Arc<Semaphore>) {
let permit = s.acquire().await;
drop(permit);
}

fn uncontended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

let s = Arc::new(Semaphore::new(10));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone()))
};
j.unwrap();
})
});
}

fn uncontended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();

let s = Arc::new(Semaphore::new(10));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
tokio::join! {
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone())
};
})
});
}

fn contended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

let s = Arc::new(Semaphore::new(5));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone()))
};
j.unwrap();
})
});
}

fn contended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();

let s = Arc::new(Semaphore::new(5));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
tokio::join! {
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone())
};
})
});
}

bencher::benchmark_group!(
sync_semaphore,
uncontended,
uncontended_concurrent_multi,
uncontended_concurrent_single,
contended_concurrent_multi,
contended_concurrent_single
);

bencher::benchmark_main!(sync_semaphore);
70 changes: 70 additions & 0 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
// NOTE: The doctests in this module are ignored since the whole module is (currently) private.

use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Constant used to determine how much "work" a task is allowed to do without yielding.
Expand Down Expand Up @@ -250,6 +252,74 @@ pub async fn proceed() {
poll_fn(|cx| poll_proceed(cx)).await;
}

pin_project_lite::pin_project! {
/// A future that cooperatively yields to the task scheduler when polling,
/// if the task's budget is exhausted.
///
/// Internally, this is simply a future combinator which calls
/// [`poll_proceed`] in its `poll` implementation before polling the wrapped
/// future.
///
/// # Examples
///
/// ```rust,ignore
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::coop::CoopFutureExt;
///
/// async { /* ... */ }
/// .cooperate()
/// .await;
/// # }
/// ```
///
/// [`poll_proceed`]: fn.poll_proceed.html
#[derive(Debug)]
#[allow(unreachable_pub, dead_code)]
pub struct CoopFuture<F> {
#[pin]
future: F,
}
}

impl<F: Future> Future for CoopFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(poll_proceed(cx));
self.project().future.poll(cx)
}
}

impl<F: Future> CoopFuture<F> {
/// Returns a new `CoopFuture` wrapping the given future.
///
#[allow(unreachable_pub, dead_code)]
pub fn new(future: F) -> Self {
Self { future }
}
}

// Currently only used by `tokio::sync`; and if we make this combinator public,
// it should probably be on the `FutureExt` trait instead.
cfg_sync! {
/// Extension trait providing `Future::cooperate` extension method.
///
/// Note: if/when the co-op API becomes public, this method should probably be
/// provided by `FutureExt`, instead.
pub(crate) trait CoopFutureExt: Future {
/// Wrap `self` to cooperatively yield to the scheduler when polling, if the
/// task's budget is exhausted.
fn cooperate(self) -> CoopFuture<Self>
where
Self: Sized,
{
CoopFuture::new(self)
}
}

impl<F> CoopFutureExt for F where F: Future {}
}

#[cfg(all(test, not(loom)))]
mod test {
use super::*;
Expand Down
Loading