Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance during a moving cluster #4906

Open
mrocklin opened this issue Jun 11, 2021 · 10 comments
Open

Rebalance during a moving cluster #4906

mrocklin opened this issue Jun 11, 2021 · 10 comments
Assignees
Labels

Comments

@mrocklin
Copy link
Member

This is part of Active Memory Management

We would like to be able to intentionally move data around the cluster. Today we do this with methods like rebalance and replicate, however these do not work well when the cluster is active. We would like to improve robustness here.

@mrocklin
Copy link
Member Author

In a side-conversation @crusaderky said the following:

I'm working on running rebalance() while a computation is in progress.
I am inclined towards this design philosophy:

  • coarsely preempt the biggest, most common causes of race conditions between computations and rebalancing
  • finely implement graceful handling of race conditions if and when they happen, at the cost of suboptimal performance

The fundamental idea behind it is that it would be prohibitive to think about and deterministically preempt all the use cases that could trigger a race condition. Such an approach would lead to a very poor cost/benefit ratio. If however we can gracefully deal with an occasional key that is moved where it shouldn't when it shouldn't, and just make the computation bring it back to its rightful place, things will be a lot easier and robust overall.

As for 1 (coarse preemption) I am inclined to simply blacklist from rebalancing all keys that are an input of a queued or running task. It is a very greedy strategy and I appreciate that people will come up with use cases where a key should be moved even if it's part of a computation, but I really want to KISS (Keep It Simple & Stupid) in the initial delivery, push it through, let people play with it, and then analyse in real life scenarios where it falls short and potentially add iterative refinements over time. Also, such a conservative approach would guarantee that we have can quickly deliver something that is better than nothing, as opposed to a regression which actively hampers computations.

@mrocklin
Copy link
Member Author

In general I'm supportive of simple 90% solutions :)

However, I also want to propose an alternate way of looking at things.

We're never going to find safe data to move around. For example, even if a task isn't currently processing, it might end up processing in the next cycle (because a worker just went down for example). Trying to segment data into safe and not safe may not be a possible way of looking at the world.

Instead, what we did for work stealing (which may or may not be a good example) is that we developed a protocol/handshake to move things around in a safe way. An example of such a handshake is ...

Example conversation

  • Scheduler: Worker-A, I think that you should get Data-X from Worker-B. Let me know if that works out.
  • A: Hey Worker-B, can I have Data-X?
  • B: Sure! Here you go. I'm not deleting it or anything though.
  • A: Hey Scheduler, I've gotten Data-X from Worker-B
  • Scheduler: Hey Worker-B, you can get rid of Data-X now
  • B: You got it boss.

Example conversation with break

  • Scheduler: Worker-A, I think that you should get Data-X from Worker-B. Let me know if that works out.
  • A: Hey Worker-B, can I have Data-X?
  • Something goes wrong
  • Scheduler: Hrm, I haven't heard from anyone. Oh well, fortunately I haven't made any destructive changes. Hopefully whoever wanted Data-X moved asks for something similar again

I think that if every transfer we do passes through a safe cycle like this, and if we never actually need that transfer to occur (because for example, we call rebalance every second or so) then we're good.

I think that generally it's useful to solve resiliency problems in the small where we think about how to do a small step safely rather than trying to think about resiliency in the large where we try to keep a global view of things.

@fjetter
Copy link
Member

fjetter commented Jun 15, 2021

From a discussion we had yesterday a few notes I still have in mind. @mrocklin @crusaderky feel free to add on in case I missed something.

  • Once the scheduler decides to move a key, it probably should operate as if the key was already removed from the worker for follow up scheduling decisions. This may require us to work with some kind of tombstones to mark data in the who_has as deleted on the scheduler until the confirmation is received.

  • Workers might be in a position where they would like to reject a delete-data request since the scheduler acted on outdated information and the worker, by now, might require the data itself. in this case, it would reject the delete request

  • This will very likely require a handshake mechansim, similar to the work stealing confirm-steal signal

  • We might want or need to put this into a batched stream to avoid comm overhead if these are very frequent messages

  • There was a confusion on my end in the area of how workers manage their data and replicas. In particular, I was concerned about workers deciding themselves to delete data. This was a false assumption with the exception of erred tasks which pointed to No longer hold dependencies of erred tasks in memory #4918

@bolliger32
Copy link

Came upon this issue when I was seeing some behavior that I think(?) might be related. I was using the secede/rejoin workflow to submit tasks from tasks. In some cases, a worker was not doing any work after seceding and I realized it was because there were no other tasks in its list to process. These were long-running tasks, and there were only a few left. For example, my workers looked something like this:

Worker 1: 1 task processing
Worker 2: 3 tasks processing
Worker 3: 1 task processing
Worker 4: 2 tasks processing

It was worker 1 that had seceded when running its 1 task, but it did not have any more work to do so it was sitting idle. I think the ideal situation would be to pull in the not-yet-started tasks from worker2 or worker, but that would require some rebalancing on-the-fly. I'm not sure if that is related to this discussion - it could also just be that dask determined that the movement of data required to shift tasks to Worker 1 was not worth it or something like that (I'm not super familiar with the inner scheduler logic). If it's unrelated to this discussion feel free to ignore. But I just wanted to point out a potentially relevant use case.

@fjetter
Copy link
Member

fjetter commented Jun 16, 2021

@bolliger32 what you are describing sounds like a work-stealing issue. Work stealing it the process of not yet executing tasks to other workers if other workers have more capacity for computation.

In this specific issue here, we're discussing already computed task results and the resulting data replicas.

@bolliger32
Copy link

ahh thanks for the clarification @fjetter ... carry on :) I'll do some more investigation to see why the work was not stolen in this situation when (I'm pretty sure) it would help

@crusaderky
Copy link
Collaborator

Status update:

The Active Memory Manager core machinery is live and is currently being used for worker retirement and (optionally) for replica reduction.
rebalance() has been reimplemented to be O(1) to the total number of key on the cluster and O(n) to the number of keys that need to be rebalanced; however it has not been migrated to AMM and is still unsafe to run on a busy cluster.

The effort required is relatively mild - a lot of cut and paste and rewriting some tests.

@crusaderky
Copy link
Collaborator

Evidence of potential benefits of AMM Rebalance

I looked at the coiled-runtime benchmarks for evidence of severe memory imbalances between workers, which rebalancing the cluster every 2 seconds would potentially smooth out.
When a worker holds substantially more memory than the rest of the cluster, and that worker is spilling to disk today, AMM Rebalance would most likely cause an improvement in end-to-end runtime.

Below is an analysis of use cases in coiled-runtime CI that would either benefit straight away (as they are currently spilling on 1-2 workers), would benefit if one increased the size of the dataset, or would not benefit at all.

All the use cases that would benefit show a large gap between worst worker (or 4th quantile) and mean. AMM Rebalance would take memory away from the worst-case workers (max) and move it to the best-case ones (min).

I've omitted the test cases that generate trivial amounts of data.

All tests were run on 10 workers with 2 threads per worker, 8 GiB of RAM each, and which start spilling at 4.8 GiB worth of managed memory.

benchmarks/test_array.py::test_anom_mean

Data is already balanced; AMM Rebalance would do nothing.
image

benchmarks/test_array.py::test_basic_sum

Looks like test_anon_mean. No benefit.

benchmarks/test_array.py::test_dot_product

Mild imbalance and no spilling. AMM Rebalance would not improve the runtime of the test; the same test on a larger dataset could receive a minor runtime improvement.
image

benchmarks/test_array.py::test_double_diff

Substantial imbalance and mild spilling. AMM Rebalance would most likely cause improvements for larger datasets.
image

benchmarks/test_array.py::test_map_overlap_sample

The test runs in 7s end to end, while grafana has a sample period of 5s. Inconclusive data.

benchmarks/test_array.py::test_vorticity

Substantial imbalance and spilling in the second half of the run. The use case would most likely benefit from AMM Rebalance.
image

benchmarks/test_csv.py::test_csv_basic

No imbalance and no spilling.

benchmarks/test_custom.py::test_jobqueue

No imbalance and no spilling.

benchmarks/test_dataframe.py::test_dataframe_align

Major imbalance and spilling. To my understanding this is a shuffle - interaction with AMM Rebalance is somewhat less predictable here.
image

benchmarks/test_dataframe.py::test_shuffle

No imbalance, no spilling, no benefit. I don't undertstand why the graph looks so much different from test_dataframe_align; I need to investigate further.
image

benchmarks/test_join.py::test_join_big[0.1]

No imbalance, no spilling, no benefit.

@fjetter
Copy link
Member

fjetter commented Jul 31, 2023

@crusaderky in #4906 (comment) you mentioned the effort for this would be "relatively mild". Can you try to be a bit more specific? Hours? Days? Weeks? Months?

@crusaderky
Copy link
Collaborator

5 to 8 days

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants