Skip to content

Commit

Permalink
Re-add TIMER_QUEUED
Browse files Browse the repository at this point in the history
  • Loading branch information
bugadani committed Dec 10, 2024
1 parent 20cda81 commit 515666d
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 11 deletions.
24 changes: 24 additions & 0 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,30 @@ impl TaskRef {
&self.header().timer_queue_item
}

/// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
///
/// Entering this state prevents the task from being respawned while in a timer queue.
///
/// Safety:
///
/// This functions should only be called by the timer queue implementation, before
/// enqueueing the timer item.
#[cfg(feature = "integrated-timers")]
pub unsafe fn timer_enqueue(&self) -> bool {
self.header().state.timer_enqueue()
}

/// Unmark the task as timer-queued.
///
/// Safety:
///
/// This functions should only be called by the timer queue implementation, after the task has
/// been removed from the timer queue.
#[cfg(feature = "integrated-timers")]
pub unsafe fn timer_dequeue(&self) {
self.header().state.timer_dequeue()
}

/// The returned pointer is valid for the entire TaskStorage.
pub(crate) fn as_ptr(self) -> *const TaskHeader {
self.ptr.as_ptr()
Expand Down
17 changes: 17 additions & 0 deletions embassy-executor/src/raw/state_atomics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use core::sync::atomic::{AtomicU32, Ordering};
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
#[cfg(feature = "integrated-timers")]
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;

pub(crate) struct State {
state: AtomicU32,
Expand Down Expand Up @@ -52,4 +55,18 @@ impl State {
let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
state & STATE_SPAWNED != 0
}

/// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
#[cfg(feature = "integrated-timers")]
#[inline(always)]
pub fn timer_enqueue(&self) -> bool {
self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::Relaxed) & STATE_TIMER_QUEUED == 0
}

/// Unmark the task as timer-queued.
#[cfg(feature = "integrated-timers")]
#[inline(always)]
pub fn timer_dequeue(&self) {
self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed);
}
}
19 changes: 17 additions & 2 deletions embassy-executor/src/raw/state_atomics_arm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ pub(crate) struct State {
spawned: AtomicBool,
/// Task is in the executor run queue
run_queued: AtomicBool,
/// Task is in the executor timer queue
timer_queued: AtomicBool,
pad: AtomicBool,
pad2: AtomicBool,
}

impl State {
pub const fn new() -> State {
Self {
spawned: AtomicBool::new(false),
run_queued: AtomicBool::new(false),
timer_queued: AtomicBool::new(false),
pad: AtomicBool::new(false),
pad2: AtomicBool::new(false),
}
}

Expand Down Expand Up @@ -85,4 +86,18 @@ impl State {
self.run_queued.store(false, Ordering::Relaxed);
r
}

/// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
#[cfg(feature = "integrated-timers")]
#[inline(always)]
pub fn timer_enqueue(&self) -> bool {
!self.timer_queued.swap(true, Ordering::Relaxed)
}

/// Unmark the task as timer-queued.
#[cfg(feature = "integrated-timers")]
#[inline(always)]
pub fn timer_dequeue(&self) {
self.timer_queued.store(false, Ordering::Relaxed);
}
}
21 changes: 21 additions & 0 deletions embassy-executor/src/raw/state_critical_section.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use critical_section::Mutex;
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
#[cfg(feature = "integrated-timers")]
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;

pub(crate) struct State {
state: Mutex<Cell<u32>>,
Expand Down Expand Up @@ -69,4 +72,22 @@ impl State {
ok
})
}

/// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
#[cfg(feature = "integrated-timers")]
#[inline(always)]
pub fn timer_enqueue(&self) -> bool {
self.update(|s| {
let ok = *s & STATE_TIMER_QUEUED == 0;
*s |= STATE_TIMER_QUEUED;
ok
})
}

/// Unmark the task as timer-queued.
#[cfg(feature = "integrated-timers")]
#[inline(always)]
pub fn timer_dequeue(&self) {
self.update(|s| *s &= !STATE_TIMER_QUEUED);
}
}
17 changes: 8 additions & 9 deletions embassy-time-queue-driver/src/queue_integrated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ impl TimerQueue {
/// a new alarm for that time.
pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool {
let item = p.timer_queue_item();
if at <= item.expires_at.get() {
if unsafe { p.timer_enqueue() } {
// If not in the queue, add it and update.
let prev = self.head.replace(Some(p));
item.next.set(prev);
item.expires_at.set(at);
true
} else if at <= item.expires_at.get() {
// If expiration is sooner than previously set, update.

if item.expires_at.get() == u64::MAX {
// If not in the queue, add it and update.
let prev = self.head.replace(Some(p));
item.next.set(prev);
}

item.expires_at.set(at);
true
} else {
Expand Down Expand Up @@ -73,7 +72,7 @@ impl TimerQueue {
} else {
// Remove it
prev.set(item.next.get());
item.expires_at.set(u64::MAX);
unsafe { p.timer_dequeue() };
}
}
}
Expand Down

0 comments on commit 515666d

Please sign in to comment.