Skip to content

Commit

Permalink
attempt multiple times before giving up on worker
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Dec 15, 2016
1 parent 5756cc5 commit 6d8bdcb
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 117 deletions.
5 changes: 3 additions & 2 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,9 @@ def __init__(self, ip, port, pool):

def __getattr__(self, key):
@gen.coroutine
def send_recv_from_rpc(**kwargs):
stream = yield self.pool.connect(self.ip, self.port)
def send_recv_from_rpc(timeout=3, **kwargs):
stream = yield self.pool.connect(self.ip, self.port,
timeout=timeout)
try:
result = yield send_recv(stream=stream, op=key,
deserialize=self.pool.deserialize, **kwargs)
Expand Down
6 changes: 3 additions & 3 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ def steal_time_ratio(self, key, bandwidth=BANDWIDTH, split=None):
def move_task(self, key, victim, thief):
with log_errors():
if self.scheduler.validate:
if victim not in self.scheduler.rprocessing[key]:
import pdb; pdb.set_trace()
assert victim in self.scheduler.rprocessing[key]
assert thief not in self.scheduler.rprocessing[key]

logger.info("Moved %s, %s: %2f -> %s: %2f", key,
logger.debug("Moved %s, %s: %2f -> %s: %2f", key,
victim, self.scheduler.occupancy[victim],
thief, self.scheduler.occupancy[thief])

Expand Down
220 changes: 108 additions & 112 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,122 +1126,124 @@ def put_key_in_memory(self, key, value):
self.transition(dep, 'ready')

@gen.coroutine
def gather_dep(self, dep, slot, cause=None):
failures = 5
del self.connections[slot]
try:
if self.validate:
self.validate_state()

if not self.who_has.get(dep):
if dep not in self.dependents:
return
failures += 1
result = self.query_who_has(dep)
if not result or failures > 5:
for key in list(self.dependents[dep]):
if dep in self.executing:
continue
if dep in self.waiting_for_data.get(key, ()):
self.cancel_key(key)
return
else:
assert self.who_has.get(dep)

if dep in self.in_flight:
return
def gather_from_worker(self, worker, dep, token, cause=None):
self.connections[token] = None
deps = {dep}

total_bytes = self.nbytes[dep]
L = self.pending_data_per_worker[worker]

while L:
d = L.popleft()
if (d in self.data or
d in self.in_flight or
d in self.executing or
d not in self.nbytes): # no longer tracking
continue
if total_bytes + self.nbytes[d] > self.target_message_size:
break
deps.add(d)
total_bytes += self.nbytes[d]

worker = random.choice(list(self.who_has[dep]))
for d in deps:
assert d not in self.in_flight
self.in_flight[d] = token
self.log.append(('request-dep', dep, worker, deps))
self.connections[token] = deps

deps = {dep}
try:
start = time()
logger.debug("Request %d keys and %d bytes", len(deps),
total_bytes)
response = yield self.rpc(worker).get_data(keys=list(deps),
who=self.address)
self.log.append(('receive-dep', worker, list(response)))
stop = time()
finally:
del self.connections[token]
assert len(self.connections) < self.total_connections
for d in deps:
del self.in_flight[d]

total_bytes = self.nbytes[dep]
L = self.pending_data_per_worker[worker]
if dep in response:
self.response[cause].update({'transfer_start': start,
'transfer_stop': stop})

if response:
total_bytes = sum(self.nbytes.get(dep, 0) for dep in response)
duration = (stop - start) or 0.5
self.incoming_transfer_log.append({
'start': start,
'stop': stop,
'middle': (start + stop) / 2.0,
'duration': duration,
'keys': {dep: self.nbytes.get(dep, None) for dep in response},
'total': total_bytes,
'bandwidth': total_bytes / duration,
'who': worker
})
if self.digests is not None:
self.digests['transfer-bandwidth'].add(total_bytes / duration)
self.digests['transfer-duration'].add(duration)
self.counters['transfer-count'].add(len(response))
self.incoming_count += 1

while L:
d = L.popleft()
if (d in self.data or
d in self.in_flight or
d in self.executing or
d not in self.nbytes): # no longer tracking
continue
if total_bytes + self.nbytes[d] > self.target_message_size:
break
deps.add(d)
total_bytes += self.nbytes[d]
self.batched_stream.send({'op': 'add-keys',
'keys': list(response)})

token = object()
for d in deps:
assert d not in self.in_flight
self.in_flight[d] = token
self.log.append(('request-dep', dep, worker, deps))
self.connections[token] = deps
for d, v in response.items():
self.put_key_in_memory(d, v)

try:
start = time()
logger.debug("Request %d keys and %d bytes", len(deps),
total_bytes)
response = yield self.rpc(worker).get_data(keys=list(deps), who=self.address)
stop = time()
deps2 = list(response)

if cause:
self.response[cause].update({'transfer_start': start,
'transfer_stop': stop})

total_bytes = sum(self.nbytes.get(dep, 0) for dep in deps2)
duration = (stop - start) or 0.5
self.incoming_transfer_log.append({
'start': start,
'stop': stop,
'middle': (start + stop) / 2.0,
'duration': duration,
'keys': {dep: self.nbytes.get(dep, None) for dep in deps2},
'total': total_bytes,
'bandwidth': total_bytes / duration,
'who': worker
})
if self.digests is not None:
self.digests['transfer-bandwidth'].add(total_bytes / duration)
self.digests['transfer-duration'].add(duration)
self.counters['transfer-count'].add(len(deps2))
self.incoming_count += 1
except EnvironmentError as e:
logger.error("Worker stream died during communication: %s",
worker)
response = {}
self.log.append(('receive-dep-failed', worker))
finally:
del self.connections[token]
for d in deps:
if d not in response and d in self.dependents:
self.log.append(('missing-dep', d))
try:
self.who_has[d].remove(worker)
except KeyError:
pass
try:
self.has_what[worker].remove(d)
except KeyError:
pass
for key in self.dependents.get(d, ()):
if key in self.waiting_for_data:
self.data_needed.appendleft(key)

self.log.append(('receive-dep', worker, list(response)))
@gen.coroutine
def gather_dep(self, dep, token, cause=None):
try:
if self.validate:
self.validate_state()

assert len(self.connections) < self.total_connections
response = {}

for d in deps:
del self.in_flight[d]
for i in range(2): # number of attempts
if not self.who_has.get(dep):
result = yield self.query_who_has(dep)
if not result: # genuinely no one has this
for key in list(self.dependents[dep]):
if dep in self.executing:
continue
if dep in self.waiting_for_data.get(key, ()):
self.cancel_key(key)
return

for d, v in response.items():
self.put_key_in_memory(d, v)
workers = list(self.who_has[dep])

if response:
self.batched_stream.send({'op': 'add-keys',
'keys': list(response)})
random.shuffle(workers)

for d in deps:
if d not in response and d in self.dependents:
self.log.append(('missing-dep', d))
try:
self.who_has[d].remove(worker)
except KeyError:
pass
for worker in workers:
if dep in self.in_flight or not dep in self.dependents:
return
try:
self.has_what[worker].remove(d)
except KeyError:
pass
for key in self.dependents.get(d, ()):
if key in self.waiting_for_data:
self.data_needed.appendleft(key)
yield self.gather_from_worker(worker, dep, token,
cause=cause)
except EnvironmentError:
continue
else:
break
if dep in self.data:
break

if self.validate:
self.validate_state()
Expand All @@ -1256,15 +1258,9 @@ def gather_dep(self, dep, slot, cause=None):

@gen.coroutine
def query_who_has(self, *deps):
try:
response = yield self.scheduler.who_has(keys=deps)
self.update_who_has(response)
raise gen.Return(response)
except Exception as e:
logger.exception(e)
if LOG_PDB:
import pdb; pdb.set_trace()
raise
response = yield self.scheduler.who_has(keys=deps)
self.update_who_has(response)
raise gen.Return(response)

def update_who_has(self, who_has):
try:
Expand Down

0 comments on commit 6d8bdcb

Please sign in to comment.