Skip to content

Commit

Permalink
Various cleanups in semaphore (#5885)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Jan 12, 2023
1 parent 34f2c13 commit 6d3182e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 44 deletions.
9 changes: 6 additions & 3 deletions distributed/semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,17 @@ def __init__(self, scheduler):
dask.config.get("distributed.scheduler.locks.lease-validation-interval"),
default="s",
)
self._pc_lease_timeout = PeriodicCallback(
self.scheduler.periodic_callbacks[
"semaphore-lease-timeout"
] = pc = PeriodicCallback(
self._check_lease_timeout, validation_callback_time * 1000
)
self._pc_lease_timeout.start()
pc.start()
self.lease_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.locks.lease-timeout"), default="s"
)

async def get_value(self, name=None):
def get_value(self, name=None):
return len(self.leases[name])

# `comm` here is required by the handler interface
Expand Down Expand Up @@ -527,6 +529,7 @@ def __setstate__(self, state):
)

def close(self):
self.refresh_callback.stop()
return self.sync(self.scheduler.semaphore_close, name=self.name)

def __del__(self):
Expand Down
56 changes: 15 additions & 41 deletions distributed/tests/test_semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ async def test_async_ctx(s, a, b):
assert await sem.acquire()


@pytest.mark.slow
def test_worker_dies(loop):
with cluster(
config={
"distributed.scheduler.locks.lease-timeout": "0.1s",
"distributed.scheduler.locks.lease-timeout": "50ms",
"distributed.scheduler.locks.lease-validation-interval": "10ms",
}
) as (scheduler, workers):
with Client(scheduler["address"], loop=loop) as client:
Expand Down Expand Up @@ -191,9 +191,8 @@ def f(x, release=True):
assert result.count(False) == 9


@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
async def test_close_async(c, s, a, b):
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_close_async(c, s, a):
sem = await Semaphore(name="test")

assert await sem.acquire()
Expand All @@ -212,7 +211,7 @@ async def test_close_async(c, s, a, b):
assert await sem2.acquire()

def f(sem_):
return sem_.acquire()
sem_.acquire(timeout="0.5s")

semaphore_object = s.extensions["semaphores"]
fire_and_forget(c.submit(f, sem_=sem2))
Expand Down Expand Up @@ -517,7 +516,7 @@ def access_limited(val, sem):
assert len(protected_resource) == 0
protected_resource.append(val)
# Interact with the DB
time.sleep(0.2)
time.sleep(0.01)
protected_resource.remove(val)

client.gather(client.map(access_limited, range(10), sem=sem))
Expand Down Expand Up @@ -556,50 +555,25 @@ async def test_release_retry(c, s, a, b):
"distributed.scheduler.locks.lease-validation-interval": "100ms",
},
)
async def test_release_failure(c, s, a, b):
async def test_release_failure(c, s, a, b, caplog):
"""Don't raise even if release fails: lease will be cleaned up by the
lease-validation after a specified interval anyway (see config parameters used).
"""

with dask.config.set({"distributed.comm.retry.count": 1}):
pool = await FlakyConnectionPool(failing_connections=5)

ext = s.extensions["semaphores"]
name = "foo"
semaphore = await Semaphore(
max_leases=2,
name="resource_we_want_to_limit",
name=name,
scheduler_rpc=pool(s.address),
)
await semaphore.acquire()
pool.activate() # Comm chaos starts
assert await semaphore.release() is False

# Release fails (after a single retry) because of broken connections
with captured_logger(
"distributed.semaphore", level=logging.ERROR
) as semaphore_log:
with captured_logger("distributed.utils_comm") as retry_log:
assert await semaphore.release() is False

with captured_logger(
"distributed.semaphore", level=logging.DEBUG
) as semaphore_cleanup_log:
pool.deactivate() # comm chaos stops
assert await semaphore.get_value() == 1 # lease is still registered
await asyncio.sleep(0.2) # Wait for lease to be cleaned up

# Check release was retried
retry_log = retry_log.getvalue().split("\n")[0]
assert retry_log.startswith(
"Retrying semaphore release:"
) and retry_log.endswith("after exception in attempt 0/1: ")
# Check release failed
semaphore_log = semaphore_log.getvalue().split("\n")[0]
assert semaphore_log.startswith(
"Release failed for id="
) and semaphore_log.endswith("Cluster network might be unstable?")

# Check lease has timed out
assert any(
log.startswith("Lease") and "timed out after" in log
for log in semaphore_cleanup_log.getvalue().split("\n")
)
assert await semaphore.get_value() == 0
pool.deactivate() # comm chaos stops
assert ext.get_value(name) == 1 # lease is still registered
while not (await semaphore.get_value() == 0):
await asyncio.sleep(0.01)

0 comments on commit 6d3182e

Please sign in to comment.