Skip to content

Commit

Permalink
Stream files into the local store while capturing them (cherrypick of #…
Browse files Browse the repository at this point in the history
…12563) (#12572)

A private repository experienced OOM issues in CI with a large number of tests. Allocation profiling highlighted that a large batch of `32MB`/`64MB` allocations was occurring around the same time, all caused by the fact that `PosixFS::read_file` slurps an entire file into memory as `FileContent`. Because local process execution uses `PosixFS::read_file` to capture sandbox outputs, a bunch of processes finishing at once could lead to memory usage spikes.

To fix that, this change removes `PosixFS::read_file` (which was only ever used to digest files), and replaces it with `{Store,local::ByteStore,ShardedLmdb}::store*` methods which make two-ish passes over a `Read` instance in order to digest and capture it.

The methods take an `immutable_data` parameter so that callers can indicate that they know that data will not change, which allows for avoiding hashing it on the second pass. Captures from process sandboxes are treated as immutable, while memoized captures from the workspace are not.

----

```
snapshot_capture/snapshot_capture((100, 100, false, 100))
                        time:   [1.8909 s 1.9000 s 1.9109 s]
                        change: [-27.459% -25.841% -24.085%] (p = 0.00 < 0.05)
                        Performance has improved.
snapshot_capture/snapshot_capture((20, 10000000, true, 10))
                        time:   [1.2539 s 1.2655 s 1.2782 s]
                        change: [-62.138% -61.318% -60.505%] (p = 0.00 < 0.05)
                        Performance has improved.
snapshot_capture/snapshot_capture((1, 200000000, true, 10))
                        time:   [3.5281 s 3.5773 s 3.6299 s]
                        change: [-13.420% -11.985% -10.434%] (p = 0.00 < 0.05)
                        Performance has improved.
```

[ci skip-rust]

[ci skip-build-wheels]
  • Loading branch information
stuhood committed Aug 15, 2021
1 parent 2b6985e commit b6b7932
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 114 deletions.
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,11 @@ async fn execute(top_match: &clap::ArgMatches<'_>) -> Result<(), ExitError> {
.ok_or_else(|| format!("Tried to save file {:?} but it did not exist", path))?;
match file {
fs::Stat::File(f) => {
let digest = store::OneOffStoreFileByDigest::new(store.clone(), Arc::new(posix_fs))
.store_by_digest(f)
.await
.unwrap();
let digest =
store::OneOffStoreFileByDigest::new(store.clone(), Arc::new(posix_fs), false)
.store_by_digest(f)
.await
.unwrap();

let report = ensure_uploaded_to_remote(&store, store_has_remote, digest)
.await
Expand Down Expand Up @@ -438,7 +439,7 @@ async fn execute(top_match: &clap::ArgMatches<'_>) -> Result<(), ExitError> {

let snapshot = Snapshot::from_path_stats(
store_copy.clone(),
store::OneOffStoreFileByDigest::new(store_copy, posix_fs),
store::OneOffStoreFileByDigest::new(store_copy, posix_fs, false),
paths,
)
.await?;
Expand Down
29 changes: 3 additions & 26 deletions src/rust/engine/fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub use crate::glob_matching::{
};

use std::cmp::min;
use std::io::{self, Read};
use std::io;
use std::ops::Deref;
use std::os::unix::fs::PermissionsExt;
use std::path::{Component, Path, PathBuf};
Expand Down Expand Up @@ -522,31 +522,8 @@ impl PosixFS {
self.ignore.is_ignored(stat)
}

pub async fn read_file(&self, file: &File) -> Result<FileContent, io::Error> {
let path = file.path.clone();
let path_abs = self.root.0.join(&file.path);
self
.executor
.spawn_blocking(move || {
let is_executable = path_abs.metadata()?.permissions().mode() & 0o100 == 0o100;
std::fs::File::open(&path_abs)
.and_then(|mut f| {
let mut content = Vec::new();
f.read_to_end(&mut content)?;
Ok(FileContent {
path: path,
content: Bytes::from(content),
is_executable,
})
})
.map_err(|e| {
io::Error::new(
e.kind(),
format!("Failed to read file {:?}: {}", path_abs, e),
)
})
})
.await
pub fn file_path(&self, file: &File) -> PathBuf {
self.root.0.join(&file.path)
}

pub async fn read_link(&self, link: &Link) -> Result<PathBuf, io::Error> {
Expand Down
35 changes: 7 additions & 28 deletions src/rust/engine/fs/src/posixfs_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,16 @@ async fn is_executable_true() {
}

#[tokio::test]
async fn read_file() {
async fn file_path() {
let dir = tempfile::TempDir::new().unwrap();
let path = PathBuf::from("marmosets");
let content = "cute".as_bytes().to_vec();
make_file(
&std::fs::canonicalize(dir.path()).unwrap().join(&path),
&content,
0o600,
);
let fs = new_posixfs(&dir.path());
let file_content = fs
.read_file(&File {
path: path.clone(),
is_executable: false,
})
.await
.unwrap();
assert_eq!(file_content.path, path);
assert_eq!(file_content.content, content);
}

#[tokio::test]
async fn read_file_missing() {
let dir = tempfile::TempDir::new().unwrap();
new_posixfs(&dir.path())
.read_file(&File {
path: PathBuf::from("marmosets"),
is_executable: false,
})
.await
.expect_err("Expected error");
let expected_path = std::fs::canonicalize(dir.path()).unwrap().join(&path);
let actual_path = fs.file_path(&File {
path: path.clone(),
is_executable: false,
});
assert_eq!(actual_path, expected_path);
}

#[tokio::test]
Expand Down
32 changes: 30 additions & 2 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ use sharded_lmdb::DEFAULT_LEASE_TIME;
use tryfuture::try_future;

use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::fs::OpenOptions;
use std::io::Write;
use std::io::{self, Read, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -352,7 +353,10 @@ impl Store {
}

///
/// Store a file locally.
/// A convenience method for storing a file.
///
/// NB: This method should not be used for large blobs: prefer to stream them from their source
/// using `store_file`.
///
pub async fn store_file_bytes(
&self,
Expand All @@ -365,6 +369,30 @@ impl Store {
.await
}

///
/// Store a file locally by streaming its contents.
///
pub async fn store_file<F, R>(
&self,
initial_lease: bool,
data_is_immutable: bool,
data_provider: F,
) -> Result<Digest, String>
where
R: Read + Debug,
F: Fn() -> Result<R, io::Error> + Send + 'static,
{
self
.local
.store(
EntryType::File,
initial_lease,
data_is_immutable,
data_provider,
)
.await
}

/// Store a digest under a given file path, returning a Snapshot
pub async fn snapshot_of_one_file(
&self,
Expand Down
33 changes: 24 additions & 9 deletions src/rust/engine/fs/store/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::{EntryType, ShrinkBehavior};

use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::io::{self, Read};
use std::path::Path;
use std::sync::Arc;
use std::time::{self, Duration};

use bytes::Bytes;
use bytes::{Buf, Bytes};
use futures::future;
use hashing::{Digest, Fingerprint, EMPTY_DIGEST};
use lmdb::Error::NotFound;
Expand Down Expand Up @@ -250,18 +252,31 @@ impl ByteStore {
bytes: Bytes,
initial_lease: bool,
) -> Result<Digest, String> {
self
.store(entry_type, initial_lease, true, move || {
Ok(bytes.clone().reader())
})
.await
}

pub async fn store<F, R>(
&self,
entry_type: EntryType,
initial_lease: bool,
data_is_immutable: bool,
data_provider: F,
) -> Result<Digest, String>
where
R: Read + Debug,
F: Fn() -> Result<R, io::Error> + Send + 'static,
{
let dbs = match entry_type {
EntryType::Directory => self.inner.directory_dbs.clone(),
EntryType::File => self.inner.file_dbs.clone(),
};
let bytes2 = bytes.clone();
let digest = self
.inner
.executor
.spawn_blocking(move || Digest::of_bytes(&bytes))
.await;
dbs?.store_bytes(digest.hash, bytes2, initial_lease).await?;
Ok(digest)
dbs?
.store(initial_lease, data_is_immutable, data_provider)
.await
}

///
Expand Down
42 changes: 15 additions & 27 deletions src/rust/engine/fs/store/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
// Copyright 2017 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::collections::HashMap;
use std::ffi::OsString;
use std::fmt;
use std::iter::Iterator;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;

use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec;
Expand Down Expand Up @@ -270,7 +269,7 @@ impl Snapshot {
.map_err(|err| format!("Error expanding globs: {}", err))?;
Snapshot::from_path_stats(
store.clone(),
OneOffStoreFileByDigest::new(store, posix_fs),
OneOffStoreFileByDigest::new(store, posix_fs, true),
path_stats,
)
.await
Expand Down Expand Up @@ -325,48 +324,37 @@ pub trait StoreFileByDigest<Error> {
}

///
/// A StoreFileByDigest which reads with a PosixFS and writes to a Store, with no caching.
/// A StoreFileByDigest which reads immutable files with a PosixFS and writes to a Store, with no
/// caching.
///
#[derive(Clone)]
pub struct OneOffStoreFileByDigest {
store: Store,
posix_fs: Arc<PosixFS>,
immutable: bool,
}

impl OneOffStoreFileByDigest {
pub fn new(store: Store, posix_fs: Arc<PosixFS>) -> OneOffStoreFileByDigest {
OneOffStoreFileByDigest { store, posix_fs }
pub fn new(store: Store, posix_fs: Arc<PosixFS>, immutable: bool) -> OneOffStoreFileByDigest {
OneOffStoreFileByDigest {
store,
posix_fs,
immutable,
}
}
}

impl StoreFileByDigest<String> for OneOffStoreFileByDigest {
fn store_by_digest(&self, file: File) -> future::BoxFuture<'static, Result<Digest, String>> {
let store = self.store.clone();
let posix_fs = self.posix_fs.clone();
let immutable = self.immutable;
let res = async move {
let content = posix_fs
.read_file(&file)
let path = posix_fs.file_path(&file);
store
.store_file(true, immutable, move || std::fs::File::open(&path))
.await
.map_err(move |err| format!("Error reading file {:?}: {:?}", file, err))?;
store.store_file_bytes(content.content, true).await
};
res.boxed()
}
}

#[derive(Clone)]
pub struct StoreManyFileDigests {
pub hash: HashMap<PathBuf, Digest>,
}

impl StoreFileByDigest<String> for StoreManyFileDigests {
fn store_by_digest(&self, file: File) -> future::BoxFuture<'static, Result<Digest, String>> {
future::ready(self.hash.get(&file.path).copied().ok_or_else(|| {
format!(
"Could not find file {} when storing file by digest",
file.path.display()
)
}))
.boxed()
}
}
2 changes: 1 addition & 1 deletion src/rust/engine/fs/store/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn setup() -> (
let dir = tempfile::Builder::new().prefix("root").tempdir().unwrap();
let ignorer = GitignoreStyleExcludes::create(vec![]).unwrap();
let posix_fs = Arc::new(PosixFS::new(dir.path(), ignorer, executor).unwrap());
let file_saver = OneOffStoreFileByDigest::new(store.clone(), posix_fs.clone());
let file_saver = OneOffStoreFileByDigest::new(store.clone(), posix_fs.clone(), true);
(store, dir, posix_fs, file_saver)
}

Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl CommandRunner {
.await?;
Snapshot::from_path_stats(
store.clone(),
OneOffStoreFileByDigest::new(store, posix_fs),
OneOffStoreFileByDigest::new(store, posix_fs, true),
path_stats,
)
.await
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/sharded_lmdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ task_executor = { path = "../task_executor" }
tempfile = "3"

[dev-dependencies]
parking_lot = "0.11"
tokio = { version = "1.4", features = ["macros"] }
Loading

0 comments on commit b6b7932

Please sign in to comment.