-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker key reference counting #3641
Worker key reference counting #3641
Conversation
Thanks for providing the test @quasiben! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jacobtomlinson . I left two comments. I also tried running @cjnolet's and I don't think it's passing quite yet
|
||
|
||
@gen_cluster(client=True) | ||
async def test_task_unique_groups_scatter(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why, but occasionally when I run I get a timeouterror:
E tornado.util.TimeoutError: Operation timed out after 20 seconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How occasionally? I'm not seeing this after 10 runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was seeing it after 3 runs on my machine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried converting Corey's example into a test and now I see that timeouterror every time. I wonder if pytest is masking the CancelledError somehow.
@gen_cluster(client=True)
async def test_task_unique_groups_scatter_tree_reduce(c, s, a, b):
from toolz import first
def tree_reduce(objs):
while len(objs) > 1:
new_objs = []
n_objs = len(objs)
for i in range(0, n_objs, 2):
inputs = objs[i : i + 2]
obj = c.submit(sum, inputs)
new_objs.append(obj)
wait(new_objs)
objs = new_objs
return first(objs)
for n_parts in [1, 2, 5, 10, 15]:
a = await c.scatter(range(n_parts))
b = tree_reduce(a)
assert sum(range(n_parts)) == await b
del a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been seeing this timeout exception as well and what's very strange is that I've even increased the timeout to 50s and I get the exception long before 50s has passed. This is specifically true in the case of dask/dask#6037
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm getting it almost immediately. Within a second.
It ran a couple of times without error, but not every time. |
The test is now consistently failing because the The worker reference counting seems to have helped with the scattered items being deleted, but not other tasks which depend on the scattered items. I'll also look at adding some reference counting in the scheduler to try and avoid this. |
I think that the problem here might be on the client/scheduler side rather than on the worker (or at least there may be another problem there). I think that we can delete a future on the client, then scatter the same data, and then, because these two messages are sent on different channels, the scheduler can get these messages out of order To demonstrate this here is a diff --- a/distributed/client.py
+++ b/distributed/client.py
@@ -1132,14 +1132,17 @@ class Client(Node):
self.close()
def _inc_ref(self, key):
+ print("inc", key, self.refcount[key])
with self._refcount_lock:
self.refcount[key] += 1
def _dec_ref(self, key):
with self._refcount_lock:
+ print("dec", key, self.refcount[key])
self.refcount[key] -= 1
if self.refcount[key] == 0:
del self.refcount[key]
+ print("release", key)
self._release_key(key)
def _release_key(self, key):
@@ -1149,6 +1152,7 @@ class Client(Node):
if st is not None:
st.cancel()
if self.status != "closed":
+ print("key in refcount", key in self.refcount)
self._send_to_scheduler(
{"op": "client-releases-keys", "keys": [key], "client": self.id}
)
@@ -1985,6 +1989,7 @@ class Client(Node):
if w.scheduler.address == self.scheduler.address:
direct = True
+ print("scatter data")
if local_worker: # running within task
local_worker.update_data(data=data, report=False)
Which results in this output
We scatter the second round of data before the first release gets sent, so the client is releasing the data that it sent the second time. For background, there are two channels of communication between the client and scheduler (or between any pair really),
|
One approach here would be to stop reference counting on the client, and instead keep a list of Future identifiers on the scheduler. So if a client had two different Futures pointing to the same key then the scheduler would be aware of that fact, and would track the deletion of each future independently. |
Otherwise there is a period where the user has requested that data exists, but we haven't incremented the reference counter yet. This way we've stated that yes, these keys are important to us, even while we wait for the data to transfer. This stops the client from sending an erroneous release message to the scheduler.
After trying this for a bit I'm not sure that it's a good idea. In any event, I think that only one of the scheduler and the client should be authoritative about if a future is active. My guess now is that this is the client. Allowing stale client-releases-keys messages pending in the |
This seems to have gone stale, suggest closing |
Add reference counting to allow workers to keep track of key deletions.
Avoids race conditions when keys are quickly deleted and recreated.
Fixes dask/dask#6027.