diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 2cd07db0..64e2f049 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -60,6 +60,19 @@ jobs: RUSTFLAGS: '--cfg skeptic' - name: Run tests (future) + uses: actions-rs/cargo@v1 + if: ${{ matrix.rust != '1.45.2' }} + with: + command: test + args: --features future + + - name: Run tests (release, no features) + uses: actions-rs/cargo@v1 + with: + command: test + args: --release + + - name: Run tests (release, future) uses: actions-rs/cargo@v1 if: ${{ matrix.rust != '1.45.2' }} with: diff --git a/.vscode/settings.json b/.vscode/settings.json index 5fc7c13c..e05e0ee2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -15,7 +15,9 @@ "deqs", "Deque", "Deques", + "Einziger", "else's", + "Eytan", "getrandom", "Hasher", "Kawano", @@ -24,6 +26,7 @@ "MSRV", "nanos", "nocapture", + "Ohad", "peekable", "preds", "reqwest", @@ -32,6 +35,7 @@ "RUSTFLAGS", "rustfmt", "semver", + "smallvec", "structs", "Tatsuya", "thiserror", diff --git a/CHANGELOG.md b/CHANGELOG.md index b71ea760..854e407c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Moka — Change Log +## Version 0.7.0 + +### Added + +- Add support for weight-based (size aware) cache management. + ([#24][gh-pull-0024]) +- Add support for unbound cache. ([#24][gh-pull-0024]) + + ## Version 0.6.3 ### Fixed @@ -182,6 +191,7 @@ [gh-pull-0033]: https://github.com/moka-rs/moka/pull/33/ [gh-pull-0030]: https://github.com/moka-rs/moka/pull/30/ [gh-pull-0028]: https://github.com/moka-rs/moka/pull/28/ +[gh-pull-0024]: https://github.com/moka-rs/moka/pull/24/ [gh-pull-0023]: https://github.com/moka-rs/moka/pull/23/ [gh-pull-0022]: https://github.com/moka-rs/moka/pull/22/ [gh-pull-0020]: https://github.com/moka-rs/moka/pull/20/ diff --git a/Cargo.toml b/Cargo.toml index 8b3170b6..f4435530 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "moka" -version = "0.6.3" +version = "0.7.0" authors = ["Tatsuya Kawano "] edition = "2018" @@ -33,12 +33,14 @@ atomic64 = [] [dependencies] crossbeam-channel = "0.5" +crossbeam-utils = "0.8" moka-cht = "0.4.2" num_cpus = "1.13" once_cell = "1.7" parking_lot = "0.11" quanta = "0.9.3" scheduled-thread-pool = "0.2" +smallvec = "1.6" thiserror = "1.0" uuid = { version = "0.8", features = ["v4"] } diff --git a/LICENSE-APACHE b/LICENSE-APACHE index 166c0fda..fad13c8d 100644 --- a/LICENSE-APACHE +++ b/LICENSE-APACHE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2020 - 2021 Tatsuya Kawano + Copyright 2020 - 2022 Tatsuya Kawano Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/LICENSE-MIT b/LICENSE-MIT index d9ec411c..aaf52b58 100644 --- a/LICENSE-MIT +++ b/LICENSE-MIT @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2020 - 2021 Tatsuya Kawano +Copyright (c) 2020 - 2022 Tatsuya Kawano Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 0395a7d5..21a2b324 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,8 @@ [![license][license-badge]](#license) [![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fmoka-rs%2Fmoka.svg?type=shield)](https://app.fossa.com/projects/git%2Bgithub.com%2Fmoka-rs%2Fmoka?ref=badge_shield) -Moka is a fast, concurrent cache library for Rust. Moka is inspired by -[Caffeine][caffeine-git] (Java). +Moka is a fast, concurrent cache library for Rust. Moka is inspired by the +[Caffeine][caffeine-git] library for Java. Moka provides cache implementations on top of hash maps. They support full concurrency of retrievals and a high expected concurrency for updates. Moka also @@ -42,7 +42,9 @@ algorithm to determine which entries to evict when the capacity is exceeded. - Synchronous caches that can be shared across OS threads. - An asynchronous (futures aware) cache that can be accessed inside and outside of asynchronous contexts. -- Caches are bounded by the maximum number of entries. +- A cache can be bounded by one of the followings: + - The maximum number of entries. + - The total weighted size of entries. - Maintains good hit rate by using an entry replacement algorithms inspired by [Caffeine][caffeine-git]: - Admission to a cache is controlled by the Least Frequently Used (LFU) policy. @@ -54,15 +56,15 @@ algorithm to determine which entries to evict when the capacity is exceeded. ## Moka in Production -Moka is powering production services as well as embedded devices like home routers. -Here are some highlights: +Moka is powering production services as well as embedded Linux devices like home +routers. Here are some highlights: - [crates.io](https://crates.io/): The official crate registry has been using Moka in its API service to reduce the loads on PostgreSQL. Moka is maintaining [cache hit rates of ~85%][gh-discussions-51] for the high-traffic download endpoint. (Moka used: Nov 2021 — present) - [aliyundrive-webdav][aliyundrive-webdav-git]: This WebDAV gateway for a cloud drive - may have been deployed in hundreds of home WiFi routers, including inexpensive + may have been deployed in hundreds of home Wi-Fi routers, including inexpensive models with 32-bit MIPS or ARMv5TE-based SoCs. Moka is used to cache the metadata of remote files. (Moka used: Aug 2021 — present) @@ -76,14 +78,14 @@ Add this to your `Cargo.toml`: ```toml [dependencies] -moka = "0.6" +moka = "0.7" ``` To use the asynchronous cache, enable a crate feature called "future". ```toml [dependencies] -moka = { version = "0.6", features = ["future"] } +moka = { version = "0.7", features = ["future"] } ``` @@ -91,8 +93,8 @@ moka = { version = "0.6", features = ["future"] } The thread-safe, synchronous caches are defined in the `sync` module. -Cache entries are manually added using `insert` method, and are stored in the cache -until either evicted or manually invalidated. +Cache entries are manually added using `insert` or `get_or_insert_with` method, and +are stored in the cache until either evicted or manually invalidated. Here's an example of reading and updating a cache by using multiple threads: @@ -152,6 +154,12 @@ fn main() { } ``` +If you want to atomically initialize and insert a value when the key is not present, +you might want to check [the document][doc-sync-cache] for other insertion methods +`get_or_insert_with` and `get_or_try_insert_with`. + +[doc-sync-cache]: https://docs.rs/moka/*/moka/sync/struct.Cache.html#method.get_or_insert_with + ## Example: Asynchronous Cache @@ -179,7 +187,7 @@ Here is a similar program to the previous example, but using asynchronous cache // Cargo.toml // // [dependencies] -// moka = { version = "0.6", features = ["future"] } +// moka = { version = "0.7", features = ["future"] } // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } // futures = "0.3" @@ -239,6 +247,12 @@ async fn main() { } ``` +If you want to atomically initialize and insert a value when the key is not present, +you might want to check [the document][doc-future-cache] for other insertion methods +`get_or_insert_with` and `get_or_try_insert_with`. + +[doc-future-cache]: https://docs.rs/moka/*/moka/future/struct.Cache.html#method.get_or_insert_with + ## Avoiding to clone the value at `get` @@ -270,6 +284,34 @@ cache.get(&key); ``` +## Example: Bounding a Cache with Weighted Size of Entry + +A `weigher` closure can be set at the cache creation time. It will calculate and +return a weighted size (relative size) of an entry. When it is set, a cache tries to +evict entries when the total weighted size exceeds its `max_capacity`. + +```rust +use std::convert::TryInto; +use moka::sync::Cache; + +fn main() { + let cache = Cache::builder() + // A weigher closure takes &K and &V and returns a u32 representing the + // relative size of the entry. Here, we use the byte length of the value + // String as the size. + .weigher(|_key, value: &String| -> u32 { + value.len().try_into().unwrap_or(u32::MAX) + }) + // This cache will hold up to 32MiB of values. + .max_capacity(32 * 1024 * 1024) + .build(); + cache.insert(0, "zero".to_string()); +} +``` + +Note that weighted sizes are not used when making eviction selections. + + ## Example: Expiration Policies Moka supports the following expiration policies: @@ -282,12 +324,11 @@ Moka supports the following expiration policies: To set them, use the `CacheBuilder`. ```rust -use moka::sync::CacheBuilder; - +use moka::sync::Cache; use std::time::Duration; fn main() { - let cache = CacheBuilder::new(10_000) // Max 10,000 elements + let cache = Cache::builder() // Time to live (TTL): 30 minutes .time_to_live(Duration::from_secs(30 * 60)) // Time to idle (TTI): 5 minutes @@ -385,9 +426,9 @@ to the dependency declaration. ```toml:Cargo.toml [dependencies] -moka = { version = "0.6", default-feautures = false } +moka = { version = "0.7", default-feautures = false } # Or -moka = { version = "0.6", default-feautures = false, features = ["future"] } +moka = { version = "0.7", default-feautures = false, features = ["future"] } ``` This will make Moka to switch to a fall-back implementation, so it will compile. @@ -415,8 +456,18 @@ $ RUSTFLAGS='--cfg skeptic --cfg trybuild' cargo test \ ## Road Map - [x] `async` optimized caches. (`v0.2.0`) -- [ ] Weight based cache management ([#24](https://github.com/moka-rs/moka/pull/24)) +- [x] Bounding a cache with weighted size of entry. + (`v0.7.0` via [#24](https://github.com/moka-rs/moka/pull/24)) +- [ ] API stabilization. (Smaller core API, shorter names for frequently used + methods) + - e.g. + - `get(&Q)` → `get_if_present(&Q)` + - `get_or_insert_with(K, F)` → `get(K, F)` + - `get_or_try_insert_with(K, F)` → `try_get(K, F)` + - `blocking_insert(K, V)` → `blocking().insert(K, V)`. + - `time_to_live()` → `config().time_to_live()` - [ ] Cache statistics. (Hit rate, etc.) +- [ ] Notifications on eviction, etc. - [ ] Upgrade TinyLFU to Window TinyLFU. - [ ] The variable (per-entry) expiration, using a hierarchical timer wheel. @@ -426,6 +477,14 @@ $ RUSTFLAGS='--cfg skeptic --cfg trybuild' cargo test \ Moka is named after the [moka pot][moka-pot-wikipedia], a stove-top coffee maker that brews espresso-like coffee using boiling water pressurized by steam. +This name would imply the following facts and hopes: + +- Moka is a part of the Java Caffeine cache family. +- It is written in Rust. (Many moka pots are made of aluminum alloy or stainless + steel. We know they don't rust though) +- It should be fast. ("Espresso" in Italian means express) +- It should be easy to use, like a moka pot. + [moka-pot-wikipedia]: https://en.wikipedia.org/wiki/Moka_pot diff --git a/src/common.rs b/src/common.rs index ea5d4add..4a67c013 100644 --- a/src/common.rs +++ b/src/common.rs @@ -14,12 +14,3 @@ pub(crate) mod unsafe_weak_pointer; pub(crate) mod atomic_time; pub(crate) mod time; - -use time::Instant; - -pub(crate) trait AccessTime { - fn last_accessed(&self) -> Option; - fn set_last_accessed(&mut self, timestamp: Instant); - fn last_modified(&self) -> Option; - fn set_last_modified(&mut self, timestamp: Instant); -} diff --git a/src/common/deque.rs b/src/common/deque.rs index 0bdb01c5..f55f0391 100644 --- a/src/common/deque.rs +++ b/src/common/deque.rs @@ -49,6 +49,10 @@ impl DeqNode { element, } } + + pub(crate) fn next_node(&self) -> Option<&DeqNode> { + self.next.as_ref().map(|node| unsafe { node.as_ref() }) + } } /// Cursor is used to remember the current iterating position. @@ -650,6 +654,51 @@ mod tests { assert!((&mut deque).next().is_none()); } + #[test] + fn next_node() { + let mut deque: Deque = Deque::new(MainProbation); + + let node1 = DeqNode::new(MainProbation, "a".into()); + deque.push_back(Box::new(node1)); + let node2 = DeqNode::new(MainProbation, "b".into()); + let node2_ptr = deque.push_back(Box::new(node2)); + let node3 = DeqNode::new(MainProbation, "c".into()); + let node3_ptr = deque.push_back(Box::new(node3)); + + // ------------------------------------------------------- + // First iteration. + // peek_front() -> node1 + let node1a = deque.peek_front().unwrap(); + assert_eq!(node1a.element, "a".to_string()); + let node2a = node1a.next_node().unwrap(); + assert_eq!(node2a.element, "b".to_string()); + let node3a = node2a.next_node().unwrap(); + assert_eq!(node3a.element, "c".to_string()); + assert!(node3a.next_node().is_none()); + + // ------------------------------------------------------- + // Iterate after a move_to_back. + // Move "b" to the back. So now "a" -> "c" -> "b". + unsafe { deque.move_to_back(node2_ptr) }; + let node1a = deque.peek_front().unwrap(); + assert_eq!(node1a.element, "a".to_string()); + let node3a = node1a.next_node().unwrap(); + assert_eq!(node3a.element, "c".to_string()); + let node2a = node3a.next_node().unwrap(); + assert_eq!(node2a.element, "b".to_string()); + assert!(node2a.next_node().is_none()); + + // ------------------------------------------------------- + // Iterate after an unlink. + // Unlink the second node "c". Now "a" -> "c". + unsafe { deque.unlink(node3_ptr) }; + let node1a = deque.peek_front().unwrap(); + assert_eq!(node1a.element, "a".to_string()); + let node2a = node1a.next_node().unwrap(); + assert_eq!(node2a.element, "b".to_string()); + assert!(node2a.next_node().is_none()); + } + #[test] fn drop() { use std::{cell::RefCell, rc::Rc}; diff --git a/src/future/builder.rs b/src/future/builder.rs index a0e76baa..47711da8 100644 --- a/src/future/builder.rs +++ b/src/future/builder.rs @@ -1,10 +1,11 @@ use super::Cache; -use crate::common::builder_utils; +use crate::{common::builder_utils, sync::Weigher}; use std::{ collections::hash_map::RandomState, hash::{BuildHasher, Hash}, marker::PhantomData, + sync::Arc, time::Duration, }; @@ -15,54 +16,78 @@ use std::{ /// # Examples /// /// ```rust -/// use moka::future::CacheBuilder; +/// // Cargo.toml +/// // +/// // [dependencies] +/// // moka = { version = "0.7", features = ["future"] } +/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } +/// // futures = "0.3" /// +/// use moka::future::Cache; /// use std::time::Duration; /// -/// let cache = CacheBuilder::new(10_000) // Max 10,000 elements -/// // Time to live (TTL): 30 minutes -/// .time_to_live(Duration::from_secs(30 * 60)) -/// // Time to idle (TTI): 5 minutes -/// .time_to_idle(Duration::from_secs( 5 * 60)) -/// // Create the cache. -/// .build(); +/// #[tokio::main] +/// async fn main() { +/// let cache = Cache::builder() +/// // Max 10,000 entries +/// .max_capacity(10_000) +/// // Time to live (TTL): 30 minutes +/// .time_to_live(Duration::from_secs(30 * 60)) +/// // Time to idle (TTI): 5 minutes +/// .time_to_idle(Duration::from_secs( 5 * 60)) +/// // Create the cache. +/// .build(); /// -/// // This entry will expire after 5 minutes (TTI) if there is no get(). -/// cache.insert(0, "zero"); +/// // This entry will expire after 5 minutes (TTI) if there is no get(). +/// cache.insert(0, "zero").await; /// -/// // This get() will extend the entry life for another 5 minutes. -/// cache.get(&0); +/// // This get() will extend the entry life for another 5 minutes. +/// cache.get(&0); /// -/// // Even though we keep calling get(), the entry will expire -/// // after 30 minutes (TTL) from the insert(). +/// // Even though we keep calling get(), the entry will expire +/// // after 30 minutes (TTL) from the insert(). +/// } /// ``` /// -pub struct CacheBuilder { - max_capacity: usize, +pub struct CacheBuilder { + max_capacity: Option, initial_capacity: Option, - // num_segments: Option, + weigher: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, cache_type: PhantomData, } -impl CacheBuilder> +impl Default for CacheBuilder> where K: Eq + Hash + Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - /// Construct a new `CacheBuilder` that will be used to build a `Cache` holding - /// up to `max_capacity` entries. - pub fn new(max_capacity: usize) -> Self { + fn default() -> Self { Self { - max_capacity, + max_capacity: None, initial_capacity: None, - // num_segments: None, + weigher: None, time_to_live: None, time_to_idle: None, invalidator_enabled: false, - cache_type: PhantomData::default(), + cache_type: Default::default(), + } + } +} + +impl CacheBuilder> +where + K: Eq + Hash + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + /// Construct a new `CacheBuilder` that will be used to build a `Cache` holding + /// up to `max_capacity` entries. + pub fn new(max_capacity: usize) -> Self { + Self { + max_capacity: Some(max_capacity), + ..Default::default() } } @@ -80,6 +105,7 @@ where self.max_capacity, self.initial_capacity, build_hasher, + self.weigher, self.time_to_live, self.time_to_idle, self.invalidator_enabled, @@ -102,6 +128,7 @@ where self.max_capacity, self.initial_capacity, hasher, + self.weigher, self.time_to_live, self.time_to_idle, self.invalidator_enabled, @@ -109,11 +136,30 @@ where } } -impl CacheBuilder { - /// Sets the initial capacity of the cache. - pub fn initial_capacity(self, capacity: usize) -> Self { +impl CacheBuilder { + /// Sets the max capacity of the cache. + pub fn max_capacity(self, max_capacity: usize) -> Self { + Self { + max_capacity: Some(max_capacity), + ..self + } + } + + /// Sets the initial capacity (number of entries) of the cache. + pub fn initial_capacity(self, number_of_entries: usize) -> Self { + Self { + initial_capacity: Some(number_of_entries), + ..self + } + } + + /// Sets the weigher closure of the cache. + /// + /// The closure should take `&K` and `&V` as the arguments and returns a `u32` + /// representing the relative size of the entry. + pub fn weigher(self, weigher: impl Fn(&K, &V) -> u32 + Send + Sync + 'static) -> Self { Self { - initial_capacity: Some(capacity), + weigher: Some(Arc::new(weigher)), ..self } } @@ -171,7 +217,6 @@ impl CacheBuilder { mod tests { use super::CacheBuilder; - use super::Cache; use std::time::Duration; #[tokio::test] @@ -179,7 +224,7 @@ mod tests { // Cache let cache = CacheBuilder::new(100).build(); - assert_eq!(cache.max_capacity(), 100); + assert_eq!(cache.max_capacity(), Some(100)); assert_eq!(cache.time_to_live(), None); assert_eq!(cache.time_to_idle(), None); assert_eq!(cache.num_segments(), 1); @@ -192,7 +237,7 @@ mod tests { .time_to_idle(Duration::from_secs(15 * 60)) .build(); - assert_eq!(cache.max_capacity(), 100); + assert_eq!(cache.max_capacity(), Some(100)); assert_eq!(cache.time_to_live(), Some(Duration::from_secs(45 * 60))); assert_eq!(cache.time_to_idle(), Some(Duration::from_secs(15 * 60))); assert_eq!(cache.num_segments(), 1); @@ -205,7 +250,7 @@ mod tests { #[should_panic(expected = "time_to_live is longer than 1000 years")] async fn build_cache_too_long_ttl() { let thousand_years_secs: u64 = 1000 * 365 * 24 * 3600; - let builder: CacheBuilder> = CacheBuilder::new(100); + let builder: CacheBuilder = CacheBuilder::new(100); let duration = Duration::from_secs(thousand_years_secs); builder .time_to_live(duration + Duration::from_secs(1)) @@ -216,7 +261,7 @@ mod tests { #[should_panic(expected = "time_to_idle is longer than 1000 years")] async fn build_cache_too_long_tti() { let thousand_years_secs: u64 = 1000 * 365 * 24 * 3600; - let builder: CacheBuilder> = CacheBuilder::new(100); + let builder: CacheBuilder = CacheBuilder::new(100); let duration = Duration::from_secs(thousand_years_secs); builder .time_to_idle(duration + Duration::from_secs(1)) diff --git a/src/future/cache.rs b/src/future/cache.rs index c8310176..6e352432 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1,12 +1,12 @@ use super::{ value_initializer::{InitResult, ValueInitializer}, - ConcurrentCacheExt, + CacheBuilder, ConcurrentCacheExt, }; use crate::{ sync::{ base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, housekeeper::InnerSync, - PredicateId, WriteOp, + PredicateId, Weigher, WriteOp, }, PredicateError, }; @@ -42,8 +42,9 @@ use std::{ /// cache until either evicted or manually invalidated: /// /// - Inside an async context (`async fn` or `async` block), use -/// [`insert`](#method.insert) or [`invalidate`](#method.invalidate) method for -/// updating the cache and `await` them. +/// [`insert`](#method.insert), [`get_or_insert_with`](#method.get_or_insert_with) +/// or [`invalidate`](#method.invalidate) method for updating the cache and `await` +/// them. /// - Outside any async context, use [`blocking_insert`](#method.blocking_insert) or /// [`blocking_invalidate`](#method.blocking_invalidate) methods. They will block /// for a short time under heavy updates. @@ -57,7 +58,7 @@ use std::{ /// // Cargo.toml /// // /// // [dependencies] -/// // moka = { version = "0.6", features = ["future"] } +/// // moka = { version = "0.7", features = ["future"] } /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// // futures = "0.3" /// @@ -116,28 +117,10 @@ use std::{ /// } /// ``` /// -/// # Thread Safety -/// -/// All methods provided by the `Cache` are considered thread-safe, and can be safely -/// accessed by multiple concurrent threads. -/// -/// - `Cache` requires trait bounds `Send`, `Sync` and `'static` for `K` -/// (key), `V` (value) and `S` (hasher state). -/// - `Cache` will implement `Send` and `Sync`. -/// -/// # Sharing a cache across asynchronous tasks -/// -/// To share a cache across async tasks (or OS threads), do one of the followings: -/// -/// - Create a clone of the cache by calling its `clone` method and pass it to other -/// task. -/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from -/// [once_cell][once-cell-crate] create, and set it to a `static` variable. -/// -/// Cloning is a cheap operation for `Cache` as it only creates thread-safe -/// reference-counted pointers to the internal data structures. -/// -/// [once-cell-crate]: https://crates.io/crates/once_cell +/// If you want to atomically initialize and insert a value when the key is not +/// present, you might want to check other insertion methods +/// [`get_or_insert_with`](#method.get_or_insert_with) and +/// [`get_or_try_insert_with`](#method.get_or_try_insert_with). /// /// # Avoiding to clone the value at `get` /// @@ -154,7 +137,67 @@ use std::{ /// /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html /// -/// # Expiration Policies +/// # Evictions +/// +/// `Cache` provides two types of eviction: size-based eviction and time-based +/// eviction. +/// +/// ## Size-based +/// +/// ```rust +/// // Cargo.toml +/// // +/// // [dependencies] +/// // moka = { version = "0.7", features = ["future"] } +/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } +/// // futures = "0.3" +/// +/// use std::convert::TryInto; +/// use moka::future::Cache; +/// +/// #[tokio::main] +/// async fn main() { +/// // Evict based on the number of entries in the cache. +/// let cache = Cache::builder() +/// // Up to 10,000 entries. +/// .max_capacity(10_000) +/// // Create the cache. +/// .build(); +/// cache.insert(1, "one".to_string()).await; +/// +/// // Evict based on the byte length of strings in the cache. +/// let cache = Cache::builder() +/// // A weigher closure takes &K and &V and returns a u32 +/// // representing the relative size of the entry. +/// .weigher(|_key, value: &String| -> u32 { +/// value.len().try_into().unwrap_or(u32::MAX) +/// }) +/// // This cache will hold up to 32MiB of values. +/// .max_capacity(32 * 1024 * 1024) +/// .build(); +/// cache.insert(2, "two".to_string()).await; +/// } +/// ``` +/// +/// If your cache should not grow beyond a certain size, use the `max_capacity` +/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache +/// will try to evict entries that have not been used recently or very often. +/// +/// At the cache creation time, a weigher closure can be set by the `weigher` method +/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and +/// returns a `u32` representing the relative size of the entry: +/// +/// - If the `weigher` is _not_ set, the cache will treat each entry has the same +/// size of `1`. This means the cache will be bounded by the number of entries. +/// - If the `weigher` is set, the cache will call the weigher to calculate the +/// weighted size (relative size) on an entry. This means the cache will be bounded +/// by the total weighted size of entries. +/// +/// Note that weighted sizes are not used when making eviction selections. +/// +/// [builder-struct]: ./struct.CacheBuilder.html +/// +/// ## Time-based (Expirations) /// /// `Cache` supports the following expiration policies: /// @@ -163,10 +206,60 @@ use std::{ /// - **Time to idle**: A cached entry will be expired after the specified duration /// past from `get` or `insert`. /// -/// See the [`CacheBuilder`][builder-struct]'s doc for how to configure a cache -/// with them. +/// ```rust +/// // Cargo.toml +/// // +/// // [dependencies] +/// // moka = { version = "0.7", features = ["future"] } +/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } +/// // futures = "0.3" /// -/// [builder-struct]: ./struct.CacheBuilder.html +/// use moka::future::Cache; +/// use std::time::Duration; +/// +/// #[tokio::main] +/// async fn main() { +/// let cache = Cache::builder() +/// // Time to live (TTL): 30 minutes +/// .time_to_live(Duration::from_secs(30 * 60)) +/// // Time to idle (TTI): 5 minutes +/// .time_to_idle(Duration::from_secs( 5 * 60)) +/// // Create the cache. +/// .build(); +/// +/// // This entry will expire after 5 minutes (TTI) if there is no get(). +/// cache.insert(0, "zero").await; +/// +/// // This get() will extend the entry life for another 5 minutes. +/// cache.get(&0); +/// +/// // Even though we keep calling get(), the entry will expire +/// // after 30 minutes (TTL) from the insert(). +/// } +/// ``` +/// +/// # Thread Safety +/// +/// All methods provided by the `Cache` are considered thread-safe, and can be safely +/// accessed by multiple concurrent threads. +/// +/// - `Cache` requires trait bounds `Send`, `Sync` and `'static` for `K` +/// (key), `V` (value) and `S` (hasher state). +/// - `Cache` will implement `Send` and `Sync`. +/// +/// # Sharing a cache across asynchronous tasks +/// +/// To share a cache across async tasks (or OS threads), do one of the followings: +/// +/// - Create a clone of the cache by calling its `clone` method and pass it to other +/// task. +/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from +/// [once_cell][once-cell-crate] create, and set it to a `static` variable. +/// +/// Cloning is a cheap operation for `Cache` as it only creates thread-safe +/// reference-counted pointers to the internal data structures. +/// +/// [once-cell-crate]: https://crates.io/crates/once_cell /// /// # Hashing Algorithm /// @@ -216,7 +309,7 @@ where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - /// Constructs a new `Cache` that will store up to the `max_capacity` entries. + /// Constructs a new `Cache` that will store up to the `max_capacity`. /// /// To adjust various configuration knobs such as `initial_capacity` or /// `time_to_live`, use the [`CacheBuilder`][builder-struct]. @@ -224,7 +317,23 @@ where /// [builder-struct]: ./struct.CacheBuilder.html pub fn new(max_capacity: usize) -> Self { let build_hasher = RandomState::default(); - Self::with_everything(max_capacity, None, build_hasher, None, None, false) + Self::with_everything( + Some(max_capacity), + None, + build_hasher, + None, + None, + None, + false, + ) + } + + /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` with + /// various configuration knobs. + /// + /// [builder-struct]: ./struct.CacheBuilder.html + pub fn builder() -> CacheBuilder> { + CacheBuilder::default() } } @@ -235,9 +344,10 @@ where S: BuildHasher + Clone + Send + Sync + 'static, { pub(crate) fn with_everything( - max_capacity: usize, + max_capacity: Option, initial_capacity: Option, build_hasher: S, + weigher: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, @@ -247,6 +357,7 @@ where max_capacity, initial_capacity, build_hasher.clone(), + weigher, time_to_live, time_to_idle, invalidator_enabled, @@ -287,7 +398,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.6", features = ["future"] } + /// // moka = { version = "0.7", features = ["future"] } /// // futures = "0.3" /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } /// use moka::future::Cache; @@ -378,7 +489,7 @@ where /// // Cargo.toml /// // /// // [dependencies] - /// // moka = { version = "0.6", features = ["future"] } + /// // moka = { version = "0.7", features = ["future"] } /// // futures = "0.3" /// // reqwest = "0.11" /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } @@ -494,8 +605,8 @@ where Arc: Borrow, Q: Hash + Eq + ?Sized, { - if let Some(entry) = self.base.remove(key) { - let op = WriteOp::Remove(entry); + if let Some(kv) = self.base.remove_entry(key) { + let op = WriteOp::Remove(kv); let hk = self.base.housekeeper.as_ref(); Self::schedule_write_op(&self.base.write_op_ch, op, hk) .await @@ -513,8 +624,8 @@ where Arc: Borrow, Q: Hash + Eq + ?Sized, { - if let Some(entry) = self.base.remove(key) { - let op = WriteOp::Remove(entry); + if let Some(kv) = self.base.remove_entry(key) { + let op = WriteOp::Remove(kv); let hk = self.base.housekeeper.as_ref(); Self::blocking_schedule_write_op(&self.base.write_op_ch, op, hk) .expect("Failed to remove"); @@ -568,7 +679,7 @@ where } /// Returns the `max_capacity` of this cache. - pub fn max_capacity(&self) -> usize { + pub fn max_capacity(&self) -> Option { self.base.max_capacity() } @@ -796,7 +907,7 @@ mod tests { assert_eq!(cache.get(&"d"), None); // d -> 2 // "d" should be admitted and "c" should be evicted - // because d's frequency is higher then c's. + // because d's frequency is higher than c's. cache.insert("d", "dennis").await; cache.sync(); assert_eq!(cache.get(&"a"), Some("alice")); @@ -843,7 +954,7 @@ mod tests { assert_eq!(cache.get(&"d"), None); // d -> 2 // "d" should be admitted and "c" should be evicted - // because d's frequency is higher then c's. + // because d's frequency is higher than c's. cache.blocking_insert("d", "dennis"); cache.sync(); assert_eq!(cache.get(&"a"), Some("alice")); @@ -855,6 +966,74 @@ mod tests { assert_eq!(cache.get(&"b"), None); } + #[tokio::test] + async fn size_aware_eviction() { + let weigher = |_k: &&str, v: &(&str, u32)| v.1; + + let alice = ("alice", 10); + let bob = ("bob", 15); + let bill = ("bill", 20); + let cindy = ("cindy", 5); + let david = ("david", 15); + let dennis = ("dennis", 15); + + let mut cache = Cache::builder().max_capacity(31).weigher(weigher).build(); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", alice).await; + cache.insert("b", bob).await; + assert_eq!(cache.get(&"a"), Some(alice)); + assert_eq!(cache.get(&"b"), Some(bob)); + cache.sync(); + // order (LRU -> MRU) and counts: a -> 1, b -> 1 + + cache.insert("c", cindy).await; + assert_eq!(cache.get(&"c"), Some(cindy)); + // order and counts: a -> 1, b -> 1, c -> 1 + cache.sync(); + + assert_eq!(cache.get(&"a"), Some(alice)); + assert_eq!(cache.get(&"b"), Some(bob)); + cache.sync(); + // order and counts: c -> 1, a -> 2, b -> 2 + + // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10). + // "d" must have higher count than 3, which is the aggregated count + // of "a" and "c". + cache.insert("d", david).await; // count: d -> 0 + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 1 + + cache.insert("d", david).await; + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 2 + + cache.insert("d", david).await; + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 3 + + cache.insert("d", david).await; + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 4 + + // Finally "d" should be admitted by evicting "c" and "a". + cache.insert("d", dennis).await; + cache.sync(); + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), Some(bob)); + assert_eq!(cache.get(&"c"), None); + assert_eq!(cache.get(&"d"), Some(dennis)); + + // Update "b" with "bill" (w: 20). This should evict "d" (w: 15). + cache.insert("b", bill).await; + cache.sync(); + assert_eq!(cache.get(&"b"), Some(bill)); + assert_eq!(cache.get(&"d"), None); + } + #[tokio::test] async fn basic_multi_async_tasks() { let num_threads = 4; diff --git a/src/lib.rs b/src/lib.rs index 056317d0..f2ef571a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,13 +2,12 @@ #![warn(rust_2018_idioms)] //! Moka is a fast, concurrent cache library for Rust. Moka is inspired by -//! [Caffeine][caffeine-git] (Java). +//! the [Caffeine][caffeine-git] library for Java. //! //! Moka provides in-memory concurrent cache implementations on top of hash maps. //! They support full concurrency of retrievals and a high expected concurrency for -//! updates. -//! They utilize a lock-free concurrent hash table `SegmentedHashMap` from the -//! [moka-cht][moka-cht-crate] crate for the central key-value storage. +//! updates. They utilize a lock-free concurrent hash table `SegmentedHashMap` from +//! the [moka-cht][moka-cht-crate] crate for the central key-value storage. //! //! Moka also provides an in-memory, non-thread-safe cache implementation for single //! thread applications. @@ -26,7 +25,9 @@ //! - Synchronous caches that can be shared across OS threads. //! - An asynchronous (futures aware) cache that can be accessed inside and //! outside of asynchronous contexts. -//! - Caches are bounded by the maximum number of entries. +//! - A cache can be bounded by one of the followings: +//! - The maximum number of entries. +//! - The total weighted size of entries. //! - Maintains good hit rate by using entry replacement algorithms inspired by //! [Caffeine][caffeine-git]: //! - Admission to a cache is controlled by the Least Frequently Used (LFU) policy. diff --git a/src/sync.rs b/src/sync.rs index a1c931cb..c71fc09c 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,20 +1,15 @@ //! Provides thread-safe, blocking cache implementations. -use crate::common::{atomic_time::AtomicInstant, deque::DeqNode, time::Instant, AccessTime}; +use crate::common::{deque::DeqNode, time::Instant}; use parking_lot::Mutex; -use std::{ - ptr::NonNull, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, -}; +use std::{ptr::NonNull, sync::Arc}; pub(crate) mod base_cache; mod builder; mod cache; mod deques; +mod entry_info; pub(crate) mod housekeeper; mod invalidator; mod segment; @@ -24,6 +19,8 @@ pub use builder::CacheBuilder; pub use cache::Cache; pub use segment::SegmentedCache; +use self::entry_info::EntryInfo; + /// The type of the unique ID to identify a predicate used by /// [`Cache#invalidate_entries_if`][invalidate-if] method. /// @@ -40,6 +37,15 @@ pub trait ConcurrentCacheExt { fn sync(&self); } +pub(crate) type Weigher = Arc u32 + Send + Sync + 'static>; + +pub(crate) trait AccessTime { + fn last_accessed(&self) -> Option; + fn set_last_accessed(&self, timestamp: Instant); + fn last_modified(&self) -> Option; + fn set_last_modified(&self, timestamp: Instant); +} + pub(crate) struct KeyHash { pub(crate) key: Arc, pub(crate) hash: u64, @@ -61,34 +67,104 @@ impl Clone for KeyHash { } pub(crate) struct KeyDate { - pub(crate) key: Arc, - pub(crate) timestamp: Arc, + key: Arc, + entry_info: EntryInfo, } impl KeyDate { - pub(crate) fn new(key: Arc, timestamp: Arc) -> Self { - Self { key, timestamp } + pub(crate) fn new(key: Arc, entry_info: &EntryInfo) -> Self { + Self { + key, + entry_info: entry_info.clone(), + } + } + + pub(crate) fn key(&self) -> &Arc { + &self.key } - pub(crate) fn timestamp(&self) -> Option { - self.timestamp.instant() + pub(crate) fn last_modified(&self) -> Option { + self.entry_info.last_modified() } } pub(crate) struct KeyHashDate { - pub(crate) key: Arc, - pub(crate) hash: u64, - pub(crate) timestamp: Arc, + key: Arc, + hash: u64, + entry_info: EntryInfo, } impl KeyHashDate { - pub(crate) fn new(kh: KeyHash, timestamp: Arc) -> Self { + pub(crate) fn new(kh: KeyHash, entry_info: &EntryInfo) -> Self { Self { key: kh.key, hash: kh.hash, - timestamp, + entry_info: entry_info.clone(), } } + + pub(crate) fn key(&self) -> &Arc { + &self.key + } + + pub(crate) fn entry_info(&self) -> &EntryInfo { + &self.entry_info + } +} + +pub(crate) struct KvEntry { + pub(crate) key: Arc, + pub(crate) entry: Arc>, +} + +impl KvEntry { + pub(crate) fn new(key: Arc, entry: Arc>) -> Self { + Self { key, entry } + } +} + +impl AccessTime for DeqNode> { + #[inline] + fn last_accessed(&self) -> Option { + None + } + + #[inline] + fn set_last_accessed(&self, _timestamp: Instant) { + unreachable!(); + } + + #[inline] + fn last_modified(&self) -> Option { + self.element.entry_info.last_modified() + } + + #[inline] + fn set_last_modified(&self, timestamp: Instant) { + self.element.entry_info.set_last_modified(timestamp); + } +} + +impl AccessTime for DeqNode> { + #[inline] + fn last_accessed(&self) -> Option { + self.element.entry_info.last_accessed() + } + + #[inline] + fn set_last_accessed(&self, timestamp: Instant) { + self.element.entry_info.set_last_accessed(timestamp); + } + + #[inline] + fn last_modified(&self) -> Option { + None + } + + #[inline] + fn set_last_modified(&self, _timestamp: Instant) { + unreachable!(); + } } // DeqNode for an access order queue. @@ -107,19 +183,15 @@ unsafe impl Send for DeqNodes {} pub(crate) struct ValueEntry { pub(crate) value: V, - is_admitted: Arc, - last_accessed: Arc, - last_modified: Arc, + info: EntryInfo, nodes: Mutex>, } impl ValueEntry { - pub(crate) fn new(value: V) -> Self { + fn new(value: V, entry_info: EntryInfo) -> Self { Self { value, - is_admitted: Arc::new(AtomicBool::new(false)), - last_accessed: Default::default(), - last_modified: Default::default(), + info: entry_info, nodes: Mutex::new(DeqNodes { access_order_q_node: None, write_order_q_node: None, @@ -127,7 +199,7 @@ impl ValueEntry { } } - pub(crate) fn new_with(value: V, other: &Self) -> Self { + fn new_from(value: V, entry_info: EntryInfo, other: &Self) -> Self { let nodes = { let other_nodes = other.nodes.lock(); DeqNodes { @@ -135,36 +207,32 @@ impl ValueEntry { write_order_q_node: other_nodes.write_order_q_node, } }; - let last_accessed = Arc::clone(&other.last_accessed); - let last_modified = Arc::clone(&other.last_modified); - // To prevent this updated ValueEntry from being evicted by a expiration policy, + // To prevent this updated ValueEntry from being evicted by an expiration policy, // set the max value to the timestamps. They will be replaced with the real // timestamps when applying writes. - last_accessed.reset(); - last_modified.reset(); + entry_info.reset_timestamps(); Self { value, - is_admitted: Arc::clone(&other.is_admitted), - last_accessed, - last_modified, + info: entry_info, nodes: Mutex::new(nodes), } } - pub(crate) fn is_admitted(&self) -> bool { - self.is_admitted.load(Ordering::Acquire) + pub(crate) fn entry_info(&self) -> &EntryInfo { + &self.info } - pub(crate) fn set_is_admitted(&self, value: bool) { - self.is_admitted.store(value, Ordering::Release); + pub(crate) fn is_admitted(&self) -> bool { + self.info.is_admitted() } - pub(crate) fn raw_last_accessed(&self) -> Arc { - Arc::clone(&self.last_accessed) + pub(crate) fn set_is_admitted(&self, value: bool) { + self.info.set_is_admitted(value); } - pub(crate) fn raw_last_modified(&self) -> Arc { - Arc::clone(&self.last_modified) + #[inline] + pub(crate) fn policy_weight(&self) -> u32 { + self.info.policy_weight() } pub(crate) fn access_order_q_node(&self) -> Option> { @@ -201,75 +269,77 @@ impl ValueEntry { impl AccessTime for Arc> { #[inline] fn last_accessed(&self) -> Option { - self.last_accessed.instant() + self.info.last_accessed() } #[inline] - fn set_last_accessed(&mut self, timestamp: Instant) { - self.last_accessed.set_instant(timestamp); + fn set_last_accessed(&self, timestamp: Instant) { + self.info.set_last_accessed(timestamp); } #[inline] fn last_modified(&self) -> Option { - self.last_modified.instant() + self.info.last_modified() } #[inline] - fn set_last_modified(&mut self, timestamp: Instant) { - self.last_modified.set_instant(timestamp); + fn set_last_modified(&self, timestamp: Instant) { + self.info.set_last_modified(timestamp); } } -impl AccessTime for DeqNode> { - #[inline] - fn last_accessed(&self) -> Option { - None - } - - #[inline] - fn set_last_accessed(&mut self, _timestamp: Instant) { - unreachable!(); - } - - #[inline] - fn last_modified(&self) -> Option { - self.element.timestamp.instant() - } +#[derive(Clone, Copy, Debug)] +pub(crate) enum CacheFeatures { + Plain, + Weighted, +} - #[inline] - fn set_last_modified(&mut self, timestamp: Instant) { - self.element.timestamp.set_instant(timestamp); +impl CacheFeatures { + pub(crate) fn new(is_weighter_defined: bool) -> Self { + if is_weighter_defined { + Self::Weighted + } else { + Self::Plain + } } } -impl AccessTime for DeqNode> { - #[inline] - fn last_accessed(&self) -> Option { - self.element.timestamp.instant() - } +pub(crate) struct ValueEntryBuilder(CacheFeatures); - #[inline] - fn set_last_accessed(&mut self, timestamp: Instant) { - self.element.timestamp.set_instant(timestamp); +impl ValueEntryBuilder { + pub(crate) fn new(features: CacheFeatures) -> Self { + Self(features) } - #[inline] - fn last_modified(&self) -> Option { - None + pub(crate) fn build(&self, value: V, policy_weight: u32) -> ValueEntry { + let info = EntryInfo::new(self.0, policy_weight); + ValueEntry::new(value, info) } - #[inline] - fn set_last_modified(&mut self, _timestamp: Instant) { - unreachable!(); + pub(crate) fn build_from( + &self, + value: V, + policy_weight: u32, + other: &ValueEntry, + ) -> ValueEntry { + let info = other.info.clone(); + info.set_policy_weight(policy_weight); + ValueEntry::new_from(value, info, other) } } pub(crate) enum ReadOp { + // u64 is the hash of the key. Hit(u64, Arc>, Instant), Miss(u64), } pub(crate) enum WriteOp { - Upsert(KeyHash, Arc>), - Remove(Arc>), + Upsert { + key_hash: KeyHash, + value_entry: Arc>, + old_weight: u32, + new_weight: u32, + }, + Remove(KvEntry), } diff --git a/src/sync/base_cache.rs b/src/sync/base_cache.rs index 3c6beca2..990d54f5 100644 --- a/src/sync/base_cache.rs +++ b/src/sync/base_cache.rs @@ -2,7 +2,8 @@ use super::{ deques::Deques, housekeeper::{Housekeeper, InnerSync, SyncPace}, invalidator::{GetOrRemoveEntry, InvalidationResult, Invalidator, KeyDateLite, PredicateFun}, - KeyDate, KeyHash, KeyHashDate, PredicateId, ReadOp, ValueEntry, WriteOp, + AccessTime, CacheFeatures, KeyDate, KeyHash, KeyHashDate, KvEntry, PredicateId, ReadOp, + ValueEntry, ValueEntryBuilder, Weigher, WriteOp, }; use crate::{ common::{ @@ -10,13 +11,13 @@ use crate::{ deque::{CacheRegion, DeqNode, Deque}, frequency_sketch::FrequencySketch, time::{CheckedTimeOps, Clock, Instant}, - AccessTime, }, PredicateError, }; - use crossbeam_channel::{Receiver, Sender, TrySendError}; +use crossbeam_utils::atomic::AtomicCell; use parking_lot::{Mutex, RwLock}; +use smallvec::SmallVec; use std::{ borrow::Borrow, collections::hash_map::RandomState, @@ -85,9 +86,10 @@ where S: BuildHasher + Clone + Send + Sync + 'static, { pub(crate) fn new( - max_capacity: usize, + max_capacity: Option, initial_capacity: Option, build_hasher: S, + weigher: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, @@ -98,6 +100,7 @@ where max_capacity, initial_capacity, build_hasher, + weigher, r_rcv, w_rcv, time_to_live, @@ -163,12 +166,12 @@ where } #[inline] - pub(crate) fn remove(&self, key: &Q) -> Option>> + pub(crate) fn remove_entry(&self, key: &Q) -> Option> where Arc: Borrow, Q: Hash + Eq + ?Sized, { - self.inner.remove(key) + self.inner.remove_entry(key) } #[inline] @@ -198,7 +201,7 @@ where self.inner.register_invalidation_predicate(predicate, now) } - pub(crate) fn max_capacity(&self) -> usize { + pub(crate) fn max_capacity(&self) -> Option { self.inner.max_capacity() } @@ -233,6 +236,7 @@ where #[inline] pub(crate) fn do_insert_with_hash(&self, key: Arc, hash: u64, value: V) -> WriteOp { + let weight = self.inner.weigh(&key, &value); let op_cnt1 = Rc::new(AtomicU8::new(0)); let op_cnt2 = Rc::clone(&op_cnt1); let mut op1 = None; @@ -250,22 +254,38 @@ where Arc::clone(&key), // on_insert || { - let entry = Arc::new(ValueEntry::new(value.clone())); + let entry = self.new_value_entry(value.clone(), weight); let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed); op1 = Some(( cnt, - WriteOp::Upsert(KeyHash::new(Arc::clone(&key), hash), Arc::clone(&entry)), + WriteOp::Upsert { + key_hash: KeyHash::new(Arc::clone(&key), hash), + value_entry: Arc::clone(&entry), + old_weight: 0, + new_weight: weight, + }, )); entry }, // on_modify |_k, old_entry| { - let entry = Arc::new(ValueEntry::new_with(value.clone(), old_entry)); + // NOTES on `new_value_entry_from` method: + // 1. The internal EntryInfo will be shared between the old and new ValueEntries. + // 2. This method will set the last_accessed and last_modified to the max value to + // prevent this new ValueEntry from being evicted by an expiration policy. + // 3. This method will update the policy_weight with the new weight. + let old_weight = old_entry.policy_weight(); + let entry = self.new_value_entry_from(value.clone(), weight, old_entry); let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed); op2 = Some(( cnt, Arc::clone(old_entry), - WriteOp::Upsert(KeyHash::new(Arc::clone(&key), hash), Arc::clone(&entry)), + WriteOp::Upsert { + key_hash: KeyHash::new(Arc::clone(&key), hash), + value_entry: Arc::clone(&entry), + old_weight, + new_weight: weight, + }, )); entry }, @@ -289,6 +309,25 @@ where } } + #[inline] + fn new_value_entry(&self, value: V, policy_weight: u32) -> Arc> { + Arc::new(self.inner.value_entry_builder.build(value, policy_weight)) + } + + #[inline] + fn new_value_entry_from( + &self, + value: V, + policy_weight: u32, + other: &ValueEntry, + ) -> Arc> { + Arc::new( + self.inner + .value_entry_builder + .build_from(value, policy_weight, other), + ) + } + #[inline] fn apply_reads_if_needed(&self) { let len = self.read_op_ch.len(); @@ -345,14 +384,70 @@ where } } +struct WeightedSize(u64); + +impl WeightedSize { + #[inline] + fn saturating_add(&mut self, weight: u32) { + let total = &mut self.0; + *total = total.saturating_add(weight as u64); + } + + #[inline] + fn saturating_sub(&mut self, weight: u32) { + let total = &mut self.0; + *total = total.saturating_sub(weight as u64); + } +} + +#[derive(Default)] +struct EntrySizeAndFrequency { + policy_weight: u64, + freq: u32, +} + +impl EntrySizeAndFrequency { + fn new(policy_weight: u32) -> Self { + Self { + policy_weight: policy_weight as u64, + ..Default::default() + } + } + + fn add_policy_weight(&mut self, weight: u32) { + self.policy_weight += weight as u64; + } + + fn add_frequency(&mut self, freq: &FrequencySketch, hash: u64) { + self.freq += freq.frequency(hash) as u32; + } +} + +// Access-Order Queue Node +type AoqNode = NonNull>>; + +enum AdmissionResult { + Admitted { + victim_nodes: SmallVec<[AoqNode; 8]>, + skipped_nodes: SmallVec<[AoqNode; 4]>, + }, + Rejected { + skipped_nodes: SmallVec<[AoqNode; 4]>, + }, +} + type CacheStore = moka_cht::SegmentedHashMap, Arc>, S>; type CacheEntry = (Arc, Arc>); +// type BoxedValueEntryBuilder = Box + Send + Sync + 'static>; + pub(crate) struct Inner { - max_capacity: usize, + max_capacity: Option, + weighted_size: AtomicCell, cache: CacheStore, build_hasher: S, + value_entry_builder: ValueEntryBuilder, deques: Mutex>, frequency_sketch: RwLock, read_op_ch: Receiver>, @@ -360,6 +455,7 @@ pub(crate) struct Inner { time_to_live: Option, time_to_idle: Option, valid_after: AtomicInstant, + weigher: Option>, invalidator_enabled: bool, invalidator: RwLock>>, has_expiration_clock: AtomicBool, @@ -369,16 +465,18 @@ pub(crate) struct Inner { // functions/methods used by BaseCache impl Inner where - K: Hash + Eq, + K: Hash + Eq + Send + Sync + 'static, + V: Send + Sync + 'static, S: BuildHasher + Clone, { // Disable a Clippy warning for having more than seven arguments. // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments #[allow(clippy::too_many_arguments)] fn new( - max_capacity: usize, + max_capacity: Option, initial_capacity: Option, build_hasher: S, + weigher: Option>, read_op_ch: Receiver>, write_op_ch: Receiver>, time_to_live: Option, @@ -395,17 +493,22 @@ where build_hasher.clone(), ); + let features = CacheFeatures::new(weigher.is_some()); + let value_entry_builder = ValueEntryBuilder::new(features); + // Ensure skt_capacity fits in a range of `128u32..=u32::MAX`. let skt_capacity = max_capacity - .try_into() // Convert to u32. - .unwrap_or(u32::MAX) + .map(|n| n.try_into().unwrap_or(u32::MAX)) // Convert to u32. + .unwrap_or_default() .max(128); let frequency_sketch = FrequencySketch::with_capacity(skt_capacity); Self { - max_capacity, + max_capacity: max_capacity.map(|n| n as u64), + weighted_size: AtomicCell::default(), cache, build_hasher, + value_entry_builder, deques: Mutex::new(Deques::default()), frequency_sketch: RwLock::new(frequency_sketch), read_op_ch, @@ -413,6 +516,7 @@ where time_to_live, time_to_idle, valid_after: AtomicInstant::default(), + weigher, invalidator_enabled, // When enabled, this field will be set later via the set_invalidator method. invalidator: RwLock::new(None), @@ -446,16 +550,18 @@ where } #[inline] - fn remove(&self, key: &Q) -> Option>> + fn remove_entry(&self, key: &Q) -> Option> where Arc: Borrow, Q: Hash + Eq + ?Sized, { - self.cache.remove(key) + self.cache + .remove_entry(key) + .map(|(key, entry)| KvEntry::new(key, entry)) } - fn max_capacity(&self) -> usize { - self.max_capacity + fn max_capacity(&self) -> Option { + self.max_capacity.map(|n| n as usize) } #[inline] @@ -516,6 +622,11 @@ where false } + #[inline] + fn weigh(&self, key: &K, value: &V) -> u32 { + self.weigher.as_ref().map(|w| w(key, value)).unwrap_or(1) + } + #[inline] fn current_time_from_expiration_clock(&self) -> Instant { if self.has_expiration_clock.load(Ordering::Relaxed) { @@ -549,6 +660,12 @@ where } } +// TODO: Divide this method into smaller methods so that unit tests can do more +// precise testing. +// - sync_reads +// - sync_writes +// - evict +// - invalidate_entries impl InnerSync for Inner where K: Hash + Eq + Send + Sync + 'static, @@ -563,6 +680,8 @@ where let mut calls = 0; let mut should_sync = true; + let current_ws = self.weighted_size.load(); + let mut ws = WeightedSize(current_ws); while should_sync && calls <= max_repeats { let r_len = self.read_op_ch.len(); if r_len > 0 { @@ -571,7 +690,7 @@ where let w_len = self.write_op_ch.len(); if w_len > 0 { - self.apply_writes(&mut deqs, w_len); + self.apply_writes(&mut deqs, w_len, &mut ws); } calls += 1; should_sync = self.read_op_ch.len() >= READ_LOG_FLUSH_POINT @@ -579,17 +698,31 @@ where } if self.has_expiry() || self.has_valid_after() { - self.evict(&mut deqs, EVICTION_BATCH_SIZE); + self.evict_expired(&mut deqs, EVICTION_BATCH_SIZE, &mut ws); } if self.invalidator_enabled { if let Some(invalidator) = &*self.invalidator.read() { if !invalidator.is_empty() && !invalidator.is_task_running() { - self.invalidate_entries(invalidator, &mut deqs, INVALIDATION_BATCH_SIZE); + self.invalidate_entries( + invalidator, + &mut deqs, + INVALIDATION_BATCH_SIZE, + &mut ws, + ); } } } + // Evict if this cache has more entries than its capacity. + let weights_to_evict = self.weights_to_evict(&ws); + if weights_to_evict > 0 { + self.evict_lru_entries(&mut deqs, EVICTION_BATCH_SIZE, weights_to_evict, &mut ws); + } + + debug_assert_eq!(self.weighted_size.load(), current_ws); + self.weighted_size.store(ws.0); + if should_sync { Some(SyncPace::Fast) } else if self.write_op_ch.len() <= WRITE_LOG_LOW_WATER_MARK { @@ -610,13 +743,25 @@ where V: Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static, { + fn has_enough_capacity(&self, candidate_weight: u32, ws: &WeightedSize) -> bool { + self.max_capacity + .map(|limit| ws.0 + candidate_weight as u64 <= limit) + .unwrap_or(true) + } + + fn weights_to_evict(&self, ws: &WeightedSize) -> u64 { + self.max_capacity + .map(|limit| ws.0.saturating_sub(limit)) + .unwrap_or_default() + } + fn apply_reads(&self, deqs: &mut Deques, count: usize) { use ReadOp::*; let mut freq = self.frequency_sketch.write(); let ch = &self.read_op_ch; for _ in 0..count { match ch.try_recv() { - Ok(Hit(hash, mut entry, timestamp)) => { + Ok(Hit(hash, entry, timestamp)) => { freq.increment(hash); entry.set_last_accessed(timestamp); deqs.move_to_back_ao(&entry) @@ -627,7 +772,7 @@ where } } - fn apply_writes(&self, deqs: &mut Deques, count: usize) { + fn apply_writes(&self, deqs: &mut Deques, count: usize, ws: &mut WeightedSize) { use WriteOp::*; let freq = self.frequency_sketch.read(); let ch = &self.write_op_ch; @@ -635,140 +780,203 @@ where for _ in 0..count { match ch.try_recv() { - Ok(Upsert(kh, entry)) => self.handle_upsert(kh, entry, ts, deqs, &freq), - Ok(Remove(entry)) => Self::handle_remove(deqs, entry), + Ok(Upsert { + key_hash: kh, + value_entry: entry, + old_weight, + new_weight, + }) => self.handle_upsert(kh, entry, old_weight, new_weight, ts, deqs, &freq, ws), + Ok(Remove(KvEntry { key: _key, entry })) => Self::handle_remove(deqs, entry, ws), Err(_) => break, }; } } + #[allow(clippy::too_many_arguments)] fn handle_upsert( &self, kh: KeyHash, - mut entry: Arc>, + entry: Arc>, + old_weight: u32, + new_weight: u32, timestamp: Instant, deqs: &mut Deques, freq: &FrequencySketch, + ws: &mut WeightedSize, ) { - const MAX_RETRY: usize = 5; - let mut tries = 0; - let mut done = false; - entry.set_last_accessed(timestamp); entry.set_last_modified(timestamp); - let last_accessed = entry.raw_last_accessed(); - let last_modified = entry.raw_last_modified(); - - while tries < MAX_RETRY { - tries += 1; - - if entry.is_admitted() { - // The entry has been already admitted, so treat this as an update. - deqs.move_to_back_ao(&entry); - deqs.move_to_back_wo(&entry); - } else if self.cache.len() <= self.max_capacity { - // There are some room in the cache. Add the candidate to the deques. - self.handle_admit(kh.clone(), &entry, last_accessed, last_modified, deqs); - } else { - let victim = match Self::find_cache_victim(deqs, freq) { - // Found a victim. - Some(node) => node, - // Not found a victim. This condition should be unreachable - // because there was no room in the cache. But rather than - // panicking here, admit the candidate as there might be some - // room in te cache now. - None => { - self.handle_admit(kh.clone(), &entry, last_accessed, last_modified, deqs); - done = true; - break; - } - }; - if Self::admit(kh.hash, victim, freq) { - // The candidate is admitted. Try to remove the victim from the - // cache (hash map). - if let Some(vic_entry) = self.cache.remove(&victim.element.key) { + if entry.is_admitted() { + // The entry has been already admitted, so treat this as an update. + ws.saturating_add(new_weight - old_weight); + deqs.move_to_back_ao(&entry); + deqs.move_to_back_wo(&entry); + return; + } + + if self.has_enough_capacity(new_weight, ws) { + // There are enough room in the cache (or the cache is unbounded). + // Add the candidate to the deques. + self.handle_admit(kh, &entry, new_weight, deqs, ws); + return; + } + + if let Some(max) = self.max_capacity { + if new_weight as u64 > max { + // The candidate is too big to fit in the cache. Reject it. + self.cache.remove(&Arc::clone(&kh.key)); + return; + } + } + + let skipped_nodes; + let mut candidate = EntrySizeAndFrequency::new(new_weight); + candidate.add_frequency(freq, kh.hash); + + // Try to admit the candidate. + match Self::admit(&candidate, &self.cache, deqs, freq) { + AdmissionResult::Admitted { + victim_nodes, + skipped_nodes: mut skipped, + } => { + // Try to remove the victims from the cache (hash map). + for victim in victim_nodes { + if let Some((_vic_key, vic_entry)) = self + .cache + .remove_entry(unsafe { &victim.as_ref().element.key }) + { // And then remove the victim from the deques. - Self::handle_remove(deqs, vic_entry); + Self::handle_remove(deqs, vic_entry, ws); } else { // Could not remove the victim from the cache. Skip this // victim node as its ValueEntry might have been - // invalidated. Since the invalidated ValueEntry (which - // should be still in the write op queue) has a pointer to - // this node, we move the node to the back of the deque - // instead of unlinking (dropping) it. - let victim = NonNull::from(victim); - unsafe { deqs.probation.move_to_back(victim) }; - - continue; // Retry + // invalidated. Add it to the skipped nodes. + skipped.push(victim); } - // Add the candidate to the deques. - self.handle_admit( - kh.clone(), - &entry, - Arc::clone(&last_accessed), - Arc::clone(&last_modified), - deqs, - ); - } else { - // The candidate is not admitted. Remove it from the cache (hash map). - self.cache.remove(&Arc::clone(&kh.key)); } + skipped_nodes = skipped; + + // Add the candidate to the deques. + self.handle_admit(kh, &entry, new_weight, deqs, ws); } - done = true; - break; - } + AdmissionResult::Rejected { skipped_nodes: s } => { + skipped_nodes = s; + // Remove the candidate from the cache (hash map). + self.cache.remove(&Arc::clone(&kh.key)); + } + }; - if !done { - // Too mary retries. Remove the candidate from the cache. - self.cache.remove(&Arc::clone(&kh.key)); + // Move the skipped nodes to the back of the deque. We do not unlink (drop) + // them because ValueEntries in the write op queue should be pointing them. + for node in skipped_nodes { + unsafe { deqs.probation.move_to_back(node) }; } } - #[inline] - fn find_cache_victim<'a>( - deqs: &'a Deques, - _freq: &FrequencySketch, - ) -> Option<&'a DeqNode>> { - // TODO: Check its frequency. If it is not very low, maybe we should - // check frequencies of next few others and pick from them. - deqs.probation.peek_front() - } - + /// Performs size-aware admission explained in the paper: + /// [Lightweight Robust Size Aware Cache Management][size-aware-cache-paper] + /// by Gil Einziger, Ohad Eytan, Roy Friedman, Ben Manes. + /// + /// [size-aware-cache-paper]: https://arxiv.org/abs/2105.08770 + /// + /// There are some modifications in this implementation: + /// - To admit to the main space, candidate's frequency must be higher than + /// the aggregated frequencies of the potential victims. (In the paper, + /// `>=` operator is used rather than `>`) The `>` operator will do a better + /// job to prevent the main space from polluting. + /// - When a candidate is rejected, the potential victims will stay at the LRU + /// position of the probation access-order queue. (In the paper, they will be + /// promoted (to the MRU position?) to force the eviction policy to select a + /// different set of victims for the next candidate). We may implement the + /// paper's behavior later? + /// #[inline] fn admit( - candidate_hash: u64, - victim: &DeqNode>, + candidate: &EntrySizeAndFrequency, + cache: &CacheStore, + deqs: &Deques, freq: &FrequencySketch, - ) -> bool { + ) -> AdmissionResult { + const MAX_CONSECUTIVE_RETRIES: usize = 5; + let mut retries = 0; + + let mut victims = EntrySizeAndFrequency::default(); + let mut victim_nodes = SmallVec::default(); + let mut skipped_nodes = SmallVec::default(); + + // Get first potential victim at the LRU position. + let mut next_victim = deqs.probation.peek_front(); + + // Aggregate potential victims. + while victims.policy_weight < candidate.policy_weight { + if candidate.freq < victims.freq { + break; + } + if let Some(victim) = next_victim.take() { + next_victim = victim.next_node(); + + if let Some(vic_entry) = cache.get(&victim.element.key) { + victims.add_policy_weight(vic_entry.policy_weight()); + victims.add_frequency(freq, victim.element.hash); + victim_nodes.push(NonNull::from(victim)); + retries = 0; + } else { + // Could not get the victim from the cache (hash map). Skip this node + // as its ValueEntry might have been invalidated. + skipped_nodes.push(NonNull::from(victim)); + + retries += 1; + if retries > MAX_CONSECUTIVE_RETRIES { + break; + } + } + } else { + // No more potential victims. + break; + } + } + + // Admit or reject the candidate. + // TODO: Implement some randomness to mitigate hash DoS attack. // See Caffeine's implementation. - freq.frequency(candidate_hash) > freq.frequency(victim.element.hash) + + if victims.policy_weight >= candidate.policy_weight && candidate.freq > victims.freq { + AdmissionResult::Admitted { + victim_nodes, + skipped_nodes, + } + } else { + AdmissionResult::Rejected { skipped_nodes } + } } fn handle_admit( &self, kh: KeyHash, entry: &Arc>, - raw_last_accessed: Arc, - raw_last_modified: Arc, + policy_weight: u32, deqs: &mut Deques, + ws: &mut WeightedSize, ) { let key = Arc::clone(&kh.key); + ws.saturating_add(policy_weight); deqs.push_back_ao( CacheRegion::MainProbation, - KeyHashDate::new(kh, raw_last_accessed), + KeyHashDate::new(kh, entry.entry_info()), entry, ); if self.is_write_order_queue_enabled() { - deqs.push_back_wo(KeyDate::new(key, raw_last_modified), entry); + deqs.push_back_wo(KeyDate::new(key, entry.entry_info()), entry); } entry.set_is_admitted(true); } - fn handle_remove(deqs: &mut Deques, entry: Arc>) { + fn handle_remove(deqs: &mut Deques, entry: Arc>, ws: &mut WeightedSize) { if entry.is_admitted() { entry.set_is_admitted(false); + ws.saturating_sub(entry.policy_weight()); deqs.unlink_ao(&entry); Deques::unlink_wo(&mut deqs.write_order, &entry); } @@ -780,20 +988,22 @@ where ao_deq: &mut Deque>, wo_deq: &mut Deque>, entry: Arc>, + ws: &mut WeightedSize, ) { if entry.is_admitted() { entry.set_is_admitted(false); + ws.saturating_sub(entry.policy_weight()); Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry); Deques::unlink_wo(wo_deq, &entry); } entry.unset_q_nodes(); } - fn evict(&self, deqs: &mut Deques, batch_size: usize) { + fn evict_expired(&self, deqs: &mut Deques, batch_size: usize, ws: &mut WeightedSize) { let now = self.current_time_from_expiration_clock(); if self.is_write_order_queue_enabled() { - self.remove_expired_wo(deqs, batch_size, now); + self.remove_expired_wo(deqs, batch_size, now, ws); } if self.time_to_idle.is_some() || self.has_valid_after() { @@ -805,7 +1015,7 @@ where ); let mut rm_expired_ao = - |name, deq| self.remove_expired_ao(name, deq, wo, batch_size, now); + |name, deq| self.remove_expired_ao(name, deq, wo, batch_size, now, ws); rm_expired_ao("window", window); rm_expired_ao("probation", probation); @@ -821,24 +1031,19 @@ where write_order_deq: &mut Deque>, batch_size: usize, now: Instant, + ws: &mut WeightedSize, ) { let tti = &self.time_to_idle; let va = &self.valid_after(); for _ in 0..batch_size { // Peek the front node of the deque and check if it is expired. - let (key, _ts) = deq - .peek_front() - .and_then(|node| { - if is_expired_entry_ao(tti, va, &*node, now) { - Some(( - Some(Arc::clone(&node.element.key)), - Some(&node.element.timestamp), - )) - } else { - None - } - }) - .unwrap_or_default(); + let key = deq.peek_front().and_then(|node| { + if is_expired_entry_ao(tti, va, &*node, now) { + Some(Arc::clone(node.element.key())) + } else { + None + } + }); if key.is_none() { break; @@ -855,49 +1060,62 @@ where .remove_if(key, |_, v| is_expired_entry_ao(tti, va, v, now)); if let Some(entry) = maybe_entry { - Self::handle_remove_with_deques(deq_name, deq, write_order_deq, entry); - } else if let Some(entry) = self.cache.get(key) { - let ts = entry.last_accessed(); - if ts.is_none() { - // The key exists and the entry has been updated. - Deques::move_to_back_ao_in_deque(deq_name, deq, &entry); - Deques::move_to_back_wo_in_deque(write_order_deq, &entry); - } else { - // The key exists but something unexpected. Break. - break; - } + Self::handle_remove_with_deques(deq_name, deq, write_order_deq, entry, ws); + } else if !self.try_skip_updated_entry(key, deq_name, deq, write_order_deq) { + break; + } + } + } + + #[inline] + fn try_skip_updated_entry( + &self, + key: &K, + deq_name: &str, + deq: &mut Deque>, + write_order_deq: &mut Deque>, + ) -> bool { + if let Some(entry) = self.cache.get(key) { + if entry.last_accessed().is_none() { + // The key exists and the entry has been updated. + Deques::move_to_back_ao_in_deque(deq_name, deq, &entry); + Deques::move_to_back_wo_in_deque(write_order_deq, &entry); + true } else { - // Skip this entry as the key might have been invalidated. Since the - // invalidated ValueEntry (which should be still in the write op - // queue) has a pointer to this node, move the node to the back of - // the deque instead of popping (dropping) it. - if let Some(node) = deq.peek_front() { - let node = NonNull::from(node); - unsafe { deq.move_to_back(node) }; - } + // The key exists but something unexpected. + false + } + } else { + // Skip this entry as the key might have been invalidated. Since the + // invalidated ValueEntry (which should be still in the write op + // queue) has a pointer to this node, move the node to the back of + // the deque instead of popping (dropping) it. + if let Some(node) = deq.peek_front() { + let node = NonNull::from(node); + unsafe { deq.move_to_back(node) }; } + true } } #[inline] - fn remove_expired_wo(&self, deqs: &mut Deques, batch_size: usize, now: Instant) { + fn remove_expired_wo( + &self, + deqs: &mut Deques, + batch_size: usize, + now: Instant, + ws: &mut WeightedSize, + ) { let ttl = &self.time_to_live; let va = &self.valid_after(); for _ in 0..batch_size { - let (key, _ts) = deqs - .write_order - .peek_front() - .and_then(|node| { - if is_expired_entry_wo(ttl, va, &*node, now) { - Some(( - Some(Arc::clone(&node.element.key)), - Some(&node.element.timestamp), - )) - } else { - None - } - }) - .unwrap_or_default(); + let key = deqs.write_order.peek_front().and_then(|node| { + if is_expired_entry_wo(ttl, va, &*node, now) { + Some(Arc::clone(node.element.key())) + } else { + None + } + }); if key.is_none() { break; @@ -910,10 +1128,9 @@ where .remove_if(key, |_, v| is_expired_entry_wo(ttl, va, v, now)); if let Some(entry) = maybe_entry { - Self::handle_remove(deqs, entry); + Self::handle_remove(deqs, entry, ws); } else if let Some(entry) = self.cache.get(key) { - let ts = entry.last_modified(); - if ts.is_none() { + if entry.last_modified().is_none() { deqs.move_to_back_ao(&entry); deqs.move_to_back_wo(&entry); } else { @@ -938,8 +1155,9 @@ where invalidator: &Invalidator, deqs: &mut Deques, batch_size: usize, + ws: &mut WeightedSize, ) { - self.process_invalidation_result(invalidator, deqs); + self.process_invalidation_result(invalidator, deqs, ws); self.submit_invalidation_task(invalidator, &mut deqs.write_order, batch_size); } @@ -947,14 +1165,15 @@ where &self, invalidator: &Invalidator, deqs: &mut Deques, + ws: &mut WeightedSize, ) { if let Some(InvalidationResult { invalidated, is_done, }) = invalidator.task_result() { - for entry in invalidated { - Self::handle_remove(deqs, entry); + for KvEntry { key: _, entry } in invalidated { + Self::handle_remove(deqs, entry, ws); } if is_done { deqs.write_order.reset_cursor(); @@ -983,7 +1202,7 @@ where while len < batch_size { if let Some(kd) = iter.next() { - if let Some(ts) = kd.timestamp() { + if let Some(ts) = kd.last_modified() { candidates.push(KeyDateLite::new(&kd.key, ts)); len += 1; } @@ -997,6 +1216,59 @@ where invalidator.submit_task(candidates, is_truncated); } } + + fn evict_lru_entries( + &self, + deqs: &mut Deques, + batch_size: usize, + weights_to_evict: u64, + ws: &mut WeightedSize, + ) { + const DEQ_NAME: &str = "probation"; + let mut evicted = 0u64; + let (deq, write_order_deq) = (&mut deqs.probation, &mut deqs.write_order); + + for _ in 0..batch_size { + if evicted >= weights_to_evict { + break; + } + + let maybe_key_and_ts = deq.peek_front().map(|node| { + ( + Arc::clone(node.element.key()), + node.element.entry_info().last_modified(), + ) + }); + + let (key, ts) = match maybe_key_and_ts { + Some((key, Some(ts))) => (key, ts), + Some((key, None)) => { + if self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) { + continue; + } else { + break; + } + } + None => break, + }; + + let maybe_entry = self.cache.remove_if(&key, |_, v| { + if let Some(lm) = v.last_modified() { + lm == ts + } else { + false + } + }); + + if let Some(entry) = maybe_entry { + let weight = entry.policy_weight(); + Self::handle_remove_with_deques(DEQ_NAME, deq, write_order_deq, entry, ws); + evicted = evicted.saturating_add(weight as u64); + } else if !self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) { + break; + } + } + } } // @@ -1097,11 +1369,12 @@ mod tests { let ensure_sketch_len = |max_capacity, len, name| { let cache = BaseCache::::new( - max_capacity, + Some(max_capacity), None, RandomState::default(), None, None, + None, false, ); assert_eq!( diff --git a/src/sync/builder.rs b/src/sync/builder.rs index 7ac67017..081fcd45 100644 --- a/src/sync/builder.rs +++ b/src/sync/builder.rs @@ -1,10 +1,11 @@ -use super::{Cache, SegmentedCache}; +use super::{Cache, SegmentedCache, Weigher}; use crate::common::builder_utils; use std::{ collections::hash_map::RandomState, hash::{BuildHasher, Hash}, marker::PhantomData, + sync::Arc, time::Duration, }; @@ -17,11 +18,12 @@ use std::{ /// # Examples /// /// ```rust -/// use moka::sync::CacheBuilder; -/// +/// use moka::sync::Cache; /// use std::time::Duration; /// -/// let cache = CacheBuilder::new(10_000) // Max 10,000 elements +/// let cache = Cache::builder() +/// // Max 10,000 entries +/// .max_capacity(10_000) /// // Time to live (TTL): 30 minutes /// .time_to_live(Duration::from_secs(30 * 60)) /// // Time to idle (TTI): 5 minutes @@ -39,32 +41,47 @@ use std::{ /// // after 30 minutes (TTL) from the insert(). /// ``` /// -pub struct CacheBuilder { - max_capacity: usize, +pub struct CacheBuilder { + max_capacity: Option, initial_capacity: Option, num_segments: Option, + weigher: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, cache_type: PhantomData, } -impl CacheBuilder> +impl Default for CacheBuilder> where K: Eq + Hash + Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - /// Construct a new `CacheBuilder` that will be used to build a `Cache` or - /// `SegmentedCache` holding up to `max_capacity` entries. - pub fn new(max_capacity: usize) -> Self { + fn default() -> Self { Self { - max_capacity, + max_capacity: None, initial_capacity: None, num_segments: None, + weigher: None, time_to_live: None, time_to_idle: None, invalidator_enabled: false, - cache_type: PhantomData::default(), + cache_type: Default::default(), + } + } +} + +impl CacheBuilder> +where + K: Eq + Hash + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + /// Construct a new `CacheBuilder` that will be used to build a `Cache` or + /// `SegmentedCache` holding up to `max_capacity` entries. + pub fn new(max_capacity: usize) -> Self { + Self { + max_capacity: Some(max_capacity), + ..Default::default() } } @@ -72,14 +89,18 @@ where /// /// # Panics /// - /// Panics if `num_segments` is less than or equals to 1. - pub fn segments(self, num_segments: usize) -> CacheBuilder> { - assert!(num_segments > 1); + /// Panics if `num_segments` is zero. + pub fn segments( + self, + num_segments: usize, + ) -> CacheBuilder> { + assert!(num_segments != 0); CacheBuilder { max_capacity: self.max_capacity, initial_capacity: self.initial_capacity, num_segments: Some(num_segments), + weigher: None, time_to_live: self.time_to_live, time_to_idle: self.time_to_idle, invalidator_enabled: self.invalidator_enabled, @@ -104,6 +125,7 @@ where self.max_capacity, self.initial_capacity, build_hasher, + self.weigher, self.time_to_live, self.time_to_idle, self.invalidator_enabled, @@ -129,6 +151,7 @@ where self.max_capacity, self.initial_capacity, hasher, + self.weigher, self.time_to_live, self.time_to_idle, self.invalidator_enabled, @@ -136,7 +159,7 @@ where } } -impl CacheBuilder> +impl CacheBuilder> where K: Eq + Hash + Send + Sync + 'static, V: Clone + Send + Sync + 'static, @@ -159,6 +182,7 @@ where self.initial_capacity, self.num_segments.unwrap(), build_hasher, + self.weigher, self.time_to_live, self.time_to_idle, self.invalidator_enabled, @@ -185,6 +209,7 @@ where self.initial_capacity, self.num_segments.unwrap(), hasher, + self.weigher, self.time_to_live, self.time_to_idle, self.invalidator_enabled, @@ -192,11 +217,30 @@ where } } -impl CacheBuilder { - /// Sets the initial capacity of the cache. - pub fn initial_capacity(self, capacity: usize) -> Self { +impl CacheBuilder { + /// Sets the max capacity of the cache. + pub fn max_capacity(self, max_capacity: usize) -> Self { + Self { + max_capacity: Some(max_capacity), + ..self + } + } + + /// Sets the initial capacity (number of entries) of the cache. + pub fn initial_capacity(self, number_of_entries: usize) -> Self { + Self { + initial_capacity: Some(number_of_entries), + ..self + } + } + + /// Sets the weigher closure of the cache. + /// + /// The closure should take `&K` and `&V` as the arguments and returns a `u32` + /// representing the relative size of the entry. + pub fn weigher(self, weigher: impl Fn(&K, &V) -> u32 + Send + Sync + 'static) -> Self { Self { - initial_capacity: Some(capacity), + weigher: Some(Arc::new(weigher)), ..self } } @@ -252,7 +296,6 @@ impl CacheBuilder { #[cfg(test)] mod tests { - use super::Cache; use super::CacheBuilder; use std::time::Duration; @@ -262,7 +305,7 @@ mod tests { // Cache let cache = CacheBuilder::new(100).build(); - assert_eq!(cache.max_capacity(), 100); + assert_eq!(cache.max_capacity(), Some(100)); assert_eq!(cache.time_to_live(), None); assert_eq!(cache.time_to_idle(), None); assert_eq!(cache.num_segments(), 1); @@ -275,7 +318,7 @@ mod tests { .time_to_idle(Duration::from_secs(15 * 60)) .build(); - assert_eq!(cache.max_capacity(), 100); + assert_eq!(cache.max_capacity(), Some(100)); assert_eq!(cache.time_to_live(), Some(Duration::from_secs(45 * 60))); assert_eq!(cache.time_to_idle(), Some(Duration::from_secs(15 * 60))); assert_eq!(cache.num_segments(), 1); @@ -289,7 +332,7 @@ mod tests { // SegmentCache let cache = CacheBuilder::new(100).segments(16).build(); - assert_eq!(cache.max_capacity(), 100); + assert_eq!(cache.max_capacity(), Some(100)); assert_eq!(cache.time_to_live(), None); assert_eq!(cache.time_to_idle(), None); assert_eq!(cache.num_segments(), 16_usize.next_power_of_two()); @@ -303,7 +346,7 @@ mod tests { .time_to_idle(Duration::from_secs(15 * 60)) .build(); - assert_eq!(cache.max_capacity(), 100); + assert_eq!(cache.max_capacity(), Some(100)); assert_eq!(cache.time_to_live(), Some(Duration::from_secs(45 * 60))); assert_eq!(cache.time_to_idle(), Some(Duration::from_secs(15 * 60))); assert_eq!(cache.num_segments(), 16_usize.next_power_of_two()); @@ -316,7 +359,7 @@ mod tests { #[should_panic(expected = "time_to_live is longer than 1000 years")] fn build_cache_too_long_ttl() { let thousand_years_secs: u64 = 1000 * 365 * 24 * 3600; - let builder: CacheBuilder> = CacheBuilder::new(100); + let builder: CacheBuilder = CacheBuilder::new(100); let duration = Duration::from_secs(thousand_years_secs); builder .time_to_live(duration + Duration::from_secs(1)) @@ -327,7 +370,7 @@ mod tests { #[should_panic(expected = "time_to_idle is longer than 1000 years")] fn build_cache_too_long_tti() { let thousand_years_secs: u64 = 1000 * 365 * 24 * 3600; - let builder: CacheBuilder> = CacheBuilder::new(100); + let builder: CacheBuilder = CacheBuilder::new(100); let duration = Duration::from_secs(thousand_years_secs); builder .time_to_idle(duration + Duration::from_secs(1)) diff --git a/src/sync/cache.rs b/src/sync/cache.rs index ba1c3ae2..2f277de1 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -2,7 +2,7 @@ use super::{ base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, housekeeper::InnerSync, value_initializer::ValueInitializer, - ConcurrentCacheExt, PredicateId, WriteOp, + CacheBuilder, ConcurrentCacheExt, PredicateId, Weigher, WriteOp, }; use crate::{sync::value_initializer::InitResult, PredicateError}; @@ -30,7 +30,8 @@ use std::{ /// /// # Examples /// -/// Cache entries are manually added using `insert` method, and are stored in the +/// Cache entries are manually added using [`insert`](#method.insert) or +/// [`get_or_insert_with`](#method.get_or_insert_with) method, and are stored in the /// cache until either evicted or manually invalidated. /// /// Here's an example of reading and updating a cache by using multiple threads: @@ -88,28 +89,10 @@ use std::{ /// } /// ``` /// -/// # Thread Safety -/// -/// All methods provided by the `Cache` are considered thread-safe, and can be safely -/// accessed by multiple concurrent threads. -/// -/// - `Cache` requires trait bounds `Send`, `Sync` and `'static` for `K` -/// (key), `V` (value) and `S` (hasher state). -/// - `Cache` will implement `Send` and `Sync`. -/// -/// # Sharing a cache across threads -/// -/// To share a cache across threads, do one of the followings: -/// -/// - Create a clone of the cache by calling its `clone` method and pass it to other -/// thread. -/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from -/// [once_cell][once-cell-crate] create, and set it to a `static` variable. -/// -/// Cloning is a cheap operation for `Cache` as it only creates thread-safe -/// reference-counted pointers to the internal data structures. -/// -/// [once-cell-crate]: https://crates.io/crates/once_cell +/// If you want to atomically initialize and insert a value when the key is not +/// present, you might want to check other insertion methods +/// [`get_or_insert_with`](#method.get_or_insert_with) and +/// [`get_or_try_insert_with`](#method.get_or_try_insert_with). /// /// # Avoiding to clone the value at `get` /// @@ -126,7 +109,57 @@ use std::{ /// /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html /// -/// # Expiration Policies +/// # Evictions +/// +/// `Cache` provides two types of eviction: size-based eviction and time-based +/// eviction. +/// +/// ## Size-based +/// +/// ```rust +/// use std::convert::TryInto; +/// use moka::sync::Cache; +/// +/// // Evict based on the number of entries in the cache. +/// let cache = Cache::builder() +/// // Up to 10,000 entries. +/// .max_capacity(10_000) +/// // Create the cache. +/// .build(); +/// cache.insert(1, "one".to_string()); +/// +/// // Evict based on the byte length of strings in the cache. +/// let cache = Cache::builder() +/// // A weigher closure takes &K and &V and returns a u32 +/// // representing the relative size of the entry. +/// .weigher(|_key, value: &String| -> u32 { +/// value.len().try_into().unwrap_or(u32::MAX) +/// }) +/// // This cache will hold up to 32MiB of values. +/// .max_capacity(32 * 1024 * 1024) +/// .build(); +/// cache.insert(2, "two".to_string()); +/// ``` +/// +/// If your cache should not grow beyond a certain size, use the `max_capacity` +/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache +/// will try to evict entries that have not been used recently or very often. +/// +/// At the cache creation time, a weigher closure can be set by the `weigher` method +/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and +/// returns a `u32` representing the relative size of the entry: +/// +/// - If the `weigher` is _not_ set, the cache will treat each entry has the same +/// size of `1`. This means the cache will be bounded by the number of entries. +/// - If the `weigher` is set, the cache will call the weigher to calculate the +/// weighted size (relative size) on an entry. This means the cache will be bounded +/// by the total weighted size of entries. +/// +/// Note that weighted sizes are not used when making eviction selections. +/// +/// [builder-struct]: ./struct.CacheBuilder.html +/// +/// ## Time-based (Expirations) /// /// `Cache` supports the following expiration policies: /// @@ -135,10 +168,50 @@ use std::{ /// - **Time to idle**: A cached entry will be expired after the specified duration /// past from `get` or `insert`. /// -/// See the [`CacheBuilder`][builder-struct]'s doc for how to configure a cache -/// with them. +/// ```rust +/// use moka::sync::Cache; +/// use std::time::Duration; +/// +/// let cache = Cache::builder() +/// // Time to live (TTL): 30 minutes +/// .time_to_live(Duration::from_secs(30 * 60)) +/// // Time to idle (TTI): 5 minutes +/// .time_to_idle(Duration::from_secs( 5 * 60)) +/// // Create the cache. +/// .build(); /// -/// [builder-struct]: ./struct.CacheBuilder.html +/// // This entry will expire after 5 minutes (TTI) if there is no get(). +/// cache.insert(0, "zero"); +/// +/// // This get() will extend the entry life for another 5 minutes. +/// cache.get(&0); +/// +/// // Even though we keep calling get(), the entry will expire +/// // after 30 minutes (TTL) from the insert(). +/// ``` +/// +/// # Thread Safety +/// +/// All methods provided by the `Cache` are considered thread-safe, and can be safely +/// accessed by multiple concurrent threads. +/// +/// - `Cache` requires trait bounds `Send`, `Sync` and `'static` for `K` +/// (key), `V` (value) and `S` (hasher state). +/// - `Cache` will implement `Send` and `Sync`. +/// +/// # Sharing a cache across threads +/// +/// To share a cache across threads, do one of the followings: +/// +/// - Create a clone of the cache by calling its `clone` method and pass it to other +/// thread. +/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from +/// [once_cell][once-cell-crate] create, and set it to a `static` variable. +/// +/// Cloning is a cheap operation for `Cache` as it only creates thread-safe +/// reference-counted pointers to the internal data structures. +/// +/// [once-cell-crate]: https://crates.io/crates/once_cell /// /// # Hashing Algorithm /// @@ -188,7 +261,7 @@ where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - /// Constructs a new `Cache` that will store up to the `max_capacity` entries. + /// Constructs a new `Cache` that will store up to the `max_capacity`. /// /// To adjust various configuration knobs such as `initial_capacity` or /// `time_to_live`, use the [`CacheBuilder`][builder-struct]. @@ -196,7 +269,23 @@ where /// [builder-struct]: ./struct.CacheBuilder.html pub fn new(max_capacity: usize) -> Self { let build_hasher = RandomState::default(); - Self::with_everything(max_capacity, None, build_hasher, None, None, false) + Self::with_everything( + Some(max_capacity), + None, + build_hasher, + None, + None, + None, + false, + ) + } + + /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` or + /// `SegmentedCache` with various configuration knobs. + /// + /// [builder-struct]: ./struct.CacheBuilder.html + pub fn builder() -> CacheBuilder> { + CacheBuilder::default() } } @@ -207,9 +296,10 @@ where S: BuildHasher + Clone + Send + Sync + 'static, { pub(crate) fn with_everything( - max_capacity: usize, + max_capacity: Option, initial_capacity: Option, build_hasher: S, + weigher: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, @@ -219,6 +309,7 @@ where max_capacity, initial_capacity, build_hasher.clone(), + weigher, time_to_live, time_to_idle, invalidator_enabled, @@ -500,8 +591,8 @@ where Arc: Borrow, Q: Hash + Eq + ?Sized, { - if let Some(entry) = self.base.remove(key) { - let op = WriteOp::Remove(entry); + if let Some(kv) = self.base.remove_entry(key) { + let op = WriteOp::Remove(kv); let hk = self.base.housekeeper.as_ref(); Self::schedule_write_op(&self.base.write_op_ch, op, hk).expect("Failed to remove"); } @@ -564,7 +655,7 @@ where } /// Returns the `max_capacity` of this cache. - pub fn max_capacity(&self) -> usize { + pub fn max_capacity(&self) -> Option { self.base.max_capacity() } @@ -703,7 +794,7 @@ mod tests { assert_eq!(cache.get(&"d"), None); // d -> 2 // "d" should be admitted and "c" should be evicted - // because d's frequency is higher then c's. + // because d's frequency is higher than c's. cache.insert("d", "dennis"); cache.sync(); assert_eq!(cache.get(&"a"), Some("alice")); @@ -715,6 +806,74 @@ mod tests { assert_eq!(cache.get(&"b"), None); } + #[test] + fn size_aware_eviction() { + let weigher = |_k: &&str, v: &(&str, u32)| v.1; + + let alice = ("alice", 10); + let bob = ("bob", 15); + let bill = ("bill", 20); + let cindy = ("cindy", 5); + let david = ("david", 15); + let dennis = ("dennis", 15); + + let mut cache = Cache::builder().max_capacity(31).weigher(weigher).build(); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", alice); + cache.insert("b", bob); + assert_eq!(cache.get(&"a"), Some(alice)); + assert_eq!(cache.get(&"b"), Some(bob)); + cache.sync(); + // order (LRU -> MRU) and counts: a -> 1, b -> 1 + + cache.insert("c", cindy); + assert_eq!(cache.get(&"c"), Some(cindy)); + // order and counts: a -> 1, b -> 1, c -> 1 + cache.sync(); + + assert_eq!(cache.get(&"a"), Some(alice)); + assert_eq!(cache.get(&"b"), Some(bob)); + cache.sync(); + // order and counts: c -> 1, a -> 2, b -> 2 + + // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10). + // "d" must have higher count than 3, which is the aggregated count + // of "a" and "c". + cache.insert("d", david); // count: d -> 0 + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 1 + + cache.insert("d", david); + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 2 + + cache.insert("d", david); + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 3 + + cache.insert("d", david); + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 4 + + // Finally "d" should be admitted by evicting "c" and "a". + cache.insert("d", dennis); + cache.sync(); + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), Some(bob)); + assert_eq!(cache.get(&"c"), None); + assert_eq!(cache.get(&"d"), Some(dennis)); + + // Update "b" with "bill" (w: 20). This should evict "d" (w: 15). + cache.insert("b", bill); + cache.sync(); + assert_eq!(cache.get(&"b"), Some(bill)); + assert_eq!(cache.get(&"d"), None); + } + #[test] fn basic_multi_threads() { let num_threads = 4; diff --git a/src/sync/entry_info.rs b/src/sync/entry_info.rs new file mode 100644 index 00000000..f10ba67c --- /dev/null +++ b/src/sync/entry_info.rs @@ -0,0 +1,145 @@ +use std::sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, +}; + +use super::{AccessTime, CacheFeatures}; +use crate::common::{atomic_time::AtomicInstant, time::Instant}; + +// We use enum-based dynamic dispatch here, rather than using trait-object-based +// dynamic dispatch. Our benchmark programs showed enum-based dispatch was slightly +// (1% or 2%) faster than other in our use cases. + +pub(crate) enum EntryInfo { + Plain(Arc), + Weighted(Arc), +} + +#[derive(Default)] +pub(crate) struct Plain { + is_admitted: AtomicBool, + last_accessed: AtomicInstant, + last_modified: AtomicInstant, +} + +pub(crate) struct Weighted { + is_admitted: AtomicBool, + last_accessed: AtomicInstant, + last_modified: AtomicInstant, + policy_weight: AtomicU32, +} + +impl Weighted { + pub(crate) fn new(policy_weight: u32) -> Self { + Self { + is_admitted: Default::default(), + last_accessed: Default::default(), + last_modified: Default::default(), + policy_weight: AtomicU32::new(policy_weight), + } + } +} + +impl Clone for EntryInfo { + fn clone(&self) -> Self { + match self { + Self::Plain(ei) => Self::Plain(Arc::clone(ei)), + Self::Weighted(ei) => Self::Weighted(Arc::clone(ei)), + } + } +} + +impl EntryInfo { + #[inline] + pub(crate) fn new(features: CacheFeatures, policy_weight: u32) -> Self { + match features { + CacheFeatures::Plain => Self::Plain(Arc::new(Plain::default())), + CacheFeatures::Weighted => Self::Weighted(Arc::new(Weighted::new(policy_weight))), + } + } + + #[inline] + pub(crate) fn is_admitted(&self) -> bool { + let v = match self { + Self::Plain(ei) => &ei.is_admitted, + Self::Weighted(ei) => &ei.is_admitted, + }; + v.load(Ordering::Acquire) + } + + #[inline] + pub(crate) fn set_is_admitted(&self, value: bool) { + let v = match self { + Self::Plain(ei) => &ei.is_admitted, + Self::Weighted(ei) => &ei.is_admitted, + }; + v.store(value, Ordering::Release); + } + + #[inline] + pub(crate) fn reset_timestamps(&self) { + match self { + Self::Plain(ei) => { + ei.last_accessed.reset(); + ei.last_accessed.reset(); + } + Self::Weighted(ei) => { + ei.last_accessed.reset(); + ei.last_modified.reset(); + } + } + } + + #[inline] + pub(crate) fn policy_weight(&self) -> u32 { + match self { + Self::Plain(_) => 1, + Self::Weighted(ei) => ei.policy_weight.load(Ordering::Acquire), + } + } + + pub(crate) fn set_policy_weight(&self, size: u32) { + match self { + Self::Plain(_) => (), + Self::Weighted(ei) => ei.policy_weight.store(size, Ordering::Release), + } + } +} + +impl AccessTime for EntryInfo { + #[inline] + fn last_accessed(&self) -> Option { + let v = match self { + Self::Plain(ei) => &ei.last_accessed, + Self::Weighted(ei) => &ei.last_accessed, + }; + v.instant() + } + + #[inline] + fn set_last_accessed(&self, timestamp: Instant) { + let v = match self { + Self::Plain(ei) => &ei.last_accessed, + Self::Weighted(ei) => &ei.last_accessed, + }; + v.set_instant(timestamp); + } + + #[inline] + fn last_modified(&self) -> Option { + let v = match self { + Self::Plain(ei) => &ei.last_modified, + Self::Weighted(ei) => &ei.last_modified, + }; + v.instant() + } + + #[inline] + fn set_last_modified(&self, timestamp: Instant) { + let v = match self { + Self::Plain(ei) => &ei.last_modified, + Self::Weighted(ei) => &ei.last_modified, + }; + v.set_instant(timestamp); + } +} diff --git a/src/sync/invalidator.rs b/src/sync/invalidator.rs index e0d77d9b..861a1c3e 100644 --- a/src/sync/invalidator.rs +++ b/src/sync/invalidator.rs @@ -5,12 +5,11 @@ use crate::{ thread_pool::{PoolName, ThreadPool, ThreadPoolRegistry}, time::Instant, unsafe_weak_pointer::UnsafeWeakPointer, - AccessTime, }, PredicateError, }; -use super::{base_cache::Inner, PredicateId, PredicateIdStr, ValueEntry}; +use super::{base_cache::Inner, AccessTime, KvEntry, PredicateId, PredicateIdStr, ValueEntry}; use parking_lot::{Mutex, RwLock}; use std::{ @@ -59,12 +58,12 @@ impl KeyDateLite { } pub(crate) struct InvalidationResult { - pub(crate) invalidated: Vec>>, + pub(crate) invalidated: Vec>, pub(crate) is_done: bool, } impl InvalidationResult { - fn new(invalidated: Vec>>, is_done: bool) -> Self { + fn new(invalidated: Vec>, is_done: bool) -> Self { Self { invalidated, is_done, @@ -399,7 +398,10 @@ where let ts = candidate.timestamp; if Self::apply(&predicates, cache, key, ts) { if let Some(entry) = Self::invalidate(cache, key, ts) { - invalidated.push(entry) + invalidated.push(KvEntry { + key: Arc::clone(key), + entry, + }) } } newest_timestamp = Some(ts); @@ -447,7 +449,7 @@ where } struct ScanResult { - invalidated: Vec>>, + invalidated: Vec>, is_truncated: bool, newest_timestamp: Option, } diff --git a/src/sync/segment.rs b/src/sync/segment.rs index d6d69adb..824837ba 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -1,4 +1,4 @@ -use super::{cache::Cache, ConcurrentCacheExt}; +use super::{cache::Cache, CacheBuilder, ConcurrentCacheExt, Weigher}; use crate::PredicateError; use std::{ @@ -60,7 +60,7 @@ where V: Clone + Send + Sync + 'static, { /// Constructs a new `SegmentedCache` that has multiple internal - /// segments and will store up to the `max_capacity` entries. + /// segments and will store up to the `max_capacity`. /// /// To adjust various configuration knobs such as `initial_capacity` or /// `time_to_live`, use the [`CacheBuilder`][builder-struct]. @@ -73,15 +73,24 @@ where pub fn new(max_capacity: usize, num_segments: usize) -> Self { let build_hasher = RandomState::default(); Self::with_everything( - max_capacity, + Some(max_capacity), None, num_segments, build_hasher, None, None, + None, false, ) } + + /// Returns a [`CacheBuilder`][builder-struct], which can builds a + /// `SegmentedCache` with various configuration knobs. + /// + /// [builder-struct]: ./struct.CacheBuilder.html + pub fn builder(num_segments: usize) -> CacheBuilder> { + CacheBuilder::default().segments(num_segments) + } } impl SegmentedCache @@ -93,11 +102,13 @@ where /// # Panics /// /// Panics if `num_segments` is 0. + #[allow(clippy::too_many_arguments)] pub(crate) fn with_everything( - max_capacity: usize, + max_capacity: Option, initial_capacity: Option, num_segments: usize, build_hasher: S, + weigher: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, @@ -108,6 +119,7 @@ where initial_capacity, num_segments, build_hasher, + weigher, time_to_live, time_to_idle, invalidator_enabled, @@ -244,7 +256,7 @@ where } /// Returns the `max_capacity` of this cache. - pub fn max_capacity(&self) -> usize { + pub fn max_capacity(&self) -> Option { self.inner.desired_capacity } @@ -345,7 +357,7 @@ impl MockExpirationClock { } struct Inner { - desired_capacity: usize, + desired_capacity: Option, segments: Box<[Cache]>, build_hasher: S, segment_shift: u32, @@ -360,11 +372,13 @@ where /// # Panics /// /// Panics if `num_segments` is 0. + #[allow(clippy::too_many_arguments)] fn new( - max_capacity: usize, + max_capacity: Option, initial_capacity: Option, num_segments: usize, build_hasher: S, + weigher: Option>, time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, @@ -374,16 +388,17 @@ where let actual_num_segments = num_segments.next_power_of_two(); let segment_shift = 64 - actual_num_segments.trailing_zeros(); // TODO: Round up. - let seg_capacity = max_capacity / actual_num_segments; + let seg_max_capacity = max_capacity.map(|n| n / actual_num_segments); let seg_init_capacity = initial_capacity.map(|cap| cap / actual_num_segments); // NOTE: We cannot initialize the segments as `vec![cache; actual_num_segments]` // because Cache::clone() does not clone its inner but shares the same inner. let segments = (0..num_segments) .map(|_| { Cache::with_everything( - seg_capacity, + seg_max_capacity, seg_init_capacity, build_hasher.clone(), + weigher.as_ref().map(Arc::clone), time_to_live, time_to_idle, invalidator_enabled, @@ -467,7 +482,7 @@ mod tests { assert_eq!(cache.get(&"d"), None); // d -> 2 // "d" should be admitted and "c" should be evicted - // because d's frequency is higher then c's. + // because d's frequency is higher than c's. cache.insert("d", "dennis"); cache.sync(); assert_eq!(cache.get(&"a"), Some("alice")); @@ -478,6 +493,77 @@ mod tests { cache.invalidate(&"b"); } + #[test] + fn size_aware_eviction() { + let weigher = |_k: &&str, v: &(&str, u32)| v.1; + + let alice = ("alice", 10); + let bob = ("bob", 15); + let bill = ("bill", 20); + let cindy = ("cindy", 5); + let david = ("david", 15); + let dennis = ("dennis", 15); + + let mut cache = SegmentedCache::builder(1) + .max_capacity(31) + .weigher(weigher) + .build(); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", alice); + cache.insert("b", bob); + assert_eq!(cache.get(&"a"), Some(alice)); + assert_eq!(cache.get(&"b"), Some(bob)); + cache.sync(); + // order (LRU -> MRU) and counts: a -> 1, b -> 1 + + cache.insert("c", cindy); + assert_eq!(cache.get(&"c"), Some(cindy)); + // order and counts: a -> 1, b -> 1, c -> 1 + cache.sync(); + + assert_eq!(cache.get(&"a"), Some(alice)); + assert_eq!(cache.get(&"b"), Some(bob)); + cache.sync(); + // order and counts: c -> 1, a -> 2, b -> 2 + + // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10). + // "d" must have higher count than 3, which is the aggregated count + // of "a" and "c". + cache.insert("d", david); // count: d -> 0 + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 1 + + cache.insert("d", david); + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 2 + + cache.insert("d", david); + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 3 + + cache.insert("d", david); + cache.sync(); + assert_eq!(cache.get(&"d"), None); // d -> 4 + + // Finally "d" should be admitted by evicting "c" and "a". + cache.insert("d", dennis); + cache.sync(); + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), Some(bob)); + assert_eq!(cache.get(&"c"), None); + assert_eq!(cache.get(&"d"), Some(dennis)); + + // Update "b" with "bill" (w: 20). This should evict "d" (w: 15). + cache.insert("b", bill); + cache.sync(); + assert_eq!(cache.get(&"b"), Some(bill)); + assert_eq!(cache.get(&"d"), None); + } + #[test] fn basic_multi_threads() { let num_threads = 4; diff --git a/src/unsync.rs b/src/unsync.rs index 64dd284e..e1052fa0 100644 --- a/src/unsync.rs +++ b/src/unsync.rs @@ -9,7 +9,16 @@ use std::{ptr::NonNull, rc::Rc}; pub use builder::CacheBuilder; pub use cache::Cache; -use crate::common::{deque::DeqNode, time::Instant, AccessTime}; +use crate::common::{deque::DeqNode, time::Instant}; + +pub(crate) type Weigher = Box u32>; + +pub(crate) trait AccessTime { + fn last_accessed(&self) -> Option; + fn set_last_accessed(&mut self, timestamp: Instant); + fn last_modified(&self) -> Option; + fn set_last_modified(&mut self, timestamp: Instant); +} pub(crate) struct KeyDate { pub(crate) key: Rc, @@ -44,61 +53,73 @@ type KeyDeqNodeAo = NonNull>>; // DeqNode for the write order queue. type KeyDeqNodeWo = NonNull>>; -struct DeqNodes { +struct EntryInfo { access_order_q_node: Option>, write_order_q_node: Option>, + policy_weight: u32, } pub(crate) struct ValueEntry { pub(crate) value: V, - deq_nodes: DeqNodes, + info: EntryInfo, } impl ValueEntry { - pub(crate) fn new(value: V) -> Self { + pub(crate) fn new(value: V, policy_weight: u32) -> Self { Self { value, - deq_nodes: DeqNodes { + info: EntryInfo { access_order_q_node: None, write_order_q_node: None, + policy_weight, }, } } #[inline] pub(crate) fn replace_deq_nodes_with(&mut self, mut other: Self) { - self.deq_nodes.access_order_q_node = other.deq_nodes.access_order_q_node.take(); - self.deq_nodes.write_order_q_node = other.deq_nodes.write_order_q_node.take(); + self.info.access_order_q_node = other.info.access_order_q_node.take(); + self.info.write_order_q_node = other.info.write_order_q_node.take(); } #[inline] pub(crate) fn access_order_q_node(&self) -> Option> { - self.deq_nodes.access_order_q_node + self.info.access_order_q_node } #[inline] pub(crate) fn set_access_order_q_node(&mut self, node: Option>) { - self.deq_nodes.access_order_q_node = node; + self.info.access_order_q_node = node; } #[inline] pub(crate) fn take_access_order_q_node(&mut self) -> Option> { - self.deq_nodes.access_order_q_node.take() + self.info.access_order_q_node.take() } #[inline] pub(crate) fn write_order_q_node(&self) -> Option> { - self.deq_nodes.write_order_q_node + self.info.write_order_q_node } #[inline] pub(crate) fn set_write_order_q_node(&mut self, node: Option>) { - self.deq_nodes.write_order_q_node = node; + self.info.write_order_q_node = node; } #[inline] pub(crate) fn take_write_order_q_node(&mut self) -> Option> { - self.deq_nodes.write_order_q_node.take() + self.info.write_order_q_node.take() + } + + #[inline] + pub(crate) fn policy_weight(&self) -> u32 { + self.info.policy_weight + } + + #[inline] + pub(crate) fn set_policy_weight(&mut self, policy_weight: u32) { + self.info.policy_weight = policy_weight; } } @@ -111,7 +132,7 @@ impl AccessTime for ValueEntry { #[inline] fn set_last_accessed(&mut self, timestamp: Instant) { - if let Some(mut node) = self.deq_nodes.access_order_q_node { + if let Some(mut node) = self.info.access_order_q_node { unsafe { node.as_mut() }.set_last_accessed(timestamp); } } @@ -124,7 +145,7 @@ impl AccessTime for ValueEntry { #[inline] fn set_last_modified(&mut self, timestamp: Instant) { - if let Some(mut node) = self.deq_nodes.write_order_q_node { + if let Some(mut node) = self.info.write_order_q_node { unsafe { node.as_mut() }.set_last_modified(timestamp); } } diff --git a/src/unsync/builder.rs b/src/unsync/builder.rs index 4cc5424a..f8b79eee 100644 --- a/src/unsync/builder.rs +++ b/src/unsync/builder.rs @@ -1,4 +1,4 @@ -use super::Cache; +use super::{Cache, Weigher}; use crate::common::builder_utils; use std::{ @@ -15,11 +15,12 @@ use std::{ /// # Examples /// /// ```rust -/// use moka::unsync::CacheBuilder; -/// +/// use moka::unsync::Cache; /// use std::time::Duration; /// -/// let mut cache = CacheBuilder::new(10_000) // Max 10,000 elements +/// let mut cache = Cache::builder() +/// // Max 10,000 elements +/// .max_capacity(10_000) /// // Time to live (TTL): 30 minutes /// .time_to_live(Duration::from_secs(30 * 60)) /// // Time to idle (TTI): 5 minutes @@ -37,27 +38,41 @@ use std::{ /// // after 30 minutes (TTL) from the insert(). /// ``` /// -pub struct CacheBuilder { - max_capacity: usize, +pub struct CacheBuilder { + max_capacity: Option, initial_capacity: Option, + weigher: Option>, time_to_live: Option, time_to_idle: Option, cache_type: PhantomData, } -impl CacheBuilder> +impl Default for CacheBuilder> where K: Eq + Hash, { - /// Construct a new `CacheBuilder` that will be used to build a `Cache` holding - /// up to `max_capacity` entries. - pub fn new(max_capacity: usize) -> Self { + fn default() -> Self { Self { - max_capacity, + max_capacity: None, initial_capacity: None, + weigher: None, time_to_live: None, time_to_idle: None, - cache_type: PhantomData::default(), + cache_type: Default::default(), + } + } +} + +impl CacheBuilder> +where + K: Eq + Hash, +{ + /// Construct a new `CacheBuilder` that will be used to build a `Cache` holding + /// up to `max_capacity` entries. + pub fn new(max_capacity: usize) -> Self { + Self { + max_capacity: Some(max_capacity), + ..Default::default() } } @@ -75,6 +90,7 @@ where self.max_capacity, self.initial_capacity, build_hasher, + self.weigher, self.time_to_live, self.time_to_idle, ) @@ -96,17 +112,37 @@ where self.max_capacity, self.initial_capacity, hasher, + self.weigher, self.time_to_live, self.time_to_idle, ) } } -impl CacheBuilder { - /// Sets the initial capacity of the cache. - pub fn initial_capacity(self, capacity: usize) -> Self { +impl CacheBuilder { + /// Sets the max capacity of the cache. + pub fn max_capacity(self, max_capacity: usize) -> Self { + Self { + max_capacity: Some(max_capacity), + ..self + } + } + + /// Sets the initial capacity (number of entries) of the cache. + pub fn initial_capacity(self, number_of_entries: usize) -> Self { + Self { + initial_capacity: Some(number_of_entries), + ..self + } + } + + /// Sets the weigher closure of the cache. + /// + /// The closure should take `&K` and `&V` as the arguments and returns a `u32` + /// representing the relative size of the entry. + pub fn weigher(self, weigher: impl FnMut(&K, &V) -> u32 + 'static) -> Self { Self { - initial_capacity: Some(capacity), + weigher: Some(Box::new(weigher)), ..self } } @@ -148,7 +184,6 @@ impl CacheBuilder { #[cfg(test)] mod tests { - use super::Cache; use super::CacheBuilder; use std::time::Duration; @@ -158,7 +193,7 @@ mod tests { // Cache let mut cache = CacheBuilder::new(100).build(); - assert_eq!(cache.max_capacity(), 100); + assert_eq!(cache.max_capacity(), Some(100)); assert_eq!(cache.time_to_live(), None); assert_eq!(cache.time_to_idle(), None); @@ -170,7 +205,7 @@ mod tests { .time_to_idle(Duration::from_secs(15 * 60)) .build(); - assert_eq!(cache.max_capacity(), 100); + assert_eq!(cache.max_capacity(), Some(100)); assert_eq!(cache.time_to_live(), Some(Duration::from_secs(45 * 60))); assert_eq!(cache.time_to_idle(), Some(Duration::from_secs(15 * 60))); @@ -182,7 +217,7 @@ mod tests { #[should_panic(expected = "time_to_live is longer than 1000 years")] fn build_cache_too_long_ttl() { let thousand_years_secs: u64 = 1000 * 365 * 24 * 3600; - let builder: CacheBuilder> = CacheBuilder::new(100); + let builder: CacheBuilder = CacheBuilder::new(100); let duration = Duration::from_secs(thousand_years_secs); builder .time_to_live(duration + Duration::from_secs(1)) @@ -193,7 +228,7 @@ mod tests { #[should_panic(expected = "time_to_idle is longer than 1000 years")] fn build_cache_too_long_tti() { let thousand_years_secs: u64 = 1000 * 365 * 24 * 3600; - let builder: CacheBuilder> = CacheBuilder::new(100); + let builder: CacheBuilder = CacheBuilder::new(100); let duration = Duration::from_secs(thousand_years_secs); builder .time_to_idle(duration + Duration::from_secs(1)) diff --git a/src/unsync/cache.rs b/src/unsync/cache.rs index 88e7ec49..d84b21df 100644 --- a/src/unsync/cache.rs +++ b/src/unsync/cache.rs @@ -1,11 +1,11 @@ -use super::{deques::Deques, KeyDate, KeyHashDate, ValueEntry}; +use super::{deques::Deques, AccessTime, CacheBuilder, KeyDate, KeyHashDate, ValueEntry, Weigher}; use crate::common::{ deque::{CacheRegion, DeqNode, Deque}, frequency_sketch::FrequencySketch, time::{CheckedTimeOps, Clock, Instant}, - AccessTime, }; +use smallvec::SmallVec; use std::{ borrow::Borrow, collections::{hash_map::RandomState, HashMap}, @@ -16,6 +16,8 @@ use std::{ time::Duration, }; +const EVICTION_BATCH_SIZE: usize = 100; + type CacheStore = std::collections::HashMap, ValueEntry, S>; /// An in-memory cache that is _not_ thread-safe. @@ -109,9 +111,11 @@ type CacheStore = std::collections::HashMap, ValueEntry, S> /// [ahash-crate]: https://crates.io/crates/ahash /// pub struct Cache { - max_capacity: usize, + max_capacity: Option, + weighted_size: u64, cache: CacheStore, build_hasher: S, + weigher: Option>, deques: Deques, frequency_sketch: FrequencySketch, time_to_live: Option, @@ -131,7 +135,15 @@ where /// [builder-struct]: ./struct.CacheBuilder.html pub fn new(max_capacity: usize) -> Self { let build_hasher = RandomState::default(); - Self::with_everything(max_capacity, None, build_hasher, None, None) + Self::with_everything(Some(max_capacity), None, build_hasher, None, None, None) + } + + /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` with + /// various configuration knobs. + /// + /// [builder-struct]: ./struct.CacheBuilder.html + pub fn builder() -> CacheBuilder> { + CacheBuilder::default() } } @@ -144,9 +156,10 @@ where S: BuildHasher + Clone, { pub(crate) fn with_everything( - max_capacity: usize, + max_capacity: Option, initial_capacity: Option, build_hasher: S, + weigher: Option>, time_to_live: Option, time_to_idle: Option, ) -> Self { @@ -157,14 +170,16 @@ where // Ensure skt_capacity fits in a range of `128u32..=u32::MAX`. let skt_capacity = max_capacity - .try_into() // Convert to u32. - .unwrap_or(u32::MAX) + .map(|n| n.try_into().unwrap_or(u32::MAX)) // Convert to u32. + .unwrap_or_default() .max(128); let frequency_sketch = FrequencySketch::with_capacity(skt_capacity); Self { - max_capacity, + max_capacity: max_capacity.map(|n| n as u64), + weighted_size: 0, cache, build_hasher, + weigher, deques: Deques::default(), frequency_sketch, time_to_live, @@ -184,7 +199,8 @@ where Rc: Borrow, Q: Hash + Eq + ?Sized, { - let timestamp = self.evict_if_needed(); + let timestamp = self.evict_expired_if_needed(); + self.evict_lru_entries(); self.frequency_sketch.increment(self.hash(key)); match (self.cache.get_mut(key), timestamp, &mut self.deques) { @@ -213,15 +229,17 @@ where /// /// If the cache has this key present, the value is updated. pub fn insert(&mut self, key: K, value: V) { - let timestamp = self.evict_if_needed(); + let timestamp = self.evict_expired_if_needed(); + self.evict_lru_entries(); + let policy_weight = weigh(&mut self.weigher, &key, &value); let key = Rc::new(key); - let entry = ValueEntry::new(value); + let entry = ValueEntry::new(value, policy_weight); if let Some(old_entry) = self.cache.insert(Rc::clone(&key), entry) { - self.handle_update(key, timestamp, old_entry); + self.handle_update(key, timestamp, policy_weight, old_entry); } else { let hash = self.hash(&key); - self.handle_insert(key, hash, timestamp); + self.handle_insert(key, hash, policy_weight, timestamp); } } @@ -234,11 +252,14 @@ where Rc: Borrow, Q: Hash + Eq + ?Sized, { - self.evict_if_needed(); + self.evict_expired_if_needed(); + self.evict_lru_entries(); if let Some(mut entry) = self.cache.remove(key) { + let weight = entry.policy_weight(); self.deques.unlink_ao(&mut entry); - Deques::unlink_wo(&mut self.deques.write_order, &mut entry) + Deques::unlink_wo(&mut self.deques.write_order, &mut entry); + self.saturating_sub_from_total_weight(weight as u64); } } @@ -250,6 +271,7 @@ where pub fn invalidate_all(&mut self) { self.cache.clear(); self.deques.clear(); + self.weighted_size = 0; } /// Discards cached values that satisfy a predicate. @@ -280,17 +302,22 @@ where .map(|(key, _)| Rc::clone(key)) .collect::>(); + let mut invalidated = 0u64; + keys_to_invalidate.into_iter().for_each(|k| { if let Some(mut entry) = cache.remove(&k) { + let weight = entry.policy_weight(); deques.unlink_ao(&mut entry); Deques::unlink_wo(&mut deques.write_order, &mut entry); + invalidated = invalidated.saturating_sub(weight as u64); } }); + self.saturating_sub_from_total_weight(invalidated); } /// Returns the `max_capacity` of this cache. - pub fn max_capacity(&self) -> usize { - self.max_capacity + pub fn max_capacity(&self) -> Option { + self.max_capacity.map(|n| n as usize) } /// Returns the `time_to_live` of this cache. @@ -329,10 +356,10 @@ where } #[inline] - fn evict_if_needed(&mut self) -> Option { + fn evict_expired_if_needed(&mut self) -> Option { if self.has_expiry() { let ts = self.current_time_from_expiration_clock(); - self.evict(ts); + self.evict_expired(ts); Some(ts) } else { None @@ -387,9 +414,37 @@ where deques.move_to_back_ao(entry) } + fn has_enough_capacity(&self, candidate_weight: u32, ws: u64) -> bool { + self.max_capacity + .map(|limit| ws + candidate_weight as u64 <= limit) + .unwrap_or(true) + } + + fn weights_to_evict(&self) -> u64 { + self.max_capacity + .map(|limit| self.weighted_size.saturating_sub(limit)) + .unwrap_or_default() + } + + fn saturating_add_to_total_weight(&mut self, weight: u64) { + let total = &mut self.weighted_size; + *total = total.saturating_add(weight as u64); + } + + fn saturating_sub_from_total_weight(&mut self, weight: u64) { + let total = &mut self.weighted_size; + *total = total.saturating_sub(weight as u64); + } + #[inline] - fn handle_insert(&mut self, key: Rc, hash: u64, timestamp: Option) { - let has_free_space = self.cache.len() <= self.max_capacity; + fn handle_insert( + &mut self, + key: Rc, + hash: u64, + policy_weight: u32, + timestamp: Option, + ) { + let has_free_space = self.has_enough_capacity(policy_weight, self.weighted_size); let (cache, deqs, freq) = (&mut self.cache, &mut self.deques, &self.frequency_sketch); if has_free_space { @@ -404,26 +459,39 @@ where if self.time_to_live.is_some() { deqs.push_back_wo(KeyDate::new(key, timestamp), entry); } - } else { - let victim = Self::find_cache_victim(deqs, freq); - if Self::admit(hash, victim, freq) { - // Remove the victim from the cache and deque. - // - // TODO: Check if the selected victim was actually removed. If not, - // maybe we should find another victim. This can happen because it - // could have been already removed from the cache but the removal - // from the deque is still on the write operations queue and is not - // yet executed. - if let Some(mut vic_entry) = cache.remove(&victim.element.key) { + self.saturating_add_to_total_weight(policy_weight as u64); + return; + } + + if let Some(max) = self.max_capacity { + if policy_weight as u64 > max { + // The candidate is too big to fit in the cache. Reject it. + cache.remove(&Rc::clone(&key)); + return; + } + } + + let mut candidate = EntrySizeAndFrequency::new(policy_weight as u64); + candidate.add_frequency(freq, hash); + + match Self::admit(&candidate, cache, deqs, freq, &mut self.weigher) { + AdmissionResult::Admitted { + victim_nodes, + victims_weight, + } => { + // Remove the victims from the cache (hash map) and deque. + for victim in victim_nodes { + // Remove the victim from the hash map. + let mut vic_entry = cache + .remove(unsafe { &victim.as_ref().element.key }) + .expect("Cannot remove a victim from the hash map"); + // And then remove the victim from the deques. deqs.unlink_ao(&mut vic_entry); Deques::unlink_wo(&mut deqs.write_order, &mut vic_entry); - } else { - let victim = NonNull::from(victim); - deqs.unlink_node_ao(victim); } + // Add the candidate to the deque. let entry = cache.get_mut(&key).unwrap(); - let key = Rc::clone(&key); deqs.push_back_ao( CacheRegion::MainProbation, @@ -433,56 +501,114 @@ where if self.time_to_live.is_some() { deqs.push_back_wo(KeyDate::new(key, timestamp), entry); } - } else { + + Self::saturating_sub_from_total_weight(self, victims_weight); + Self::saturating_add_to_total_weight(self, policy_weight as u64); + } + AdmissionResult::Rejected => { // Remove the candidate from the cache. cache.remove(&key); } } } - #[inline] - fn find_cache_victim<'a>( - deqs: &'a mut Deques, - _freq: &FrequencySketch, - ) -> &'a DeqNode> { - // TODO: Check its frequency. If it is not very low, maybe we should - // check frequencies of next few others and pick from them. - deqs.probation.peek_front().expect("No victim found") - } - + /// Performs size-aware admission explained in the paper: + /// [Lightweight Robust Size Aware Cache Management][size-aware-cache-paper] + /// by Gil Einziger, Ohad Eytan, Roy Friedman, Ben Manes. + /// + /// [size-aware-cache-paper]: https://arxiv.org/abs/2105.08770 + /// + /// There are some modifications in this implementation: + /// - To admit to the main space, candidate's frequency must be higher than + /// the aggregated frequencies of the potential victims. (In the paper, + /// `>=` operator is used rather than `>`) The `>` operator will do a better + /// job to prevent the main space from polluting. + /// - When a candidate is rejected, the potential victims will stay at the LRU + /// position of the probation access-order queue. (In the paper, they will be + /// promoted (to the MRU position?) to force the eviction policy to select a + /// different set of victims for the next candidate). We may implement the + /// paper's behavior later? + /// #[inline] fn admit( - candidate_hash: u64, - victim: &DeqNode>, + candidate: &EntrySizeAndFrequency, + cache: &CacheStore, + deqs: &Deques, freq: &FrequencySketch, - ) -> bool { + weigher: &mut Option>, + ) -> AdmissionResult { + let mut victims = EntrySizeAndFrequency::default(); + let mut victim_nodes = SmallVec::default(); + + // Get first potential victim at the LRU position. + let mut next_victim = deqs.probation.peek_front(); + + // Aggregate potential victims. + while victims.weight < candidate.weight { + if candidate.freq < victims.freq { + break; + } + if let Some(victim) = next_victim.take() { + next_victim = victim.next_node(); + + let vic_entry = cache + .get(&victim.element.key) + .expect("Cannot get an victim entry"); + victims.add_policy_weight(victim.element.key.as_ref(), &vic_entry.value, weigher); + victims.add_frequency(freq, victim.element.hash); + victim_nodes.push(NonNull::from(victim)); + } else { + // No more potential victims. + break; + } + } + + // Admit or reject the candidate. + // TODO: Implement some randomness to mitigate hash DoS attack. // See Caffeine's implementation. - freq.frequency(candidate_hash) > freq.frequency(victim.element.hash) + + if victims.weight >= candidate.weight && candidate.freq > victims.freq { + AdmissionResult::Admitted { + victim_nodes, + victims_weight: victims.weight, + } + } else { + AdmissionResult::Rejected + } } fn handle_update( &mut self, key: Rc, timestamp: Option, + policy_weight: u32, old_entry: ValueEntry, ) { + let old_policy_weight = old_entry.policy_weight(); + let entry = self.cache.get_mut(&key).unwrap(); entry.replace_deq_nodes_with(old_entry); if let Some(ts) = timestamp { entry.set_last_accessed(ts); entry.set_last_modified(ts); } + entry.set_policy_weight(policy_weight); + let deqs = &mut self.deques; deqs.move_to_back_ao(entry); - deqs.move_to_back_wo(entry) - } + if self.time_to_live.is_some() { + deqs.move_to_back_wo(entry); + } - fn evict(&mut self, now: Instant) { - const EVICTION_BATCH_SIZE: usize = 100; + self.saturating_sub_from_total_weight(old_policy_weight as u64); + self.saturating_add_to_total_weight(policy_weight as u64); + } + fn evict_expired(&mut self, now: Instant) { if self.time_to_live.is_some() { - self.remove_expired_wo(EVICTION_BATCH_SIZE, now); + let evicted = self.remove_expired_wo(EVICTION_BATCH_SIZE, now); + self.saturating_sub_from_total_weight(evicted); } if self.time_to_idle.is_some() { @@ -508,9 +634,13 @@ where ) }; - rm_expired_ao("window", window); - rm_expired_ao("probation", probation); - rm_expired_ao("protected", protected); + let evicted1 = rm_expired_ao("window", window); + let evicted2 = rm_expired_ao("probation", probation); + let evicted3 = rm_expired_ao("protected", protected); + + self.saturating_sub_from_total_weight(evicted1); + self.saturating_sub_from_total_weight(evicted2); + self.saturating_sub_from_total_weight(evicted3); } } @@ -523,7 +653,9 @@ where time_to_idle: &Option, batch_size: usize, now: Instant, - ) { + ) -> u64 { + let mut evicted = 0u64; + for _ in 0..batch_size { let key = deq .peek_front() @@ -540,18 +672,26 @@ where break; } - if let Some(mut entry) = cache.remove(&key.unwrap()) { + let key = key.unwrap(); + + if let Some(mut entry) = cache.remove(&key) { + let weight = entry.policy_weight(); Deques::unlink_ao_from_deque(deq_name, deq, &mut entry); Deques::unlink_wo(write_order_deq, &mut entry); + evicted = evicted.saturating_add(weight as u64); } else { deq.pop_front(); } } + + evicted } #[inline] - fn remove_expired_wo(&mut self, batch_size: usize, now: Instant) { + fn remove_expired_wo(&mut self, batch_size: usize, now: Instant) -> u64 { + let mut evicted = 0u64; let time_to_live = &self.time_to_live; + for _ in 0..batch_size { let key = self .deques @@ -570,13 +710,59 @@ where break; } - if let Some(mut entry) = self.cache.remove(&key.unwrap()) { + let key = key.unwrap(); + + if let Some(mut entry) = self.cache.remove(&key) { + let weight = entry.policy_weight(); self.deques.unlink_ao(&mut entry); Deques::unlink_wo(&mut self.deques.write_order, &mut entry); + evicted = evicted.saturating_sub(weight as u64); } else { self.deques.write_order.pop_front(); } } + + evicted + } + + #[inline] + fn evict_lru_entries(&mut self) { + const DEQ_NAME: &str = "probation"; + + let weights_to_evict = self.weights_to_evict(); + let mut evicted = 0u64; + + { + let deqs = &mut self.deques; + let (probation, wo, cache) = + (&mut deqs.probation, &mut deqs.write_order, &mut self.cache); + + for _ in 0..EVICTION_BATCH_SIZE { + if evicted >= weights_to_evict { + break; + } + + let key = probation + .peek_front() + .map(|node| Rc::clone(&node.element.key)); + + if key.is_none() { + break; + } + let key = key.unwrap(); + + if let Some(mut entry) = cache.remove(&key) { + let weight = entry.policy_weight(); + Deques::unlink_ao_from_deque(DEQ_NAME, probation, &mut entry); + Deques::unlink_wo(wo, &mut entry); + evicted = evicted.saturating_add(weight as u64); + } else { + probation.pop_front(); + } + } + } + + self.saturating_sub_from_total_weight(evicted); } } @@ -594,6 +780,48 @@ where } } +#[derive(Default)] +struct EntrySizeAndFrequency { + weight: u64, + freq: u32, +} + +impl EntrySizeAndFrequency { + fn new(policy_weight: u64) -> Self { + Self { + weight: policy_weight, + ..Default::default() + } + } + + fn add_policy_weight(&mut self, key: &K, value: &V, weigher: &mut Option>) { + self.weight += weigh(weigher, key, value) as u64; + } + + fn add_frequency(&mut self, freq: &FrequencySketch, hash: u64) { + self.freq += freq.frequency(hash) as u32; + } +} + +// Access-Order Queue Node +type AoqNode = NonNull>>; + +enum AdmissionResult { + Admitted { + victim_nodes: SmallVec<[AoqNode; 8]>, + victims_weight: u64, + }, + Rejected, +} + +// +// private free-standing functions +// +#[inline] +fn weigh(weigher: &mut Option>, key: &K, value: &V) -> u32 { + weigher.as_mut().map(|w| w(key, value)).unwrap_or(1) +} + // To see the debug prints, run test as `cargo test -- --nocapture` #[cfg(test)] mod tests { @@ -628,7 +856,7 @@ mod tests { assert_eq!(cache.get(&"d"), None); // d -> 2 // "d" should be admitted and "c" should be evicted - // because d's frequency is higher then c's. + // because d's frequency is higher than c's. cache.insert("d", "dennis"); assert_eq!(cache.get(&"a"), Some(&"alice")); assert_eq!(cache.get(&"b"), Some(&"bob")); @@ -639,6 +867,61 @@ mod tests { assert_eq!(cache.get(&"b"), None); } + #[test] + fn size_aware_eviction() { + let weigher = |_k: &&str, v: &(&str, u32)| v.1; + + let alice = ("alice", 10); + let bob = ("bob", 15); + let bill = ("bill", 20); + let cindy = ("cindy", 5); + let david = ("david", 15); + let dennis = ("dennis", 15); + + let mut cache = Cache::builder().max_capacity(31).weigher(weigher).build(); + + cache.insert("a", alice); + cache.insert("b", bob); + assert_eq!(cache.get(&"a"), Some(&alice)); + assert_eq!(cache.get(&"b"), Some(&bob)); + // order (LRU -> MRU) and counts: a -> 1, b -> 1 + + cache.insert("c", cindy); + assert_eq!(cache.get(&"c"), Some(&cindy)); + // order and counts: a -> 1, b -> 1, c -> 1 + + assert_eq!(cache.get(&"a"), Some(&alice)); + assert_eq!(cache.get(&"b"), Some(&bob)); + // order and counts: c -> 1, a -> 2, b -> 2 + + // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10). + // "d" must have higher count than 3, which is the aggregated count + // of "a" and "c". + cache.insert("d", david); // count: d -> 0 + assert_eq!(cache.get(&"d"), None); // d -> 1 + + cache.insert("d", david); + assert_eq!(cache.get(&"d"), None); // d -> 2 + + cache.insert("d", david); + assert_eq!(cache.get(&"d"), None); // d -> 3 + + cache.insert("d", david); + assert_eq!(cache.get(&"d"), None); // d -> 4 + + // Finally "d" should be admitted by evicting "c" and "a". + cache.insert("d", dennis); + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"b"), Some(&bob)); + assert_eq!(cache.get(&"c"), None); + assert_eq!(cache.get(&"d"), Some(&dennis)); + + // Update "b" with "bill" (w: 20). This should evict "d" (w: 15). + cache.insert("b", bill); + assert_eq!(cache.get(&"b"), Some(&bill)); + assert_eq!(cache.get(&"d"), None); + } + #[test] fn invalidate_all() { let mut cache = Cache::new(100);