diff --git a/distributed/stealing.py b/distributed/stealing.py index 098b9e61ce8..22ed5c08107 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -10,7 +10,7 @@ from .utils import key_split, log_errors from .diagnostics.plugin import SchedulerPlugin -BANDWIDTH = 200e6 +BANDWIDTH = 100e6 logger = logging.getLogger(__name__) @@ -18,7 +18,7 @@ class WorkStealing(SchedulerPlugin): def __init__(self, scheduler): self.scheduler = scheduler - self.stealable = [set() for i in range(12)] + self.stealable = [set() for i in range(15)] self.key_stealable = dict() self.stealable_unknown_durations = defaultdict(set) @@ -94,12 +94,14 @@ def steal_time_ratio(self, key, bandwidth=BANDWIDTH, split=None): ratio = compute_time / transfer_time except ZeroDivisionError: ratio = 10000 - if ratio > 8: + if ratio == 10000: loc = 0 + elif ratio > 32: + loc = 1 elif ratio < 2**-8: loc = -1 else: - loc = int(-round(log(ratio) / log(2), 0) + 3) + loc = int(-round(log(ratio) / log(2), 0) + 5) + 1 return ratio, loc def move_task(self, key, victim, thief): @@ -146,7 +148,7 @@ def balance(self): original = stealable - ratio = 2 ** (level - 3) + ratio = 2 ** (level - 5) duration_if_hold = len(stealable) / len(self.scheduler.saturated) duration_if_steal = ratio diff --git a/distributed/worker.py b/distributed/worker.py index 097abfd0bb9..f91dcc16f5f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -208,7 +208,7 @@ def __init__(self, scheduler_ip, scheduler_port, ip=None, ncores=None, } super(WorkerBase, self).__init__(handlers, io_loop=self.loop, - connection_limit=128, **kwargs) + connection_limit=64, **kwargs) self.heartbeat_callback = PeriodicCallback(self.heartbeat, self.heartbeat_interval,