Skip to content

Commit

Permalink
executor: rewrite the work-stealing thread pool (#1657)
Browse files Browse the repository at this point in the history
This patch is a ground up rewrite of the existing work-stealing thread
pool. The goal is to reduce overhead while simplifying code when
possible.

At a high level, the following architectural changes were made:

- The local run queues were switched for bounded circle buffer queues.
- Reduce cross-thread synchronization.
- Refactor task constructs to use a single allocation and always include
  a join handle (#887).
- Simplify logic around putting workers to sleep and waking them up.

**Local run queues**

Move away from crossbeam's implementation of the Chase-Lev deque. This
implementation included unnecessary overhead as it supported
capabilities that are not needed for the work-stealing thread pool.
Instead, a fixed size circle buffer is used for the local queue. When
the local queue is full, half of the tasks contained in it are moved to
the global run queue.

**Reduce cross-thread synchronization**

This is done via many small improvements. Primarily, an upper bound is
placed on the number of concurrent stealers. Limiting the number of
stealers results in lower contention. Secondly, the rate at which
workers are notified and woken up is throttled. This also reduces
contention by preventing many threads from racing to steal work.

**Refactor task structure**

Now that Tokio is able to target a rust version that supports
`std::alloc` as well as `std::task`, the pool is able to optimize how
the task structure is laid out. Now, a single allocation per task is
required and a join handle is always provided enabling the spawner to
retrieve the result of the task (#887).

**Simplifying logic**

When possible, complexity is reduced in the implementation. This is done
by using locks and other simpler constructs in cold paths. The set of
sleeping workers is now represented as a `Mutex<VecDeque<usize>>`.
Instead of optimizing access to this structure, we reduce the amount the
pool must access this structure.

Secondly, we have (temporarily) removed `threadpool::blocking`. This
capability will come back later, but the original implementation was way
more complicated than necessary.

**Results**

The thread pool benchmarks have improved significantly:

Old thread pool:

```
test chained_spawn ... bench:   2,019,796 ns/iter (+/- 302,168)
test ping_pong     ... bench:   1,279,948 ns/iter (+/- 154,365)
test spawn_many    ... bench:  10,283,608 ns/iter (+/- 1,284,275)
test yield_many    ... bench:  21,450,748 ns/iter (+/- 1,201,337)
```

New thread pool:

```
test chained_spawn ... bench:     147,943 ns/iter (+/- 6,673)
test ping_pong     ... bench:     537,744 ns/iter (+/- 20,928)
test spawn_many    ... bench:   7,454,898 ns/iter (+/- 283,449)
test yield_many    ... bench:  16,771,113 ns/iter (+/- 733,424)
```

Real-world benchmarks improve significantly as well. This is testing the hyper hello
world server using: `wrk -t1 -c50 -d10`:

Old scheduler:

```
Running 10s test @ http://127.0.0.1:3000
  1 threads and 50 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   371.53us   99.05us   1.97ms   60.53%
    Req/Sec   114.61k     8.45k  133.85k    67.00%
  1139307 requests in 10.00s, 95.61MB read
Requests/sec: 113923.19
Transfer/sec:      9.56MB
```

New scheduler:

```
Running 10s test @ http://127.0.0.1:3000
  1 threads and 50 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   275.05us   69.81us   1.09ms   73.57%
    Req/Sec   153.17k    10.68k  171.51k    71.00%
  1522671 requests in 10.00s, 127.79MB read
Requests/sec: 152258.70
Transfer/sec:     12.78MB
```
  • Loading branch information
carllerche authored Oct 19, 2019
1 parent 2a18132 commit ed5a94e
Show file tree
Hide file tree
Showing 100 changed files with 7,408 additions and 6,795 deletions.
9 changes: 8 additions & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
tokio-codec: []
tokio-executor:
- current-thread
- threadpool
- thread-pool
tokio-io:
- util
tokio-sync:
Expand Down Expand Up @@ -94,6 +94,13 @@ jobs:
- tokio-no-features
- tokio-with-net

# Run loom tests
- template: ci/azure-loom.yml
parameters:
rust: beta
crates:
- tokio-executor

# Try cross compiling
- template: ci/azure-cross-compile.yml
parameters:
Expand Down
18 changes: 18 additions & 0 deletions ci/azure-loom.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
jobs:
- job: loom
displayName: Loom tests
pool:
vmImage: ubuntu-16.04

steps:
- template: azure-install-rust.yml
parameters:
rust_version: ${{ parameters.rust }}

- ${{ each crate in parameters.crates }}:
- script: RUSTFLAGS="--cfg loom" cargo test --lib --release
env:
LOOM_MAX_PREEMPTIONS: 2
CI: 'True'
displayName: test ${{ crate }}
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}
23 changes: 7 additions & 16 deletions tokio-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,31 @@ categories = ["concurrency", "asynchronous"]
[features]
blocking = ["tokio-sync", "lazy_static"]
current-thread = ["crossbeam-channel"]
threadpool = [
"tokio-sync",
"crossbeam-deque",
"crossbeam-queue",
"crossbeam-utils",
"futures-core-preview",
"num_cpus",
"lazy_static",
"slab",
]
thread-pool = ["num_cpus"]

[dependencies]
futures-util-preview = { version = "=0.3.0-alpha.19", features = ["channel"] }
tokio-sync = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-sync" }

tracing = { version = "0.1.5", optional = true }
futures-util-preview = { version = "=0.3.0-alpha.19", features = ["channel"] }

# current-thread dependencies
crossbeam-channel = { version = "0.3.8", optional = true }

# threadpool dependencies
crossbeam-deque = { version = "0.7.0", optional = true }
crossbeam-queue = { version = "0.1.0", optional = true }
crossbeam-utils = { version = "0.6.4", optional = true }
futures-core-preview = { version = "=0.3.0-alpha.19", optional = true }
num_cpus = { version = "1.2", optional = true }

# blocking
futures-core-preview = { version = "=0.3.0-alpha.19", optional = true }
lazy_static = { version = "1", optional = true }
slab = { version = "0.4.1", optional = true }

[dev-dependencies]
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" }
tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" }

futures-core-preview = "=0.3.0-alpha.19"
loom = { version = "0.2.9", features = ["futures", "checkpoint"] }
rand = "0.7"

[package.metadata.docs.rs]
Expand Down
133 changes: 0 additions & 133 deletions tokio-executor/benches/blocking.rs

This file was deleted.

161 changes: 161 additions & 0 deletions tokio-executor/benches/thread_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#![feature(test)]

extern crate test;

use tokio_executor::thread_pool::{Builder, Spawner, ThreadPool};
use tokio_sync::oneshot;

use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{mpsc, Arc};
use std::task::{Context, Poll};

struct Backoff(usize);

impl Future for Backoff {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.0 == 0 {
Poll::Ready(())
} else {
self.0 -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

const NUM_THREADS: usize = 6;

#[bench]
fn spawn_many(b: &mut test::Bencher) {
const NUM_SPAWN: usize = 10_000;

let threadpool = Builder::new().num_threads(NUM_THREADS).build();

let (tx, rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));

b.iter(|| {
rem.store(NUM_SPAWN, Relaxed);

for _ in 0..NUM_SPAWN {
let tx = tx.clone();
let rem = rem.clone();

threadpool.spawn(async move {
if 1 == rem.fetch_sub(1, Relaxed) {
tx.send(()).unwrap();
}
});
}

let _ = rx.recv().unwrap();
});
}

#[bench]
fn yield_many(b: &mut test::Bencher) {
const NUM_YIELD: usize = 1_000;
const TASKS_PER_CPU: usize = 50;

let threadpool = Builder::new().num_threads(NUM_THREADS).build();

let tasks = TASKS_PER_CPU * num_cpus::get_physical();
let (tx, rx) = mpsc::sync_channel(tasks);

b.iter(move || {
for _ in 0..tasks {
let tx = tx.clone();

threadpool.spawn(async move {
let backoff = Backoff(NUM_YIELD);
backoff.await;
tx.send(()).unwrap();
});
}

for _ in 0..tasks {
let _ = rx.recv().unwrap();
}
});
}

#[bench]
fn ping_pong(b: &mut test::Bencher) {
const NUM_PINGS: usize = 1_000;

let threadpool = Builder::new().num_threads(NUM_THREADS).build();

let (done_tx, done_rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));

b.iter(|| {
let done_tx = done_tx.clone();
let rem = rem.clone();
rem.store(NUM_PINGS, Relaxed);

let spawner = threadpool.spawner().clone();

threadpool.spawn(async move {
for _ in 0..NUM_PINGS {
let rem = rem.clone();
let done_tx = done_tx.clone();

let spawner2 = spawner.clone();

spawner.spawn(async move {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

spawner2.spawn(async move {
rx1.await.unwrap();
tx2.send(()).unwrap();
});

tx1.send(()).unwrap();
rx2.await.unwrap();

if 1 == rem.fetch_sub(1, Relaxed) {
done_tx.send(()).unwrap();
}
});
}
});

done_rx.recv().unwrap();
});
}

#[bench]
fn chained_spawn(b: &mut test::Bencher) {
const ITER: usize = 1_000;

let threadpool = Builder::new().num_threads(NUM_THREADS).build();

fn iter(spawner: Spawner, done_tx: mpsc::SyncSender<()>, n: usize) {
if n == 0 {
done_tx.send(()).unwrap();
} else {
let s2 = spawner.clone();
spawner.spawn(async move {
iter(s2, done_tx, n - 1);
});
}
}

let (done_tx, done_rx) = mpsc::sync_channel(1000);

b.iter(move || {
let done_tx = done_tx.clone();
let spawner = threadpool.spawner().clone();
threadpool.spawn(async move {
iter(spawner, done_tx, ITER);
});

done_rx.recv().unwrap();
});
}
Loading

0 comments on commit ed5a94e

Please sign in to comment.