Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent pending run_pending_tasks of future::Cache from causing busy loop in schedule_write_op #415

Merged
merged 5 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
# Moka Cache — Change Log

## Version 0.12.6

### Fixed

- Fixed a bug in `future::Cache` that pending `run_pending_tasks` calls may cause
infinite busy loop in an internal `schedule_write_op` method
([#412][gh-issue-0412]):
- This bug was introduced in `v0.12.0` when the background threads were removed
from `future::Cache`.
- This bug can occur when `run_pending_task` method is called by user code while
cache is receiving a very high number of concurrent cache write operations.
(e.g. `insert`, `get_with`, `invalidate` etc.)
- When it occurs, the `schedule_write_op` method will be spinning in a busy loop
forever, causing high CPU usage and all other async tasks to be starved.

### Changed

- Upgraded `async-lock` crate used by `future::Cache` from `v2.4` to the latest
`v3.3`.


## Version 0.12.5

### Added
Expand Down Expand Up @@ -828,6 +849,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021).
[gh-Swatinem]: https://github.com/Swatinem
[gh-tinou98]: https://github.com/tinou98

[gh-issue-0412]: https://github.com/moka-rs/moka/issues/412/
[gh-issue-0385]: https://github.com/moka-rs/moka/issues/385/
[gh-issue-0329]: https://github.com/moka-rs/moka/issues/329/
[gh-issue-0322]: https://github.com/moka-rs/moka/issues/322/
Expand Down
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.5"
version = "0.12.6"
edition = "2021"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand All @@ -22,7 +22,7 @@ default = ["atomic64", "quanta"]
sync = []

# Enable this feature to use `moka::future::Cache`.
future = ["async-lock", "async-trait", "futures-util"]
future = ["async-lock", "async-trait", "event-listener", "futures-util"]

# Enable this feature to activate optional logging from caches.
# Currently cache will emit log only when it encounters a panic in user provided
Expand Down Expand Up @@ -64,8 +64,9 @@ triomphe = { version = "0.1.3", default-features = false }
quanta = { version = "0.12.2", optional = true }

# Optional dependencies (future)
async-lock = { version = "2.4", optional = true }
async-lock = { version = "3.3", optional = true }
async-trait = { version = "0.1.58", optional = true }
event-listener = { version = "5.3", optional = true }
futures-util = { version = "0.3.17", optional = true }

# Optional dependencies (logging)
Expand Down
75 changes: 44 additions & 31 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ impl<K, V, S> BaseCache<K, V, S> {
}

#[inline]
pub(crate) fn maintenance_task_lock(&self) -> &RwLock<()> {
&self.inner.maintenance_task_lock
pub(crate) fn write_op_ch_ready_event(&self) -> &event_listener::Event<()> {
&self.inner.write_op_ch_ready_event
}

pub(crate) fn notify_invalidate(
Expand Down Expand Up @@ -618,7 +618,7 @@ where
pub(crate) async fn schedule_write_op(
inner: &Arc<impl InnerSync + Send + Sync + 'static>,
ch: &Sender<WriteOp<K, V>>,
maintenance_task_lock: &RwLock<()>,
ch_ready_event: &event_listener::Event<()>,
op: WriteOp<K, V>,
ts: Instant,
housekeeper: Option<&HouseKeeperArc>,
Expand All @@ -638,13 +638,16 @@ where
let mut op = op;
let mut spin_loop_attempts = 0u8;
loop {
// Run the `Inner::do_run_pending_tasks` method if needed.
BaseCache::<K, V, S>::apply_reads_writes_if_needed(
Arc::clone(inner),
ch,
ts,
housekeeper,
)
.await;

// Try to send our op to the write op channel.
match ch.try_send(op) {
Ok(()) => return Ok(()),
Err(TrySendError::Full(op1)) => {
Expand All @@ -666,25 +669,22 @@ where
std::hint::spin_loop();
}
} else {
// Wait for a shared reader lock to become available. The exclusive
// writer lock will be already held by another async task that is
// currently calling `do_run_pending_tasks` method via
// `apply_reads_writes_if_needed` method above.
spin_loop_attempts = 0;

// Yield the async runtime scheduler to other async tasks and wait
// for a channel ready event. This event will be sent when one of the
// following conditions is met:
//
// `do_run_pending_tasks` will receive some of the ops from the
// channel and apply them to the data structures for the cache
// policies, so the channel will have some room for the new ops.
// - The `Inner::do_run_pending_tasks` method has removed some ops
// from the write op channel.
// - The `Housekeeper`'s `run_pending_tasks` or `
// try_run_pending_tasks` methods has freed the lock on the
// `current_task`.
//
// A shared lock will become available once the async task has
// returned from `do_run_pending_tasks`. We release the lock
// immediately after we acquire it.
let _ = maintenance_task_lock.read().await;
spin_loop_attempts = 0;
ch_ready_event.listen().await;

// We are going to retry. If the write op channel has enough room, we
// will be able to send our op to the channel and we are done. If
// not, we (or somebody else) will become the next exclusive writer
// when we (or somebody) call `apply_reads_writes_if_needed` above.
// We are going to retry. Now the channel may have some space and/or
// one of us is allowed to run `do_run_pending_tasks` method.
}
}
}
Expand Down Expand Up @@ -717,10 +717,10 @@ where

// Retry to schedule the write op.
let ts = cancel_guard.ts;
let lock = self.maintenance_task_lock();
let event = self.write_op_ch_ready_event();
let op = cancel_guard.op.as_ref().cloned().unwrap();
let hk = self.housekeeper.as_ref();
Self::schedule_write_op(&self.inner, &self.write_op_ch, lock, op, ts, hk, false)
Self::schedule_write_op(&self.inner, &self.write_op_ch, event, op, ts, hk, false)
.await
.expect("Failed to reschedule a write op");

Expand Down Expand Up @@ -1042,7 +1042,7 @@ pub(crate) struct Inner<K, V, S> {
frequency_sketch_enabled: AtomicBool,
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<WriteOp<K, V>>,
maintenance_task_lock: RwLock<()>,
write_op_ch_ready_event: event_listener::Event,
eviction_policy: EvictionPolicyConfig,
expiration_policy: ExpirationPolicy<K, V>,
valid_after: AtomicInstant,
Expand Down Expand Up @@ -1250,7 +1250,7 @@ where
frequency_sketch_enabled: AtomicBool::default(),
read_op_ch,
write_op_ch,
maintenance_task_lock: RwLock::default(),
write_op_ch_ready_event: event_listener::Event::default(),
eviction_policy: eviction_policy.config,
expiration_policy,
valid_after: AtomicInstant::default(),
Expand Down Expand Up @@ -1412,6 +1412,12 @@ where
self.do_run_pending_tasks(max_repeats).await;
}

/// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method
/// for the write op channel to have enough room.
fn notify_write_op_ch_is_ready(&self) {
self.write_op_ch_ready_event.notify(usize::MAX);
}

fn now(&self) -> Instant {
self.current_time_from_expiration_clock()
}
Expand All @@ -1429,10 +1435,6 @@ where
}

// Acquire some locks.

// SAFETY: the write lock below should never be starved, because the lock
// strategy of async_lock::RwLock is write-preferring.
let write_op_ch_lock = self.maintenance_task_lock.write().await;
let mut deqs = self.deques.lock().await;
let mut timer_wheel = self.timer_wheel.lock().await;

Expand Down Expand Up @@ -1462,9 +1464,21 @@ where
self.enable_frequency_sketch(&eviction_state.counters).await;
}

let w_len = self.write_op_ch.len();

// If there are any async tasks waiting in `BaseCache::schedule_write_op`
// method for the write op channel to have enough room, notify them.
let listeners = self.write_op_ch_ready_event.total_listeners();
if listeners > 0 {
let n = listeners.min(WRITE_LOG_SIZE - w_len);
// Notify the `n` listeners. The `notify` method accepts 0, so no
// need to check if `n` is greater than 0.
self.write_op_ch_ready_event.notify(n);
}

calls += 1;
should_process_logs = self.read_op_ch.len() >= READ_LOG_FLUSH_POINT
|| self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT;
should_process_logs =
self.read_op_ch.len() >= READ_LOG_FLUSH_POINT || w_len >= WRITE_LOG_FLUSH_POINT;
}

if timer_wheel.is_enabled() {
Expand Down Expand Up @@ -1527,9 +1541,8 @@ where

crossbeam_epoch::pin().flush();

// Ensure some of the locks are held until here.
// Ensure this lock is held until here.
drop(deqs);
drop(write_op_ch_lock);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1820,12 +1820,12 @@ where
}

let hk = self.base.housekeeper.as_ref();
let lock = self.base.maintenance_task_lock();
let event = self.base.write_op_ch_ready_event();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
event,
op,
ts,
hk,
Expand Down Expand Up @@ -1986,13 +1986,13 @@ where
should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
}

let lock = self.base.maintenance_task_lock();
let event = self.base.write_op_ch_ready_event();
let hk = self.base.housekeeper.as_ref();

BaseCache::<K, V, S>::schedule_write_op(
&self.base.inner,
&self.base.write_op_ch,
lock,
event,
op,
now,
hk,
Expand Down
27 changes: 23 additions & 4 deletions src/future/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ use futures_util::future::{BoxFuture, Shared};
#[async_trait]
pub(crate) trait InnerSync {
async fn run_pending_tasks(&self, max_sync_repeats: usize);

/// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method
/// for the write op channel to have enough room.
fn notify_write_op_ch_is_ready(&self);

fn now(&self) -> Instant;
}

Expand Down Expand Up @@ -75,19 +80,33 @@ impl Housekeeper {
T: InnerSync + Send + Sync + 'static,
{
let mut current_task = self.current_task.lock().await;
self.do_run_pending_tasks(cache, &mut current_task).await;
self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task)
.await;

drop(current_task);

// If there are any async tasks waiting in `BaseCache::schedule_write_op`
// method for the write op channel, notify them.
cache.notify_write_op_ch_is_ready();
}

pub(crate) async fn try_run_pending_tasks<T>(&self, cache: Arc<T>) -> bool
where
T: InnerSync + Send + Sync + 'static,
{
if let Some(mut current_task) = self.current_task.try_lock() {
self.do_run_pending_tasks(cache, &mut current_task).await;
true
self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task)
.await;
} else {
false
return false;
}

// The `current_task` lock should be free now.

// If there are any async tasks waiting in `BaseCache::schedule_write_op`
// method for the write op channel, notify them.
cache.notify_write_op_ch_is_ready();
true
}

async fn do_run_pending_tasks<T>(
Expand Down
Loading