Skip to content

Commit

Permalink
Port to tokio 0.2, and to stdlib futures for fs and task_executor (pa…
Browse files Browse the repository at this point in the history
…ntsbuild#9071)

We're on an older version of tokio, which doesn't smoothly support usage of async/await.

Switch to tokio 0.2, which supports directly spawning and awaiting (via its macros) stdlib futures, which is an important step toward being able to utilize async/await more broadly. Additionally, port the `fs` and `task_executor` crates to stdlib futures.

Finally, transitively fixup for the new APIs: in particular, since both `task_executor` and `tokio` now consume stdlib futures to spawn tasks, we switch all relevant tests and main methods to use the `tokio::main` and `tokio::test` macros, which annotate async methods and spawn a runtime to allow for `await`ing futures inline.

Progress toward more usage of async/await!
  • Loading branch information
stuhood authored and Henry Fuller committed May 5, 2020
1 parent 98c8589 commit 8861e32
Show file tree
Hide file tree
Showing 51 changed files with 3,064 additions and 2,644 deletions.
9 changes: 9 additions & 0 deletions src/rust/engine/ASYNC.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

# async-await port notes

Many functions at the boundary between ported async-await, stdlib futures code and legacy
future 0.1 code temporarily return futures 0.3 BoxFuture and use explicit lifetimes, because that
is easier for a futures 0.1 consumer. stdlib futures consumers can easily call async functions with
references (because they can remain "on the stack"), but an 0.1 future cannot. These methods can be
swapped back to async once all callers are using async-await.

747 changes: 254 additions & 493 deletions src/rust/engine/Cargo.lock

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,34 +81,38 @@ default-members = [
]

[dependencies]
async-trait = "0.1"
boxfuture = { path = "boxfuture" }
bytes = "0.4.5"
concrete_time = { path = "concrete_time" }
fnv = "1.0.5"
fs = { path = "fs" }
futures01 = { package = "futures", version = "0.1" }
futures = { version = "0.3", features = ["compat"] }
graph = { path = "graph" }
hashing = { path = "hashing" }
indexmap = "1.0.2"
itertools = "0.8.2"
lazy_static = "1"
log = "0.4"
logging = { path = "logging" }
num_enum = "0.1.1"
num_cpus = "1"
num_enum = "0.4"
parking_lot = "0.6"
process_execution = { path = "process_execution" }
rand = "0.6"
reqwest = { version = "0.9.22", default_features = false, features = ["rustls-tls"] }
reqwest = { version = "0.10", default_features = false, features = ["stream", "rustls-tls"] }
rule_graph = { path = "rule_graph" }
sharded_lmdb = { path = "sharded_lmdb" }
smallvec = "0.6"
store = { path = "fs/store" }
task_executor = { path = "task_executor" }
tempfile = "3"
time = "0.1.40"
tokio = { version = "0.2", features = ["rt-threaded"] }
ui = { path = "ui" }
url = "1.7.1"
url = "2.1"
uuid = { version = "0.7", features = ["v4"] }
task_executor = { path = "task_executor" }
workunit_store = { path = "workunit_store" }

[patch.crates-io]
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/engine_cffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ crate-type = ["cdylib"]
[dependencies]
engine = { path = ".." }
futures01 = { package = "futures", version = "0.1" }
futures = { version = "0.3", features = ["compat"] }
hashing = { path = "../hashing" }
log = "0.4"
logging = { path = "../logging" }
Expand Down
34 changes: 16 additions & 18 deletions src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use engine::{
externs, nodes, Core, ExecutionRequest, ExecutionTermination, Function, Handle, Key, Params,
RootResult, Rule, Scheduler, Session, Tasks, TypeId, Types, Value,
};
use futures::compat::Future01CompatExt;
use futures01::{future, Future};
use hashing::{Digest, EMPTY_DIGEST};
use log::{error, warn, Log};
Expand Down Expand Up @@ -896,7 +897,8 @@ pub extern "C" fn capture_snapshots(
})
.collect::<Vec<_>>(),
)
.map(|values| externs::store_tuple(&values)),
.map(|values| externs::store_tuple(&values))
.compat(),
)
})
.into()
Expand Down Expand Up @@ -926,11 +928,10 @@ pub extern "C" fn merge_directories(
scheduler
.core
.executor
.block_on(store::Snapshot::merge_directories(
scheduler.core.store(),
digests,
workunit_store,
))
.block_on(
store::Snapshot::merge_directories(scheduler.core.store(), digests, workunit_store)
.compat(),
)
.map(|dir| nodes::Snapshot::store_directory(&scheduler.core, &dir))
.into()
})
Expand Down Expand Up @@ -975,13 +976,11 @@ pub extern "C" fn run_local_interactive_process(
None => unreachable!()
};

let write_operation = scheduler.core.store().materialize_directory(
scheduler.core.store().materialize_directory(
destination,
digest,
session.workunit_store(),
);

scheduler.core.executor.spawn_on_io_pool(write_operation).wait()?;
).wait()?;
}
}

Expand Down Expand Up @@ -1058,7 +1057,7 @@ pub extern "C" fn materialize_directories(
let types = &scheduler.core.types;
let construct_materialize_directories_results = types.construct_materialize_directories_results;
let construct_materialize_directory_result = types.construct_materialize_directory_result;
let work_future = future::join_all(
future::join_all(
digests_and_path_prefixes
.into_iter()
.map(|(digest, path_prefix)| {
Expand Down Expand Up @@ -1105,9 +1104,8 @@ pub extern "C" fn materialize_directories(
&[externs::store_tuple(&entries)],
);
output
});

scheduler.core.executor.spawn_on_io_pool(work_future).wait()
})
.wait()
})
.into()
}
Expand All @@ -1120,7 +1118,7 @@ pub extern "C" fn init_logging(level: u64, show_rust_3rdparty_logs: bool) {

#[no_mangle]
pub extern "C" fn setup_pantsd_logger(log_file_ptr: *const raw::c_char, level: u64) -> PyResult {
logging::set_destination(Destination::Pantsd);
logging::set_thread_destination(Destination::Pantsd);

let path_str = unsafe { CStr::from_ptr(log_file_ptr).to_string_lossy().into_owned() };
let path = PathBuf::from(path_str);
Expand All @@ -1134,7 +1132,7 @@ pub extern "C" fn setup_pantsd_logger(log_file_ptr: *const raw::c_char, level: u
// Might be called before externs are set, therefore can't return a PyResult
#[no_mangle]
pub extern "C" fn setup_stderr_logger(level: u64) {
logging::set_destination(Destination::Stderr);
logging::set_thread_destination(Destination::Stderr);
LOGGER
.set_stderr_logger(level)
.expect("Error setting up STDERR logger");
Expand Down Expand Up @@ -1173,7 +1171,7 @@ pub extern "C" fn flush_log() {

#[no_mangle]
pub extern "C" fn override_thread_logging_destination(destination: Destination) {
logging::set_destination(destination);
logging::set_thread_destination(destination);
}

fn graph_full(scheduler: &Scheduler, subject_types: Vec<TypeId>) -> RuleGraph<Rule> {
Expand Down Expand Up @@ -1202,7 +1200,7 @@ where
F: FnOnce(&Scheduler) -> T,
{
let scheduler = unsafe { Box::from_raw(scheduler_ptr) };
let t = f(&scheduler);
let t = scheduler.core.runtime.enter(|| f(&scheduler));
mem::forget(scheduler);
t
}
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ authors = [ "Pants Build <pantsbuild@gmail.com>" ]
publish = false

[dependencies]
boxfuture = { path = "../boxfuture" }
async-trait = "0.1"
bytes = "0.4.5"
futures01 = { package = "futures", version = "0.1" }
futures = "0.3"
glob = "0.2.11"
ignore = "0.4.4"
lazy_static = "1"
Expand All @@ -19,4 +19,4 @@ tempfile = "3"

[dev-dependencies]
testutil = { path = "../testutil" }
tokio = "0.1"
tokio = { version = "0.2", features = ["rt-core", "macros"] }
3 changes: 2 additions & 1 deletion src/rust/engine/fs/brfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ env_logger = "0.5.4"
errno = "0.2.3"
fuse = "0.3.1"
futures01 = { package = "futures", version = "0.1" }
futures = { version = "0.3", features = ["compat"] }
hashing = { path = "../../hashing" }
libc = "0.2.39"
log = "0.4.1"
Expand All @@ -22,7 +23,7 @@ serverset = { path = "../../serverset" }
store = { path = "../store" }
task_executor = { path = "../../task_executor" }
time = "0.1.39"
tokio = "0.1"
tokio = { version = "0.2", features = ["rt-threaded", "macros"] }
workunit_store = { path = "../../workunit_store" }

[dev-dependencies]
Expand Down
Loading

0 comments on commit 8861e32

Please sign in to comment.