Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EMA based statistically adaptive thread pool design #108

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions benches/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#![feature(test)]

extern crate test;

use async_std::task;
use async_std::task::blocking::JoinHandle;
use futures::future::join_all;
use std::thread;
use std::time::Duration;
use test::Bencher;

// Benchmark for a 10K burst task spawn
#[bench]
fn blocking(b: &mut Bencher) {
b.iter(|| {
let handles = (0..10_000)
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
.map(|_| {
task::blocking::spawn(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
})
.collect::<Vec<JoinHandle<()>>>();

task::block_on(join_all(handles));
});
}

// Benchmark for a single blocking task spawn
#[bench]
fn blocking_single(b: &mut Bencher) {
b.iter(|| {
task::blocking::spawn(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
});
}
261 changes: 236 additions & 25 deletions src/task/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,57 @@
//! A thread pool for running blocking functions asynchronously.
//!
//! Blocking thread pool consists of four elements:
//! * Frequency Detector
//! * Trend Estimator
//! * Predictive Upscaler
//! * Time-based Downscaler
//!
//! ## Frequency Detector
//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame.
//! Pool manager thread does this sampling every 200 milliseconds.
//! This value is going to be used for trend estimation phase.
//!
//! ## Trend Estimator
//! Hold up to the given number of frequencies to create an estimation.
//! Trend estimator holds 10 frequencies at a time.
//! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html).
//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm.
//!
//! This algorithm is adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61)
//! and altered to:
//! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value.
//! * use instead of linear trend estimation, it uses exponential trend estimation where formula is:
//! ```text
//! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK
//! ```
//! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count
//! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution)
//! * operate extensive subsampling. Extensive subsampling congests the pool manager thread.
//! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations.
//!
//! ## Predictive Upscaler
//! Upscaler has three cases (also can be seen in paper):
//! * The rate slightly increases and there are many idle threads.
//! * The number of worker threads tends to be reduced since the workload of the system is descending.
//! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed – throughput hogs)
//!
//! For the first two EMA calculation and exponential trend estimation gives good performance.
//! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen.
//!
//! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will got rejected.
//! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler.
//! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on
//! some OS or make congestion on the other thread utilizations of the program, because of context switch.
//!
//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency.
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors.
//!
//! ## Time-based Downscaler
//! When threads becomes idle, they will not shut down immediately.
//! Instead, they wait a random amount between 1 and 11 seconds
//! to even out the load.

use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
Expand All @@ -10,21 +62,44 @@ use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static;

use crate::future::Future;
use crate::io::ErrorKind;
use crate::task::{Context, Poll};
use crate::utils::abort_on_panic;
use std::sync::Mutex;

const MAX_THREADS: u64 = 10_000;
/// Low watermark value, defines the bare minimum of the pool.
/// Spawns initial thread set.
const LOW_WATERMARK: u64 = 2;

static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
/// Pool managers interval time (milliseconds).
/// This is the actual interval which makes adaptation calculation.
const MANAGER_POLL_INTERVAL: u64 = 200;

/// Frequency histogram's sliding window size.
/// Defines how many frequencies will be considered for adaptation.
const FREQUENCY_QUEUE_SIZE: usize = 10;

/// Exponential moving average smoothing coefficient for limited window.
/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64);

/// Pool task frequency variable.
/// Holds scheduled tasks onto the thread pool for the calculation time window.
static FREQUENCY: AtomicU64 = AtomicU64::new(0);

/// Possible max threads (without OS contract).
static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000);

/// Pool interface between the scheduler and thread pool
struct Pool {
sender: Sender<async_task::Task<()>>,
receiver: Receiver<async_task::Task<()>>,
}

lazy_static! {
/// Blocking pool with static starting thread count.
static ref POOL: Pool = {
for _ in 0..2 {
for _ in 0..LOW_WATERMARK {
thread::Builder::new()
.name("async-blocking-driver".to_string())
.spawn(|| abort_on_panic(|| {
Expand All @@ -35,6 +110,19 @@ lazy_static! {
.expect("cannot start a thread driving blocking tasks");
}

// Pool manager to check frequency of task rates
// and take action by scaling the pool accordingly.
thread::Builder::new()
.name("async-pool-manager".to_string())
.spawn(|| abort_on_panic(|| {
let poll_interval = Duration::from_millis(MANAGER_POLL_INTERVAL);
loop {
scale_pool();
thread::sleep(poll_interval);
}
}))
.expect("thread pool manager cannot be started");

// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
Expand All @@ -45,52 +133,175 @@ lazy_static! {
let (sender, receiver) = bounded(0);
Pool { sender, receiver }
};

/// Sliding window for pool task frequency calculation
static ref FREQ_QUEUE: Mutex<VecDeque<u64>> = {
Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE.saturating_add(1)))
};

/// Dynamic pool thread count variable
static ref POOL_SIZE: Mutex<u64> = Mutex::new(LOW_WATERMARK);
}

// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't
// receive any work after between one and ten seconds.
fn maybe_create_another_blocking_thread() {
// We use a `Relaxed` atomic operation because
// it's just a heuristic, and would not lose correctness
// even if it's random.
let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed);
if workers >= MAX_THREADS {
return;
/// Exponentially Weighted Moving Average calculation
///
/// This allows us to find the EMA value.
/// This value represents the trend of tasks mapped onto the thread pool.
/// Calculation is following:
/// ```text
/// +--------+-----------------+----------------------------------+
/// | Symbol | Identifier | Explanation |
/// +--------+-----------------+----------------------------------+
/// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
/// | Yt | freq | frequency sample at time t |
/// | St | acc | EMA at time t |
/// +--------+-----------------+----------------------------------+
/// ```
/// Under these definitions formula is following:
/// ```text
/// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
/// ```
/// # Arguments
///
/// * `freq_queue` - Sliding window of frequency samples
#[inline]
Copy link
Contributor

@spacejam spacejam Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, humans are bad at guessing whether something should be inlined. Usually in rust it's better to leave these attributes out unless some code is going to be executed in a super hot path and you've done work to ensure the improvement is measurable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not contributing to the overall discussion + review. I’ve found it quite a bit out of context and also from the book. I've made the benchmarks already with and without and decided to leave it inlined. You can see below my last run again after your comment (run shouldn't be needed because of how compiler inlines and unrolls):

Mean of 10 consecutive runs of blocking benchmark:

43057207.9 ns without inlining
41816626.1 ns with inlining

That is also better for users of this library indirectly.

Copy link
Contributor

@spacejam spacejam Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit skeptical that change is related to the inline here because this is a function that gets called 5 times per second, and that measurement skew is ~1ms. For a workload that lasts ~40ms, it's not clear to me how measuring things that happen on the order of once every 200ms is relevant.

I'm asking you to simplify your proposed code because we are going to have to keep it working over time, and it's important not to add bits that add complexity for humans without clear benefits.

fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| {
acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64))
}) * EMA_COEFFICIENT as f64
}

/// Adaptive pool scaling function
///
/// This allows to spawn new threads to make room for incoming task pressure.
/// Works in the background detached from the pool system and scales up the pool based
/// on the request rate.
///
/// It uses frequency based calculation to define work. Utilizing average processing rate.
fn scale_pool() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another context there was some discussion around adding tracing (using log's trace! macro) to async-std. This functions seems like a good place for that, too.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log::trace or the new tracing library? wondering about how much overhead each would add

Copy link
Contributor

@killercup killercup Aug 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just know that the idea was thrown around, and it was probably an opt-in feature, too. @stjepang or @yoshuawuyts would know more :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log now has kv support, which we're already using in other places too. tracing can pick these up too, so using log is probably the best choice here overall (:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should get the kv support in tracing-log soon-ish? However I don't believe that log supports the span-like macros, so async-std might need to have its own, internal span-like macro and dispatch conditionally to async-log or tracing.

I haven't considered how the differences between async-log's span and tracing's span can be bridged, but I think it's feasible if tracing's span!().enter() is used to mimic async-log's nested span block.

Copy link
Contributor

@yoshuawuyts yoshuawuyts Sep 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidbarsky we could use async-log::span for this, probably. Though there's still a few open issues on the repo, the interface wouldn't change.

From convos with Eliza during RustConf there's def a desire to bridge between the two approaches, and we mostly need to figure out how to go about it. async-rs/async-log#7 is what I've currently got (repo incoming soon), but need to check how well that works (:

Copy link

@davidbarsky davidbarsky Sep 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yoshuawuyts Yep! I think we're on the same page about wanting to bridge the two libraries.

I'm spitballing one mechanism on bridging that gap, which consists of using mutually exclusive feature flags in a build.rs to dispatch to either async-log or tracing. This would entail using an async-std-specific span macro until the gaps—if any exist—between async-std's span and tracing's span are closed. I'm personally excited to dig into rust-lang/log#353!

I'm sorry if what I communicated wasn't clear!

// Fetch current frequency, it does matter that operations are ordered in this approach.
let current_frequency = FREQUENCY.swap(0, Ordering::SeqCst);
let mut freq_queue = FREQ_QUEUE.lock().unwrap();

// Make it safe to start for calculations by adding initial frequency scale
if freq_queue.len() == 0 {
freq_queue.push_back(0);
}

// Calculate message rate for the given time window
let frequency = (current_frequency as f64 / MANAGER_POLL_INTERVAL as f64) as u64;

// Calculates current time window's EMA value (including last sample)
let prev_ema_frequency = calculate_ema(&freq_queue);

// Add seen frequency data to the frequency histogram.
freq_queue.push_back(frequency);
if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) {
freq_queue.pop_front();
}

// Calculates current time window's EMA value (including last sample)
let curr_ema_frequency = calculate_ema(&freq_queue);

// Adapts the thread count of pool
//
// Sliding window of frequencies visited by the pool manager.
// Pool manager creates EMA value for previous window and current window.
// Compare them to determine scaling amount based on the trends.
// If current EMA value is bigger, we will scale up.
if curr_ema_frequency > prev_ema_frequency {
// "Scale by" amount can be seen as "how much load is coming".
// "Scale" amount is "how many threads we should spawn".
let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
let scale = num_cpus::get()
.min(((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as usize);

// It is time to scale the pool!
(0..scale).for_each(|_| {
create_blocking_thread();
});
} else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON
&& current_frequency != 0
{
// Throughput is low. Allocate more threads to unblock flow.
// If we fall to this case, scheduler is congested by longhauling tasks.
// For unblock the flow we should add up some threads to the pool, but not that many to
// stagger the program's operation.
(0..LOW_WATERMARK).for_each(|_| {
create_blocking_thread();
});
}
}

/// Creates blocking thread to receive tasks
/// Dynamic threads will terminate themselves if they don't
/// receive any work after between one and ten seconds.
fn create_blocking_thread() {
// Check that thread is spawnable.
// If it hits to the OS limits don't spawn it.
{
let pool_size = *POOL_SIZE.lock().unwrap();
if pool_size >= MAX_THREADS.load(Ordering::SeqCst) {
MAX_THREADS.store(10_000, Ordering::SeqCst);
return;
}
}
// We want to avoid having all threads terminate at
// exactly the same time, causing thundering herd
// effects. We want to stagger their destruction over
// 10 seconds or so to make the costs fade into
// background noise.
//
// Generate a simple random number of milliseconds
let rand_sleep_ms = u64::from(random(10_000));
let rand_sleep_ms = 1000_u64
.checked_add(u64::from(random(10_000)))
.expect("shouldn't overflow");

thread::Builder::new()
let _ = thread::Builder::new()
.name("async-blocking-driver-dynamic".to_string())
.spawn(move || {
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);
let wait_limit = Duration::from_millis(rand_sleep_ms);

DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
// Adjust the pool size counter before and after spawn
*POOL_SIZE.lock().unwrap() += 1;
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
abort_on_panic(|| task.run());
}
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
*POOL_SIZE.lock().unwrap() -= 1;
})
.expect("cannot start a dynamic thread driving blocking tasks");
.map_err(|err| {
match err.kind() {
ErrorKind::WouldBlock => {
// Maximum allowed threads per process is varying from system to system.
// Also, some systems have it(like macOS), and some don't(Linux).
// This case expected not to happen.
// But when happened this shouldn't throw a panic.
let guarded_count = POOL_SIZE
.lock()
.unwrap()
.checked_sub(1)
.expect("shouldn't underflow");
MAX_THREADS.store(guarded_count, Ordering::SeqCst);
}
_ => eprintln!(
"cannot start a dynamic thread driving blocking tasks: {}",
err
),
}
});
}

// Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work.
/// Enqueues work, attempting to send to the thread pool in a
/// nonblocking way and spinning up needed amount of threads
/// based on the previous statistics without relying on
/// if there is not a thread ready to accept the work or not.
fn schedule(t: async_task::Task<()>) {
// Add up for every incoming scheduled task
FREQUENCY.fetch_add(1, Ordering::Acquire);

if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking. Try to spin up another thread and then
// retry sending while blocking.
maybe_create_another_blocking_thread();
// blocking.
POOL.sender.send(err.into_inner()).unwrap();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ mod pool;
mod sleep;
mod task;

pub(crate) mod blocking;
pub mod blocking;
Loading