Skip to content

Commit

Permalink
Remove the thread pool from future::Cache
Browse files Browse the repository at this point in the history
Update the integration tests for async runtimes.
  • Loading branch information
tatsuya6502 committed Aug 7, 2023
1 parent ca59ce8 commit 9e83dcc
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ futures-util = { version = "0.3", optional = true }
log = { version = "0.4", optional = true }

[dev-dependencies]
actix-rt = { version = "2.7", default-features = false }
actix-rt = "2.8"
ahash = "0.8.3"
anyhow = "1.0.19"
async-std = { version = "1.11", features = ["attributes"] }
Expand Down
88 changes: 58 additions & 30 deletions tests/runtime_actix_rt2.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#![cfg(all(test, feature = "future"))]

use actix_rt::Runtime;
use std::sync::Arc;

use actix_rt::System;
use moka::future::Cache;
use tokio::sync::Barrier;

#[test]
fn main() -> Result<(), Box<dyn std::error::Error>> {
const NUM_TASKS: usize = 16;
#[actix_rt::test]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
const NUM_TASKS: usize = 12;
const NUM_THREADS: usize = 4;
const NUM_KEYS_PER_TASK: usize = 64;

fn value(n: usize) -> String {
Expand All @@ -15,59 +19,83 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a cache that can store up to 10,000 entries.
let cache = Cache::new(10_000);

// Create Actix Runtime
let rt = Runtime::new()?;
let barrier = Arc::new(Barrier::new(NUM_THREADS + NUM_TASKS));

// Spawn async tasks and write to and read from the cache.
// NOTE: Actix Runtime is single threaded.
let tasks: Vec<_> = (0..NUM_TASKS)
.map(|i| {
// To share the same cache across the async tasks, clone it.
// This is a cheap operation.
// To share the same cache across the async tasks and OS threads, clone
// it. This is a cheap operation.
let my_cache = cache.clone();
let my_barrier = Arc::clone(&barrier);
let start = i * NUM_KEYS_PER_TASK;
let end = (i + 1) * NUM_KEYS_PER_TASK;

// NOTE: Actix Runtime is single threaded.
rt.spawn(async move {
actix_rt::spawn(async move {
// Wait for the all async tasks and threads to be spawned.
my_barrier.wait().await;

// Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
for key in start..end {
if key % 8 == 0 {
// TODO: Use async runtime's `block_on`.
// my_cache.blocking().insert(key, value(key));
my_cache.insert(key, value(key)).await;
} else {
// insert() is an async method, so await it
my_cache.insert(key, value(key)).await;
}
// get() returns Option<String>, a clone of the stored value.
my_cache.insert(key, value(key)).await;
assert_eq!(my_cache.get(&key).await, Some(value(key)));
}

// Invalidate every 4 element of the inserted entries.
for key in (start..end).step_by(4) {
if key % 8 == 0 {
// TODO: Use async runtime's `block_on`.
// my_cache.blocking().invalidate(&key);
my_cache.invalidate(&key).await;
} else {
// invalidate() is an async method, so await it
my_cache.invalidate(&key).await;
}
my_cache.invalidate(&key).await;
}
})
})
.collect();

// Spawn OS threads and write to and read from the cache.
let threads: Vec<_> = (0..NUM_THREADS)
.map(|i| i + NUM_TASKS)
.map(|i| {
let my_cache = cache.clone();
let my_barrier = Arc::clone(&barrier);
let start = i * NUM_KEYS_PER_TASK;
let end = (i + 1) * NUM_KEYS_PER_TASK;

std::thread::spawn(move || {
// It seems there is no way to get a SystemRunner from the current
// System (`System::current()`). So, create a new System.
let runner = System::new(); // Returns a SystemRunner.

// Wait for the all async tasks and threads to be spawned.
runner.block_on(my_barrier.wait());

// Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
for key in start..end {
runner.block_on(my_cache.insert(key, value(key)));
assert_eq!(runner.block_on(my_cache.get(&key)), Some(value(key)));
}

// Invalidate every 4 element of the inserted entries.
for key in (start..end).step_by(4) {
runner.block_on(my_cache.invalidate(&key));
}
})
})
.collect();

rt.block_on(futures_util::future::join_all(tasks));
futures_util::future::join_all(tasks).await;
for t in threads {
t.join().unwrap();
}

// Verify the result.
for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
if key % 4 == 0 {
assert_eq!(rt.block_on(cache.get(&key)), None);
assert_eq!(cache.get(&key).await, None);
} else {
assert_eq!(rt.block_on(cache.get(&key)), Some(value(key)));
assert_eq!(cache.get(&key).await, Some(value(key)));
}
}

System::current().stop();

Ok(())
}
69 changes: 49 additions & 20 deletions tests/runtime_async_std.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
#![cfg(all(test, feature = "future"))]

use std::sync::Arc;

// Use async_lock's Barrier instead of async_std's Barrier as the latter requires
// `unstable` feature (v1.12.0).
use async_lock::Barrier;
use moka::future::Cache;

#[async_std::test]
async fn main() {
const NUM_TASKS: usize = 16;
const NUM_TASKS: usize = 12;
const NUM_THREADS: usize = 4;
const NUM_KEYS_PER_TASK: usize = 64;

fn value(n: usize) -> String {
Expand All @@ -14,47 +20,70 @@ async fn main() {
// Create a cache that can store up to 10,000 entries.
let cache = Cache::new(10_000);

let barrier = Arc::new(Barrier::new(NUM_THREADS + NUM_TASKS));

// Spawn async tasks and write to and read from the cache.
let tasks: Vec<_> = (0..NUM_TASKS)
.map(|i| {
// To share the same cache across the async tasks, clone it.
// This is a cheap operation.
// To share the same cache across the async tasks and OS threads, clone
// it. This is a cheap operation.
let my_cache = cache.clone();
let my_barrier = Arc::clone(&barrier);
let start = i * NUM_KEYS_PER_TASK;
let end = (i + 1) * NUM_KEYS_PER_TASK;

async_std::task::spawn(async move {
// Wait for the all async tasks and threads to be spawned.
my_barrier.wait().await;

// Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
for key in start..end {
if key % 8 == 0 {
// TODO: Use async runtime's `block_on`.
// my_cache.blocking().insert(key, value(key));
my_cache.insert(key, value(key)).await;
} else {
// insert() is an async method, so await it
my_cache.insert(key, value(key)).await;
}
// get() returns Option<String>, a clone of the stored value.
my_cache.insert(key, value(key)).await;
assert_eq!(my_cache.get(&key).await, Some(value(key)));
}

// Invalidate every 4 element of the inserted entries.
for key in (start..end).step_by(4) {
if key % 8 == 0 {
// TODO: Use async runtime's `block_on`.
// my_cache.blocking().invalidate(&key);
my_cache.invalidate(&key).await;
} else {
// invalidate() is an async method, so await it
my_cache.invalidate(&key).await;
}
my_cache.invalidate(&key).await;
}
})
})
.collect();

// Spawn threads and write to and read from the cache.
let threads: Vec<_> = (0..NUM_THREADS)
.map(|i| i + NUM_TASKS)
.map(|i| {
let my_cache = cache.clone();
let my_barrier = Arc::clone(&barrier);
let start = i * NUM_KEYS_PER_TASK;
let end = (i + 1) * NUM_KEYS_PER_TASK;

std::thread::spawn(move || {
use async_std::task::block_on;

// Wait for the all async tasks and threads to be spawned.
block_on(my_barrier.wait());

// Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
for key in start..end {
block_on(my_cache.insert(key, value(key)));
assert_eq!(block_on(my_cache.get(&key)), Some(value(key)));
}

// Invalidate every 4 element of the inserted entries.
for key in (start..end).step_by(4) {
block_on(my_cache.invalidate(&key));
}
})
})
.collect();

// Wait for all tasks to complete.
futures_util::future::join_all(tasks).await;
for t in threads {
t.join().unwrap();
}

// Verify the result.
for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
Expand Down
68 changes: 47 additions & 21 deletions tests/runtime_tokio.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#![cfg(all(test, feature = "future"))]

use std::sync::Arc;

use moka::future::Cache;
use tokio::sync::Barrier;

#[tokio::test]
async fn main() {
const NUM_TASKS: usize = 16;
const NUM_TASKS: usize = 12;
const NUM_THREADS: usize = 4;
const NUM_KEYS_PER_TASK: usize = 64;

fn value(n: usize) -> String {
Expand All @@ -14,47 +18,69 @@ async fn main() {
// Create a cache that can store up to 10,000 entries.
let cache = Cache::new(10_000);

let barrier = Arc::new(Barrier::new(NUM_THREADS + NUM_TASKS));

// Spawn async tasks and write to and read from the cache.
let tasks: Vec<_> = (0..NUM_TASKS)
.map(|i| {
// To share the same cache across the async tasks, clone it.
// This is a cheap operation.
// To share the same cache across the async tasks and OS threads, clone
// it. This is a cheap operation.
let my_cache = cache.clone();
let my_barrier = Arc::clone(&barrier);
let start = i * NUM_KEYS_PER_TASK;
let end = (i + 1) * NUM_KEYS_PER_TASK;

tokio::spawn(async move {
// Wait for the all async tasks and threads to be spawned.
my_barrier.wait().await;

// Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
for key in start..end {
if key % 8 == 0 {
// TODO: Use async runtime's `block_on`.
// my_cache.blocking().insert(key, value(key));
my_cache.insert(key, value(key)).await;
} else {
// insert() is an async method, so await it
my_cache.insert(key, value(key)).await;
}
// get() returns Option<String>, a clone of the stored value.
my_cache.insert(key, value(key)).await;
assert_eq!(my_cache.get(&key).await, Some(value(key)));
}

// Invalidate every 4 element of the inserted entries.
for key in (start..end).step_by(4) {
if key % 8 == 0 {
// TODO: Use async runtime's `block_on`.
// my_cache.blocking().invalidate(&key);
my_cache.invalidate(&key).await;
} else {
// invalidate() is an async method, so await it
my_cache.invalidate(&key).await;
}
my_cache.invalidate(&key).await;
}
})
})
.collect();

// Wait for all tasks to complete.
// Spawn OS threads and write to and read from the cache.
let threads: Vec<_> = (0..NUM_THREADS)
.map(|i| i + NUM_TASKS)
.map(|i| {
let my_cache = cache.clone();
let my_barrier = Arc::clone(&barrier);
let start = i * NUM_KEYS_PER_TASK;
let end = (i + 1) * NUM_KEYS_PER_TASK;
let rt = tokio::runtime::Handle::current();

std::thread::spawn(move || {
// Wait for the all async tasks and threads to be spawned.
rt.block_on(my_barrier.wait());

// Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
for key in start..end {
rt.block_on(my_cache.insert(key, value(key)));
assert_eq!(rt.block_on(my_cache.get(&key)), Some(value(key)));
}

// Invalidate every 4 element of the inserted entries.
for key in (start..end).step_by(4) {
rt.block_on(my_cache.invalidate(&key));
}
})
})
.collect();

// Wait for all tasks and threads to complete.
futures_util::future::join_all(tasks).await;
for t in threads {
t.join().unwrap();
}

// Verify the result.
for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
Expand Down

0 comments on commit 9e83dcc

Please sign in to comment.