-
Notifications
You must be signed in to change notification settings - Fork 27
/
foreach.r
64 lines (56 loc) · 2.45 KB
/
foreach.r
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#' Register clustermq as `foreach` parallel handler
#'
#' @param ... List of arguments passed to the `Q` function, e.g. n_jobs
#' @export
register_dopar_cmq = function(...) {
dots = list(...)
workers = NA
if ("n_jobs" %in% names(dots))
workers = dots$n_jobs
info = function(data, item) {
switch(item,
name = "clustermq",
version = utils::packageVersion("clustermq"),
workers = workers)
}
foreach::setDoPar(cmq_foreach, data=dots, info=info)
}
#' clustermq foreach handler
#'
#' @param obj Returned from foreach::foreach, containing the following variables:
#' args : Arguments passed, each as a call
#' argnames: character vector of arguments passed
#' evalenv : Environment where to evaluate the arguments
#' export : character vector of variable names to export to nodes
#' packages: character vector of required packages
#' verbose : whether to print status messages [logical]
#' errorHandling: string of function name to call error with, e.g. "stop"
#' @param expr An R expression in curly braces
#' @param envir Environment where to evaluate the arguments
#' @param data Common arguments passed by register_dopcar_cmq(), e.g. n_jobs
#' @keywords internal
cmq_foreach = function(obj, expr, envir, data) {
stopifnot(inherits(obj, "foreach"))
stopifnot(inherits(envir, "environment"))
it = iterators::iter(obj)
args_df = do.call(rbind, as.list(it))
# if we call a function by name, add it to the export list
if (is.call(expr) && as.character(expr[[1]]) != "{")
obj$export = c(as.character(expr[[1]]), obj$export)
# wrap whatever we call in a function for use with Q(...)
fun = function(...) NULL
add = stats::setNames(replicate(ncol(args_df), substitute()), obj$argnames)
formals(fun) = c(add, formals(fun))
body(fun) = expr
# scan 'expr' for exports, eval and add objects ref'd in '.export'
globs = globals::globalsOf(expr, envir=envir, mustExist=FALSE)
globs = globs[! names(globs) %in% c(names(formals(fun)), ls(baseenv()))]
data$export = utils::modifyList(as.list(data$export), globs, keep.null=TRUE)
# make sure packages are loaded on the dopar target
if (length(obj$packages) > 0)
data$pkgs = unique(c(data$pkgs, obj$packages))
result = do.call(Q_rows, c(list(df=args_df, fun=fun), data))
accum = foreach::makeAccum(it)
accum(result, tags=seq_along(result))
foreach::getResult(it)
}