diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index 4fa85d66a5..f97ba6dd3b 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -1,6 +1,6 @@ from array import array +from collections import deque from functools import partial -from itertools import repeat import traceback import importlib from enum import Enum @@ -445,34 +445,46 @@ def extract_serialize(x): >>> extract_serialize(msg) ({'op': 'update'}, {('data',): }, set()) """ - x2 = type(x)() - ser = {} - bytestrings = set() - _extract_serialize(x, x2, ser, bytestrings) - return x2, ser, bytestrings - - -def _extract_serialize(x, x2, ser, bytestrings, path=()): typ_x = type(x) if typ_x is dict: x_items = x.items() + x2 = {} elif typ_x is list: x_items = enumerate(x) - x2.extend(repeat(None, len(x))) - - for k, v in x_items: - path_k = path + (k,) - typ_v = type(v) - if typ_v is dict or typ_v is list: - x2[k] = v2 = typ_v() - _extract_serialize(v, v2, ser, bytestrings, path_k) - elif typ_v is Serialize or typ_v is Serialized: - ser[path_k] = v - elif (typ_v is bytes or typ_v is bytearray) and len(v) > 2 ** 16: - ser[path_k] = to_serialize(v) - bytestrings.add(path_k) + x2 = len(x) * [None] + + ser = {} + bytestrings = set() + _extract_serialize(x_items, x2, ser, bytestrings) + return x2, ser, bytestrings + + +def _extract_serialize(x_items, x2, ser, bytestrings, path=()): + q = deque() + while True: + for k, v in x_items: + path_k = path + (k,) + typ_v = type(v) + if typ_v is dict: + v_items = v.items() + x2[k] = v2 = {} + q.append((v_items, v2, path_k)) + elif typ_v is list: + v_items = enumerate(v) + x2[k] = v2 = len(v) * [None] + q.append((v_items, v2, path_k)) + elif typ_v is Serialize or typ_v is Serialized: + ser[path_k] = v + elif (typ_v is bytes or typ_v is bytearray) and len(v) > 2 ** 16: + ser[path_k] = to_serialize(v) + bytestrings.add(path_k) + else: + x2[k] = v + + if q: + x_items, x2, path = q.pop() else: - x2[k] = v + break def nested_deserialize(x):