Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work hangs: mclapply() parallelism within clustermq jobs #103

Closed
wlandau opened this issue Aug 27, 2018 · 4 comments
Closed

Work hangs: mclapply() parallelism within clustermq jobs #103

wlandau opened this issue Aug 27, 2018 · 4 comments

Comments

@wlandau
Copy link
Contributor

wlandau commented Aug 27, 2018

I suspect this is related to #99, but it is an important use case, so I thought I should post something for the record. Feel free to close if you think R-devel already fixed it.

The following little drake pipeline sends jobs to an SGE cluster, and each job uses mclapply() to parallelize its own work. It hangs when mc.cores is greater than 1, and it completes normally (and very quickly) when mc.cores equals 1. I am using ropensci/drake@c6395ee, and ecfdb9d. Other session info is here.

library(drake)
library(magrittr)
options(
  clustermq.scheduler = "sge",
  clustermq.template = "sge_clustermq.tmpl"
)
plan <- drake_plan(
  x = parallel::mclapply(1:4 + i__, sqrt, mc.cores = 4),
  y = mean(unlist(x_i__))
) %>%
  evaluate_plan(wildcard = "i__", values = 1:8)
clean(destroy = TRUE)
make(plan, verbose = 4, parallelism = "clustermq", jobs = 8)

The template file makes sure each job gets 4 cores.

# From https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }}               # job name
#$ -t 1-{{ n_jobs }}               # submit jobs as array
#$ -j y                            # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }}   # output file
#$ -cwd                            # use pwd as work dir
#$ -V                              # use environment variable
#$ -pe smp 4                       # request 1 core per job
module load R
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

We get pretty far along in the workflow, but it hangs before starting x_8.

cache /lrlhps/users/c240390/reprex-cmq/.drake
analyze environment
analyze 6 imports: ld, wd, td, spell_check_ignore, plan, dl
analyze 16 targets: x_1, x_2, x_3, x_4, x_5, x_6, x_7, x_8, y_1, y_2, y_3, y_...
construct graph edges
construct vertex attributes
construct graph
import parallel::mclapply
import sqrt
import mean
import unlist
Submitting 8 worker jobs (ID: 7227) ...
target x_1
target x_2
target x_3
target x_4
target x_5
target x_6
target x_7
target x_8
target y_1
load 1 item: x_1
target y_2
unload 1 item: x_1
load 1 item: x_2
target y_4
unload 1 item: x_2
load 1 item: x_4
target y_3
unload 1 item: x_4
load 1 item: x_3
target y_6
unload 1 item: x_3
load 1 item: x_6
target y_7
unload 1 item: x_6
load 1 item: x_7

qstat shows that some, but not all, of the workers are still running.

job-ID     prior   name       user         state submit/start at     queue                          jclass                         slots ja-task-ID
------------------------------------------------------------------------------------------------------------------------------------------------
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:27:52 CENSORED                                         4 3
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:27:54 CENSORED                                         4 4
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:27:54 CENSORED                                         4 5
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:28:03 CENSORED                                         4 6
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:28:06 CENSORED                                         4 7
  37636485 0.55350 cmq7227    CENSORED     r     08/27/2018 15:28:10 CENSORED                                         4 8
@mschubert
Copy link
Owner

Can you try this with clustermq only and post the worker log file?

Q(..., template=list(log_file="..."))

@wlandau
Copy link
Contributor Author

wlandau commented Aug 28, 2018

options(
  clustermq.scheduler = "sge",
  clustermq.template = "sge_clustermq.tmpl"
)
library(clustermq)
f <- function(i){
  parallel::mclapply(1:4 + i, sqrt, mc.cores = 4)
}
Q(f, 1:8, n_jobs = 8, template = list(log_file = "log.txt"))
#> Submitting 8 worker jobs (ID: 6424) ...
#> Running 8 calculations (1 calls/chunk) ...
#> [===============================>--------------------]  62% (4/4 wrk) eta:  6s

At this point, the work hung, so I sent SIGINT with CTRL-C.

^CError in rz mq::poll.socket(list(private$socket), list("read"), timeout = msec) :
  The operation was interrupted by delivery of a signal before any events were available.
Calls: Q ... master -> <Anonymous> -> <Anonymous> -> <Anonymous>
^CExecution halted

Log file:

> clustermq:::worker("tcp://CLUSTER-LOGIN-NODE:6424")
Master: tcp://CLUSTER-LOGIN-NODE:6424
WORKER_UP to: tcp://CLUSTER-LOGIN-NODE:6424
> DO_SETUP (0.000s wait)
token from msg: ubust
> WORKER_STOP (0.000s wait)
shutting down worker

Total: 0 in 0.00s [user], 0.00s [system], 0.01s [elapsed]

@mschubert
Copy link
Owner

Thank you, I could reproduce this now: it was caused by poll.socket() returning NULL on non-critical interrupt, which was not handled properly by the worker (from mschubert:rzmq/signal)

@wlandau
Copy link
Contributor Author

wlandau commented Aug 31, 2018

Fixed on my end. Thanks very much.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants