Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #86: future_promise spuriously reporting unhandled errors #90

Merged
merged 8 commits into from
Feb 7, 2023
1 change: 0 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ LinkingTo: later,
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.3
Encoding: UTF-8
LazyData: true
VignetteBuilder: knitr
URL: https://rstudio.github.io/promises/, https://github.com/rstudio/promises
BugReports: https://github.com/rstudio/promises/issues
Expand Down
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
promises (development version)
==============

* `future_promise()` recevied a speed improvement when submitting many requests with a minimal number of `{future}` workers. If `future_promise()` runs out of available `{future}` workers, then `future_promise()` will preemptively return for the remainder of the current `{later}` execution. While it is possible for `{future}` to finish a job before submitting all of the `future_promise()` requests, the time saved by not asking `{future}`'s worker availablity will be faster overall than if a few jobs were submitted early. (#78)
* `future_promise()` received a speed improvement when submitting many requests with a minimal number of `{future}` workers. If `future_promise()` runs out of available `{future}` workers, then `future_promise()` will preemptively return for the remainder of the current `{later}` execution. While it is possible for `{future}` to finish a job before submitting all of the `future_promise()` requests, the time saved by not asking `{future}`'s worker availability will be faster overall than if a few jobs were submitted early. (#78)

* Fixed #86: `future_promise()` spuriously reports unhandled errors. (#90)

* Move `{fastmap}` from `Suggests` to `Imports` for better `{renv}` discovery. (#87)

Expand Down
11 changes: 9 additions & 2 deletions R/future_promise.R
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,17 @@ WorkQueue <- R6::R6Class("WorkQueue",
future_job <- work_fn()

# Try to attempt work immediately after the future job has finished
finally(future_job, function() {
# (whether it succeeds or fails doesn't matter)
continue_work <- function() {
debug_msg("finished work. queue size: ", private$queue$size())
private$attempt_work()
})
}

# We're not using finally() here because we don't want rejections to be
# propagated (which would result in a warning). Any warnings will be
# handled by the user using a different promise object.
# https://github.com/rstudio/promises/issues/86
then(future_job, onFulfilled = continue_work, onRejected = continue_work)

return()
}
Expand Down
22 changes: 15 additions & 7 deletions R/promise.R
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ Promise <- R6::R6Class("Promise",
private$state <- "rejected"

later::later(function() {
lapply(private$onRejected, function(f) {
private$rejectionHandled <- TRUE
f(private$value)
})
private$onRejected <- list()
lapply(private$onRejected, function(f) {
private$rejectionHandled <- TRUE
f(private$value)
})
private$onRejected <- list()

later::later(~{
if (!private$rejectionHandled) {
Expand Down Expand Up @@ -217,8 +217,12 @@ Promise <- R6::R6Class("Promise",
)

normalizeOnFulfilled <- function(onFulfilled) {
if (!is.function(onFulfilled))
if (!is.function(onFulfilled)) {
if (!is.null(onFulfilled)) {
warning("`onFulfilled` must be a function or `NULL`")
}
return(NULL)
}

args <- formals(onFulfilled)
arg_count <- length(args)
Expand All @@ -241,8 +245,12 @@ normalizeOnFulfilled <- function(onFulfilled) {
}

normalizeOnRejected <- function(onRejected) {
if (!is.function(onRejected))
if (!is.function(onRejected)) {
if (!is.null(onRejected)) {
warning("`onRejected` must be a function or `NULL`")
}
return(NULL)
}

args <- formals(onRejected)
arg_count <- length(args)
Expand Down
5 changes: 5 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

using namespace Rcpp;

#ifdef RCPP_USE_GLOBAL_ROSTREAM
Rcpp::Rostream<true>& Rcpp::Rcout = Rcpp::Rcpp_cout_get();
Rcpp::Rostream<false>& Rcpp::Rcerr = Rcpp::Rcpp_cerr_get();
#endif

// asyncFib
void asyncFib(Rcpp::Function resolve, Rcpp::Function reject, double x);
RcppExport SEXP _promises_asyncFib(SEXP resolveSEXP, SEXP rejectSEXP, SEXP xSEXP) {
Expand Down
30 changes: 28 additions & 2 deletions tests/testthat/test-methods.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,20 @@ describe("then()", {
expect_identical(result$visible, FALSE)
})
it("method ignores non-functions or NULL...", {
p1 <- promise(~resolve(1))$then(10)$then(NULL)
p1 <- promise(~resolve(1))
expect_warning(
{
p1 <- p1$then(10)
},
"`onFulfilled` must be a function or `NULL`",
fixed = TRUE
)
expect_warning(
{
p1 <- p1$then(NULL)
},
NA
)
expect_identical(extract(p1), 1)
})
it("...but function only ignores NULL, not non-functions", {
Expand Down Expand Up @@ -72,7 +85,20 @@ describe("catch()", {
expect_error(extract(p), "^bar$")
})
it("method ignores non-functions or NULL...", {
p1 <- promise(~resolve(1))$catch(10)$catch(NULL)
p1 <- promise(~ resolve(1))
expect_warning(
{
p1 <- p1$catch(10)
},
"`onRejected` must be a function or `NULL`",
fixed = TRUE
)
expect_warning(
{
p1 <- p1$catch(NULL)
},
NA
)
expect_identical(extract(p1), 1)
})
it("...but function only ignores NULL, not non-functions", {
Expand Down
29 changes: 24 additions & 5 deletions tests/testthat/test-zzz-future_promise.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ source(test_path("common.R"))
local({
## Setup ##

n_workers <- 2
# Set up a plan with 2 future workers
with_test_workers <- function(code) {
# (Can not use a variable for workers if in a local({}))
Expand Down Expand Up @@ -44,7 +45,7 @@ local({

worker_jobs <- 8
worker_job_time <- 1
expected_total_time <- worker_jobs * worker_job_time / 2 # 2 workers
expected_total_time <- worker_jobs * worker_job_time / n_workers

do_future_test <- function(
prom_fn = future_promise,
Expand Down Expand Up @@ -74,11 +75,11 @@ local({
future_exec_times <- c()
if (block_mid_session) {
# Have `future` block the main R session 1 second into execution
lapply(1:8, function(i) {
lapply(seq_len(worker_jobs), function(i) {
later::later(
function() {
future::future({
Sys.sleep(1)
Sys.sleep(worker_job_time)
time_diff()
}) %...>% {
future_exec_times <<- c(future_exec_times, .)
Expand All @@ -89,7 +90,6 @@ local({
})
}


exec_times <- NA
lapply(seq_len(worker_jobs), function(i) {
prom_fn({
Expand All @@ -112,7 +112,7 @@ local({
expect_equal(all(exec_times_lag < (2 * worker_job_time)), expect_reasonable_exec_lag_time)

# post_lapply_time_diff should be ~ 0s
expect_equal(post_lapply_time_diff < worker_job_time, expect_immediate_lapply)
expect_equal(post_lapply_time_diff < (worker_job_time * ((worker_jobs - n_workers) / n_workers)), expect_immediate_lapply)

# time_diffs should never grow by more than 1s; (Expected 0.1)
time_diffs_lag <- time_diffs[-1] - time_diffs[-length(time_diffs)]
Expand Down Expand Up @@ -165,4 +165,23 @@ local({
)
})

test_that("future_promise reports unhandled errors", {
with_test_workers({
err <- capture.output(type="message", {
future_promise(stop("boom1"))
wait_for_it()
})
expect_match(err, "boom1")
})
})

test_that("future_promise doesn't report errors that have been handled", {
with_test_workers({
err <- capture.output(type="message", {
future_promise(stop("boom1")) %>% then(onRejected = ~{})
wait_for_it()
})
expect_equal(err, character(0))
})
})
})