Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instability when replicating/rebalancing on an active cluster #3024

Closed
bnaul opened this issue Sep 4, 2019 · 8 comments
Closed

Instability when replicating/rebalancing on an active cluster #3024

bnaul opened this issue Sep 4, 2019 · 8 comments
Labels
memory stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@bnaul
Copy link
Contributor

bnaul commented Sep 4, 2019

Maybe tangentially related to #391: I have noticed with long-running jobs that sometimes one worker will stop making progress and those tasks will remain in "Processing" state indefinitely. I've tried to figure out what might be going on but am at a loss:

  • All of the remaining tasks are "Processing" and all dependent tasks are ready
  • No relevant information in the client/scheduler/worker logs
  • The call stack for the stuck worker is completely empty
  • The worker machine is responsive (I can SSH into the worker k8s pod and see 0 CPU on top)

I assumed this was due to some peculiarity of my task function, but the fact that the call stack is completely empty (in this case w/ 5 of the same task "Processing") makes me think it might be dask-related instead. Any suggestions on how to go about debugging why a worker might be stuck the next time this happens?

EDIT: renaming since we decided the culprit was replicate

@TomAugspurger
Copy link
Member

Can you inspect the WorkerState to see if the scheduler and workers agree on what is currently being run.

For the workers:

client = Client()
In [100]: c.run(lambda dask_worker: dask_worker.tasks)

An you can get the scheduler's state from Scheduler.workers

@bnaul
Copy link
Contributor Author

bnaul commented Sep 4, 2019

It looks like they don't agree:

>> worker_tasks = client.run(lambda dask_worker: dask_worker.tasks)
>> list(worker_tasks['tcp://10.46.184.8:39345'].keys())
["('to_records-5dadf1179be500355e42659220cffc41', 21)",
 "('to_records-5dadf1179be500355e42659220cffc41', 921)",
 "('to_records-5dadf1179be500355e42659220cffc41', 621)",
 "('to_records-5dadf1179be500355e42659220cffc41', 321)",
 'generate_population_activities-7f4214639b58c94a82be6be246f0cffa',
 'generate_population_activities-9fac04427dcb373174cb69208e5c9c69',
 'generate_population_activities-dea9ac4b2ee996d6a7b9ef8ced3f7dc7',
 'generate_population_activities-80dd5d43380da05d1713099510da9543',
 '_load_network_link_data-31d51e38402f1166e7e21c57613a6d2b']

vs

>> w = client.run_on_scheduler(lambda dask_scheduler: dask_scheduler.workers)['tcp://10.46.184.8:39345']
>> w.has_what
{<Task "('to_records-5dadf1179be500355e42659220cffc41', 21)" memory>,
 <Task "('to_records-5dadf1179be500355e42659220cffc41', 321)" memory>,
 <Task "('to_records-5dadf1179be500355e42659220cffc41', 621)" memory>,
 <Task "('to_records-5dadf1179be500355e42659220cffc41', 921)" memory>,
 <Task '_load_cells_to_geos-4caa504af91fe72983ecd65dee2e9209' memory>,
 <Task '_load_default_models-3c753d14e2b278cc5637bec8af5518dc' memory>}

to_records and _load_... are data dependencies, generate_population_activities is downstream; I guess the workers aren't able to get the data needed to start computation?

I am calling replicate(broadcast=True) on the _load_... futures to distribute them to the workers, I guess something must be going wrong with that that's the cause of the inconsistency. Is there any other kind of safeguard against reaching this state or is just bad luck due to some kind of race condition?

@TomAugspurger
Copy link
Member

Which task is the one that's stuck in processing? _load_default_models?

I am calling replicate(broadcast=True) on the load... futures to distribute them to the workers, I guess something must be going wrong with that that's the cause of the inconsistency.

That's useful info, thanks.

Is there any other kind of safeguard against reaching this state or is just bad luck due to some kind of race condition?

I'm not sure, but this looks like a bug. I don't think it should be possible to get the cluster into this state (cc @mrocklin, in case you know offhand).

@bnaul
Copy link
Contributor Author

bnaul commented Sep 4, 2019

_load_default_models, _load_cells_to_geos, _load_network_link_data all finish successfully and are (supposed to be) replicated across the cluster; there are ~20k generate_population_activities tasks that all depend on all 3 _loads (each also depends on a to_records array chunk), and all but 4 of those tasks end up with all the data they need and finish successfully.

@mrocklin
Copy link
Member

mrocklin commented Sep 5, 2019

Inside I'm cheering that you two are having this conversation without my involvement :)

If the scheduler and worker's perspective of tasks remains different for a non-trivial amount of time then yes, that seems like a bug to me

replicate(broadcast=True)

As a warning, replicate isn't super robust, especially when things are actively running. It could use being improved/replaced.

@bnaul
Copy link
Contributor Author

bnaul commented Sep 6, 2019

I've played around a bit more and it seems replicate() is definitely the culprit; it's timing out before all the syncs are complete and the scheduler/worker state remain inconsistent indefinitely.

I'm not sure how common the replicate-to-all-workers pattern is in practice; I actually would be equally happy to re-load my data on each worker but .replicate seemed like the easiest way to go. I also played around with a solution along the lines of https://stackoverflow.com/questions/48299356/override-dask-scheduler-to-concurrently-load-data-on-multiple-workers, forcing data load on all workers and manually updating the scheduler's state.

If you think there's any low-hanging fruit in the current replicate implementation I'd be interested in playing around, but I don't think I have the expertise to really rework it in a substantive way.

@mrocklin
Copy link
Member

mrocklin commented Sep 6, 2019

I think that the current replicate and rebalance implementations only really work robustly if there isn't any work going on in the cluster. They're not resilient to anything that could happen.

Instead, I think that we need to design something from the ground up. It's not a trivial task, but probably has more to do with general design than the internals of Dask.

@bnaul bnaul changed the title Sporadic task deadlock Instability when replicating/rebalancing on an active cluster Nov 21, 2019
@crusaderky
Copy link
Collaborator

Closing as a duplicate of #4906 / #6578

@crusaderky crusaderky added memory stability Issue or feature related to cluster stability (e.g. deadlock) and removed performance labels Jun 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
memory stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
Development

No branches or pull requests

5 participants