From 1f3958d9a5e62a756e18d1ff585d2069b6f307ae Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 24 Nov 2020 15:58:12 -0800 Subject: [PATCH 01/13] Collect `comms` in `dict` Instead of collecting a `set` of `comms` where `BatchedSend` objects will need to be `hash`ed, use a `dict` with keys based on `Client` IDs and their `BatchedSend` object as values. This should still ensure that we only pick up each `Client`'s `BatchedSend` object once. Though this will hash the `Client` IDs (`str`s) instead of `BatchedSend` objects, which should be more performant. --- distributed/scheduler.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6848110e43f..e80048b6498 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2584,10 +2584,10 @@ def report(self, msg, ts=None, client=None): If the message contains a key then we only send the message to those comms that care about the key. """ - comms = set() + comms = dict() if client is not None: try: - comms.add(self.client_comms[client]) + comms[client] = self.client_comms[client] except KeyError: pass @@ -2595,15 +2595,17 @@ def report(self, msg, ts=None, client=None): ts = self.tasks.get(msg["key"]) if ts is None: # Notify all clients - comms.update(self.client_comms.values()) + comms.update(self.client_comms) else: # Notify clients interested in key comms.update( - self.client_comms[c.client_key] - for c in ts.who_wants - if c.client_key in self.client_comms + { + c.client_key: self.client_comms[c.client_key] + for c in ts.who_wants + if c.client_key in self.client_comms + } ) - for c in comms: + for c in comms.values(): try: c.send(msg) # logger.debug("Scheduler sends message to client %s", msg) From 4b6f4e0c7b31d1fb47e2b5ea649ff046e08e3fc1 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 24 Nov 2020 16:04:56 -0800 Subject: [PATCH 02/13] Only check for `c.client_key` once Previously we looped through all `Client`s interested in a `TaskState` and collected their `BatchedSend` comms object (if we had it). This results in checking for the `Client` in our collection of `BatchedSend` once and then retrieving it. IOW this is two calls to `__getitem__`. We can cut this down to one call of `__getitem__` by simply trying to grab the `Client` (whether it exists or not) and simply skipping over it if we don't have it. The end result is the same, but the number of steps performed is reduced. --- distributed/scheduler.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e80048b6498..8b676fca2af 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2598,13 +2598,9 @@ def report(self, msg, ts=None, client=None): comms.update(self.client_comms) else: # Notify clients interested in key - comms.update( - { - c.client_key: self.client_comms[c.client_key] - for c in ts.who_wants - if c.client_key in self.client_comms - } - ) + for c in ts.who_wants: + with suppress(KeyError): + comms[c.client_key] = self.client_comms[c.client_key] for c in comms.values(): try: c.send(msg) From 78d1994a63e30ab3ad7b1ae2ac723a1c70413183 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 24 Nov 2020 16:19:42 -0800 Subject: [PATCH 03/13] Add blank line for readability --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8b676fca2af..62248343d52 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2601,6 +2601,7 @@ def report(self, msg, ts=None, client=None): for c in ts.who_wants: with suppress(KeyError): comms[c.client_key] = self.client_comms[c.client_key] + for c in comms.values(): try: c.send(msg) From 7dee040c975a399dd759052ecd4e2a2b1bdfd44a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 24 Nov 2020 16:27:12 -0800 Subject: [PATCH 04/13] Collect `Client` keys in a `set` instead Per James' suggestion try collecting a `set` of `Client` keys instead. This simplifies a bit of the logic earlier on. Plus we can fold the handling of `Client`s whose comms we don't have into our existing `for`-loop below. --- distributed/scheduler.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 62248343d52..27c9842c310 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2584,25 +2584,24 @@ def report(self, msg, ts=None, client=None): If the message contains a key then we only send the message to those comms that care about the key. """ - comms = dict() + client_keys = set() if client is not None: - try: - comms[client] = self.client_comms[client] - except KeyError: - pass + client_keys.add(client) if ts is None and "key" in msg: ts = self.tasks.get(msg["key"]) if ts is None: # Notify all clients - comms.update(self.client_comms) + client_keys.update(self.client_comms) else: # Notify clients interested in key - for c in ts.who_wants: - with suppress(KeyError): - comms[c.client_key] = self.client_comms[c.client_key] + client_keys.update(c.client_key for c in ts.who_wants) - for c in comms.values(): + for k in client_keys: + try: + c = self.client_comms[k] + except KeyError: + continue try: c.send(msg) # logger.debug("Scheduler sends message to client %s", msg) From 7657f5850413b884fa8a465b4b2e94f45e730958 Mon Sep 17 00:00:00 2001 From: jakirkham Date: Tue, 24 Nov 2020 16:48:32 -0800 Subject: [PATCH 05/13] Use a `list` collect `client_keys` As `self.client_comms` is a `dict`, we know none of its keys will repeat. Similarly as we know `TaskState`'s `who_wants` is a set, none of its values will repeat. So these can be added to a `list` without creating duplicates. The only possible duplicate is the `client` key passed into the function, which we can just check against the `client_keys` once produced. So just do this instead thus avoiding a bunch of checks for collisions that won't occur. --- distributed/scheduler.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 27c9842c310..658ac81c64d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2584,18 +2584,19 @@ def report(self, msg, ts=None, client=None): If the message contains a key then we only send the message to those comms that care about the key. """ - client_keys = set() - if client is not None: - client_keys.add(client) + client_keys = [] if ts is None and "key" in msg: ts = self.tasks.get(msg["key"]) if ts is None: # Notify all clients - client_keys.update(self.client_comms) + client_keys.extend(self.client_comms) else: # Notify clients interested in key - client_keys.update(c.client_key for c in ts.who_wants) + client_keys.extend(c.client_key for c in ts.who_wants) + + if client is not None and client not in client_keys: + client_keys.append(client) for k in client_keys: try: From 5ec687f4b56a14c61d1779407eb4527b391e4356 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 24 Nov 2020 17:31:03 -0800 Subject: [PATCH 06/13] Add blank line for readability --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 658ac81c64d..eb11036f73f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2588,6 +2588,7 @@ def report(self, msg, ts=None, client=None): if ts is None and "key" in msg: ts = self.tasks.get(msg["key"]) + if ts is None: # Notify all clients client_keys.extend(self.client_comms) From a3a1db9a3fce5d99cc8a062d6986684622787f0f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 24 Nov 2020 17:31:25 -0800 Subject: [PATCH 07/13] Assign `client_keys` directly As `client_keys` is blank to begin with, just assign directly to it instead of `extend`ing it. --- distributed/scheduler.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index eb11036f73f..aafbe0a0595 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2584,17 +2584,15 @@ def report(self, msg, ts=None, client=None): If the message contains a key then we only send the message to those comms that care about the key. """ - client_keys = [] - if ts is None and "key" in msg: ts = self.tasks.get(msg["key"]) if ts is None: # Notify all clients - client_keys.extend(self.client_comms) + client_keys = list(self.client_comms) else: # Notify clients interested in key - client_keys.extend(c.client_key for c in ts.who_wants) + client_keys = [c.client_key for c in ts.who_wants] if client is not None and client not in client_keys: client_keys.append(client) From cb069faa2b0746fb63fb5ea05e24d3ef662bcd5f Mon Sep 17 00:00:00 2001 From: jakirkham Date: Tue, 24 Nov 2020 17:50:01 -0800 Subject: [PATCH 08/13] Only add `client` when it may not be included In the case where all `Client`s are notified, there is no need to add the `client` argument given to `report` as it is either in `self.client_comms`, from which we included all keys, or it is not, in which case we skip it. Thus we only need to try and added `client` when only some of the `Client`s may have been notified, which may or may not include the `client` argument given. Hence this branch is moved into the `else` case. --- distributed/scheduler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index aafbe0a0595..eb5edb4c5bd 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2593,9 +2593,8 @@ def report(self, msg, ts=None, client=None): else: # Notify clients interested in key client_keys = [c.client_key for c in ts.who_wants] - - if client is not None and client not in client_keys: - client_keys.append(client) + if client is not None and client not in client_keys: + client_keys.append(client) for k in client_keys: try: From 6380d793b5a950a0123f017c26260eb6a177e556 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 24 Nov 2020 18:11:30 -0800 Subject: [PATCH 09/13] Only add keys that don't duplicate `client` --- distributed/scheduler.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index eb5edb4c5bd..c55c32aa0b0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2592,9 +2592,16 @@ def report(self, msg, ts=None, client=None): client_keys = list(self.client_comms) else: # Notify clients interested in key - client_keys = [c.client_key for c in ts.who_wants] - if client is not None and client not in client_keys: + client_keys = [] + if client is not None: client_keys.append(client) + for c in ts.who_wants: + k = c.client_key + if k != client: + client_keys.append(k) + else: + for c in ts.who_wants: + client_keys.append(c.client_key) for k in client_keys: try: From f37199edf28190025780f38243b3f10ad22556a7 Mon Sep 17 00:00:00 2001 From: jakirkham Date: Tue, 24 Nov 2020 18:22:14 -0800 Subject: [PATCH 10/13] Use `list` comprehensions instead of `for`-loops --- distributed/scheduler.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c55c32aa0b0..6ae2ccfd849 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2592,16 +2592,13 @@ def report(self, msg, ts=None, client=None): client_keys = list(self.client_comms) else: # Notify clients interested in key - client_keys = [] if client is not None: + client_keys = [ + c.client_key for c in ts.who_wants if c.client_key != client + ] client_keys.append(client) - for c in ts.who_wants: - k = c.client_key - if k != client: - client_keys.append(k) else: - for c in ts.who_wants: - client_keys.append(c.client_key) + client_keys = [c.client_key for c in ts.who_wants] for k in client_keys: try: From 3041a83dfdaa4f4dc50fae6b0d5475b66e33aba9 Mon Sep 17 00:00:00 2001 From: jakirkham Date: Tue, 24 Nov 2020 18:33:57 -0800 Subject: [PATCH 11/13] Flatten branch structure --- distributed/scheduler.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6ae2ccfd849..0d9acff3f3e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2590,15 +2590,15 @@ def report(self, msg, ts=None, client=None): if ts is None: # Notify all clients client_keys = list(self.client_comms) + elif client is not None: + # Notify clients interested in key (including `client`) + client_keys = [ + c.client_key for c in ts.who_wants if c.client_key != client + ] + client_keys.append(client) else: # Notify clients interested in key - if client is not None: - client_keys = [ - c.client_key for c in ts.who_wants if c.client_key != client - ] - client_keys.append(client) - else: - client_keys = [c.client_key for c in ts.who_wants] + client_keys = [c.client_key for c in ts.who_wants] for k in client_keys: try: From c5acd1c1540b9496c19f1c75cda58979385b1126 Mon Sep 17 00:00:00 2001 From: jakirkham Date: Tue, 24 Nov 2020 18:35:46 -0800 Subject: [PATCH 12/13] Run `black` --- distributed/scheduler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 0d9acff3f3e..a30d53801ef 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2592,9 +2592,7 @@ def report(self, msg, ts=None, client=None): client_keys = list(self.client_comms) elif client is not None: # Notify clients interested in key (including `client`) - client_keys = [ - c.client_key for c in ts.who_wants if c.client_key != client - ] + client_keys = [c.client_key for c in ts.who_wants if c.client_key != client] client_keys.append(client) else: # Notify clients interested in key From 3b8741f6389ccc8d6044ed6fb471545c3dc8e101 Mon Sep 17 00:00:00 2001 From: jakirkham Date: Tue, 24 Nov 2020 19:08:54 -0800 Subject: [PATCH 13/13] Flip `client` cases --- distributed/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a30d53801ef..10bca0a50f7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2590,13 +2590,13 @@ def report(self, msg, ts=None, client=None): if ts is None: # Notify all clients client_keys = list(self.client_comms) - elif client is not None: + elif client is None: + # Notify clients interested in key + client_keys = [c.client_key for c in ts.who_wants] + else: # Notify clients interested in key (including `client`) client_keys = [c.client_key for c in ts.who_wants if c.client_key != client] client_keys.append(client) - else: - # Notify clients interested in key - client_keys = [c.client_key for c in ts.who_wants] for k in client_keys: try: