Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EMA based statistically adaptive thread pool design #108

Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions benches/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#![feature(test)]

extern crate test;

use async_std::task;
use async_std::task::blocking::JoinHandle;
use futures::future::join_all;
use std::thread;
use std::time::Duration;
use test::Bencher;

#[bench]
fn blocking(b: &mut Bencher) {
b.iter(|| {
let handles = (0..10_000)
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
.map(|_| {
task::blocking::spawn(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
})
.collect::<Vec<JoinHandle<()>>>();

task::block_on(join_all(handles));
});
}
93 changes: 74 additions & 19 deletions src/task/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,19 @@ use crate::future::Future;
use crate::task::{Context, Poll};
use crate::utils::abort_on_panic;

const LOW_WATERMARK: u64 = 2;
const MAX_THREADS: u64 = 10_000;

static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
// Pool task frequency calculation variables
static AVR_FREQUENCY: AtomicU64 = AtomicU64::new(0);
static FREQUENCY: AtomicU64 = AtomicU64::new(0);

// Pool speedup calculation variables
static SPEEDUP: AtomicU64 = AtomicU64::new(0);

// Pool size variables
static EXPECTED_POOL_SIZE: AtomicU64 = AtomicU64::new(LOW_WATERMARK);
static CURRENT_POOL_SIZE: AtomicU64 = AtomicU64::new(LOW_WATERMARK);

struct Pool {
sender: Sender<async_task::Task<()>>,
Expand All @@ -24,12 +34,13 @@ struct Pool {

lazy_static! {
static ref POOL: Pool = {
for _ in 0..2 {
for _ in 0..LOW_WATERMARK {
thread::Builder::new()
.name("async-blocking-driver".to_string())
.spawn(|| abort_on_panic(|| {
for task in &POOL.receiver {
task.run();
calculate_dispatch_frequency();
}
}))
.expect("cannot start a thread driving blocking tasks");
Expand All @@ -47,18 +58,34 @@ lazy_static! {
};
}

// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't
// receive any work after between one and ten seconds.
fn maybe_create_another_blocking_thread() {
// We use a `Relaxed` atomic operation because
// it's just a heuristic, and would not lose correctness
// even if it's random.
let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed);
if workers >= MAX_THREADS {
return;
fn calculate_dispatch_frequency() {
// Calculate current message processing rate here
let current_freq = FREQUENCY.fetch_sub(1, Ordering::Relaxed);
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
let avr_freq = AVR_FREQUENCY.load(Ordering::Relaxed);
spacejam marked this conversation as resolved.
Show resolved Hide resolved
let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::Relaxed);
spacejam marked this conversation as resolved.
Show resolved Hide resolved
let frequency = (avr_freq as f64 + current_freq as f64 / current_pool_size as f64) as u64;
AVR_FREQUENCY.store(frequency, Ordering::Relaxed);
spacejam marked this conversation as resolved.
Show resolved Hide resolved

// Adapt the thread count of pool
let speedup = SPEEDUP.load(Ordering::Relaxed);
spacejam marked this conversation as resolved.
Show resolved Hide resolved
if frequency > speedup {
// Speedup can be gained. Scale the pool up here.
SPEEDUP.store(frequency, Ordering::Relaxed);
spacejam marked this conversation as resolved.
Show resolved Hide resolved
EXPECTED_POOL_SIZE.store(current_pool_size + 1, Ordering::Relaxed);
spacejam marked this conversation as resolved.
Show resolved Hide resolved
} else {
// There is no need for the extra threads, schedule them to be closed.
let expected = EXPECTED_POOL_SIZE.load(Ordering::Relaxed);
spacejam marked this conversation as resolved.
Show resolved Hide resolved
if 2 * LOW_WATERMARK < expected {
// Substract amount of low watermark
EXPECTED_POOL_SIZE.fetch_sub(LOW_WATERMARK, Ordering::Relaxed);
spacejam marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// Creates yet another thread to receive tasks.
// Dynamic threads will terminate themselves if they don't
// receive any work after between one and ten seconds.
fn create_blocking_thread() {
// We want to avoid having all threads terminate at
// exactly the same time, causing thundering herd
// effects. We want to stagger their destruction over
Expand All @@ -73,25 +100,53 @@ fn maybe_create_another_blocking_thread() {
.spawn(move || {
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);

DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
CURRENT_POOL_SIZE.fetch_add(1, Ordering::SeqCst);
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
abort_on_panic(|| task.run());
calculate_dispatch_frequency();
}
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
CURRENT_POOL_SIZE.fetch_sub(1, Ordering::SeqCst);
})
.expect("cannot start a dynamic thread driving blocking tasks");
}

// Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work.
// nonblocking way and spinning up needed amount of threads
// based on the previous statistics without relying on
// if there is not a thread ready to accept the work or not.
fn schedule(t: async_task::Task<()>) {
// Add up for every incoming task schedule
FREQUENCY.fetch_add(1, Ordering::Relaxed);
vertexclique marked this conversation as resolved.
Show resolved Hide resolved

// Calculate the amount of threads needed to spin up
// then retry sending while blocking. It doesn't spin if
// expected pool size is above the MAX_THREADS (which is a
// case won't happen)
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
let pool_size = EXPECTED_POOL_SIZE.load(Ordering::Relaxed);
spacejam marked this conversation as resolved.
Show resolved Hide resolved
let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::SeqCst);
let reward = (AVR_FREQUENCY.load(Ordering::Relaxed) as f64 / 2.0_f64) as u64;
vertexclique marked this conversation as resolved.
Show resolved Hide resolved

if pool_size > current_pool_size && pool_size <= MAX_THREADS {
let needed = pool_size.saturating_sub(current_pool_size);

// For safety, check boundaries before spawning threads.
// This also won't be expected to happen. But better safe than sorry.
if needed > 0 && (needed < pool_size || needed < current_pool_size) {
(0..needed).for_each(|_| {
create_blocking_thread();
});
}
}

if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking. Try to spin up another thread and then
// retry sending while blocking.
maybe_create_another_blocking_thread();
// blocking.
POOL.sender.send(err.into_inner()).unwrap();
} else {
// Every successful dispatch, rewarded with negative
if reward + (2 * LOW_WATERMARK) < pool_size {
EXPECTED_POOL_SIZE.fetch_sub(reward, Ordering::Relaxed);
spacejam marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ mod pool;
mod sleep;
mod task;

pub(crate) mod blocking;
pub mod blocking;