Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nested scatter calls lead to KeyError #3703

Open
pierreglaser opened this issue Apr 13, 2020 · 14 comments
Open

Nested scatter calls lead to KeyError #3703

pierreglaser opened this issue Apr 13, 2020 · 14 comments

Comments

@pierreglaser
Copy link
Contributor

pierreglaser commented Apr 13, 2020

Hi All,

I am currently working on improving the joblib-dask integration.
It turns out that nested Parallel calls in joblib using the dask backend tend to error out with either KeyError or CancelledError.

I narrowed it down using only dask and numpy, and it seems that the issue comes from nested scatter calls.

Here is a reproducer: it consists of submitting functions that rely on scattered arrays. Each of these functions submit small arithmetic operations to be computed on scattered slices of their original input.

import logging

import numpy as np

from distributed import LocalCluster, Client, get_client, secede, rejoin


NUM_INNER_TASKS = 10
NUM_OUTER_TASKS = 10


def my_sum(x, i, j):
    print(f"running inner task {j} of outer task {i}")
    return np.sum(x)


def outer_function(array, i):
    print(f"running outer task {i}")
    client = get_client()
    slices = [array[i + j :] for j in range(NUM_INNER_TASKS)]

    # commenting this line makes the code run successfully
    slices = client.scatter(slices, broadcast=True)  

    futures = client.map(my_sum, slices, [i] * NUM_INNER_TASKS, range(NUM_INNER_TASKS))

    secede()
    results = client.gather(futures)
    rejoin()
    return sum(results)


if __name__ == "__main__":
    my_arrays = [np.ones(100000) for _ in range(10)]

    cluster = LocalCluster(
        n_workers=1, threads_per_worker=1, silence_logs=logging.WARNING
    )
    client = Client(cluster)

    future_arrays = client.scatter(my_arrays, direct=False)


    # using .map() instead of .submit() makes the code run successfully.
    # futures = client.map(outer_function, future_arrays, range(10))

    futures = []
    for i, arr in enumerate(future_arrays):
        future = client.submit(outer_function, arr, i)
        futures.append(future)

    results = client.gather(futures)
    print(results)

2 Remarks:

  • as said in the code, using client.map makes the code run successfully.
  • not scattering the slices in the outer functions makes the code run successfully.

My guess as of now is that dynamically creating new compute resources through secede/rejoin calls might interact badly with the data locality logic of distributed. I'm investigating this own my own, but I'm not familiar enough with the dask/distributed codebase to trace this back efficiently.

Is this behavior supported? Is there a clear anti-pattern that I'm missing? Any pointer would be helpful.

@mrocklin
Copy link
Member

I am currently working on improving the joblib-dask integration.

Thank you for this.

For others, a traceback. It looks like this might be caused by using replicate many times concurrently (replicate is notoriously not robust)

Traceback
p(base) mrocklin@carbon-7:~$ python foo.py 
running outer task 3
running outer task 9
running inner task 0 of outer task 3
running inner task 1 of outer task 3
running inner task 0 of outer task 9
running inner task 1 of outer task 9
running inner task 2 of outer task 9
running inner task 2 of outer task 3
running inner task 3 of outer task 9
running inner task 3 of outer task 3
running inner task 4 of outer task 9
running inner task 4 of outer task 3
running inner task 5 of outer task 9
running inner task 5 of outer task 3
running inner task 6 of outer task 9
running inner task 6 of outer task 3
running inner task 7 of outer task 9
running inner task 7 of outer task 3
running inner task 8 of outer task 9
running inner task 8 of outer task 3
running inner task 9 of outer task 9
running inner task 9 of outer task 3
running outer task 8
running outer task 7
distributed.core - ERROR - 'ndarray-e13a0cd21557a3394c2c32361a932aea'
Traceback (most recent call last):
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 408, in handle_comm
    result = await result
  File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 3179, in replicate
    tasks = {self.tasks[k] for k in keys}
  File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 3179, in <setcomp>
    tasks = {self.tasks[k] for k in keys}
KeyError: 'ndarray-e13a0cd21557a3394c2c32361a932aea'
distributed.worker - WARNING -  Compute Failed
Function:  outer_function
args:      (array([1., 1., 1., ..., 1., 1., 1.]), 7)
kwargs:    {}
Exception: KeyError('ndarray-e13a0cd21557a3394c2c32361a932aea')

running inner task 0 of outer task 8
running inner task 1 of outer task 8
running inner task 2 of outer task 8
Traceback (most recent call last):
  File "foo.py", line 52, in <module>
    results = client.gather(futures)
  File "/home/mrocklin/workspace/distributed/distributed/client.py", line 1892, in gather
running inner task 3 of outer task 8
    asynchronous=asynchronous,
running inner task 4 of outer task 8
  File "/home/mrocklin/workspace/distributed/distributed/client.py", line 778, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/mrocklin/workspace/distributed/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/home/mrocklin/workspace/distributed/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/home/mrocklin/miniconda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
running inner task 5 of outer task 8
    value = future.result()
  File "/home/mrocklin/workspace/distributed/distributed/client.py", line 1751, in _gather
running inner task 6 of outer task 8
    raise exception.with_traceback(traceback)
  File "foo.py", line 23, in outer_function
    slices = client.scatter(slices, broadcast=True)
  File "/home/mrocklin/workspace/distributed/distributed/client.py", line 2098, in scatter
running inner task 7 of outer task 8
    hash=hash,
  File "/home/mrocklin/workspace/distributed/distributed/client.py", line 778, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/mrocklin/workspace/distributed/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/home/mrocklin/workspace/distributed/distributed/utils.py", line 332, in f
running inner task 8 of outer task 8
    result[0] = yield future
  File "/home/mrocklin/miniconda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
running inner task 9 of outer task 8
    value = future.result()
  File "/home/mrocklin/workspace/distributed/distributed/client.py", line 1994, in _scatter
running outer task 6
    await self._replicate(list(out.values()), workers=workers, n=n)
  File "/home/mrocklin/workspace/distributed/distributed/client.py", line 3045, in _replicate
    keys=list(keys), n=n, workers=workers, branching_factor=branching_factor
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 755, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 554, in send_recv
    raise exc.with_traceback(tb)
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 408, in handle_comm
    result = await result
  File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 3179, in replicate
    tasks = {self.tasks[k] for k in keys}
  File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 3179, in <setcomp>
    tasks = {self.tasks[k] for k in keys}
KeyError: 'ndarray-e13a0cd21557a3394c2c32361a932aea'

If we turn off broadcast=True (which activates the not-robust replicate code) then things work well on my machine. Is this a possibility for jobqueue+dask?

@pierreglaser
Copy link
Contributor Author

pierreglaser commented Apr 13, 2020

For others, a traceback

My bad. Thanks for adding it.

If we turn off broadcast=True (which activates the not-robust replicate code) then things work well on my machine

If I turn off broadcasting, then I personally get un-deterministic CancelledError (you might want to run the code a few times):

Here is the traceback.

running inner task 6 of outer task 7
running inner task 6 of outer task 8
running inner task 5 of outer task 8
distributed.worker - WARNING -  Compute Failed
Function:  outer_function
args:      (array([1., 1., 1., ..., 1., 1., 1.]), 7)
kwargs:    {}
Exception: CancelledError()

Traceback (most recent call last):
  File "test_scatter_simplified.py", line 61, in <module>
    results = client.gather(futures)
  File "/home/pierreglaser/repos/distributed/distributed/client.py", line 1892, in gather
    asynchronous=asynchronous,
  File "/home/pierreglaser/repos/distributed/distributed/client.py", line 778, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/pierreglaser/repos/distributed/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/home/pierreglaser/repos/distributed/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/home/pierreglaser/.virtualenvs/joblib_py36/lib/python3.6/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
concurrent.futures._base.CancelledError

PS: the use of direct=False in the scatter call is leftover debugs.

@mrocklin
Copy link
Member

Hrm, I tried this and can reproduce. I don't have an immediate answer about what is going on.

To debug this further, the next thing I would try is to ...

  1. run the main LocalCluster/Client code in async mode so that we have more access to the scheduler
  2. make sure that we still get the same exception
  3. Find which task is failing
  4. Look at Scheduler.story(key) for that task to see what happened.

cc'ing @jrbourbeau @lr4d in case either of them have an interest in diving in to scheduler internals.

@pierreglaser
Copy link
Contributor Author

Thank you for the hints. I'll post an update once I figure this out (if others don't do it before me).

@lr4d
Copy link
Contributor

lr4d commented Apr 15, 2020

in case either of them have an interest in diving in to scheduler internals.

if only I had the time...

@jrbourbeau
Copy link
Member

Thanks for the ping @mrocklin, I will look into this issue over the next couple of days

@mrocklin
Copy link
Member

mrocklin commented Apr 25, 2020

For general education, here is how I'm debugging this.

I rewrote this using the gen_cluster testing harness. This is mostly so that I can get pdb access into the scheduler.

@gen_cluster(client=True)
async def test_nested_scatter(c, s, a, b):
    np = pytest.importorskip("numpy")
    from joblib import Parallel, delayed, parallel_backend

    NUM_INNER_TASKS = 10
    NUM_OUTER_TASKS = 10


    def my_sum(x, i, j):
        print(f"running inner task {j} of outer task {i}")
        return np.sum(x)


    def outer_function(array, i):
        print(f"running outer task {i}") 
        client = get_client()
        slices = [array[i + j :] for j in range(NUM_INNER_TASKS)]

        # commenting this line makes the code run successfully
        slices = client.scatter(slices, broadcast=True)

        futures = client.map(my_sum, slices, [i] * NUM_INNER_TASKS, range(NUM_INNER_TASKS))

        secede()
        results = client.gather(futures)
        rejoin()
        return sum(results)

    my_arrays = [np.ones(100000) for _ in range(10)]

    future_arrays = await c.scatter(my_arrays, direct=False)

# using .map() instead of .submit() makes the code run successfully.
# futures = client.map(outer_function, future_arrays, range(10))

    futures = []
    for i, arr in enumerate(future_arrays):
        future = c.submit(outer_function, arr, i)
        futures.append(future)

    results = await c.gather(futures)
    print(results)

We're getting a CancelledError. This is actually hidden by the gen_cluster infrastructure, but using try-excepts and binary search I isolated it to this line:

            results = client.gather(futures)

Putting in a breakpoint there

        before = str(futures)
        secede()
        try:
            results = client.gather(futures)
        except Exception:
            breakpoint()
        rejoin()

I see that indeed, some of the futures have been cancelled

pp before
('[<Future: pending, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-0>, '
 '<Future: pending, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-1>, '
 '<Future: pending, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-2>, '
 '<Future: pending, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-3>, '
 '<Future: pending, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-4>, '
 '<Future: pending, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-5>, '
 '<Future: pending, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-6>, '
 '<Future: pending, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-7>, '
 '<Future: pending, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-8>, '
 '<Future: pending, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-9>]')
(Pdb) pp futures
[<Future: finished, type: numpy.float64, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-0>,
 <Future: finished, type: numpy.float64, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-1>,
 <Future: finished, type: numpy.float64, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-2>,
 <Future: finished, type: numpy.float64, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-3>,
 <Future: finished, type: numpy.float64, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-4>,
 <Future: finished, type: numpy.float64, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-5>,
 <Future: finished, type: numpy.float64, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-6>,
 <Future: cancelled, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-7>,
 <Future: cancelled, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-8>,
 <Future: cancelled, key: my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-9>]

I want to get access to the scheduler within this function, however I need to be a little bit clever about this, because I don't want to include the scheduler in the scope, because then it'll get serialized when we move this from the client to the workers (even though they're both in the same process). So I attach the scheduler to a global module and then reference it in the function.

@gen_cluster(client=True, timeout=None)
async def test_nested_scatter(c, s, a, b):
    print("main client", c)
    np = pytest.importorskip("numpy")
    from joblib import Parallel, delayed, parallel_backend
    import distributed
    distributed.s = s

...

    def outer_function(array, i):
        import distributed
        s = distributed.s

This is a bit of a hack, but it works.

I find that, oddly, the scheduler never received any news about this task

(Pdb) pp s.story(futures[0].key)  # this is what we expect
[('my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-0',
  'released',
  'waiting',
  OrderedDict([('my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-0',
                'processing')]),
  1587833466.9215913),
 ('my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-0',
  'waiting',
  'processing',
  {},
  1587833466.9216352),
 ('my_sum-07fd0280-4a8d-426a-b293-f03334e8015d-0',
  'processing',
  'memory',
  OrderedDict(),
  1587833466.950933)]
(Pdb) pp s.story(futures[-1].key)  # this is what we get
[]

This is odd. I check that the number of tasks is divisible by ten, and learn that it isn't

(Pdb) pp len(s.tasks)
27

So I put this code in update_graph, the method used whenever the scheduler receives new tasks

        if len(tasks) % 10 != 0 and len(tasks) != 1:
            breakpoint()

And this leads me to this section of code

        while len(tasks) != n:  # walk through new tasks, cancel any bad deps
            n = len(tasks)
            for k, deps in list(dependencies.items()):
                if any(
                    dep not in self.tasks and dep not in tasks for dep in deps
                ):  # bad key
                    logger.info("User asked for computation on lost data, %s", k)
                    del tasks[k]
                    del dependencies[k]
                    if k in keys:
                        keys.remove(k)
                    self.report({"op": "cancelled-key", "key": k}, client=client)
                    self.client_releases_keys(keys=[k], client=client)

And indeed, we've had the "computation on lost data" message in logs all this time. I should look more often at logs.

And we find that the scattered data on which this task depends has just recently been released

distributed.scheduler - INFO - User asked for computation on lost data, my_sum-9007cd9a-b94f-407c-b700-f62aae1388aa-9
> /home/mrocklin/workspace/distributed/distributed/scheduler.py(1820)update_graph()
-> del tasks[k]
(Pdb) pp k
'my_sum-9007cd9a-b94f-407c-b700-f62aae1388aa-9'
(Pdb) pp deps
('ndarray-f4ddcddd3c5cfa043c2cfc1169a0ee51',)
(Pdb) pp self.story(*deps)
[('ndarray-f4ddcddd3c5cfa043c2cfc1169a0ee51',
  'memory',
  'released',
  OrderedDict([('ndarray-f4ddcddd3c5cfa043c2cfc1169a0ee51', 'forgotten')]),
  1587834183.6691344),
 ('ndarray-f4ddcddd3c5cfa043c2cfc1169a0ee51',
  'released',
  'forgotten',
  {},
  1587834183.6691456)]

This is, I think, a duplicate of https://github.com/dask/dask/issues/6027 . See also #3641

Fully worked test
from distributed.utils_test import gen_cluster
from dask.distributed import secede, rejoin, get_client
import pytest   
import distributed 
        
@gen_cluster(client=True, timeout=None)
async def test_nested_scatter(c, s, a, b):
    np = pytest.importorskip("numpy")
    distributed.s = s

    NUM_INNER_TASKS = 10
    NUM_OUTER_TASKS = 10


    def my_sum(x, i, j):
        print(f"running inner task {j} of outer task {i}")
        return np.sum(x)


    def outer_function(array, i):
        import distributed
        s = distributed.s
        print(f"running outer task {i}")
        client = get_client()
        slices = [array[i + j :] for j in range(NUM_INNER_TASKS)]

        # commenting this line makes the code run successfully
        slices = client.scatter(slices, direct=False)

        futures = client.map(my_sum, slices, [i] * NUM_INNER_TASKS, range(NUM_INNER_TASKS))
        before = str(futures)

        secede()
        try:
            results = client.gather(futures)
        except Exception as e:
            f = e
            breakpoint()
        rejoin()
        return sum(results)
        
    my_arrays = [np.ones(100000) for _ in range(10)]
    
    future_arrays = await c.scatter(my_arrays, direct=False)

# using .map() instead of .submit() makes the code run successfully.
# futures = client.map(outer_function, future_arrays, range(10))

    futures = []
    for i, arr in enumerate(future_arrays):
        future = c.submit(outer_function, arr, i)
        futures.append(future)
        
    results = await c.gather(futures)
    print(results)

@mrocklin
Copy link
Member

Short term these problems also just go away if you use the hash=False keyword to client.scatter. This avoids any sort of collision between the different clients. It may also mean increased memory use, but maybe not given that the work is likely to be done locally anyway.

@mrocklin
Copy link
Member

Another approach for Dask + Joblib is for Joblib, when nested, to use the ThreadPoolExecutor on the worker.

def my_func(...):
    w = dask.distributed.get_worker()
    futures = [w.executor.submit(func, x) for x in seq\
    secede()
    results = [future.result() for future in futures]
    rejoin()
    return aggregate(results)

@mrocklin
Copy link
Member

There is a broader point here that one doesn't have to use Dask for everything. At some point it makes sense to switch out to more standard Python libraries.

@ogrisel
Copy link
Contributor

ogrisel commented Jun 9, 2020

If the outer parallel loop cannot saturate the nodes, using the ThreadPoolExecutor for the inner loop will lead to under-subscription of the dask cluster.

For instance outer loop is 5-fold cross validation and inner loop is an embarrassingly parallel model fit (e.g. a random forest with 100s of trees). But maybe this case is no so frequent.

@mrocklin
Copy link
Member

mrocklin commented Jun 9, 2020

Ah, that makes sense. I think that short term the solution of not hashing data in scatter is probably best. It's a little bit unclean, but I suspect that it actually has better performance because locally scattering data is entirely free.

@seanlaw
Copy link

seanlaw commented Dec 7, 2020

Short term these problems also just go away if you use the hash=False keyword to client.scatter. This avoids any sort of collision between the different clients. It may also mean increased memory use, but maybe not given that the work is likely to be done locally anyway.

@mrocklin Thank you for pointing out hash=False! I'm not sure how I've missed this option but this tip helped me

@maxbane
Copy link
Contributor

maxbane commented Dec 12, 2022

Hi. I think I might be hitting this same race condition without even using nested scatters. The following minimal example fails reliably for me with either CancelledError (most of the time) or, more worryingly, KilledWorker (less frequently, but still more than once in my experiments). Adding hash=False to the scatter calls fixes it.

I kind of agree with @mrocklin's earlier tone of urgency about this issue, since it seems so basic. I believe I've encountered this race condition a couple of times in my career using Dask (once at a previous employer and now again with this non-nested variant), and both times I googled it, added hash=False, shrugged, and moved on with my life, without really contributing to this issue (my bad). Maybe most people are like me and that's why there haven't been as many support issues as you might expect... or maybe most people are not directly calling submit/scatter in complex/nested ways (using the higher level APIs instead). Anyway, my 2c.

from dask import distributed
cluster = distributed.LocalCluster(
    n_workers=4, threads_per_worker=2, host="0.0.0.0",
    scheduler_port=0, dashboard_address=':0'
)
client = distributed.Client(cluster)

def nullity(x):
    return None

def identity(x, y):
    return x

for i in range(100):
    try:
        y = client.submit(
            nullity,
            client.scatter("smoochies")
        ).result()

        client.submit(
            identity,
            client.scatter("smoochies"),
            y
        ).result()
    finally:
        print(f"Iteration {i}")

After a few iterations it usually dies with CancelledError; example traceback:

CancelledError                            Traceback (most recent call last)
Cell In [6], line 18
      8 try:
      9     y = client.submit(
     10         nullity,
     11         client.scatter("smoochies")
     12     ).result()
     14     client.submit(
     15         identity,
     16         client.scatter("smoochies"),
     17         y
---> 18     ).result()
     19 finally:
     20     print(f"Iteration {i}")

File ~/.../lib/python3.10/site-packages/distributed/client.py:281, in Future.result(self, timeout)
    279     raise exc.with_traceback(tb)
    280 elif self.status == "cancelled":
--> 281     raise result
    282 else:
    283     return result

CancelledError: identity-56f9b83c4799f6e32ed58d11c86cb08d

More rarely, it dies with KilledWorker. Example traceback below. Anecdotally, if you got CancelledError the first time, you might have to kill your kernel and start over to have a chance of getting KilledWorker (that's my impression, but who knows? 🤷🏻)

2022-12-12 22:40:26,177 - distributed.worker - ERROR - unsupported operand type(s) for +: 'NoneType' and 'tuple'
Traceback (most recent call last):
  File "/home/max/.../.venv-dev/lib/python3.10/site-packages/distributed/worker.py", line 179, in wrapper
    return await method(self, *args, **kwargs)  # type: ignore
  File "/home/max/.../.venv-dev/lib/python3.10/site-packages/distributed/worker.py", line 1260, in handle_scheduler
    await self.handle_stream(comm)
  File "/home/max/.../.venv-dev/lib/python3.10/site-packages/distributed/core.py", line 843, in handle_stream
    handler(**merge(extra, msg))
  File "/home/max/.../.venv-dev/lib/python3.10/site-packages/distributed/worker.py", line 1907, in _
    self.handle_stimulus(event)
  File "/home/max/.../.venv-dev/lib/python3.10/site-packages/distributed/worker.py", line 192, in wrapper
    return method(self, *args, **kwargs)
  File "/home/max/.../.venv-dev/lib/python3.10/site-packages/distributed/worker.py", line 1932, in handle_stimulus
    super().handle_stimulus(*stims)
  File "/home/max/.../.venv-dev/lib/python3.10/site-packages/distributed/worker_state_machine.py", line 3627, in handle_stimulus
    instructions = self.state.handle_stimulus(*stims)
  File "/home/max/.../.venv-dev/lib/python3.10/site-packages/distributed/worker_state_machine.py", line 1329, in handle_stimulus
    recs, instr = self._handle_event(stim)
  File "/usr/lib/python3.10/functools.py", line 926, in _method
    return method.__get__(obj, cls)(*args, **kwargs)
  File "/home/max/.../.venv-dev/lib/python3.10/site-packages/distributed/worker_state_machine.py", line 2875, in _handle_compute_task
    priority = ev.priority + (self.generation,)
TypeError: unsupported operand type(s) for +: 'NoneType' and 'tuple'
2022-12-12 22:40:26,182 - distributed.nanny - ERROR - Worker process died unexpectedly
2022-12-12 22:40:26,206 - distributed.nanny - WARNING - Restarting worker
Died on iteration 0
---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
Cell In [2], line 20
      8 try:
      9     y = client.submit(
     10         nullity,
     11         client.scatter("smoochies")
     12     ).result()
     13     (
     14         client
     15         .submit(
     16             identity,
     17             client.scatter("smoochies"),
     18             y
     19         )
---> 20         .result()
     21     )
     22 finally:
     23     print(f"Died on iteration {i}")

File ~/frankenfit/.venv-dev/lib/python3.10/site-packages/distributed/client.py:279, in Future.result(self, timeout)
    277 if self.status == "error":
    278     typ, exc, tb = result
--> 279     raise exc.with_traceback(tb)
    280 elif self.status == "cancelled":
    281     raise result

KilledWorker: Attempted to run task str-621d9abe95ce2c476d2fc23582808907 on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.0.2.82:43681. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
2022-12-12 22:40:26,824 - distributed.nanny - WARNING - Restarting worker

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants