From 5c884a106c29920e056d179983721b931994180f Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Tue, 11 Jun 2019 21:03:31 +1200 Subject: [PATCH] Limit RAM use during unpacking. Heavily deferred directories can lead to very large amounts of RAM holding pending IOs, and under low memory limits this caused failures. Reduce the stack size for IO worker threads to 1MB, as the default of 8MB on Linux is wasteful for IO worker threads. --- README.md | 8 +++- src/diskio/threaded.rs | 2 + src/dist/component/package.rs | 73 ++++++++++++++++++++++++++++++++--- 3 files changed, 76 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 62aae280a45..7d663fc5031 100644 --- a/README.md +++ b/README.md @@ -591,6 +591,7 @@ Command | Description ## Environment variables + - `RUSTUP_HOME` (default: `~/.rustup` or `%USERPROFILE%/.rustup`) Sets the root rustup folder, used for storing installed toolchains and configuration options. @@ -611,18 +612,21 @@ Command | Description - `RUSTUP_UPDATE_ROOT` (default `https://static.rust-lang.org/rustup`) Sets the root URL for downloading self-updates. -- `RUSTUP_IO_THREADS` (defaults to reported cpu count). Sets the +- `RUSTUP_IO_THREADS` *unstable* (defaults to reported cpu count). Sets the number of threads to perform close IO in. Set to `disabled` to force single-threaded IO for troubleshooting, or an arbitrary number to override automatic detection. -- `RUSTUP_TRACE_DIR` (default: no tracing) +- `RUSTUP_TRACE_DIR` *unstable* (default: no tracing) Enables tracing and determines the directory that traces will be written too. Traces are of the form PID.trace. Traces can be read by the Catapult project [tracing viewer][tv]. [tv]: (https://github.com/catapult-project/catapult/blob/master/tracing/README.md) +- `RUSTUP_UNPACK_RAM` *unstable* (default 400M, min 100M) + Caps the amount of RAM rustup will use for IO tasks while unpacking. + ## Other installation methods The primary installation method, as described at https://rustup.rs, differs by platform: diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index 842fdb48955..fb1f71bf910 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -40,6 +40,7 @@ impl<'a> Threaded<'a> { // more threads to get more IO dispatched at this stage in the process. let pool = threadpool::Builder::new() .thread_name("CloseHandle".into()) + .thread_stack_size(1_048_576) .build(); let (tx, rx) = channel(); Threaded { @@ -62,6 +63,7 @@ impl<'a> Threaded<'a> { let pool = threadpool::Builder::new() .thread_name("CloseHandle".into()) .num_threads(thread_count) + .thread_stack_size(1_048_576) .build(); let (tx, rx) = channel(); Threaded { diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index e195d161f5e..91a027dbd2b 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -11,6 +11,7 @@ use crate::utils::notifications::Notification; use crate::utils::utils; use std::collections::{HashMap, HashSet}; +use std::env; use std::fmt; use std::io::{self, ErrorKind as IOErrorKind, Read}; use std::iter::FromIterator; @@ -157,6 +158,51 @@ impl<'a> TarPackage<'a> { } } +struct MemoryBudget { + limit: usize, + used: usize, +} + +// Probably this should live in diskio but ¯\_(ツ)_/¯ +impl MemoryBudget { + fn new(max_file_size: usize) -> MemoryBudget { + const DEFAULT_UNPACK_RAM: usize = 400 * 1024 * 1024; + let unpack_ram = if let Ok(budget_str) = env::var("RUSTUP_UNPACK_RAM") { + if let Ok(budget) = budget_str.parse::() { + budget + } else { + DEFAULT_UNPACK_RAM + } + } else { + DEFAULT_UNPACK_RAM + }; + if max_file_size > unpack_ram { + panic!("RUSTUP_UNPACK_RAM must be larger than {}", max_file_size); + } + MemoryBudget { + limit: unpack_ram, + used: 0, + } + } + fn reclaim(&mut self, op: &Item) { + match &op.kind { + Kind::Directory => {} + Kind::File(content) => self.used -= content.len(), + }; + } + + fn claim(&mut self, op: &Item) { + match &op.kind { + Kind::Directory => {} + Kind::File(content) => self.used += content.len(), + }; + } + + fn available(&self) -> usize { + self.limit - self.used + } +} + /// Handle the async result of io operations /// Replaces op.result with Ok(()) fn filter_result(op: &mut Item) -> io::Result<()> { @@ -187,6 +233,7 @@ fn filter_result(op: &mut Item) -> io::Result<()> { fn trigger_children( io_executor: &mut dyn Executor, directories: &mut HashMap, + budget: &mut MemoryBudget, item: Item, ) -> Result { let mut result = 0; @@ -206,8 +253,9 @@ fn trigger_children( for pending_item in pending.into_iter() { for mut item in Vec::from_iter(io_executor.execute(pending_item)) { // TODO capture metrics + budget.reclaim(&item); filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - result += trigger_children(io_executor, directories, item)?; + result += trigger_children(io_executor, directories, budget, item)?; } } }; @@ -229,6 +277,9 @@ fn unpack_without_first_dir<'a, R: Read>( let entries = archive .entries() .chain_err(|| ErrorKind::ExtractingPackage)?; + const MAX_FILE_SIZE: u64 = 100_000_000; + let mut budget = MemoryBudget::new(MAX_FILE_SIZE as usize); + let mut directories: HashMap = HashMap::new(); // Path is presumed to exist. Call it a precondition. directories.insert(path.to_owned(), DirStatus::Exists); @@ -239,8 +290,9 @@ fn unpack_without_first_dir<'a, R: Read>( // our unpacked item is pending dequeue) for mut item in Vec::from_iter(io_executor.completed()) { // TODO capture metrics + budget.reclaim(&item); filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - trigger_children(&mut *io_executor, &mut directories, item)?; + trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; } let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?; @@ -266,9 +318,17 @@ fn unpack_without_first_dir<'a, R: Read>( } let size = entry.header().size()?; - if size > 100_000_000 { + if size > MAX_FILE_SIZE { return Err(format!("File too big {} {}", relpath.display(), size).into()); } + while size > budget.available() as u64 { + for mut item in Vec::from_iter(io_executor.completed()) { + // TODO capture metrics + budget.reclaim(&item); + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; + } + } // Bail out if we get hard links, device nodes or any other unusual content // - it is most likely an attack, as rusts cross-platform nature precludes // such artifacts @@ -309,6 +369,7 @@ fn unpack_without_first_dir<'a, R: Read>( } _ => return Err(ErrorKind::UnsupportedKind(format!("{:?}", kind)).into()), }; + budget.claim(&item); let item = loop { // Create the full path to the entry if it does not exist already @@ -343,8 +404,9 @@ fn unpack_without_first_dir<'a, R: Read>( for mut item in Vec::from_iter(io_executor.execute(item)) { // TODO capture metrics + budget.reclaim(&item); filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - trigger_children(&mut *io_executor, &mut directories, item)?; + trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; } } @@ -353,8 +415,9 @@ fn unpack_without_first_dir<'a, R: Read>( for mut item in Vec::from_iter(io_executor.join()) { // handle final IOs // TODO capture metrics + budget.reclaim(&item); filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - triggered += trigger_children(&mut *io_executor, &mut directories, item)?; + triggered += trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; } if triggered == 0 { // None of the IO submitted before the prior join triggered any new