diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 3c8f75fd9f..4d8289c904 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -233,15 +233,15 @@ async def write(self, msg, serializers=None, on_error="message"): ) try: + nframes = len(frames) lengths = [nbytes(frame) for frame in frames] - length_bytes = [struct.pack("Q", len(frames))] + [ - struct.pack("Q", x) for x in lengths - ] + length_bytes = struct.pack(f"Q{nframes}Q", nframes, *lengths) if sum(lengths) < 2 ** 17: # 128kiB - b = b"".join(length_bytes + frames) # small enough, send in one go - stream.write(b) + # small enough, send in one go + stream.write(b"".join([length_bytes, *frames])) else: - stream.write(b"".join(length_bytes)) # avoid large memcpy, send in many + # avoid large memcpy, send in many + stream.write(length_bytes) for frame, frame_bytes in zip(frames, lengths): # Can't wait for the write() Future as it may be lost @@ -398,7 +398,7 @@ def __init__( deserialize=True, allow_offload=True, default_port=0, - **connection_args + **connection_args, ): self._check_encryption(address, connection_args) self.ip, self.port = parse_host_port(address, default_port)