Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace unnecessary atomics with non-atomic operations #94

Merged
merged 2 commits into from
Feb 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 22 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock, TryLockError};
use std::task::{Poll, Waker};

Expand Down Expand Up @@ -243,7 +243,7 @@ impl<'a> Executor<'a> {
/// assert_eq!(res, 6);
/// ```
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
let runner = Runner::new(self.state());
let mut runner = Runner::new(self.state());
let mut rng = fastrand::Rng::new();

// A future that runs tasks forever.
Expand Down Expand Up @@ -639,29 +639,26 @@ struct Ticker<'a> {
/// 1) Woken.
/// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified.
sleeping: AtomicUsize,
sleeping: usize,
}

impl Ticker<'_> {
/// Creates a ticker.
fn new(state: &State) -> Ticker<'_> {
Ticker {
state,
sleeping: AtomicUsize::new(0),
}
Ticker { state, sleeping: 0 }
}

/// Moves the ticker into sleeping and unnotified state.
///
/// Returns `false` if the ticker was already sleeping and unnotified.
fn sleep(&self, waker: &Waker) -> bool {
fn sleep(&mut self, waker: &Waker) -> bool {
let mut sleepers = self.state.sleepers.lock().unwrap();

match self.sleeping.load(Ordering::SeqCst) {
match self.sleeping {
// Move to sleeping state.
0 => self
.sleeping
.store(sleepers.insert(waker), Ordering::SeqCst),
0 => {
self.sleeping = sleepers.insert(waker);
}

// Already sleeping, check if notified.
id => {
Expand All @@ -679,25 +676,25 @@ impl Ticker<'_> {
}

/// Moves the ticker into woken state.
fn wake(&self) {
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
fn wake(&mut self) {
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
sleepers.remove(id);
sleepers.remove(self.sleeping);

self.state
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
}
self.sleeping = 0;
}

/// Waits for the next runnable task to run.
async fn runnable(&self) -> Runnable {
async fn runnable(&mut self) -> Runnable {
self.runnable_with(|| self.state.queue.pop().ok()).await
}

/// Waits for the next runnable task to run, given a function that searches for a task.
async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
future::poll_fn(|cx| {
loop {
match search() {
Expand Down Expand Up @@ -728,10 +725,9 @@ impl Ticker<'_> {
impl Drop for Ticker<'_> {
fn drop(&mut self) {
// If this ticker is in sleeping state, it must be removed from the sleepers list.
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
let notified = sleepers.remove(id);
let notified = sleepers.remove(self.sleeping);

self.state
.notified
Expand Down Expand Up @@ -760,7 +756,7 @@ struct Runner<'a> {
local: Arc<ConcurrentQueue<Runnable>>,

/// Bumped every time a runnable task is found.
ticks: AtomicUsize,
ticks: usize,
}

impl Runner<'_> {
Expand All @@ -770,7 +766,7 @@ impl Runner<'_> {
state,
ticker: Ticker::new(state),
local: Arc::new(ConcurrentQueue::bounded(512)),
ticks: AtomicUsize::new(0),
ticks: 0,
};
state
.local_queues
Expand All @@ -781,7 +777,7 @@ impl Runner<'_> {
}

/// Waits for the next runnable task to run.
async fn runnable(&self, rng: &mut fastrand::Rng) -> Runnable {
async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
let runnable = self
.ticker
.runnable_with(|| {
Expand Down Expand Up @@ -824,9 +820,9 @@ impl Runner<'_> {
.await;

// Bump the tick counter.
let ticks = self.ticks.fetch_add(1, Ordering::SeqCst);
self.ticks += 1;

if ticks % 64 == 0 {
if self.ticks % 64 == 0 {
// Steal tasks from the global queue to ensure fair task scheduling.
steal(&self.state.queue, &self.local);
}
Expand Down
Loading