Skip to content

Commit

Permalink
Fix #86: future_promise spuriously reporting unhandled errors (#90)
Browse files Browse the repository at this point in the history
Co-authored-by: Barret Schloerke <barret@posit.co>
  • Loading branch information
jcheng5 and schloerke authored Feb 7, 2023
1 parent e52ed2b commit 66bfca2
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 18 deletions.
1 change: 0 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ LinkingTo: later,
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.3
Encoding: UTF-8
LazyData: true
VignetteBuilder: knitr
URL: https://rstudio.github.io/promises/, https://github.com/rstudio/promises
BugReports: https://github.com/rstudio/promises/issues
Expand Down
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
promises (development version)
==============

* `future_promise()` recevied a speed improvement when submitting many requests with a minimal number of `{future}` workers. If `future_promise()` runs out of available `{future}` workers, then `future_promise()` will preemptively return for the remainder of the current `{later}` execution. While it is possible for `{future}` to finish a job before submitting all of the `future_promise()` requests, the time saved by not asking `{future}`'s worker availablity will be faster overall than if a few jobs were submitted early. (#78)
* `future_promise()` received a speed improvement when submitting many requests with a minimal number of `{future}` workers. If `future_promise()` runs out of available `{future}` workers, then `future_promise()` will preemptively return for the remainder of the current `{later}` execution. While it is possible for `{future}` to finish a job before submitting all of the `future_promise()` requests, the time saved by not asking `{future}`'s worker availability will be faster overall than if a few jobs were submitted early. (#78)

* Fixed #86: `future_promise()` spuriously reports unhandled errors. (#90)

* Move `{fastmap}` from `Suggests` to `Imports` for better `{renv}` discovery. (#87)

Expand Down
11 changes: 9 additions & 2 deletions R/future_promise.R
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,17 @@ WorkQueue <- R6::R6Class("WorkQueue",
future_job <- work_fn()

# Try to attempt work immediately after the future job has finished
finally(future_job, function() {
# (whether it succeeds or fails doesn't matter)
continue_work <- function() {
debug_msg("finished work. queue size: ", private$queue$size())
private$attempt_work()
})
}

# We're not using finally() here because we don't want rejections to be
# propagated (which would result in a warning). Any warnings will be
# handled by the user using a different promise object.
# https://github.com/rstudio/promises/issues/86
then(future_job, onFulfilled = continue_work, onRejected = continue_work)

return()
}
Expand Down
22 changes: 15 additions & 7 deletions R/promise.R
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ Promise <- R6::R6Class("Promise",
private$state <- "rejected"

later::later(function() {
lapply(private$onRejected, function(f) {
private$rejectionHandled <- TRUE
f(private$value)
})
private$onRejected <- list()
lapply(private$onRejected, function(f) {
private$rejectionHandled <- TRUE
f(private$value)
})
private$onRejected <- list()

later::later(~{
if (!private$rejectionHandled) {
Expand Down Expand Up @@ -217,8 +217,12 @@ Promise <- R6::R6Class("Promise",
)

normalizeOnFulfilled <- function(onFulfilled) {
if (!is.function(onFulfilled))
if (!is.function(onFulfilled)) {
if (!is.null(onFulfilled)) {
warning("`onFulfilled` must be a function or `NULL`")
}
return(NULL)
}

args <- formals(onFulfilled)
arg_count <- length(args)
Expand All @@ -241,8 +245,12 @@ normalizeOnFulfilled <- function(onFulfilled) {
}

normalizeOnRejected <- function(onRejected) {
if (!is.function(onRejected))
if (!is.function(onRejected)) {
if (!is.null(onRejected)) {
warning("`onRejected` must be a function or `NULL`")
}
return(NULL)
}

args <- formals(onRejected)
arg_count <- length(args)
Expand Down
5 changes: 5 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

using namespace Rcpp;

#ifdef RCPP_USE_GLOBAL_ROSTREAM
Rcpp::Rostream<true>& Rcpp::Rcout = Rcpp::Rcpp_cout_get();
Rcpp::Rostream<false>& Rcpp::Rcerr = Rcpp::Rcpp_cerr_get();
#endif

// asyncFib
void asyncFib(Rcpp::Function resolve, Rcpp::Function reject, double x);
RcppExport SEXP _promises_asyncFib(SEXP resolveSEXP, SEXP rejectSEXP, SEXP xSEXP) {
Expand Down
30 changes: 28 additions & 2 deletions tests/testthat/test-methods.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,20 @@ describe("then()", {
expect_identical(result$visible, FALSE)
})
it("method ignores non-functions or NULL...", {
p1 <- promise(~resolve(1))$then(10)$then(NULL)
p1 <- promise(~resolve(1))
expect_warning(
{
p1 <- p1$then(10)
},
"`onFulfilled` must be a function or `NULL`",
fixed = TRUE
)
expect_warning(
{
p1 <- p1$then(NULL)
},
NA
)
expect_identical(extract(p1), 1)
})
it("...but function only ignores NULL, not non-functions", {
Expand Down Expand Up @@ -72,7 +85,20 @@ describe("catch()", {
expect_error(extract(p), "^bar$")
})
it("method ignores non-functions or NULL...", {
p1 <- promise(~resolve(1))$catch(10)$catch(NULL)
p1 <- promise(~ resolve(1))
expect_warning(
{
p1 <- p1$catch(10)
},
"`onRejected` must be a function or `NULL`",
fixed = TRUE
)
expect_warning(
{
p1 <- p1$catch(NULL)
},
NA
)
expect_identical(extract(p1), 1)
})
it("...but function only ignores NULL, not non-functions", {
Expand Down
29 changes: 24 additions & 5 deletions tests/testthat/test-zzz-future_promise.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ source(test_path("common.R"))
local({
## Setup ##

n_workers <- 2
# Set up a plan with 2 future workers
with_test_workers <- function(code) {
# (Can not use a variable for workers if in a local({}))
Expand Down Expand Up @@ -44,7 +45,7 @@ local({

worker_jobs <- 8
worker_job_time <- 1
expected_total_time <- worker_jobs * worker_job_time / 2 # 2 workers
expected_total_time <- worker_jobs * worker_job_time / n_workers

do_future_test <- function(
prom_fn = future_promise,
Expand Down Expand Up @@ -74,11 +75,11 @@ local({
future_exec_times <- c()
if (block_mid_session) {
# Have `future` block the main R session 1 second into execution
lapply(1:8, function(i) {
lapply(seq_len(worker_jobs), function(i) {
later::later(
function() {
future::future({
Sys.sleep(1)
Sys.sleep(worker_job_time)
time_diff()
}) %...>% {
future_exec_times <<- c(future_exec_times, .)
Expand All @@ -89,7 +90,6 @@ local({
})
}


exec_times <- NA
lapply(seq_len(worker_jobs), function(i) {
prom_fn({
Expand All @@ -112,7 +112,7 @@ local({
expect_equal(all(exec_times_lag < (2 * worker_job_time)), expect_reasonable_exec_lag_time)

# post_lapply_time_diff should be ~ 0s
expect_equal(post_lapply_time_diff < worker_job_time, expect_immediate_lapply)
expect_equal(post_lapply_time_diff < (worker_job_time * ((worker_jobs - n_workers) / n_workers)), expect_immediate_lapply)

# time_diffs should never grow by more than 1s; (Expected 0.1)
time_diffs_lag <- time_diffs[-1] - time_diffs[-length(time_diffs)]
Expand Down Expand Up @@ -165,4 +165,23 @@ local({
)
})

test_that("future_promise reports unhandled errors", {
with_test_workers({
err <- capture.output(type="message", {
future_promise(stop("boom1"))
wait_for_it()
})
expect_match(err, "boom1")
})
})

test_that("future_promise doesn't report errors that have been handled", {
with_test_workers({
err <- capture.output(type="message", {
future_promise(stop("boom1")) %>% then(onRejected = ~{})
wait_for_it()
})
expect_equal(err, character(0))
})
})
})

0 comments on commit 66bfca2

Please sign in to comment.