diff --git a/README.md b/README.md index 62aae280a45..7d663fc5031 100644 --- a/README.md +++ b/README.md @@ -591,6 +591,7 @@ Command | Description ## Environment variables + - `RUSTUP_HOME` (default: `~/.rustup` or `%USERPROFILE%/.rustup`) Sets the root rustup folder, used for storing installed toolchains and configuration options. @@ -611,18 +612,21 @@ Command | Description - `RUSTUP_UPDATE_ROOT` (default `https://static.rust-lang.org/rustup`) Sets the root URL for downloading self-updates. -- `RUSTUP_IO_THREADS` (defaults to reported cpu count). Sets the +- `RUSTUP_IO_THREADS` *unstable* (defaults to reported cpu count). Sets the number of threads to perform close IO in. Set to `disabled` to force single-threaded IO for troubleshooting, or an arbitrary number to override automatic detection. -- `RUSTUP_TRACE_DIR` (default: no tracing) +- `RUSTUP_TRACE_DIR` *unstable* (default: no tracing) Enables tracing and determines the directory that traces will be written too. Traces are of the form PID.trace. Traces can be read by the Catapult project [tracing viewer][tv]. [tv]: (https://github.com/catapult-project/catapult/blob/master/tracing/README.md) +- `RUSTUP_UNPACK_RAM` *unstable* (default 400M, min 100M) + Caps the amount of RAM rustup will use for IO tasks while unpacking. + ## Other installation methods The primary installation method, as described at https://rustup.rs, differs by platform: diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index 842fdb48955..fb1f71bf910 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -40,6 +40,7 @@ impl<'a> Threaded<'a> { // more threads to get more IO dispatched at this stage in the process. let pool = threadpool::Builder::new() .thread_name("CloseHandle".into()) + .thread_stack_size(1_048_576) .build(); let (tx, rx) = channel(); Threaded { @@ -62,6 +63,7 @@ impl<'a> Threaded<'a> { let pool = threadpool::Builder::new() .thread_name("CloseHandle".into()) .num_threads(thread_count) + .thread_stack_size(1_048_576) .build(); let (tx, rx) = channel(); Threaded { diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index e195d161f5e..91a027dbd2b 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -11,6 +11,7 @@ use crate::utils::notifications::Notification; use crate::utils::utils; use std::collections::{HashMap, HashSet}; +use std::env; use std::fmt; use std::io::{self, ErrorKind as IOErrorKind, Read}; use std::iter::FromIterator; @@ -157,6 +158,51 @@ impl<'a> TarPackage<'a> { } } +struct MemoryBudget { + limit: usize, + used: usize, +} + +// Probably this should live in diskio but ¯\_(ツ)_/¯ +impl MemoryBudget { + fn new(max_file_size: usize) -> MemoryBudget { + const DEFAULT_UNPACK_RAM: usize = 400 * 1024 * 1024; + let unpack_ram = if let Ok(budget_str) = env::var("RUSTUP_UNPACK_RAM") { + if let Ok(budget) = budget_str.parse::() { + budget + } else { + DEFAULT_UNPACK_RAM + } + } else { + DEFAULT_UNPACK_RAM + }; + if max_file_size > unpack_ram { + panic!("RUSTUP_UNPACK_RAM must be larger than {}", max_file_size); + } + MemoryBudget { + limit: unpack_ram, + used: 0, + } + } + fn reclaim(&mut self, op: &Item) { + match &op.kind { + Kind::Directory => {} + Kind::File(content) => self.used -= content.len(), + }; + } + + fn claim(&mut self, op: &Item) { + match &op.kind { + Kind::Directory => {} + Kind::File(content) => self.used += content.len(), + }; + } + + fn available(&self) -> usize { + self.limit - self.used + } +} + /// Handle the async result of io operations /// Replaces op.result with Ok(()) fn filter_result(op: &mut Item) -> io::Result<()> { @@ -187,6 +233,7 @@ fn filter_result(op: &mut Item) -> io::Result<()> { fn trigger_children( io_executor: &mut dyn Executor, directories: &mut HashMap, + budget: &mut MemoryBudget, item: Item, ) -> Result { let mut result = 0; @@ -206,8 +253,9 @@ fn trigger_children( for pending_item in pending.into_iter() { for mut item in Vec::from_iter(io_executor.execute(pending_item)) { // TODO capture metrics + budget.reclaim(&item); filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - result += trigger_children(io_executor, directories, item)?; + result += trigger_children(io_executor, directories, budget, item)?; } } }; @@ -229,6 +277,9 @@ fn unpack_without_first_dir<'a, R: Read>( let entries = archive .entries() .chain_err(|| ErrorKind::ExtractingPackage)?; + const MAX_FILE_SIZE: u64 = 100_000_000; + let mut budget = MemoryBudget::new(MAX_FILE_SIZE as usize); + let mut directories: HashMap = HashMap::new(); // Path is presumed to exist. Call it a precondition. directories.insert(path.to_owned(), DirStatus::Exists); @@ -239,8 +290,9 @@ fn unpack_without_first_dir<'a, R: Read>( // our unpacked item is pending dequeue) for mut item in Vec::from_iter(io_executor.completed()) { // TODO capture metrics + budget.reclaim(&item); filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - trigger_children(&mut *io_executor, &mut directories, item)?; + trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; } let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?; @@ -266,9 +318,17 @@ fn unpack_without_first_dir<'a, R: Read>( } let size = entry.header().size()?; - if size > 100_000_000 { + if size > MAX_FILE_SIZE { return Err(format!("File too big {} {}", relpath.display(), size).into()); } + while size > budget.available() as u64 { + for mut item in Vec::from_iter(io_executor.completed()) { + // TODO capture metrics + budget.reclaim(&item); + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; + } + } // Bail out if we get hard links, device nodes or any other unusual content // - it is most likely an attack, as rusts cross-platform nature precludes // such artifacts @@ -309,6 +369,7 @@ fn unpack_without_first_dir<'a, R: Read>( } _ => return Err(ErrorKind::UnsupportedKind(format!("{:?}", kind)).into()), }; + budget.claim(&item); let item = loop { // Create the full path to the entry if it does not exist already @@ -343,8 +404,9 @@ fn unpack_without_first_dir<'a, R: Read>( for mut item in Vec::from_iter(io_executor.execute(item)) { // TODO capture metrics + budget.reclaim(&item); filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - trigger_children(&mut *io_executor, &mut directories, item)?; + trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; } } @@ -353,8 +415,9 @@ fn unpack_without_first_dir<'a, R: Read>( for mut item in Vec::from_iter(io_executor.join()) { // handle final IOs // TODO capture metrics + budget.reclaim(&item); filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - triggered += trigger_children(&mut *io_executor, &mut directories, item)?; + triggered += trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; } if triggered == 0 { // None of the IO submitted before the prior join triggered any new