Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for running workers using the sockets instead of the worker-specific API #31

Closed
wlandau opened this issue Mar 5, 2023 · 66 comments

Comments

@wlandau
Copy link
Owner

wlandau commented Mar 5, 2023

shikokuchuo/mirai#33 (comment)

@wlandau
Copy link
Owner Author

wlandau commented Mar 6, 2023

Current thinking on logic for launching and auto-scaling:

  • Each worker gets a user-specified startup time. During that window, the worker is considered alive even if it is not connected. Afterwards, it is only considered alive if it is connected to the websocket.
  • The launcher object keeps track of the active worker at each socket. When there is a definite decision to launch, any previous worker at the socket is explicitly terminated.
  • When auto-scaling up, we first check unoccupied sockets. Here, a socket is unoccupied if there is no connection and there is no worker trying to start up at that socket. This piece of logic probably deserves its own function in the controller because it will be used more than once and requires both the launcher and the router.

@wlandau
Copy link
Owner Author

wlandau commented Mar 6, 2023

I like this because it does not require waiting for workers to start, and relies completely on the connection and the startup time to tell if a worker is running.

@wlandau
Copy link
Owner Author

wlandau commented Mar 7, 2023

On second thought, maximum startup time is not very reliable. It can guard against nightmare scenarios, but so many things can affect the startup time of a worker, and it is too hard to predict even in specific cases. I think crew needs to trust that a worker will start up and join (within an extremely generous startup time period). Here, a worker joins if:

  1. mirai::daemons() shows an active connection, or
  2. The worker completed a task since it was started.

crew could keep track of (2) by setting a CREW_SOCKET environment variable in the process that runs miria::server(), and then crew_eval() could grab that environment and return it with the rest of the metadata. Then when task is collected in the collect() method of the connector, we can log the socket in the launcher object.

@wlandau
Copy link
Owner Author

wlandau commented Mar 7, 2023

c.f. #32 (comment)

@wlandau wlandau mentioned this issue Mar 7, 2023
@shikokuchuo
Copy link
Contributor

On second thought, maximum startup time is not very reliable. It can guard against nightmare scenarios, but so many things can affect the startup time of a worker, and it is too hard to predict even in specific cases. I think crew needs to trust that a worker will start up and join (within an extremely generous startup time period). Here, a worker joins if:

1. `mirai::daemons()` shows an active connection, or

2. The worker completed a task since it was started.

crew could keep track of (2) by setting a CREW_SOCKET environment variable in the process that runs miria::server(), and then crew_eval() could grab that environment and return it with the rest of the metadata. Then when task is collected in the collect() method of the connector, we can log the socket in the launcher object.

Would it help you if I simply add a count to each server - tasks started, tasks completed for each node. Then you can just get this by calling daemons() along with the current connection status. Would this give you everything you need?

@wlandau
Copy link
Owner Author

wlandau commented Mar 7, 2023

That would help so much! Tasks started would let me really know how many workers can accept new tasks (if I also take into account the "expected" workers which are starting up but have not yet connected to the client). And tasks completed is such useful load balancing data. In the case of persistent workers, it could really help users figure out if e.g. they really need 50 workers or they can scale down to 25.

@wlandau
Copy link
Owner Author

wlandau commented Mar 7, 2023

As long as those counts are refreshed if the server() at the given socket restarts, this would be amazing.

@shikokuchuo
Copy link
Contributor

Give shikokuchuo/mirai@ba5e84e (v0.7.2.9026) a try. Should give you everything you need. As a happy side effect, the active queue keeps getting more efficient even as we add more features.

@wlandau
Copy link
Owner Author

wlandau commented Mar 7, 2023

Fantastic! I will test as soon as I have another free moment. (Trying to juggle other projects too, so this one may take me a few days.)

@shikokuchuo
Copy link
Contributor

Cool. You'll want to pick up shikokuchuo/mirai@3777610 (v0.7.2.9028) instead. Having gone through #32 (comment)_ in more detail, I think this now has what you need.

@wlandau
Copy link
Owner Author

wlandau commented Mar 8, 2023

I tested the specific worker counters with shikokuchuo/mirai@51f2f80, and they work perfectly.

@wlandau
Copy link
Owner Author

wlandau commented Mar 9, 2023

I did more thinking about auto-scaling logic, and I care a lot about whether a mirai server is active or inactive. Here, a server is active if it deserves an opportunity to do tasks. Inversely, a worker is inactive if it is definitely broken and we should force-terminate and relaunch if needed.

To determine if a server is active, I need three more definitions:

  1. Connected: The server is connected to its websocket. status_online from mirai::daemons()$nodes is 1. This is not the same as active because a server can take a long time to start.
  2. Discovered: the server may or may not be connected right now, but at some point in its lifecycle, it dialed into the client and changed something in mirai::daemons()$nodes.
  3. Launching: the server process began within the last few minutes and may need more time to initialize and connect to the mirai client.

If the server is connected, then it is automatically active. If it is disconnected, then we should only consider it active if it is launching and not yet discovered.

Screenshot 2023-03-09 at 3 37 40 PM

@wlandau
Copy link
Owner Author

wlandau commented Mar 9, 2023

Scenario for slow-to-launch transient workers:

  1. Start a server and note the start time.
  2. Push a task and auto-scale.
    • Notice that the server is disconnected and discovered. Now we know the worker is inactive, so it must be not launching.
    • Force-terminate the worker.
    • Reset the start time to NA_real_ to force the worker into a not launching state.
    • Synchronize the crew launcher object with daemons()$nodes so the worker is not discovered. The worker is still dubbed inactive, and now crew is prepared to discover the next worker that dials into the same socket.
  3. Subsequent auto-scaling operations may or may not choose to relaunch another worker at this same socket, but we are confident the worker is inactive, and the re-launch can happen in a different method call than (2).

@wlandau
Copy link
Owner Author

wlandau commented Mar 9, 2023

Unfortunately, the counts in daemons()$nodes do not quite cover the scenario in #31 (comment). I will post feature request to mirai.

@shikokuchuo
Copy link
Contributor

Re. your diagram, I am not sure you need to be so explicit. I believe the daemons() call should be sufficient.

If status is online, you have an active server.

Otherwise you know that a server has either never connected (zero tasks columns) or disconnected (non-zero tasks columns). In both cases you check when it was launched by crew and if past the 'expiry time' then you kill it and launch another if needed - and here you may also utilise the stats to determine this.

@wlandau
Copy link
Owner Author

wlandau commented Mar 9, 2023

Otherwise you know that a server has either never connected (zero tasks columns) or disconnected (non-zero tasks columns).

If the server ran and then disconnected, I would strongly prefer not to wait for the rest of the expiry time. The expiry time in crew needs to be large enough to accommodate circumstances like a randomly busy cluster or a long wait time for a AWS to find a spot instance within the user's price threshold. So waiting for the remainder of that time would be costly for transient workers that exit almost as soon as they dial in.

Unfortunately, I cannot currently tell if this is happening. A lot of the time, I call daemons()$nodes and see these counts:

                    status_online status_busy tasks_assigned tasks_complete
ws://127.0.0.1:5000             0           0              1              1

If we are still inside the expiry time, then I cannot tell if the server already connected and disconnected, or if the worker is starting and these counts are left over from the previous server that used websocket ws://127.0.0.1:5000.

Does this make sense? Would it be possible to add a new websocket-specific counter in daemons()$nodes that increments every time a different server process dials into the socket? As per the logic in #31 (comment), I can watch that counter for changes. This would allow crew to detect connected-then-disconnected servers at any time in the expiry window.

@shikokuchuo
Copy link
Contributor

I think you have look at this from the perspective that mirai is not trying to keep state - calling daemons() offers a snapshot that enables crew to do so.

  1. crew launches first server. Online status is zero - you know you are waiting for it to start. If successful then you observe it becomes 1 at some point or you reach the expiry time and it is still zero in which case you kill and repeat.

  2. After this you observe the status becomes zero again. You know the server has disconnected. perhaps you look at the number of completed tasks vs other servers and use some rule to decide if you re-launch or not.

Once you have re-launched then you are effectively back to step 1. You should know your state at all times.

@wlandau
Copy link
Owner Author

wlandau commented Mar 9, 2023

  1. crew launches first server. Online status is zero - you know you are waiting for it to start. If successful then you observe it becomes 1

For a quick task and tasklimit = 1, the server online status may go from 0 to 1 to 0 too fast for crew to ever observe the 1. In other words, the task for the tasklimit = 1 worker may be too quick to for me to ever observe a change in online status. crew has no daemons of its own to periodically poll for status_online = 1, and even if it did, the narrow window of the 1 would have to overlap with the polling interval, which is not guaranteed for short tasks.

Previously I though of a workaround where I would make each task tell me which websocket it came from. This would solve some scenarios, but not a scenario where idletime is small and timerstart = 0. In this latter case, the worker could start and vanish before I notice, with no tasks assigned, and I would have to wait until the end of the long expiry time to find out what happened.

If state is an obstacle for mirai, what if each server could present some UUID or other ID which would differ from connection to connection at the websocket? Then mirai would not need to keep track of state for the counter, and crew would watch the UUID for changes.

@shikokuchuo
Copy link
Contributor

  1. crew launches first server. Online status is zero - you know you are waiting for it to start. If successful then you observe it becomes 1

For a quick task and tasklimit = 1, the server online status may go from 0 to 1 to 0 too fast for crew to ever observe the 1. In other words, the task for the tasklimit = 1 worker may be too quick to for me to ever observe a change in online status. crew has no daemons of its own to periodically poll for status_online = 1, and even if it did, the narrow window of the 1 would have to overlap with the polling interval, which is not guaranteed for short tasks.

Online status may go from 0 to 1 to 0 again, but the snapshot will also show non-zero tasks against the zero status. So you know the server has completed one task in this case and disconnected. You may be missing the fact that tasks only get zeroed when a new server connects. So if you don't choose to start up a new server to connect to this port, the stats will never change.

@shikokuchuo
Copy link
Contributor

Previously I though of a workaround where I would make each task tell me which websocket it came from. This would solve some scenarios, but not a scenario where idletime is small and timerstart = 0. In this latter case, the worker could start and vanish before I notice, with no tasks assigned, and I would have to wait until the end of the long expiry time to find out what happened.

This does not sound very plausible to me - you have something that takes potentially a very long time to spin up stay online for a very short time and have it exit without carrying out a task.

As per my last answer - at each point you want to start up a new server, you have access to all the stats you need. You don't have to poll outside of those points. You won't lose stats because somehow you weren't quick enough to catch them - only a new server connection clears out the previous stats.

@shikokuchuo
Copy link
Contributor

If state is an obstacle for mirai, what if each server could present some UUID or other ID which would differ from connection to connection at the websocket? Then mirai would not need to keep track of state for the counter, and crew would watch the UUID for changes.

It is not at all an obstacle, but there must be some part of the crew workings that I am not getting, because I am not seeing the need.

@wlandau
Copy link
Owner Author

wlandau commented Mar 10, 2023

Online status may go from 0 to 1 to 0 again, but the snapshot will also show non-zero tasks against the zero status. So you know the server has completed one task in this case and disconnected. You may be missing the fact that tasks only get zeroed when a new server connects. So if you don't choose to start up a new server to connect to this port, the stats will never change.

This is exactly where I am struggling: as you say, the tasks only get zeroed when a new server starts. So if I start my second server at the websocket and observe status_online = 0 with tasks_completed = 1, are those counts left over from the first server, or did the second server run and then disconnect before I could check online status? The result is the same, so I cannot tell the difference. That means I do not know if the second server is starting up, which means I have to wait until the end of the expiry time. Does that make sense?

A major goal ofcrew is to provide a smooth continuum between persistent and transient workers, so even in the case with daemons, I am trying to make transient workers function as efficiently and responsively as possible.

This does not sound very plausible to me - you have something that takes potentially a very long time to spin up stay online for a very short time and have it exit without carrying out a task.

And I could definitely avoid it by requiring tasklimit >= 1 at the level of crew, but I think some users may want tasklimit = 0 with an extremely small idletime because resources are expensive. I plan to build crew into targets, and I have learned there is a wide variety of preferences and scenarios in the user base.

It is not at all an obstacle, but there must be some part of the crew workings that I am not getting, because I am not seeing the need.

I could just as easily be missing something, I am finding it challenging (and interesting) to wrap my head around this problem. Thank you for sticking with me on this. If the first part of my answer does not make sense, please let me know and I may be able to describe it a different way.

@wlandau
Copy link
Owner Author

wlandau commented Mar 10, 2023

Online status may go from 0 to 1 to 0 again, but the snapshot will also show non-zero tasks against the zero status. So you know the server has completed one task in this case and disconnected. You may be missing the fact that tasks only get zeroed when a new server connects. So if you don't choose to start up a new server to connect to this port, the stats will never change.

Just to be clear, this is totally fine for the first server that dials into a websocket. The delay due to leftover counts only happens when subsequent servers launch at the same websocket long after the first server disconnects. But this is important: crew cares about auto-scaling, so multiple servers could connect to the same websocket (one after the other) over the course of a pipeline.

Just to make sure crew's auto-scaling behavior is clear, the following is a likely scenario:

  1. The user needs to run tasks on a busy SLURM cluster. Given how busy it is, they set an expiry startup time of 30 minutes.
  2. The user submits enough tasks that crew decides to scale up the number of mirai servers.
  3. crew submits a SLURM job to run mirai server A.
  4. The SLURM job for mirai server A spends 5 minutes in the SLURM task queue.
  5. The SLURM job dequeues, and mirai server A dials into ws://192.168.0.2:5000/23.
  6. Most of the tasks complete, and few tasks are left in the queue. mirai server A exits and disconnects from ws://192.168.0.2:5000/23 because the idle time limit is reached. Tasks assigned and tasks completed both equal 2 for socket ws://192.168.0.2:5000/23 at this point.
  7. After several minutes, the user submits another load of tasks to the crew queue, and crew decides to scale up the mirai servers again. crew launches mirai server B to connect to the same socket at ws://192.168.0.2:5000/23.
  8. Tasks are distributed among multiple servers, and 5 minutes later, crew notices for ws://192.168.0.2:5000/23 that the online status is 0, tasks completed is 2, and tasks assigned is 2. Did mirai server B finish its tasks and idle out, or is it still trying to start and connect to ws://192.168.0.2:5000/23? The counts are exactly the same either way, and crew is not going to poll the SLURM job queue to find out because doing so on a regular basis would overburden squeue. (This kind of platform-specific polling would also be expensive and slow in the general case, e.g. AWS, where each one would have to be an HTTP/REST API call.)
  9. Wait an extra 25 minutes because we are not sure if mirai server B ever connected at all. (Recall the expiry startup time of 30 minutes from (1).)

@wlandau
Copy link
Owner Author

wlandau commented Mar 10, 2023

To quote @brendanf from #32 (comment):

Using targets with clustermq on Slurm, I sometimes get extreme queue times for some of the workers, on the order of days.

So the posited 30-minute expiry time from #31 (comment) may not be nearly enough for some users.

@wlandau
Copy link
Owner Author

wlandau commented Mar 10, 2023

You know what? I may be able to handle all this in crew without needing to burden mirai with it, and the end product may actually cleaner and easier on my end. I could submit the crew worker with known UUID that I keep track of, then send the UUID back when the mirai server is done. If I receive the same UUID that I sent the worker with, then I know the worker finished.

crew_worker <- function(socket, uuid, ...) {
  server_socket <- nanonext::socket(protocol = "req", dial = socket)
  on.exit(nanonext::send(con = server_socket, data = uuid)) # Tell the client when the server is done.
  mirai::server(url = socket, ...)
}

On the server process:

crew_worker("ws://192.168.0.2:5000/finished_servers", uuid = "MY_UUID", idletime = 100, tasklimit = 0)

On the client:

sock <- nanonext::socket(protocol = "rep", listen = "ws://192.168.0.2:5000/finished_servers")
# ... Do some other work, do not poll at regular intervals.
uuid <- nanonext::recv(sock) # Did any workers finish since last I checked? 
# ... Check the uuid against the known set of UUIDs I submitted workers with.

There is still a slim possibility that mirai::server() connects but the subsequent manual nanonext::send() fails, but I don't think that will come up as much, and we can rely on expiry time to detect those rare failures.

@wlandau
Copy link
Owner Author

wlandau commented Mar 10, 2023

Would I need an additional TCP port for this? I see:

library(mirai)
library(nanonext)
daemons("ws://127.0.0.1:5000/1", nodes = 1)
#> [1] 1
connection_for_done_worker <- nanonext::socket(protocol = "rep", listen = "ws://127.0.0.1:5000/done")
#> Error in nanonext::socket(protocol = "rep", listen = "ws://127.0.0.1:5000/done") : 
#>   10 | Address in use

@shikokuchuo
Copy link
Contributor

Shouldn't do. mirai has listeners on each of the ws addresses - no different.

@wlandau
Copy link
Owner Author

wlandau commented Mar 10, 2023

In that case, how would you recommend I listen to ws://127.0.0.1:5000/done using nanonext?

@shikokuchuo
Copy link
Contributor

Well the error message says address in use, so have you tried other random paths? no path? I don't see anything obviously wrong.

@wlandau
Copy link
Owner Author

wlandau commented Mar 10, 2023

I just tried with different paths/ports, both on the loopback address and withgetip::getip(type = "local"), and I saw the same result. Example:

library(mirai)
library(nanonext)
daemons("ws://127.0.0.1:61903/path1", nodes = 1)
#> [1] 0
connection_for_done_worker <- socket(protocol = "rep", listen = "ws://127.0.0.1:61903/path2")
#> Error in socket(protocol = "rep", listen = "ws://127.0.0.1:61903/path2") : 
#>   10 | Address in use

@shikokuchuo
Copy link
Contributor

Oh, because you are listening using mirai from another process. I guess that is why. In that case you need another port.

@shikokuchuo
Copy link
Contributor

I will throw in this suggestion though as it will be less error-prone, and that it to just establish a connection and not actually send any messages.

First use the 'bus' protocol as that is the lightest:

connection_for_done_worker[[1L]] <- socket(protocol = "bus", listen = "ws://127.0.0.1:61903/UUID")

etc.

stat(connection_for_done_worker[[1L]]$listener[[1L]], "accept")

will give you the total number of connections accepted at the listener (1 if the server has dialled in).

stat(connection_for_done_worker[[1L]]$listener[[1L]], "pipes")

will give you the number of current connections. (0 if server has disconnected).

So a combination of 1 and 0 above means the server has dialled in and disconnected after presumably finishing its tasks.

@wlandau
Copy link
Owner Author

wlandau commented Mar 10, 2023

Wow! This is so much better than trying to catch messages! So much easier. Thank you so much!

@wlandau
Copy link
Owner Author

wlandau commented Mar 10, 2023

Notes to self:

  • The router object still takes a single host and port.
  • Globally set the port for all launcher objects using function crew_port_set(). (Also create crew_port_get().)
  • The workers in the launcher should all set an NNG bus socket at the path of the UUID on launch.

@wlandau
Copy link
Owner Author

wlandau commented Mar 11, 2023

Re #31 (comment),

First use the 'bus' protocol as that is the lightest:

I am actually thinking of using these custom sockets to also send common data for #33. Would I use the push/pull protocol for that? Would the client use a send() and the server use a blocking recv()?

@shikokuchuo
Copy link
Contributor

Re #31 (comment),

First use the 'bus' protocol as that is the lightest:

I am actually thinking of using these custom sockets to also send common data for #33. Would I use the push/pull protocol for that? Would the client use a send() and the server use a blocking recv()?

You wouldn't use a push/pull unless you had a very specific need to ensure flow is only one way. The fewer semantics the better (unless you need guaranteed delivery in which case use req/rep). If it is one-to-one I would just use 'bus', or 'pair' if you want to be sure it is one-to-one (this won't allow 2 processes dialing into one side for example).

I'm not sure how exactly you plan to implement #33 but a typical pattern that works well is for one party to request an asynchronous receive recv_aio(). Then the other party can then send a message at whatever time and the 'recvAio' will automatically resolve when the message is received.

@shikokuchuo
Copy link
Contributor

On this point - I have not been following what this automatic updating is about. But simply sending the .Globalenv or its contents should be trivial using the current mirai interface right, if you wanted to do so.

@wlandau
Copy link
Owner Author

wlandau commented Mar 11, 2023

True, but would it be fast? I am thinking of a persistent workers scenario where all tasks operate on a shared common large in-memory object that is slow to serialize. I would rather send that large object once per persistent worker (infrequently) rather than once per task (much more frequently).

@wlandau
Copy link
Owner Author

wlandau commented Mar 11, 2023

If e.g. daemons(n= 8, common_data= list(...)) were to send the common data for servers to pick up, then I could definitely rely on mirai for this.

@shikokuchuo
Copy link
Contributor

True, but would it be fast? I am thinking of a persistent workers scenario where all tasks operate on a shared common large in-memory object that is slow to serialize. I would rather send that large object once per persistent worker (infrequently) rather than once per task (much more frequently).

How large is large to give me some idea?

@wlandau
Copy link
Owner Author

wlandau commented Mar 11, 2023

In the general case for a targets pipeline, it could be a large fraction of the allowable R session memory, e.g. 2-4 GB. Small enough to all be in the global env, but slow to serialize and send over the local network.

@shikokuchuo
Copy link
Contributor

In the general case for a targets pipeline, it could be a large fraction of the allowable R session memory, e.g. 2-4 GB. Small enough to all be in the global env, but slow to serialize and send over the local network.

Right, I would estimate that would take a couple of seconds, a few seconds perhaps. Not an obstacle I'd say, in the absence of a better alternative.

@shikokuchuo
Copy link
Contributor

If you send this common object, is it meant to be immutable then, and if so how do you ensure that? Maybe you know what the right answer is already, but to me there are many potential pitfalls!

@wlandau
Copy link
Owner Author

wlandau commented Mar 11, 2023

Yes, it is meant to be immutable. Although immutability is hard to strictly enforce here, targets (and clustermq) have lightweight protections that work well enough for all but the most extreme cases.

In targets, the user assigns shared globals to the tar_option_get("envir") environment, which is usually .GlobalEnv. These globals are passed to the export field of clustermq and treated as common data (sent only once per worker). On top of that, each target has its own set of objects in its own temporary target-specific non-global environment which inherits from .GlobalEnv. This temporary target-specific non-global environment is the one supplied to eval(), so except for extreme anti-patterns like attach(), the target has access to global objects while minimizing the risk of accidentally modifying them.

mirai aims to be as fast and efficient as possible, so common data seems like it would be a good fit for your package, both inside and outside of crew.

@shikokuchuo
Copy link
Contributor

It sounds like you have found a way that works which is good. I put a heavy premium on correctness so it would take a lot to convince me any of this is worthwhile. I would willingly spend the extra 5s per task.

@wlandau
Copy link
Owner Author

wlandau commented Mar 11, 2023

Fair enough, I will go ahead with my original plan to implement common data in crew using bus sockets.

@shikokuchuo
Copy link
Contributor

Fair enough, I will go ahead with my original plan to implement common data in crew using bus sockets.

Is your plan to put this common data in into the global environment of all the servers?

@wlandau
Copy link
Owner Author

wlandau commented Mar 11, 2023

Pretty much. I am writing a wrapper that will recv() the data, assign it to the global env, then call mirai::server().

Looking back at your comments, maybe I do need guaranteed delivery for this (is it exactly what it sounds like?), which puts me with rep/req. Should I use rep/listen for the client and req/dial for the servers?

@wlandau wlandau mentioned this issue Mar 12, 2023
@wlandau
Copy link
Owner Author

wlandau commented Mar 12, 2023

Closing this thread because it digressed to #33, and before that, @shikokuchuo solved it with #31 (comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants