From 515666d8d4bf6ed01546c1df0fd66d8099c9a74b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Mon, 9 Dec 2024 22:46:13 +0100 Subject: [PATCH] Re-add TIMER_QUEUED --- embassy-executor/src/raw/mod.rs | 24 +++++++++++++++++++ embassy-executor/src/raw/state_atomics.rs | 17 +++++++++++++ embassy-executor/src/raw/state_atomics_arm.rs | 19 +++++++++++++-- .../src/raw/state_critical_section.rs | 21 ++++++++++++++++ .../src/queue_integrated.rs | 17 +++++++------ 5 files changed, 87 insertions(+), 11 deletions(-) diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index f9c6509f18..ad9ea82ef6 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -88,6 +88,30 @@ impl TaskRef { &self.header().timer_queue_item } + /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) + /// + /// Entering this state prevents the task from being respawned while in a timer queue. + /// + /// Safety: + /// + /// This functions should only be called by the timer queue implementation, before + /// enqueueing the timer item. + #[cfg(feature = "integrated-timers")] + pub unsafe fn timer_enqueue(&self) -> bool { + self.header().state.timer_enqueue() + } + + /// Unmark the task as timer-queued. + /// + /// Safety: + /// + /// This functions should only be called by the timer queue implementation, after the task has + /// been removed from the timer queue. + #[cfg(feature = "integrated-timers")] + pub unsafe fn timer_dequeue(&self) { + self.header().state.timer_dequeue() + } + /// The returned pointer is valid for the entire TaskStorage. pub(crate) fn as_ptr(self) -> *const TaskHeader { self.ptr.as_ptr() diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs index e4127897ef..cfa4eb0053 100644 --- a/embassy-executor/src/raw/state_atomics.rs +++ b/embassy-executor/src/raw/state_atomics.rs @@ -4,6 +4,9 @@ use core::sync::atomic::{AtomicU32, Ordering}; pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "integrated-timers")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct State { state: AtomicU32, @@ -52,4 +55,18 @@ impl State { let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); state & STATE_SPAWNED != 0 } + + /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_enqueue(&self) -> bool { + self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::Relaxed) & STATE_TIMER_QUEUED == 0 + } + + /// Unmark the task as timer-queued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_dequeue(&self) { + self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed); + } } diff --git a/embassy-executor/src/raw/state_atomics_arm.rs b/embassy-executor/src/raw/state_atomics_arm.rs index b673c73595..e4dfe50938 100644 --- a/embassy-executor/src/raw/state_atomics_arm.rs +++ b/embassy-executor/src/raw/state_atomics_arm.rs @@ -11,8 +11,9 @@ pub(crate) struct State { spawned: AtomicBool, /// Task is in the executor run queue run_queued: AtomicBool, + /// Task is in the executor timer queue + timer_queued: AtomicBool, pad: AtomicBool, - pad2: AtomicBool, } impl State { @@ -20,8 +21,8 @@ impl State { Self { spawned: AtomicBool::new(false), run_queued: AtomicBool::new(false), + timer_queued: AtomicBool::new(false), pad: AtomicBool::new(false), - pad2: AtomicBool::new(false), } } @@ -85,4 +86,18 @@ impl State { self.run_queued.store(false, Ordering::Relaxed); r } + + /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_enqueue(&self) -> bool { + !self.timer_queued.swap(true, Ordering::Relaxed) + } + + /// Unmark the task as timer-queued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_dequeue(&self) { + self.timer_queued.store(false, Ordering::Relaxed); + } } diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs index b92eed006e..c3cc1b0b77 100644 --- a/embassy-executor/src/raw/state_critical_section.rs +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -6,6 +6,9 @@ use critical_section::Mutex; pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "integrated-timers")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct State { state: Mutex>, @@ -69,4 +72,22 @@ impl State { ok }) } + + /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_enqueue(&self) -> bool { + self.update(|s| { + let ok = *s & STATE_TIMER_QUEUED == 0; + *s |= STATE_TIMER_QUEUED; + ok + }) + } + + /// Unmark the task as timer-queued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_dequeue(&self) { + self.update(|s| *s &= !STATE_TIMER_QUEUED); + } } diff --git a/embassy-time-queue-driver/src/queue_integrated.rs b/embassy-time-queue-driver/src/queue_integrated.rs index 9df404526e..0a4a6218e8 100644 --- a/embassy-time-queue-driver/src/queue_integrated.rs +++ b/embassy-time-queue-driver/src/queue_integrated.rs @@ -21,15 +21,14 @@ impl TimerQueue { /// a new alarm for that time. pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool { let item = p.timer_queue_item(); - if at <= item.expires_at.get() { + if unsafe { p.timer_enqueue() } { + // If not in the queue, add it and update. + let prev = self.head.replace(Some(p)); + item.next.set(prev); + item.expires_at.set(at); + true + } else if at <= item.expires_at.get() { // If expiration is sooner than previously set, update. - - if item.expires_at.get() == u64::MAX { - // If not in the queue, add it and update. - let prev = self.head.replace(Some(p)); - item.next.set(prev); - } - item.expires_at.set(at); true } else { @@ -73,7 +72,7 @@ impl TimerQueue { } else { // Remove it prev.set(item.next.get()); - item.expires_at.set(u64::MAX); + unsafe { p.timer_dequeue() }; } } }