From 8c924f46eb3554fd43fe3aceb4e7683fd018fc0e Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Tue, 7 Feb 2023 11:00:01 -0800 Subject: [PATCH 1/7] Fix #86: future_promise spuriously reporting unhandled errors This was because we were doing a finally() which creates a new promise, and not subsequently handling errors on that promise. --- NEWS.md | 4 +++- R/future_promise.R | 11 +++++++++-- R/promise.R | 18 +++++++++++------- src/RcppExports.cpp | 5 +++++ tests/testthat/test-zzz-future_promise.R | 19 +++++++++++++++++++ 5 files changed, 47 insertions(+), 10 deletions(-) diff --git a/NEWS.md b/NEWS.md index 7405d1b..cb32846 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,7 +1,9 @@ promises (development version) ============== -* `future_promise()` recevied a speed improvement when submitting many requests with a minimal number of `{future}` workers. If `future_promise()` runs out of available `{future}` workers, then `future_promise()` will preemptively return for the remainder of the current `{later}` execution. While it is possible for `{future}` to finish a job before submitting all of the `future_promise()` requests, the time saved by not asking `{future}`'s worker availablity will be faster overall than if a few jobs were submitted early. (#78) +* `future_promise()` received a speed improvement when submitting many requests with a minimal number of `{future}` workers. If `future_promise()` runs out of available `{future}` workers, then `future_promise()` will preemptively return for the remainder of the current `{later}` execution. While it is possible for `{future}` to finish a job before submitting all of the `future_promise()` requests, the time saved by not asking `{future}`'s worker availability will be faster overall than if a few jobs were submitted early. (#78) + +* Fixed #86: future_promise() spuriously reports unhandled errors. promises 1.2.0.1 diff --git a/R/future_promise.R b/R/future_promise.R index 911c995..d5f705c 100644 --- a/R/future_promise.R +++ b/R/future_promise.R @@ -253,10 +253,17 @@ WorkQueue <- R6::R6Class("WorkQueue", future_job <- work_fn() # Try to attempt work immediately after the future job has finished - finally(future_job, function() { + # (whether it succeeds or fails doesn't matter) + continue_work <- function() { debug_msg("finished work. queue size: ", private$queue$size()) private$attempt_work() - }) + } + + # We're not using finally() here because we don't want rejections to be + # propagated (which would result in a warning). Any warnings will be + # handled by the user using a different promise object. + # https://github.com/rstudio/promises/issues/86 + then(future_job, onFulfilled = continue_work, onRejected = continue_work) return() } diff --git a/R/promise.R b/R/promise.R index 51582f8..3c89ed6 100644 --- a/R/promise.R +++ b/R/promise.R @@ -77,11 +77,11 @@ Promise <- R6::R6Class("Promise", private$state <- "rejected" later::later(function() { - lapply(private$onRejected, function(f) { - private$rejectionHandled <- TRUE - f(private$value) - }) - private$onRejected <- list() + lapply(private$onRejected, function(f) { + private$rejectionHandled <- TRUE + f(private$value) + }) + private$onRejected <- list() later::later(~{ if (!private$rejectionHandled) { @@ -217,8 +217,10 @@ Promise <- R6::R6Class("Promise", ) normalizeOnFulfilled <- function(onFulfilled) { - if (!is.function(onFulfilled)) + if (!is.function(onFulfilled)) { + # TODO: Warn? return(NULL) + } args <- formals(onFulfilled) arg_count <- length(args) @@ -241,8 +243,10 @@ normalizeOnFulfilled <- function(onFulfilled) { } normalizeOnRejected <- function(onRejected) { - if (!is.function(onRejected)) + if (!is.function(onRejected)) { + # TODO: Warn? return(NULL) + } args <- formals(onRejected) arg_count <- length(args) diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index b21e757..cb7c374 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -5,6 +5,11 @@ using namespace Rcpp; +#ifdef RCPP_USE_GLOBAL_ROSTREAM +Rcpp::Rostream& Rcpp::Rcout = Rcpp::Rcpp_cout_get(); +Rcpp::Rostream& Rcpp::Rcerr = Rcpp::Rcpp_cerr_get(); +#endif + // asyncFib void asyncFib(Rcpp::Function resolve, Rcpp::Function reject, double x); RcppExport SEXP _promises_asyncFib(SEXP resolveSEXP, SEXP rejectSEXP, SEXP xSEXP) { diff --git a/tests/testthat/test-zzz-future_promise.R b/tests/testthat/test-zzz-future_promise.R index a2b1408..810c0bb 100644 --- a/tests/testthat/test-zzz-future_promise.R +++ b/tests/testthat/test-zzz-future_promise.R @@ -165,4 +165,23 @@ local({ ) }) + test_that("future_promise reports unhandled errors", { + with_test_workers({ + err <- capture.output(type="message", { + future_promise(stop("boom1")) + wait_for_it() + }) + expect_match(err, "boom1") + }) + }) + + test_that("future_promise doesn't report errors that have been handled", { + with_test_workers({ + err <- capture.output(type="message", { + future_promise(stop("boom1")) %>% then(onRejected = ~{}) + wait_for_it() + }) + expect_equal(err, character(0)) + }) + }) }) From b2159abb736390d9dade047af159cc70c51a85b8 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 7 Feb 2023 14:32:03 -0500 Subject: [PATCH 2/7] Update NEWS.md --- NEWS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index cb32846..33960c0 100644 --- a/NEWS.md +++ b/NEWS.md @@ -3,7 +3,7 @@ promises (development version) * `future_promise()` received a speed improvement when submitting many requests with a minimal number of `{future}` workers. If `future_promise()` runs out of available `{future}` workers, then `future_promise()` will preemptively return for the remainder of the current `{later}` execution. While it is possible for `{future}` to finish a job before submitting all of the `future_promise()` requests, the time saved by not asking `{future}`'s worker availability will be faster overall than if a few jobs were submitted early. (#78) -* Fixed #86: future_promise() spuriously reports unhandled errors. +* Fixed #86: `future_promise()` spuriously reports unhandled errors. (#90) promises 1.2.0.1 From cebdf258ce039e6293d1d2309ea91d66ca205d14 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 7 Feb 2023 14:41:06 -0500 Subject: [PATCH 3/7] Add warning when `onFulfilled` or `onRejected` are not functions or `NULL` --- R/promise.R | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/R/promise.R b/R/promise.R index 3c89ed6..4617cb5 100644 --- a/R/promise.R +++ b/R/promise.R @@ -218,7 +218,9 @@ Promise <- R6::R6Class("Promise", normalizeOnFulfilled <- function(onFulfilled) { if (!is.function(onFulfilled)) { - # TODO: Warn? + if (!is.null(onFulfilled)) { + warning("`onFulfilled` must be a function or `NULL`") + } return(NULL) } @@ -244,7 +246,9 @@ normalizeOnFulfilled <- function(onFulfilled) { normalizeOnRejected <- function(onRejected) { if (!is.function(onRejected)) { - # TODO: Warn? + if (!is.null(onRejected)) { + warning("`onRejected` must be a function or `NULL`") + } return(NULL) } From 6c48379045ff96ce708bded240198498541e6ff3 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 7 Feb 2023 14:41:53 -0500 Subject: [PATCH 4/7] Add test for non NULL/function `onFulfilled` and `onRejected` values --- tests/testthat/test-methods.R | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/tests/testthat/test-methods.R b/tests/testthat/test-methods.R index f725011..5d0438c 100644 --- a/tests/testthat/test-methods.R +++ b/tests/testthat/test-methods.R @@ -31,7 +31,20 @@ describe("then()", { expect_identical(result$visible, FALSE) }) it("method ignores non-functions or NULL...", { - p1 <- promise(~resolve(1))$then(10)$then(NULL) + p1 <- promise(~resolve(1)) + expect_warning( + { + p1 <- p1$then(10) + }, + "`onFulfilled` must be a function or `NULL`", + fixed = TRUE + ) + expect_warning( + { + p1 <- p1$then(NULL) + }, + NA + ) expect_identical(extract(p1), 1) }) it("...but function only ignores NULL, not non-functions", { @@ -72,7 +85,20 @@ describe("catch()", { expect_error(extract(p), "^bar$") }) it("method ignores non-functions or NULL...", { - p1 <- promise(~resolve(1))$catch(10)$catch(NULL) + p1 <- promise(~ resolve(1)) + expect_warning( + { + p1 <- p1$catch(10) + }, + "`onRejected` must be a function or `NULL`", + fixed = TRUE + ) + expect_warning( + { + p1 <- p1$catch(NULL) + }, + NA + ) expect_identical(extract(p1), 1) }) it("...but function only ignores NULL, not non-functions", { From 0c2ff82095439e95236693efa932ecaabdb09aaf Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 7 Feb 2023 14:42:19 -0500 Subject: [PATCH 5/7] Bump roxygen version / document --- DESCRIPTION | 2 +- man/WorkQueue.Rd | 18 +++++++++--------- man/future_promise.Rd | 6 +++--- man/then.Rd | 18 ++++++++++++------ 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 2238b47..4b3440c 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -31,7 +31,7 @@ Suggests: LinkingTo: later, Rcpp Roxygen: list(markdown = TRUE) -RoxygenNote: 7.1.2 +RoxygenNote: 7.2.3 Encoding: UTF-8 LazyData: true VignetteBuilder: knitr diff --git a/man/WorkQueue.Rd b/man/WorkQueue.Rd index 9133837..54dcf4b 100644 --- a/man/WorkQueue.Rd +++ b/man/WorkQueue.Rd @@ -35,14 +35,14 @@ However, it is possible to use a private loop inside a user-defined \code{WorkQu \section{Methods}{ \subsection{Public methods}{ \itemize{ -\item \href{#method-new}{\code{WorkQueue$new()}} -\item \href{#method-schedule_work}{\code{WorkQueue$schedule_work()}} -\item \href{#method-clone}{\code{WorkQueue$clone()}} +\item \href{#method-WorkQueue-new}{\code{WorkQueue$new()}} +\item \href{#method-WorkQueue-schedule_work}{\code{WorkQueue$schedule_work()}} +\item \href{#method-WorkQueue-clone}{\code{WorkQueue$clone()}} } } \if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-new}{}}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-WorkQueue-new}{}}} \subsection{Method \code{new()}}{ Create a new \code{WorkQueue} \subsection{Usage}{ @@ -67,8 +67,8 @@ Schedule work} } } \if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-schedule_work}{}}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-WorkQueue-schedule_work}{}}} \subsection{Method \code{schedule_work()}}{ \subsection{Usage}{ \if{html}{\out{
}}\preformatted{WorkQueue$schedule_work(fn)}\if{html}{\out{
}} @@ -83,8 +83,8 @@ Schedule work} } } \if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-clone}{}}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-WorkQueue-clone}{}}} \subsection{Method \code{clone()}}{ The objects of this class are cloneable with this method. \subsection{Usage}{ diff --git a/man/future_promise.Rd b/man/future_promise.Rd index 62b8af1..b2a4bff 100644 --- a/man/future_promise.Rd +++ b/man/future_promise.Rd @@ -55,11 +55,11 @@ For more details and examples, please see the \href{https://rstudio.github.io/pr } \section{Functions}{ \itemize{ -\item \code{future_promise_queue}: Default \code{future_promise()} work queue to use. This function returns a \link{WorkQueue} that is cached per R session. +\item \code{future_promise_queue()}: Default \code{future_promise()} work queue to use. This function returns a \link{WorkQueue} that is cached per R session. -\item \code{future_promise}: Creates a \code{\link[=promise]{promise()}} that will execute the \code{expr} using \code{\link[future:future]{future::future()}}. -}} +\item \code{future_promise()}: Creates a \code{\link[=promise]{promise()}} that will execute the \code{expr} using \code{\link[future:future]{future::future()}}. +}} \examples{ \donttest{# Relative start time start <- Sys.time() diff --git a/man/then.Rd b/man/then.Rd index 2adb0a8..e43383e 100644 --- a/man/then.Rd +++ b/man/then.Rd @@ -63,9 +63,11 @@ the return value of \code{then} is also a (newly created) promise. This new promise waits for the original promise to be fulfilled or rejected, and for \code{onFulfilled} or \code{onRejected} to be called. The result of (or error raised by) calling \code{onFulfilled}/\code{onRejected} will be used to fulfill (reject) the -new promise.\preformatted{promise_a <- get_data_frame_async() +new promise. + +\if{html}{\out{
}}\preformatted{promise_a <- get_data_frame_async() promise_b <- then(promise_a, onFulfilled = head) -} +}\if{html}{\out{
}} In this example, assuming \code{get_data_frame_async} returns a promise that eventually resolves to a data frame, \code{promise_b} will eventually resolve to @@ -76,7 +78,9 @@ whether \code{onFulfilled}/\code{onRejected} returns a value or throws an error, whether the original promise was fulfilled or rejected. In other words, it's possible to turn failure to success and success to failure. Consider this example, where we expect \code{some_async_operation} to fail, and want to consider -it an error if it doesn't:\preformatted{promise_c <- some_async_operation() +it an error if it doesn't: + +\if{html}{\out{
}}\preformatted{promise_c <- some_async_operation() promise_d <- then(promise_c, onFulfilled = function(value) \{ stop("That's strange, the operation didn't fail!") @@ -86,20 +90,22 @@ promise_d <- then(promise_c, NULL \} ) -} +}\if{html}{\out{
}} Now, \code{promise_d} will be rejected if \code{promise_c} is fulfilled, and vice versa. \strong{Warning:} Be very careful not to accidentally turn failure into success, -if your error handling code is not the last item in a chain!\preformatted{some_async_operation() \%>\% +if your error handling code is not the last item in a chain! + +\if{html}{\out{
}}\preformatted{some_async_operation() \%>\% catch(function(reason) \{ warning("An error occurred: ", reason) \}) \%>\% then(function() \{ message("I guess we succeeded...?") # No! \}) -} +}\if{html}{\out{
}} In this example, the \code{catch} callback does not itself throw an error, so the subsequent \code{then} call will consider its promise fulfilled! From aefe2dcd938db6d16152faac71835e3db2d97eb2 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 7 Feb 2023 15:13:45 -0500 Subject: [PATCH 6/7] Remove `LazyData` from DESCRIPTION --- DESCRIPTION | 1 - 1 file changed, 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index ec2422a..abab9aa 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -33,7 +33,6 @@ LinkingTo: later, Roxygen: list(markdown = TRUE) RoxygenNote: 7.2.3 Encoding: UTF-8 -LazyData: true VignetteBuilder: knitr URL: https://rstudio.github.io/promises/, https://github.com/rstudio/promises BugReports: https://github.com/rstudio/promises/issues From 3939a3c0b9a6ca28141a257b08446f14d9fedae5 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 7 Feb 2023 15:27:21 -0500 Subject: [PATCH 7/7] Attempt to make test more robust on mac by allowing for more time --- tests/testthat/test-zzz-future_promise.R | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/testthat/test-zzz-future_promise.R b/tests/testthat/test-zzz-future_promise.R index 810c0bb..9885a10 100644 --- a/tests/testthat/test-zzz-future_promise.R +++ b/tests/testthat/test-zzz-future_promise.R @@ -9,6 +9,7 @@ source(test_path("common.R")) local({ ## Setup ## + n_workers <- 2 # Set up a plan with 2 future workers with_test_workers <- function(code) { # (Can not use a variable for workers if in a local({})) @@ -44,7 +45,7 @@ local({ worker_jobs <- 8 worker_job_time <- 1 - expected_total_time <- worker_jobs * worker_job_time / 2 # 2 workers + expected_total_time <- worker_jobs * worker_job_time / n_workers do_future_test <- function( prom_fn = future_promise, @@ -74,11 +75,11 @@ local({ future_exec_times <- c() if (block_mid_session) { # Have `future` block the main R session 1 second into execution - lapply(1:8, function(i) { + lapply(seq_len(worker_jobs), function(i) { later::later( function() { future::future({ - Sys.sleep(1) + Sys.sleep(worker_job_time) time_diff() }) %...>% { future_exec_times <<- c(future_exec_times, .) @@ -89,7 +90,6 @@ local({ }) } - exec_times <- NA lapply(seq_len(worker_jobs), function(i) { prom_fn({ @@ -112,7 +112,7 @@ local({ expect_equal(all(exec_times_lag < (2 * worker_job_time)), expect_reasonable_exec_lag_time) # post_lapply_time_diff should be ~ 0s - expect_equal(post_lapply_time_diff < worker_job_time, expect_immediate_lapply) + expect_equal(post_lapply_time_diff < (worker_job_time * ((worker_jobs - n_workers) / n_workers)), expect_immediate_lapply) # time_diffs should never grow by more than 1s; (Expected 0.1) time_diffs_lag <- time_diffs[-1] - time_diffs[-length(time_diffs)]