Skip to content

Commit

Permalink
Limit RAM use during unpacking.
Browse files Browse the repository at this point in the history
Heavily deferred directories can lead to very large amounts of RAM
holding pending IOs, and under low memory limits this caused failures.

Reduce the stack size for IO worker threads to 1MB, as the default
of 8MB on Linux is wasteful for IO worker threads.
  • Loading branch information
rbtcollins committed Jun 11, 2019
1 parent 8064cdd commit 5c884a1
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 7 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ Command | Description

## Environment variables


- `RUSTUP_HOME` (default: `~/.rustup` or `%USERPROFILE%/.rustup`)
Sets the root rustup folder, used for storing installed
toolchains and configuration options.
Expand All @@ -611,18 +612,21 @@ Command | Description
- `RUSTUP_UPDATE_ROOT` (default `https://static.rust-lang.org/rustup`)
Sets the root URL for downloading self-updates.

- `RUSTUP_IO_THREADS` (defaults to reported cpu count). Sets the
- `RUSTUP_IO_THREADS` *unstable* (defaults to reported cpu count). Sets the
number of threads to perform close IO in. Set to `disabled` to force
single-threaded IO for troubleshooting, or an arbitrary number to
override automatic detection.

- `RUSTUP_TRACE_DIR` (default: no tracing)
- `RUSTUP_TRACE_DIR` *unstable* (default: no tracing)
Enables tracing and determines the directory that traces will be
written too. Traces are of the form PID.trace. Traces can be read
by the Catapult project [tracing viewer][tv].

[tv]: (https://github.com/catapult-project/catapult/blob/master/tracing/README.md)

- `RUSTUP_UNPACK_RAM` *unstable* (default 400M, min 100M)
Caps the amount of RAM rustup will use for IO tasks while unpacking.

## Other installation methods

The primary installation method, as described at https://rustup.rs, differs by platform:
Expand Down
2 changes: 2 additions & 0 deletions src/diskio/threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl<'a> Threaded<'a> {
// more threads to get more IO dispatched at this stage in the process.
let pool = threadpool::Builder::new()
.thread_name("CloseHandle".into())
.thread_stack_size(1_048_576)
.build();
let (tx, rx) = channel();
Threaded {
Expand All @@ -62,6 +63,7 @@ impl<'a> Threaded<'a> {
let pool = threadpool::Builder::new()
.thread_name("CloseHandle".into())
.num_threads(thread_count)
.thread_stack_size(1_048_576)
.build();
let (tx, rx) = channel();
Threaded {
Expand Down
73 changes: 68 additions & 5 deletions src/dist/component/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::utils::notifications::Notification;
use crate::utils::utils;

use std::collections::{HashMap, HashSet};
use std::env;
use std::fmt;
use std::io::{self, ErrorKind as IOErrorKind, Read};
use std::iter::FromIterator;
Expand Down Expand Up @@ -157,6 +158,51 @@ impl<'a> TarPackage<'a> {
}
}

struct MemoryBudget {
limit: usize,
used: usize,
}

// Probably this should live in diskio but ¯\_(ツ)_/¯
impl MemoryBudget {
fn new(max_file_size: usize) -> MemoryBudget {
const DEFAULT_UNPACK_RAM: usize = 400 * 1024 * 1024;
let unpack_ram = if let Ok(budget_str) = env::var("RUSTUP_UNPACK_RAM") {
if let Ok(budget) = budget_str.parse::<usize>() {
budget
} else {
DEFAULT_UNPACK_RAM
}
} else {
DEFAULT_UNPACK_RAM
};
if max_file_size > unpack_ram {
panic!("RUSTUP_UNPACK_RAM must be larger than {}", max_file_size);
}
MemoryBudget {
limit: unpack_ram,
used: 0,
}
}
fn reclaim(&mut self, op: &Item) {
match &op.kind {
Kind::Directory => {}
Kind::File(content) => self.used -= content.len(),
};
}

fn claim(&mut self, op: &Item) {
match &op.kind {
Kind::Directory => {}
Kind::File(content) => self.used += content.len(),
};
}

fn available(&self) -> usize {
self.limit - self.used
}
}

/// Handle the async result of io operations
/// Replaces op.result with Ok(())
fn filter_result(op: &mut Item) -> io::Result<()> {
Expand Down Expand Up @@ -187,6 +233,7 @@ fn filter_result(op: &mut Item) -> io::Result<()> {
fn trigger_children(
io_executor: &mut dyn Executor,
directories: &mut HashMap<PathBuf, DirStatus>,
budget: &mut MemoryBudget,
item: Item,
) -> Result<usize> {
let mut result = 0;
Expand All @@ -206,8 +253,9 @@ fn trigger_children(
for pending_item in pending.into_iter() {
for mut item in Vec::from_iter(io_executor.execute(pending_item)) {
// TODO capture metrics
budget.reclaim(&item);
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
result += trigger_children(io_executor, directories, item)?;
result += trigger_children(io_executor, directories, budget, item)?;
}
}
};
Expand All @@ -229,6 +277,9 @@ fn unpack_without_first_dir<'a, R: Read>(
let entries = archive
.entries()
.chain_err(|| ErrorKind::ExtractingPackage)?;
const MAX_FILE_SIZE: u64 = 100_000_000;
let mut budget = MemoryBudget::new(MAX_FILE_SIZE as usize);

let mut directories: HashMap<PathBuf, DirStatus> = HashMap::new();
// Path is presumed to exist. Call it a precondition.
directories.insert(path.to_owned(), DirStatus::Exists);
Expand All @@ -239,8 +290,9 @@ fn unpack_without_first_dir<'a, R: Read>(
// our unpacked item is pending dequeue)
for mut item in Vec::from_iter(io_executor.completed()) {
// TODO capture metrics
budget.reclaim(&item);
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
trigger_children(&mut *io_executor, &mut directories, item)?;
trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?;
}

let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?;
Expand All @@ -266,9 +318,17 @@ fn unpack_without_first_dir<'a, R: Read>(
}

let size = entry.header().size()?;
if size > 100_000_000 {
if size > MAX_FILE_SIZE {
return Err(format!("File too big {} {}", relpath.display(), size).into());
}
while size > budget.available() as u64 {
for mut item in Vec::from_iter(io_executor.completed()) {
// TODO capture metrics
budget.reclaim(&item);
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?;
}
}
// Bail out if we get hard links, device nodes or any other unusual content
// - it is most likely an attack, as rusts cross-platform nature precludes
// such artifacts
Expand Down Expand Up @@ -309,6 +369,7 @@ fn unpack_without_first_dir<'a, R: Read>(
}
_ => return Err(ErrorKind::UnsupportedKind(format!("{:?}", kind)).into()),
};
budget.claim(&item);

let item = loop {
// Create the full path to the entry if it does not exist already
Expand Down Expand Up @@ -343,8 +404,9 @@ fn unpack_without_first_dir<'a, R: Read>(

for mut item in Vec::from_iter(io_executor.execute(item)) {
// TODO capture metrics
budget.reclaim(&item);
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
trigger_children(&mut *io_executor, &mut directories, item)?;
trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?;
}
}

Expand All @@ -353,8 +415,9 @@ fn unpack_without_first_dir<'a, R: Read>(
for mut item in Vec::from_iter(io_executor.join()) {
// handle final IOs
// TODO capture metrics
budget.reclaim(&item);
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
triggered += trigger_children(&mut *io_executor, &mut directories, item)?;
triggered += trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?;
}
if triggered == 0 {
// None of the IO submitted before the prior join triggered any new
Expand Down

0 comments on commit 5c884a1

Please sign in to comment.