From d8234620109cf0f69089f37ca94c4fab22c0560f Mon Sep 17 00:00:00 2001 From: The 8472 Date: Thu, 9 Jun 2022 20:52:17 +0200 Subject: [PATCH 1/3] add cgroupv1 support to available_parallelism --- library/std/src/lib.rs | 1 + library/std/src/sys/unix/thread.rs | 173 +++++++++++++++++++++-------- library/std/src/thread/mod.rs | 2 +- 3 files changed, 128 insertions(+), 48 deletions(-) diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs index b1c68ec43bc99..05a2815632bd1 100644 --- a/library/std/src/lib.rs +++ b/library/std/src/lib.rs @@ -274,6 +274,7 @@ #![feature(hasher_prefixfree_extras)] #![feature(hashmap_internals)] #![feature(int_error_internals)] +#![feature(is_some_with)] #![feature(maybe_uninit_slice)] #![feature(maybe_uninit_write_slice)] #![feature(mixed_integer_ops)] diff --git a/library/std/src/sys/unix/thread.rs b/library/std/src/sys/unix/thread.rs index d191e1fe7a650..c5b905685700b 100644 --- a/library/std/src/sys/unix/thread.rs +++ b/library/std/src/sys/unix/thread.rs @@ -285,7 +285,7 @@ pub fn available_parallelism() -> io::Result { ))] { #[cfg(any(target_os = "android", target_os = "linux"))] { - let quota = cgroup2_quota().max(1); + let quota = cgroups::quota().max(1); let mut set: libc::cpu_set_t = unsafe { mem::zeroed() }; unsafe { if libc::sched_getaffinity(0, mem::size_of::(), &mut set) == 0 { @@ -379,49 +379,77 @@ pub fn available_parallelism() -> io::Result { } } -/// Returns cgroup CPU quota in core-equivalents, rounded down, or usize::MAX if the quota cannot -/// be determined or is not set. #[cfg(any(target_os = "android", target_os = "linux"))] -fn cgroup2_quota() -> usize { +mod cgroups { use crate::ffi::OsString; use crate::fs::{try_exists, File}; use crate::io::Read; use crate::os::unix::ffi::OsStringExt; use crate::path::PathBuf; + use crate::str::from_utf8; - let mut quota = usize::MAX; - if cfg!(miri) { - // Attempting to open a file fails under default flags due to isolation. - // And Miri does not have parallelism anyway. - return quota; - } - - let _: Option<()> = try { - let mut buf = Vec::with_capacity(128); - // find our place in the cgroup hierarchy - File::open("/proc/self/cgroup").ok()?.read_to_end(&mut buf).ok()?; - let cgroup_path = buf - .split(|&c| c == b'\n') - .filter_map(|line| { - let mut fields = line.splitn(3, |&c| c == b':'); - // expect cgroupv2 which has an empty 2nd field - if fields.nth(1) != Some(b"") { - return None; - } - let path = fields.last()?; - // skip leading slash - Some(path[1..].to_owned()) - }) - .next()?; - let cgroup_path = PathBuf::from(OsString::from_vec(cgroup_path)); + enum Cgroup { + V1, + V2, + } + + /// Returns cgroup CPU quota in core-equivalents, rounded down, or usize::MAX if the quota cannot + /// be determined or is not set. + pub(super) fn quota() -> usize { + let mut quota = usize::MAX; + if cfg!(miri) { + // Attempting to open a file fails under default flags due to isolation. + // And Miri does not have parallelism anyway. + return quota; + } + + let _: Option<()> = try { + let mut buf = Vec::with_capacity(128); + // find our place in the cgroup hierarchy + File::open("/proc/self/cgroup").ok()?.read_to_end(&mut buf).ok()?; + let (cgroup_path, version) = buf + .split(|&c| c == b'\n') + .filter_map(|line| { + let mut fields = line.splitn(3, |&c| c == b':'); + // 2nd field is a list of controllers for v1 or empty for v2 + let version = match fields.nth(1) { + Some(b"") => Some(Cgroup::V2), + Some(controllers) + if from_utf8(controllers) + .is_ok_and(|c| c.split(",").any(|c| c == "cpu")) => + { + Some(Cgroup::V1) + } + _ => None, + }?; + + let path = fields.last()?; + // skip leading slash + Some((path[1..].to_owned(), version)) + }) + .next()?; + let cgroup_path = PathBuf::from(OsString::from_vec(cgroup_path)); + + quota = match version { + Cgroup::V1 => quota_v1(cgroup_path), + Cgroup::V2 => quota_v2(cgroup_path), + }; + }; + + quota + } + + fn quota_v2(group_path: PathBuf) -> usize { + let mut quota = usize::MAX; let mut path = PathBuf::with_capacity(128); let mut read_buf = String::with_capacity(20); + // standard mount location defined in file-hierarchy(7) manpage let cgroup_mount = "/sys/fs/cgroup"; path.push(cgroup_mount); - path.push(&cgroup_path); + path.push(&group_path); path.push("cgroup.controllers"); @@ -432,30 +460,81 @@ fn cgroup2_quota() -> usize { path.pop(); - while path.starts_with(cgroup_mount) { - path.push("cpu.max"); + let _: Option<()> = try { + while path.starts_with(cgroup_mount) { + path.push("cpu.max"); + + read_buf.clear(); + + if File::open(&path).and_then(|mut f| f.read_to_string(&mut read_buf)).is_ok() { + let raw_quota = read_buf.lines().next()?; + let mut raw_quota = raw_quota.split(' '); + let limit = raw_quota.next()?; + let period = raw_quota.next()?; + match (limit.parse::(), period.parse::()) { + (Ok(limit), Ok(period)) => { + quota = quota.min(limit / period); + } + _ => {} + } + } - read_buf.clear(); + path.pop(); // pop filename + path.pop(); // pop dir + } + }; - if File::open(&path).and_then(|mut f| f.read_to_string(&mut read_buf)).is_ok() { - let raw_quota = read_buf.lines().next()?; - let mut raw_quota = raw_quota.split(' '); - let limit = raw_quota.next()?; - let period = raw_quota.next()?; - match (limit.parse::(), period.parse::()) { - (Ok(limit), Ok(period)) => { - quota = quota.min(limit / period); - } + quota + } + + fn quota_v1(group_path: PathBuf) -> usize { + let mut quota = usize::MAX; + let mut path = PathBuf::with_capacity(128); + let mut read_buf = String::with_capacity(20); + + // Hardcode commonly used locations mentioned in the cgroups(7) manpage + // since scanning mountinfo can be expensive on some systems. + // This isn't exactly standardized since cgroupv1 was meant to allow flexibly + // mixing and matching controller hierarchies. + let mounts = ["/sys/fs/cgroup/cpu", "/sys/fs/cgroup/cpu,cpuacct"]; + + for mount in mounts { + path.clear(); + path.push(mount); + path.push(&group_path); + + // skip if we guessed the mount incorrectly + if matches!(try_exists(&path), Err(_) | Ok(false)) { + continue; + } + + while path.starts_with(mount) { + let mut parse_file = |name| { + path.push(name); + read_buf.clear(); + + let mut f = File::open(&path).ok()?; + f.read_to_string(&mut read_buf).ok()?; + let parsed = read_buf.trim().parse::().ok()?; + + path.pop(); + Some(parsed) + }; + + let limit = parse_file("cpu.cfs_quota_us"); + let period = parse_file("cpu.cfs_period_us"); + + match (limit, period) { + (Some(limit), Some(period)) => quota = quota.min(limit / period), _ => {} } - } - path.pop(); // pop filename - path.pop(); // pop dir + path.pop(); + } } - }; - quota + quota + } } #[cfg(all( diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index 7f9b297e9dc3a..f02a7de04e5be 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -1571,7 +1571,7 @@ fn _assert_sync_and_send() { /// /// On Linux: /// - It may overcount the amount of parallelism available when limited by a -/// process-wide affinity mask or cgroup quotas and cgroup2 fs or `sched_getaffinity()` can't be +/// process-wide affinity mask or cgroup quotas and `sched_getaffinity()` or cgroup fs can't be /// queried, e.g. due to sandboxing. /// - It may undercount the amount of parallelism if the current thread's affinity mask /// does not reflect the process' cpuset, e.g. due to pinned threads. From b2c410ec572de2e0f3b0b2af0fc6546d0bcd626b Mon Sep 17 00:00:00 2001 From: The 8472 Date: Wed, 15 Jun 2022 23:14:40 +0200 Subject: [PATCH 2/3] scan mountinfo when hardcoded cgroupv1 mountpoints don't work --- library/std/src/sys/unix/thread.rs | 102 +++++++++++++++++++++++------ 1 file changed, 83 insertions(+), 19 deletions(-) diff --git a/library/std/src/sys/unix/thread.rs b/library/std/src/sys/unix/thread.rs index c5b905685700b..af281a255cc26 100644 --- a/library/std/src/sys/unix/thread.rs +++ b/library/std/src/sys/unix/thread.rs @@ -381,19 +381,27 @@ pub fn available_parallelism() -> io::Result { #[cfg(any(target_os = "android", target_os = "linux"))] mod cgroups { + //! Currently not covered + //! * cgroup v2 in non-standard mountpoints + //! * paths containing control characters or spaces, since those would be escaped in procfs + //! output and we don't unescape + use crate::borrow::Cow; use crate::ffi::OsString; use crate::fs::{try_exists, File}; use crate::io::Read; + use crate::io::{BufRead, BufReader}; use crate::os::unix::ffi::OsStringExt; + use crate::path::Path; use crate::path::PathBuf; use crate::str::from_utf8; + #[derive(PartialEq)] enum Cgroup { V1, V2, } - /// Returns cgroup CPU quota in core-equivalents, rounded down, or usize::MAX if the quota cannot + /// Returns cgroup CPU quota in core-equivalents, rounded down or usize::MAX if the quota cannot /// be determined or is not set. pub(super) fn quota() -> usize { let mut quota = usize::MAX; @@ -407,27 +415,30 @@ mod cgroups { let mut buf = Vec::with_capacity(128); // find our place in the cgroup hierarchy File::open("/proc/self/cgroup").ok()?.read_to_end(&mut buf).ok()?; - let (cgroup_path, version) = buf - .split(|&c| c == b'\n') - .filter_map(|line| { + let (cgroup_path, version) = + buf.split(|&c| c == b'\n').fold(None, |previous, line| { let mut fields = line.splitn(3, |&c| c == b':'); // 2nd field is a list of controllers for v1 or empty for v2 let version = match fields.nth(1) { - Some(b"") => Some(Cgroup::V2), + Some(b"") => Cgroup::V2, Some(controllers) if from_utf8(controllers) .is_ok_and(|c| c.split(",").any(|c| c == "cpu")) => { - Some(Cgroup::V1) + Cgroup::V1 } - _ => None, - }?; + _ => return previous, + }; + + // already-found v1 trumps v2 since it explicitly specifies its controllers + if previous.is_some() && version == Cgroup::V2 { + return previous; + } let path = fields.last()?; // skip leading slash Some((path[1..].to_owned(), version)) - }) - .next()?; + })?; let cgroup_path = PathBuf::from(OsString::from_vec(cgroup_path)); quota = match version { @@ -493,14 +504,21 @@ mod cgroups { let mut read_buf = String::with_capacity(20); // Hardcode commonly used locations mentioned in the cgroups(7) manpage - // since scanning mountinfo can be expensive on some systems. - // This isn't exactly standardized since cgroupv1 was meant to allow flexibly - // mixing and matching controller hierarchies. - let mounts = ["/sys/fs/cgroup/cpu", "/sys/fs/cgroup/cpu,cpuacct"]; + // if that doesn't work scan mountinfo and adjust `group_path` for bind-mounts + let mounts: &[fn(&Path) -> Option<(_, &Path)>] = &[ + |p| Some((Cow::Borrowed("/sys/fs/cgroup/cpu"), p)), + |p| Some((Cow::Borrowed("/sys/fs/cgroup/cpu,cpuacct"), p)), + // this can be expensive on systems with tons of mountpoints + // but we only get to this point when /proc/self/cgroups explicitly indicated + // this process belongs to a cpu-controller cgroup v1 and the defaults didn't work + find_mountpoint, + ]; for mount in mounts { + let Some((mount, group_path)) = mount(&group_path) else { continue }; + path.clear(); - path.push(mount); + path.push(mount.as_ref()); path.push(&group_path); // skip if we guessed the mount incorrectly @@ -508,16 +526,16 @@ mod cgroups { continue; } - while path.starts_with(mount) { + while path.starts_with(mount.as_ref()) { let mut parse_file = |name| { path.push(name); read_buf.clear(); - let mut f = File::open(&path).ok()?; - f.read_to_string(&mut read_buf).ok()?; + let f = File::open(&path); + path.pop(); // restore buffer before any early returns + f.ok()?.read_to_string(&mut read_buf).ok()?; let parsed = read_buf.trim().parse::().ok()?; - path.pop(); Some(parsed) }; @@ -531,10 +549,56 @@ mod cgroups { path.pop(); } + + // we passed the try_exists above so we should have traversed the correct hierarchy + // when reaching this line + break; } quota } + + /// Scan mountinfo for cgroup v1 mountpoint with a cpu controller + /// + /// If the cgroupfs is a bind mount then `group_path` is adjusted to skip + /// over the already-included prefix + fn find_mountpoint(group_path: &Path) -> Option<(Cow<'static, str>, &Path)> { + let mut reader = BufReader::new(File::open("/proc/self/mountinfo").ok()?); + let mut line = String::with_capacity(256); + loop { + line.clear(); + if reader.read_line(&mut line).ok()? == 0 { + break; + } + + let line = line.trim(); + let mut items = line.split(' '); + + let sub_path = items.nth(3)?; + let mount_point = items.next()?; + let mount_opts = items.next_back()?; + let filesystem_type = items.nth_back(1)?; + + if filesystem_type != "cgroup" || !mount_opts.split(',').any(|opt| opt == "cpu") { + // not a cgroup / not a cpu-controller + continue; + } + + let sub_path = Path::new(sub_path).strip_prefix("/").ok()?; + + if !group_path.starts_with(sub_path) { + // this is a bind-mount and the bound subdirectory + // does not contain the cgroup this process belongs to + continue; + } + + let trimmed_group_path = group_path.strip_prefix(sub_path).ok()?; + + return Some((Cow::Owned(mount_point.to_owned()), trimmed_group_path)); + } + + None + } } #[cfg(all( From 2e33c812e8b1bea563893f17ff5edcb7023d5d41 Mon Sep 17 00:00:00 2001 From: the8472 Date: Fri, 22 Jul 2022 22:18:07 +0200 Subject: [PATCH 3/3] [review] mention that runtime may scale with # of mountpoints Co-authored-by: Josh Triplett --- library/std/src/thread/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index f02a7de04e5be..add93b618a6dc 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -1575,6 +1575,11 @@ fn _assert_sync_and_send() { /// queried, e.g. due to sandboxing. /// - It may undercount the amount of parallelism if the current thread's affinity mask /// does not reflect the process' cpuset, e.g. due to pinned threads. +/// - If the process is in a cgroup v1 cpu controller, this may need to +/// scan mountpoints to find the corresponding cgroup v1 controller, +/// which may take time on systems with large numbers of mountpoints. +/// (This does not apply to cgroup v2, or to processes not in a +/// cgroup.) /// /// On all targets: /// - It may overcount the amount of parallelism available when running in a VM