Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent mirai sessions #3

Closed
wlandau opened this issue Feb 24, 2022 · 14 comments
Closed

Persistent mirai sessions #3

wlandau opened this issue Feb 24, 2022 · 14 comments
Labels
enhancement New feature or request

Comments

@wlandau
Copy link

wlandau commented Feb 24, 2022

Have you considered a persistent mirai where a single R session can send and receive multiple jobs? R startup time could be a bottleneck if there are numerous short-lived tasks. I am picturing something like callr::r_session as demonstrated at https://www.tidyverse.org/blog/2019/09/callr-task-q/. cc @krlmlr.

@shikokuchuo
Copy link
Owner

I have come across that article before. For my main use purposes R startup time is not really an issue as it is async anyway. But you probably have a specific use case in mind?

I am basically in the middle of re-writing the non-blocking call... a more optimal solution. So leave this with me for now...

@shikokuchuo
Copy link
Owner

So there is potentially an even lighter all-NNG solution to this. You see I don't really need a queue as NNG itself is a queue in that all messages are buffered, and I believe there is a topology using the scalability protocols that achieves what you want. I just need to test it does what I think it does.

Having to maintain so much state when working with NNG just felt intuitively wrong :)

I have limited bandwidth right now and re-implementing the non-blocking call is priority. But stay tuned...

@shikokuchuo shikokuchuo added the enhancement New feature or request label Feb 25, 2022
@wlandau
Copy link
Author

wlandau commented Feb 25, 2022

Great point, and that actually reminded me of how clustermq is actually almost a task queue: https://github.com/mschubert/clustermq. To be a task queue like Gabor's, I think it just needs (1) a way to keep a backlog of queued jobs, and (2) childproofing so users don't need to think about the ZeroMQ ping-pong event loop.

@shikokuchuo
Copy link
Owner

shikokuchuo commented Feb 27, 2022

Re-writing the non-blocking call took much less time than expected. This is now handled entirely in C and as robust as any other part of the package.

Fortunately for you, there was also an internal request for similar functionality on our side so I have put together a prototype in 0d983b0 that might cover your bases as well. NNG worked exactly as expected, hence implementation was straightforward.

Basic usage docs are included in the Rd file, but I thought it worth pointing out a few things:

Firstly, this offers a minimal approach.

If you were after a maximal approach where you want to monitor the status of each task, that can also be achieved using NNG, and there is even an entire statistics interface if you wanted to go there, but I think it is also against the whole ethos of NNG. The library was designed to handle concurrency precisely so we don’t need to ourselves.

Also as you mentioned ZeroMQ, I just wanted to make sure that you are aware that NNG is not ZeroMQ. There has been an evolution from ZeroMQ to Nanomsg to NNG, which is a complete re-write by Garrett D'Amore as it should have been in his view (https://nng.nanomsg.org/RATIONALE.html). The two are not compatible over the wire.

Your 2 points: backlog – yes. And optimal load-balancing / automatic distribution of tasks handled by NNG at the C library level. Processes can be created/destroyed at will. Child-proof – no changes to existing functions. New daemons() function with minimal user API.

You should be able to do something like:

daemons(4L)

x <- 8L
res <- vector(mode = "list", length = x)
start <- .Internal(Sys.time())
for (i in seq_len(x)) {
  res[[i]] <- eval_mirai({
    st <- .Internal(Sys.time())
    .Internal(Sys.sleep(x))
    .Internal(Sys.time()) - st
  }, x = i)
}
.Internal(Sys.time()) - start
lapply(res, call_mirai)
.Internal(Sys.time()) - start
unlist(lapply(res, .subset2, "data"))

@shikokuchuo
Copy link
Owner

Just adding a note that the nanonext Aio user interface has been streamlined now the non-blocking call code has been finalised.

This means a 'mirai' no longer needs to be expressly called. Just access the field $data directly (unfortunately this will be a breaking change from the current $value). It will return the evaluated result or an NA 'unresolved value'.

@shikokuchuo
Copy link
Owner

shikokuchuo commented Mar 4, 2022

Got round to updating the package today after implementing a whole host of new features in nanonext. The new daemons 'control panel' now has a logical interface which is modular by design and I can live with.

Coming back to this request - the current prototype performs well for similar homogenous tasks, but is not a true queue. This is as NNG does load-balancing at the messaging level, and does not maintain the state of each request (whether each has received a reply or not). Coupled with buffering that is tricky to turn off and it is no longer trivial at this level.

Demonstrated below:

daemons(4L)

x <- c(1L, 2L, 6L, 5L, 8L, 7L, 3L, 4L)
res <- vector(mode = "list", length = length(x))
start <- .Internal(Sys.time())
for (i in seq_along(x)) {
  res[[i]] <- eval_mirai({
    st <- .Internal(Sys.time())
    .Internal(Sys.sleep(x))
    .Internal(Sys.time()) - st
  }, x = x[i])
}
.Internal(Sys.time()) - start
lapply(res, call_mirai)
.Internal(Sys.time()) - start
unlist(lapply(res, .subset2, "data"))

The above should give you an optimal 9s, but the below a non-optimal 12s.

daemons(4L)

x <- c(10L, 5L, 1L, 1L, 1L, 1L, 1L, 1L, 1L)
res <- vector(mode = "list", length = length(x))
start <- .Internal(Sys.time())
for (i in seq_along(x)) {
  if(i == 5L) .Internal(Sys.sleep(6))
  if(i == 9L) .Internal(Sys.sleep(2))
  res[[i]] <- eval_mirai({
    st <- .Internal(Sys.time())
    .Internal(Sys.sleep(x))
    .Internal(Sys.time()) - st
  }, x = x[i])
}
.Internal(Sys.time()) - start
lapply(res, call_mirai)
.Internal(Sys.time()) - start
unlist(lapply(res, .subset2, "data"))

A true 'queue' would allocate the later requests to the already finished and free processes.

@shikokuchuo
Copy link
Owner

It seems it wouldn't be too much work to add on a queue similar to Gabor's, but I wonder if that's really the right solution.

I think I would prefer a more decentralised design. The scheduler may broadcast when it has a new job and idle processes can respond perhaps (in a surveyor/respondent pattern in NNG terms). Even better if the scheduler sits in another thread in the background (NNG creates portable threads and I have tested passing SEXPs to them but I have not tried messaging from them!). This is just one of many possibilities.

But it seems the less state we track the more resilient the solution becomes. If a process dies but we have a dataframe that says it is idle...

@shikokuchuo
Copy link
Owner

@wlandau A simple approach as per the article seems to breaks down as it would only update upon user interaction. If for example you set 100 tasks to run and then go way you may come back to find that most of them remain queued.

The more general solution seems to necessitate C code to have the scheduler run on another thread. This would store the state of daemons along with pointers to the language objects that are awaiting execution. These would also have to be duplicated. Not sure if this is what you had in mind.

In any case I'm not going to hold up the next release of 'mirai' for this as the current setup will be useful in many cases and will likely remain one of the options going forward anyway.

@wlandau
Copy link
Author

wlandau commented Mar 14, 2022

A simple approach as per the article seems to breaks down as it would only update upon user interaction. If for example you set 100 tasks to run and then go way you may come back to find that most of them remain queued.

For a task queue like Gabor's, I would expect to need a custom event loop that polls the scheduler so queued tasks regularly ship to workers. Not prohibitive, but I agree, not ideal either.

The more general solution seems to necessitate C code to have the scheduler run on another thread. This would store the state of daemons along with pointers to the language objects that are awaiting execution. These would also have to be duplicated. Not sure if this is what you had in mind.

Marshaling those jobs to a separate process with a separate event loop is tricky enough that I have not attempted it myself. Pretty convenient if it happens to exist already.

In any case I'm not going to hold up the next release of 'mirai' for this as the current setup will be useful in many cases and will likely remain one of the options going forward anyway.

Glad to hear I am not slowing you down. Thanks for the discussion and features!

@shikokuchuo
Copy link
Owner

@wlandau You're welcome! mirai 0.1.1 is out. Turned out not to be 0.2.0 as no breaking changes.

Unfortunately the full task manager doesn't exist yet. My thoughts keep changing on this so I think I'll let this settle for a while. Contributing something to upstream might be the answer. I think all we need is basically messages to be buffered at the sender if the daemons are yet to reply. The need for this kind of pattern should be quite common.

@shikokuchuo
Copy link
Owner

@wlandau I have come back to this and the task queue is now implemented using a simple all-R solution. A separate process polls for changes every 5ms, perhaps a bit excessive... It is still a bit rough around the edges, so welcome any comments.

In any case, my previous example now returns the optimal result of (just over) 10s:

library(mirai)
daemons(4, q = TRUE)

x <- c(10L, 5L, 1L, 1L, 1L, 1L, 1L, 1L, 1L)
res <- vector(mode = "list", length = length(x))
start <- .Internal(Sys.time())
for (i in seq_along(x)) {
  if(i == 5L) .Internal(Sys.sleep(6))
  if(i == 9L) .Internal(Sys.sleep(2))
  res[[i]] <- eval_mirai({
    st <- .Internal(Sys.time())
    .Internal(Sys.sleep(x))
    .Internal(Sys.time()) - st
  }, x = x[i])
}
.Internal(Sys.time()) - start
lapply(res, call_mirai)
.Internal(Sys.time()) - start
unlist(lapply(res, .subset2, "data"))

@wlandau
Copy link
Author

wlandau commented Feb 13, 2023

This is fantastic, @shikokuchuo! Very powerful as a local multi-process task queue.

Since we last spoke, I have been thinking more about the future direction of https://github.com/wlandau/crew. My goal is to build on top of an existing local-network task queue and make it easier to launch daemons/server processes on different kinds of platforms, from SLURM to AWS. From the update you shared, mirai seems much closer to being an alternative to https://github.com/mrc-ide/rrq which could support wlandau/crew#21. It would be really nice to drop the dependency on Redis.

For crew, I would like to be able to auto-scale server processes in response to an increasing backlog of tasks. It would be great to be able to launch a server process on a remote node (e.g. as a SLURM job) which connects back to the client on the local network (mirai::server() can already do this, right?) Then if the workload subsides, it would be nice to automatically drop the workers that idle for too long. Seems like this would need some way of detecting which TCP sockets are open and working.

Would this set of features be possible or in scope for mirai? I know you intend mirai to be lightweight, so I totally understand if some or all of this will not be on your roadmap.

@shikokuchuo
Copy link
Owner

For the remote queue I think I just need to add another interface to specify an array of URLs for the task queue to listen to. Then each remote daemon will dial into its unique URL using the same server() function as before.

Autoscaling requires a bit more work: killing workers when they idle for too long is easy. But there is currently not the functionality to start up remote workers automatically within the package, as that would be use-dependent e.g. Slurm / AWS and I did not want to get into specifics when there are probably better existing tools.

It sounds like crew is designed to abstract over remote implementations so I think logically that functionality should sit there. I will have to think about how easy it will be to have a scalable queue size though as it is fixed at the moment.

@shikokuchuo
Copy link
Owner

shikokuchuo commented Feb 13, 2023

I just implemented (and reverted) the local queue / remote cluster setup as I think the interface becomes too unwieldy.

For now I'd like to keep the options as either all local, started automatically via daemons(q = TRUE), or all remote (remote queue + remote cluster) simply by swapping out server() for serverq().

The benefit from the current simplicity is that you can actually do autoscaling to some extent by adding serverq() clusters that dial into the same client URL and tasks are distributed evenly between them. This isn't perfect load-balancing of course but probably good enough in many cases and it does allow resources to be added or removed very easily.

Repository owner locked and limited conversation to collaborators Feb 15, 2023
@shikokuchuo shikokuchuo converted this issue into discussion #18 Feb 15, 2023

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants