-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamically scale fuse threads #411
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to think more about testing, you could factor out the Session::run
part of the worker pool into a shared closure, and write a stress test with a different run
function. Could even use shuttle for that if you were inclined.
/// Run the session loop that receives kernel requests and dispatches them to method | ||
/// calls into the filesystem. | ||
/// Version with before/after_dispatch callbacks. TODO: review/refactor | ||
pub fn run_with_callbacks<FA, FB>(&self, mut before_dispatch: FB, mut after_dispatch: FA) -> io::Result<()> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me, but we should change run
to just call this with empty callbacks.
mountpoint-s3/src/fuse/session.rs
Outdated
state: Arc<WorkerPoolState<FS>>, | ||
workers: Sender<WorkerHandle>, | ||
max_workers: usize, | ||
max_idle_workers: Option<usize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of think we shouldn't bother with the scaling down idle workers stuff -- in practice threads are cheap (the CRT already spawns a bunch) and it's a tricky balance to decide when to scale down. The real goal here is just to make concurrent workloads work automatically, which only scaling upwards should achieve.
(libfuse encourages not setting a max idle workers)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scaling down removed
mountpoint-s3/src/fuse/session.rs
Outdated
if !req.is_forget() { | ||
self.state.idle_worker_count.fetch_sub(1, Ordering::SeqCst); | ||
} | ||
|
||
if self.state.idle_worker_count.load(Ordering::SeqCst) == 0 { | ||
if let Err(error) = self.try_add_worker() { | ||
warn!(?error, "unable to spawn fuse worker"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The decrement and load need to happen atomically here, otherwise two racing requests could both see 0 and spawn two threads instead of one. fetch_sub
returns the previous value, so probably just move the whole thing inside if !req.is_forget()
and compare that to 1 to see if you should spawn something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, a comment on why we exclude forget
.
mountpoint-s3/src/fuse/session.rs
Outdated
if !pool.try_add_worker()? { | ||
return Err(anyhow::anyhow!("reached max worker threads")); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be impossible because we asserted max_workers > 0
, right?
@@ -135,7 +135,7 @@ struct CliArgs { | |||
|
|||
#[clap( | |||
long, | |||
help = "Number of FUSE daemon threads", | |||
help = "Maximum number of FUSE daemon threads", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably rename the actual flag to --max-threads
too
Factored out the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple suggestions. The fuser
changes look good to me, but we don't commit those directly to mainline because we've been trying to maintain fuser history separately. Instead, pull those into their own commit on the fuser/fork
branch (you can push that without reviewing), and then run vendor-fuser.sh
on the PR branch to pull the change in.
mountpoint-s3/src/fuse/session.rs
Outdated
#[test_case(10, 10)] | ||
#[test_case(10, 30)] | ||
#[test_case(30, 10)] | ||
fn test_worker_pool(max_worker_threads: usize, concurrent_messages: usize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a long time to parse what this test is doing, so probably deserves a comment. Here's my understanding: it tests that the spawning logic never under-spawns threads, by assigning each worker thread a work item that only completes when a flag is flipped, and then arranges for the flag to flip only once max_worker_threads
have been spawned. Neat!
I thought maybe you could use std::sync::Barrier
to make this simpler (set the barrier to max_worker_threads + 1
), but I guess it doesn't have a wait_timeout
so you'd block forever if the test broke.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think we should write one other, simpler test: the work item is just incrementing a shared counter. We check that the counter got incremented exactly as many times as we expected, and that the number of spawned threads was no greater than max_worker_threads
.
} else { | ||
assert_eq!(workers.len(), min_expected_workers); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasn't too hard to run this to a Shuttle test as well:
#[cfg(feature = "shuttle")]
mod shuttle_tests {
use shuttle::rand::Rng;
use shuttle::{check_pct, check_random};
fn test_worker_pool_helper() {
let mut rng = shuttle::rand::thread_rng();
let num_worker_threads = rng.gen_range(1..=8);
let num_concurrent_messages = rng.gen_range(1..=16);
super::test_worker_pool(num_worker_threads, num_concurrent_messages);
}
#[test]
fn test_worker_pool() {
check_random(test_worker_pool_helper, 10000);
check_pct(test_worker_pool_helper, 10000, 3);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Added together with the new test.
trait Work: Send + Sync + 'static { | ||
type Result: Send; | ||
|
||
fn run<FB, FA>(&self, before: FB, after: FA) -> Self::Result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a brief comment about the semantics of this function.
vendor/fuser/src/session.rs
Outdated
/// calls into the filesystem. | ||
/// This version also notifies callers of kernel requests before and after they | ||
/// are dispatched to the filesystem. | ||
pub fn run_and_notify<FA, FB>(&self, mut before_dispatch: FB, mut after_dispatch: FA) -> io::Result<()> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming nit: I'd probably call it run_with_callbacks
or something.
edit: lol, that's exactly what you called it the first time around
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted!
mountpoint-s3/src/main.rs
Outdated
@@ -135,13 +135,13 @@ struct CliArgs { | |||
|
|||
#[clap( | |||
long, | |||
help = "Number of FUSE daemon threads", | |||
help = "Maximum number of FUSE daemon threads", | |||
value_name = "N", | |||
default_value = "1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make the default here 16 (just a number I made up, feel free to make up your own).
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
72a85ae
to
5089ae4
Compare
Description of change
Introduce a pool of fuse worker threads that will scale dynamically up to a
max_workers
limit when receiving kernel requests.The implementation relies on the following changes in
fuser/fork
:Session::run_with_callbacks()
: version ofSession::run()
with callbacks before/after dispatch.Request::is_forget()
: we do not want to spawn new threads on spikes of forget/batch_forget requests.Relevant issues: #7
Does this change impact existing behavior?
New
--max-threads
option replaces--thread-count
and sets the maximum number of threads to spawn.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the Developer Certificate of Origin (DCO).