Skip to content

Commit

Permalink
Start sketching #30 and #31
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Mar 5, 2023
1 parent 14a193c commit 1397660
Show file tree
Hide file tree
Showing 14 changed files with 126 additions and 253 deletions.
5 changes: 1 addition & 4 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
# Generated by roxygen2: do not edit by hand

S3method(is_controller,crew_class_controller)
S3method(is_controller,default)
S3method(is_launcher,crew_class_launcher_callr)
S3method(is_launcher,default)
S3method(is_router,crew_class_router)
S3method(is_router,default)
export(crew_class_controller)
export(crew_class_launcher_callr)
export(crew_class_multi_controller)
Expand All @@ -17,6 +13,7 @@ export(crew_launcher_callr)
export(crew_multi_controller)
export(crew_router)
export(crew_wait)
export(is_controller.crew_class_controller)
importFrom(R6,R6Class)
importFrom(callr,r_bg)
importFrom(getip,getip)
Expand Down
2 changes: 1 addition & 1 deletion R/crew_controller.R
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ crew_class_controller <- R6::R6Class(
validate = function() {
true(is.list(self$queue))
true(is.list(self$results))
true(is_router(self$router))
true(inherits(self$router, "crew_class_router"))
true(is_launcher(self$launcher))
self$router$validate()
self$launcher$validate()
Expand Down
4 changes: 2 additions & 2 deletions R/crew_controller_callr.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ crew_controller_callr <- function(
name = NULL,
workers = 1L,
host = NULL,
ports = NULL,
port = NULL,
router_timeout = 5,
router_wait = 0.1,
idle_time = Inf,
Expand All @@ -36,7 +36,7 @@ crew_controller_callr <- function(
name = name,
workers = workers,
host = host,
ports = ports,
port = port,
router_timeout = router_timeout,
router_wait = router_wait
)
Expand Down
9 changes: 2 additions & 7 deletions R/crew_launcher_callr.R
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ crew_class_launcher_callr <- R6::R6Class(
classname = c("crew_class_launcher_callr"),
cloneable = FALSE,
public = list(
#' @field sockets TCP sockets for listening to the workers.
#' @field sockets Websockets for listening to the workers.
sockets = NULL,
#' @field workers List of `callr::r_bg()` handles for workers.
#' The handle is `NA` if it is not yet called.
Expand Down Expand Up @@ -172,7 +172,7 @@ crew_class_launcher_callr <- R6::R6Class(
},
#' @description Populate workers.
#' @return `NULL` (invisibly).
#' @param sockets Character vector of local TCP sockets that the workers
#' @param sockets Character vector of local websockets that the workers
#' will use to dial in to receive tasks.
populate = function(sockets = character(0)) {
true(sockets, length(.) > 0L, is.character(.), nzchar(.), !anyNA(.))
Expand All @@ -181,11 +181,6 @@ crew_class_launcher_callr <- R6::R6Class(
self$validate()
invisible()
},
#' @description Count the number of running workers.
#' @return Number of running workers.
running = function() {
sum(map_lgl(self$workers, ~process_running(.x)))
},
#' @description Launch one or more workers.
#' @details The actual number of newly launched workers
#' is less than or equal to the number of sockets
Expand Down
2 changes: 1 addition & 1 deletion R/crew_multi_controller.R
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ crew_class_multi_controller <- R6::R6Class(
#' @return `NULL` (invisibly).
validate = function() {
true(
map_lgl(self$controllers, is_controller),
map_lgl(self$controllers, ~inherits(.x, "crew_class_controller")),
message = "All objects in a multi-controller must be controllers."
)
out <- unname(map_chr(self$controllers, ~.x$router$name))
Expand Down
141 changes: 49 additions & 92 deletions R/crew_router.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,53 @@
#' @export
#' @keywords internal
#' @family routers
#' @description Create an `R6` object to route tasks to workers
#' on the local network.
#' @section Ports:
#' In the `mirai`-based task scheduling in `crew`, each parallel worker
#' dials into a different TCP port on the local machine.
#' If you launch hundreds of workers, then hundreds of ports will
#' not be available to other users or processes.
#' Large numbers of workers on shared machines or clusters may
#' seriously disrupt the tasks of other users, so please be careful.
#' @param name Name of the `mirai` router.
#' Defaults to a string from `ids::proquint()`.
#' @description Create an `R6` object to manage the `mirai` task scheduler
#' client.
#' @param name Name of the router object. If `NULL`, a name is automatically
#' generated.
#' @param workers Integer, maximum number of parallel workers to run.
#' `crew` will reserve one ephemeral port for each worker. See the
#' Ports section for an important cautionary note.
#' @param host IP address of the client process that the workers can dial
#' into inside the local network.
#' If a character string, the router uses the specified IP address.
#' If `NULL`, the IP address defaults to `getip::getip(type = "local")`.
#' @param ports Optional integer vector of TCP port numbers.
#' Supersedes `workers` if supplied. Except for 0,
#' which defers to NNG to automatically assign a port, each unique port
#' value corresponds to a local port where a worker will dial in
#' to accept tasks.
#' @param port TCP port to listen for the workers. If `NULL`,
#' then an available port is supplied through `parallelly::freePort()`.
#' @param router_timeout Number of seconds to time out waiting for the `mirai`
#' router to (dis)connect.
#' client to (dis)connect.
#' @param router_wait Number of seconds to wait between iterations checking
#' if the `mirai` router is (dis)connected.
#' if the `mirai` client is (dis)connected.
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' router <- crew_router()
#' router$sockets_listening() # character(0)
#' router$connect()
#' router$sockets_listening() # "tcp://xx.xx.xx:xxxxx"
#' router$sockets_listening() # "ws://xx.xx.xx:xxxxx/x"
#' router$disconnect()
#' router$sockets_listening() # character(0)
#' }
crew_router <- function(
name = NULL,
workers = 1L,
host = NULL,
ports = NULL,
port = NULL,
router_timeout = 5,
router_wait = 0.1
) {
true(workers, is.numeric(.), length(.) == 1L, . > 0L, !anyNA(.))
name <- as.character(name %|||% random_name())
host <- as.character(host %|||% local_ipv4())
ports <- as.integer(ports %|||% rep(0, workers))
port <- as.integer(port %|||% free_port())
true(name, is.character(.), length(.) == 1L, nzchar(.), !anyNA(.))
true(host, is.character(.), length(.) == 1L, nzchar(.), !anyNA(.))
true(ports, is.integer(.), length(.) > 0L, !anyNA(.))
true(ports, . >= 0L, . <= 65535L)
true(port, is.integer(.), length(.) == 1L, !anyNA(.))
true(port, . >= 0L, . <= 65535L)
true(router_timeout, is.numeric(.), length(.) == 1L, !is.na(.), . >= 0)
true(router_wait, is.numeric(.), length(.) == 1L, !is.na(.), . >= 0)
true(router_timeout >= router_wait)
sockets <- web_sockets(host = host, port = port, n = workers)
router <- crew_class_router$new(
name = name,
host = host,
ports = ports,
sockets = sockets,
router_timeout = router_timeout,
router_wait = router_wait
)
Expand All @@ -78,7 +66,7 @@ crew_router <- function(
#' router <- crew_router()
#' router$sockets_listening() # character(0)
#' router$connect()
#' router$sockets_listening() # "tcp://xx.xx.xx:xxxxx"
#' router$sockets_listening() # "ws://xx.xx.xx:xxxxx/x"
#' router$disconnect()
#' router$sockets_listening() # character(0)
#' }
Expand All @@ -88,52 +76,46 @@ crew_class_router <- R6::R6Class(
public = list(
#' @field name Name of the router.
name = NULL,
#' @field host Local IP address.
host = NULL,
#' @field ports TCP ports to listen to workers.
ports = NULL,
#' @field router_timeout Timeout in seconds
#' (dis)connecting the `mirai` router.
#' @field sockets Websockets to listen for workers.
sockets = NULL,
#' @field router_timeout Timeout in seconds for checking the `mirai`
#' client connection.
router_timeout = NULL,
#' @field router_wait Polling interval in seconds checking if the `mirai`
#' router successfully (dis)connected.
#' @field router_wait Polling interval in seconds for checking the
#' `mirai` client connection.
router_wait = NULL,
#' @description `mirai` router constructor.
#' @return An `R6` object with the router.
#' @param name Argument passed from [crew_router()].
#' @param host Argument passed from [crew_router()].
#' @param ports Argument passed from [crew_router()].
#' @param sockets Argument passed from [crew_router()].
#' @param router_timeout Argument passed from [crew_router()].
#' @param router_wait Argument passed from [crew_router()].
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' router <- crew_router()
#' router$sockets_listening() # character(0)
#' router$connect()
#' router$sockets_listening() # "tcp://xx.xx.xx:xxxxx"
#' router$sockets_listening() # "ws://xx.xx.xx:xxxxx/x"
#' router$disconnect()
#' router$sockets_listening() # character(0)
#' }
initialize = function(
name = NULL,
host = NULL,
ports = NULL,
sockets = NULL,
router_timeout = NULL,
router_wait = NULL
) {
self$name <- name
self$host <- host
self$ports <- ports
self$sockets <- sockets
self$router_timeout <- router_timeout
self$router_wait <- router_wait
},
#' @description Validate the router.
#' @return `NULL` (invisibly).
validate = function() {
true(self$name, is.character(.), length(.) == 1L, nzchar(.), !anyNA(.))
true(self$host, is.character(.), length(.) == 1L, nzchar(.), !anyNA(.))
true(self$ports, is.integer(.), length(.) > 0L, !anyNA(.))
true(self$ports, . >= 0L, . <= 65535L)
true(self$sockets, is.character(.), nzchar(.), !anyNA(.))
true(all(grepl(pattern = "^ws\\:\\/\\/", x = self$sockets)))
true(
self$router_timeout,
is.numeric(.),
Expand All @@ -144,62 +126,43 @@ crew_class_router <- R6::R6Class(
true(self$router_timeout >= self$router_wait)
invisible()
},
#' @description Get all sockets
#' @return Character vector of all sockets if connected, or
#' `character(0)` if the router is not connected
#' or polling the sockets is unsuccessful
#' (which may happen if the sockets are busy at the moment).
sockets_listening = function() {
nodes <- mirai::daemons(.compute = self$name)$nodes
if_any(
mirai::is_error_value(nodes),
character(0),
as.character(names(nodes))
)
},
#' @description Show worker connections.
#' @return Character vector of TCP sockets where workers
#' @description Get occupied worker sockets.
#' @return Character vector of websockets sockets where workers
#' are currently dialed in.
#' Returns `character(0)` if the router is not connected
#' or polling the sockets is unsuccessful
#' (which may happen if the sockets are busy at the moment).
sockets_occupied = function() {
occupied = function() {
nodes <- mirai::daemons(.compute = self$name)$nodes
if_any(
mirai::is_error_value(nodes),
character(0),
as.character(names(nodes)[nodes > 0L])
)
as.character(names(nodes)[nodes == 1L])
},
#' @description Get TCP sockets that are available for workers to dial in.
#' @return Character string with available TCP sockets.
#' A return value of `character(0)` may indicate that
#' the router is not connected
#' or polling the sockets is unsuccessful
#' (which may happen if the sockets are busy at the moment).
sockets_available = function() {
as.character(setdiff(self$sockets_listening(), self$sockets_occupied()))
#' @description Get worker sockets that are unoccupied and
#' available for workers to dial in.
#' @return Character string with available websockets.
unoccupied = function() {
nodes <- mirai::daemons(.compute = self$name)$nodes
as.character(names(nodes)[nodes == 0L])
},
#' @description Check if the router is connected.
#' @details This method may stall and time out if there are
#' tasks in the queue. Methods `connect()` and `disconnect()`
#' call `is_connected()` to manage the connection before
#' call `connected()` to manage the connection before
#' and after the entire workload, respectively.
#' @return `TRUE` if successfully listening for dialed-in workers,
#' `FALSE` otherwise.
is_connected = function() {
connected = function() {
out <- mirai::daemons(.compute = self$name)$connections
(length(out) == 1L) && !anyNA(out) && is.numeric(out) && out > 0L
},
#' @description Start listening for workers on the available sockets.
#' @return `NULL` (invisibly).
connect = function() {
if (isFALSE(self$is_connected())) {
tcp <- tcp_sockets(host = self$host, ports = self$ports)
args <- list(value = tcp, nodes = length(tcp), .compute = self$name)
if (isFALSE(self$connected())) {
args <- list(
value = self$sockets,
nodes = length(self$sockets),
.compute = self$name
)
do.call(what = mirai::daemons, args = args)
crew_wait(
fun = ~isTRUE(self$is_connected()),
fun = ~isTRUE(self$connected()),
timeout = self$router_timeout,
wait = self$router_wait,
message = "mirai client cannot connect."
Expand All @@ -210,11 +173,11 @@ crew_class_router <- R6::R6Class(
#' @description Disconnect the router.
#' @return `NULL` (invisibly).
disconnect = function() {
if (isTRUE(self$is_connected())) {
if (isTRUE(self$connected())) {
try(mirai::daemons(value = 0L, .compute = self$name), silent = TRUE)
try(
crew_wait(
fun = ~isFALSE(self$is_connected()),
fun = ~isFALSE(self$connected()),
timeout = self$router_timeout,
wait = self$router_wait,
message = "mirai client cannot disconnect."
Expand All @@ -226,9 +189,3 @@ crew_class_router <- R6::R6Class(
}
)
)

#' @export
#' @keywords internal
is_router.crew_class_router <- function(x) {
TRUE
}
11 changes: 9 additions & 2 deletions R/utils_network.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ local_ipv4 <- function() {
getip::getip(type = "local")
}

tcp_sockets <- function(host, ports) {
sprintf("tcp://%s:%s", host, ports)
web_sockets <- function(host, port, n = 1L) {
sprintf("ws://%s:%s/worker%s", host, port, seq_len(n))
}

free_port <- function() {
parallelly::freePort(
ports = seq(from = 49152L, to = 65535L, by = 1L),
default = NA_integer_
)
}
20 changes: 0 additions & 20 deletions R/utils_oop.R
Original file line number Diff line number Diff line change
@@ -1,23 +1,3 @@
is_controller <- function(x) {
UseMethod("is_controller")
}

#' @export
#' @keywords internal
is_controller.default <- function(x) {
FALSE
}

is_router <- function(x) {
UseMethod("is_router")
}

#' @export
#' @keywords internal
is_router.default <- function(x) {
FALSE
}

is_launcher <- function(x) {
UseMethod("is_launcher")
}
Expand Down
Loading

0 comments on commit 1397660

Please sign in to comment.