Skip to content

Commit

Permalink
Do not leave zombie processes behind in parallel. Improvements to signal
Browse files Browse the repository at this point in the history
handling and termination of forked processes in parallel.


git-svn-id: https://svn.r-project.org/R/trunk@73775 00db46b3-68df-0310-9c12-caf00c1e9a41
  • Loading branch information
kalibera committed Nov 22, 2017
1 parent 438f071 commit eb46800
Show file tree
Hide file tree
Showing 10 changed files with 672 additions and 285 deletions.
4 changes: 4 additions & 0 deletions doc/NEWS.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@

\item Subassignment with zero length vectors now coerces as
documented. (\PR{17344})

\item \code{mclapply}, \code{pvec} and \code{mcparallel} (when
\code{mccollect} is used to collect results) no longer leave zombie
processes behind.
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/library/parallel/R/unix/forkCluster.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# File src/library/parallel/R/unix/forkCluster.R
# Part of the R package, https://www.R-project.org
#
# Copyright (C) 1995-2014 The R Core Team
# Copyright (C) 1995-2017 The R Core Team
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -36,7 +36,7 @@ newForkNode <- function(..., options = defaultClusterOptions, rank)
timeout <- getClusterOption("timeout", options)
renice <- getClusterOption("renice", options)

f <- mcfork()
f <- mcfork(TRUE)
if (inherits(f, "masterProcess")) { # the slave
on.exit(mcexit(1L, structure("fatal error in wrapper code",
class = "try-error")))
Expand All @@ -55,8 +55,6 @@ newForkNode <- function(..., options = defaultClusterOptions, rank)
Sys.getpid(), paste(master, port, sep = ":"),
format(Sys.time(), "%H:%M:%OS3"))
cat(msg)
## allow this to quit when the loop is done.
tools::pskill(Sys.getpid(), tools::SIGUSR1)
if(!is.na(renice) && renice) ## ignore 0
tools::psnice(Sys.getpid(), renice)
slaveLoop(makeSOCKmaster(master, port, timeout))
Expand Down
41 changes: 34 additions & 7 deletions src/library/parallel/R/unix/mcfork.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# File src/library/parallel/R/unix/mcfork.R
# Part of the R package, https://www.R-project.org
#
# Copyright (C) 1995-2013 The R Core Team
# Copyright (C) 1995-2017 The R Core Team
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -24,8 +24,9 @@

## registered as finalizer in .onLoad() to kill all child processes
clean_pids <- function(e)
if(length(pids <- sapply(children(), function(o) o$pid))) tools::pskill(pids, tools::SIGKILL)
cleanup(kill = tools::SIGKILL, detach = TRUE, shutdown = TRUE)

## used in mclapply, mcparallel, newWorkNode
mcfork <- function(estranged = FALSE) {
r <- .Call(C_mc_fork, estranged)

Expand Down Expand Up @@ -56,21 +57,24 @@ selectChildren <- function(children = NULL, timeout = 0)
if (!length(children)) children <- integer()
if (inherits(children, "process")) children <- processID(children)
if (is.list(children))
children <- unlist(lapply(children, function(x) if (inherits(x, "process")) x$pid
else stop("'children' must be a list of processes or a single process")))
children <- unlist(lapply(children, function(x)
if (inherits(x, "process")) processID(x)
else stop("'children' must be a list of processes or a single process")
))
if (!is.numeric(children))
stop("'children' must be a list of processes or a single process")
.Call(C_mc_select_children, as.double(timeout), as.integer(children))
}

## not used
rmChild <- function(child)
{
if (inherits(child, "process")) child <- processID(child)
if (!is.numeric(child)) stop("invalid 'child' argument")
.Call(C_mc_rm_child, as.integer(child))
}

## used in pvec, mclapply
## not used
mckill <- function(process, signal = 2L)
{
process <- processID(process)
Expand All @@ -87,14 +91,15 @@ sendMaster <- function(what)
.Call(C_mc_send_master, what)
}

## used widely, not exported
processID <- function(process) {
if (inherits(process, "process")) process$pid
else if (is.list(process)) unlist(lapply(process, processID))
else stop(gettextf("'process' must be of class %s", dQuote("process")),
domain = NA)
}

# unused in the package
# not used
sendChildStdin <- function(child, what)
{
if (inherits(child, "process") || is.list(child)) child <- processID(child)
Expand All @@ -107,7 +112,7 @@ sendChildStdin <- function(child, what)
.Call(C_mc_send_child_stdin, p, what))))
}

## used by mcparallel, mclapply
## used by mcparallel, mclapply, newForkNode
mcexit <- function(exit.code = 0L, send = NULL)
{
if (!is.null(send)) try(sendMaster(send), silent = TRUE)
Expand All @@ -124,17 +129,26 @@ children <- function(select)
structure(list(pid = x), class = c("childProcess", "process")))
}

## not used
childrenDescriptors <- function(index = 0L)
.Call(C_mc_fds, as.integer(index))

## not used
masterDescriptor <- function() .Call(C_mc_master_fd)

## used by mclapply
isChild <- function() .Call(C_mc_is_child)

## used by mccollect, mclapply
closeStdout <- function(to.null=FALSE) .Call(C_mc_close_stdout, to.null)

## not used
closeStderr <- function(to.null=FALSE) .Call(C_mc_close_stderr, to.null)

## not used
closeFD <- function(fds) .Call(C_mc_close_fds, as.integer(fds))

## not used
closeAll <- function(includeStd = FALSE)
{
if (!isChild()) {
Expand All @@ -148,3 +162,16 @@ closeAll <- function(includeStd = FALSE)
## close all but those that we actually use
closeFD((1:mf)[-fds])
}

# used by mcparallel, mclapply, mcmapply
mcaffinity <- function(affinity = NULL) .Call(C_mc_affinity, affinity)

# used by mcparallel
mcinteractive <- function(interactive) .Call(C_mc_interactive, interactive)

# used by mclapply, pvec
prepareCleanup <- function() .Call(C_mc_prepare_cleanup)

# used by mclapply, pvec, mccollect
cleanup <- function(kill = TRUE, detach = TRUE, shutdown = FALSE)
.Call(C_mc_cleanup, kill, detach, shutdown)
38 changes: 15 additions & 23 deletions src/library/parallel/R/unix/mclapply.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,9 @@ mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
}

jobs <- list()
cleanup <- function() {
## kill children if cleanup is requested
if (length(jobs) && mc.cleanup) {
## first take care of uncollected children
mccollect(children(jobs), FALSE)
mckill(children(jobs),
if (is.integer(mc.cleanup)) mc.cleanup else tools::SIGTERM)
mccollect(children(jobs))
}
if (length(jobs)) {
## just in case there are zombies
mccollect(children(jobs), FALSE)

## just in case there are open file descriptors
sapply(children(jobs), function(x) rmChild(x$pid))
}
}
on.exit(cleanup())
## all processes created from now on will be terminated by cleanup
prepareCleanup()
on.exit(cleanup(mc.cleanup))
## Follow lapply
if(!is.vector(X) || is.object(X)) X <- as.list(X)
if(!is.null(affinity.list) && length(affinity.list) < length(X))
Expand Down Expand Up @@ -115,8 +100,8 @@ mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
jobsp <- processID(jobs)
has.errors <- 0L
while (!all(fin)) {
s <- selectChildren(jobs, 0.5)
if (is.null(s)) break # no children -> no hope
s <- selectChildren(jobs[!is.na(jobsp)], -1)
if (is.null(s)) break # no children -> no hope (should not happen)
if (is.integer(s))
for (ch in s) {
ji <- match(TRUE, jobsp == ch)
Expand All @@ -131,6 +116,9 @@ mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
if (!is.null(child.res)) res[[ci]] <- child.res
} else {
fin[ci] <- TRUE
## the job has finished, so we must not run
## select on its fds again
jobsp[ji] <- jobid[ji] <- NA
if (any(ava)) { # still something to do,
## look for first job which is allowed to
## run on the now idling core and spawn it
Expand Down Expand Up @@ -182,15 +170,15 @@ mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
mcexit(0L)
}
jobs[[core]] <<- ch[[core]] <<- f
cp[core] <<- f$pid
cp[core] <<- processID(f)
NULL
}
job.res <- lapply(seq_len(cores), inner.do)
ac <- cp[cp > 0]
has.errors <- integer(0)
while (!all(fin)) {
s <- selectChildren(ac, 1)
if (is.null(s)) break # no children -> no hope we get anything
s <- selectChildren(ac[!fin], -1)
if (is.null(s)) break # no children -> no hope we get anything (should not happen)
if (is.integer(s))
for (ch in s) {
a <- readChild(ch)
Expand All @@ -203,6 +191,10 @@ mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
if (inherits(ijr, "try-error"))
has.errors <- c(has.errors, core)
dr[core] <- TRUE
} else if (is.null(a)) {
# the child no longer exists (should not happen)
core <- which(cp == ch)
fin[core] <- TRUE
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/library/parallel/R/unix/mcparallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

### Derived from multicore version 0.1-6 by Simon Urbanek

mcaffinity <- function(affinity = NULL) .Call(C_mc_affinity, affinity)

mcparallel <- function(expr, name, mc.set.seed = TRUE, silent = FALSE, mc.affinity = NULL, mc.interactive = FALSE, detached = FALSE)
{
f <- mcfork(detached)
Expand All @@ -30,9 +28,9 @@ mcparallel <- function(expr, name, mc.set.seed = TRUE, silent = FALSE, mc.affini
class = "try-error")))
if (isTRUE(mc.set.seed)) mc.set.stream()
mc.interactive <- as.logical(mc.interactive)
if (isTRUE(mc.interactive)) .Call(C_mc_interactive, TRUE)
if (isTRUE(!mc.interactive)) .Call(C_mc_interactive, FALSE)
if (!is.null(mc.affinity)) .Call(C_mc_affinity, mc.affinity)
if (isTRUE(mc.interactive)) mcinteractive(TRUE)
if (isTRUE(!mc.interactive)) mcinteractive(FALSE)
if (!is.null(mc.affinity)) mcaffinity(mc.affinity)
if (isTRUE(silent)) closeStdout(TRUE)
if (detached) {
on.exit(mcexit(1L))
Expand Down Expand Up @@ -76,7 +74,7 @@ mccollect <- function(jobs, wait = TRUE, timeout = 0, intermediate = FALSE)
names(res) <- pnames
fin <- rep(FALSE, length(jobs))
while (!all(fin)) {
s <- selectChildren(pids, 0.5)
s <- selectChildren(pids[!fin], -1)
if (is.integer(s)) {
for (pid in s) {
r <- readChild(pid)
Expand All @@ -85,8 +83,11 @@ mccollect <- function(jobs, wait = TRUE, timeout = 0, intermediate = FALSE)
res[which(pid == pids)] <- list(unserialize(r))
}
if (is.function(intermediate)) intermediate(res)
} else if (all(is.na(match(pids, processID(children()))))) break
} else
## should not happen
if (all(is.na(match(pids, processID(children()))))) break
}
}
cleanup(kill = FALSE, detach = FALSE) # compact children
res
}
19 changes: 2 additions & 17 deletions src/library/parallel/R/unix/pvec.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,8 @@ pvec <- function(v, FUN, ..., mc.set.seed = TRUE, mc.silent = FALSE,
lapply(seq_len(cores), function(ix) v[si[ix]:se[ix]])
}
jobs <- NULL
cleanup <- function() {
## kill children if cleanup is requested
if (length(jobs) && mc.cleanup) {
## first take care of uncollected children
mccollect(children(jobs), FALSE)
mckill(children(jobs),
if (is.integer(mc.cleanup)) mc.cleanup else 15L)
mccollect(children(jobs))
}
if (length(jobs)) {
## just in case there are zombies
mccollect(children(jobs), FALSE)

## just in case there are open file descriptors
sapply(children(jobs), function(x) rmChild(x$pid))
}
}
## all processes created from now on will be terminated by cleanup
prepareCleanup()
on.exit(cleanup())
FUN <- match.fun(FUN)
## may have more cores than tasks ....
Expand Down
13 changes: 13 additions & 0 deletions src/library/parallel/man/unix/mcparallel.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ mccollect(jobs, wait = TRUE, timeout = 0, intermediate = FALSE)
terminating job that has sent its results already after which the
job is no longer available.

Jobs are identified by process IDs (even when referred to as job objects),
which are reused by the operating system. Detached jobs created by
\code{mcparallel} can thus never be safely referred to by their process
IDs nor job objects. Non-detached jobs are guaranteed to exist until
collected by \code{mccollect}, even if crashed or terminated by a signal.
Once collected by \code{mccollect}, a job is regarded as detached, and
thus must no longer be referred to by its process ID nor its job object.
With \code{wait = TRUE}, all jobs passed to \code{mccollect} are
collected. With \code{wait = FALSE}, the collected jobs are given as
names of the result vector, and thus in subsequent calls to
\code{mccollect} these jobs must be excluded. Job objects should be used
in preference of process IDs whenever accepted by the API.

The \code{mc.affinity} parameter can be used to try to restrict
the child process to specific CPUs. The availability and the extent of
this feature is system-dependent (e.g., some systems will only
Expand Down
Loading

0 comments on commit eb46800

Please sign in to comment.