diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 182938c9092..bebed5993c6 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2602,26 +2602,25 @@ def report(self, msg, ts=None, client=None): If the message contains a key then we only send the message to those comms that care about the key. """ - comms = set() - if client is not None: - try: - comms.add(self.client_comms[client]) - except KeyError: - pass - if ts is None and "key" in msg: ts = self.tasks.get(msg["key"]) + if ts is None: # Notify all clients - comms.update(self.client_comms.values()) - else: + client_keys = list(self.client_comms) + elif client is None: # Notify clients interested in key - comms.update( - self.client_comms[c.client_key] - for c in ts.who_wants - if c.client_key in self.client_comms - ) - for c in comms: + client_keys = [c.client_key for c in ts.who_wants] + else: + # Notify clients interested in key (including `client`) + client_keys = [c.client_key for c in ts.who_wants if c.client_key != client] + client_keys.append(client) + + for k in client_keys: + try: + c = self.client_comms[k] + except KeyError: + continue try: c.send(msg) # logger.debug("Scheduler sends message to client %s", msg)