Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement Request - Dask Workers lifetime option not waiting for job to finish #3141

Open
ameetshah1983 opened this issue Oct 11, 2019 · 16 comments · May be fixed by #8752
Open

Enhancement Request - Dask Workers lifetime option not waiting for job to finish #3141

ameetshah1983 opened this issue Oct 11, 2019 · 16 comments · May be fixed by #8752

Comments

@ameetshah1983
Copy link

When applying workers lifetime option with restart, looks like if the worker is running a job, it still moves ahead with restart.

Applied lifetime restart option for every 60 secs using 1 worker and ran a job which simply sleeps for twice the amount of time. The restart still appears to take place even if the worker is running the job.

For graceful restart, thought the worker would wait for a long running task / job to finish and when idle would then restart itself. That way even if you have along running task its not interrupted by the auto restart option.

@TomAugspurger
Copy link
Member

Just to clarify, you want the --lifetime=60s to mean "60s or until I don't have any tasks, whichever is longer"? How would this be exposed in the CLI?

And does adaptive better suite your needs?

@mrocklin
Copy link
Member

Lifetime was intended to provide a mechanism for a periodic reset of workers, such as might be useful when dealing with libraries that might leak memory and so benefit from cycling worker processes. It is orthogonal to adaptive.

Just to clarify, you want the --lifetime=60s to mean "60s or until I don't have any tasks, whichever is longer"? How would this be exposed in the CLI?

I think that @ameetshah1983 's request is in scope, and is probably the intent of the keyword. I think that what you say above Tom is correct, and should be the meaning of the current spelling in the CLI.

To fix this, we probably want to modify the lifetime code to close down the worker to new tasks, wait until the thread pool clears out of current tasks (or some large timeout), and then close down.

@simaster123
Copy link

Not sure what the status is on this feature request, but for what it's worth, I'm using --lifetime to deal with sporadic worker stalls. So, in my case, there is a task on a worker indefinitely, and that is exactly what I'd like to restart. Perhaps, when this feature is added, would it be possible to add it with flexibility to either (1) wait for tasks to finish, or (2) restart regardless of the status of tasks.

@karims
Copy link

karims commented Jan 19, 2020

Lifetime was intended to provide a mechanism for a periodic reset of workers, such as might be useful when dealing with libraries that might leak memory and so benefit from cycling worker processes. It is orthogonal to adaptive.

Just to clarify, you want the --lifetime=60s to mean "60s or until I don't have any tasks, whichever is longer"? How would this be exposed in the CLI?

I think that @ameetshah1983 's request is in scope, and is probably the intent of the keyword. I think that what you say above Tom is correct, and should be the meaning of the current spelling in the CLI.

To fix this, we probably want to modify the lifetime code to close down the worker to new tasks, wait until the thread pool clears out of current tasks (or some large timeout), and then close down.

Any update on this feature request?

@TomAugspurger
Copy link
Member

I don’t see any. Are you interested in working on it?

@karims
Copy link

karims commented Jan 30, 2020

I don’t see any. Are you interested in working on it?

Sure!

@EvanKomp
Copy link

What is the safest mechanism for closing the worker? I attempted to write a WorkerPlugin that closed the worker on task completion using the same mechanism as --lifetime eg. worker.io_loop.call_later(time, worker.close_gracefully but that ended up causing an unacceptable number of task duplications. I am happy to work on something and create a pull but given the tests I have run in that issue, I'm not sure that having the worker close itself after a task doesn't create a bunch of other issues.

See the issue dask/dask-jobqueue#597

@tjgalvin
Copy link

tjgalvin commented Apr 9, 2023

Hi all - is there any update to this request? I am also curious on what the best way would be. With some guidance I might be able to implement it? @EvanKomp - did you every figure out a solution? I am wanting to use this in a SLURM setting.

@electronsandstuff
Copy link

Just wanted to add that this would be helpful for my use case too. I am running in a SLURM environment and trying to keep my workers within the walltime limit of the cluster without having jobs fail.

@fjetter
Copy link
Member

fjetter commented May 23, 2023

The current mechanism is to "gracefully downscale" a worker. This typically evicts all data and runnable tasks but is not waiting for the current one to finish.

Instead of using Worker.close_gracefully here

self.io_loop.call_later(
lifetime, self.close_gracefully, reason="worker-lifetime-reached"
)

We'd need a method that is almost equal to close_gracefully but one that waits until all threads are idle. Could be something like the following.

def lifetime_close_gracefully(...):
    # Same as close_gracefully but waits for the threads to be idle
    ...
    while self.state.executing_count:
        await asyncio.sleep(0.01)
    await self.close(...)

Anybody is welcome to pick this up and create a PR (with a unit test). If you are struggling to complete, I suggest to open a Draft PR with how far you got and we can help you push this over the finishing line. Any volunteers?

@tjgalvin
Copy link

tjgalvin commented May 23, 2023 via email

@AlecThomson
Copy link

Hi all,

I was talking offline with @tjgalvin and he implemented a potential drain option to dask worker. I've taken his work and added a few tweaks, and I think I've got the expected behaviour working.

The changes are available in https://github.com/AlecThomson/distributed/tree/drainclose

If the maintainers are happy I can go ahead and open a PR

Here's a demo of firing up a scheduler + worker with a drain and it handling some work that will go over the lifetime:

drain_1080.mov

@AlecThomson
Copy link

BTW - my motivation for using this feature is with dask-jobqueue in an HPC environment and using .adapt(). We have some tasks submitted to Dask that are long-running, but fit inside the walltime of a dask-jobqueue. However, depending on how busy the HPC queue is, more than one task can be submitted to a given worker. This means work starts on a second long-running task which will be subsequently killed by the lifetime and/or the HPC walltime. My thinking is that I would like to set a conservative lifetime with the drain option implemented by @tjgalvin. This will let a bunch small tasks run, but will trigger a shutdown at the end of a long-running task.

@AlecThomson AlecThomson linked a pull request Jul 6, 2024 that will close this issue
2 tasks
@fjetter
Copy link
Member

fjetter commented Jul 9, 2024

I'm not thrilled about the drain option and would prefer teaching lifetime / retire_workers to not kill workers which are still running stuff.
I haven't tested this but it may be as simple as adding this line

diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py
index 724bfc189..9c4d53760 100644
--- a/distributed/active_memory_manager.py
+++ b/distributed/active_memory_manager.py
@@ -736,4 +736,4 @@ class RetireWorker(ActiveMemoryManagerPolicy):
         ws = self.manager.scheduler.workers.get(self.address)
         if ws is None:
             return True
-        return all(len(ts.who_has or ()) > 1 for ts in ws.has_what)
+        return all(len(ts.who_has or ()) > 1 for ts in ws.has_what) and not ws.processing

I could imagine a bunch of tests tripping with this (and I'd like to see a new one testing this bahvior) but generally speaking this addition to graceful downscaling would be nice

@tjgalvin
Copy link

For what it was worth, I added the --drain option initially as I saw this changing what some might consider expected behavior. My thinking was that if the behavior has come to be expected (whether documented or not) perhaps a opt-in was required, at least initially.

Might be able to look at this problem again.

@AlecThomson
Copy link

Another quick thought on this - the `drain' state exists in batch management systems like Slurm. With the idea that the worker / job finishes its current work but stops accepting new work. This seems exactly like the kind of behaviour @ameetshah1983's request was asking for.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants