diff --git a/distributed/semaphore.py b/distributed/semaphore.py index d1de970d72..2698e653da 100644 --- a/distributed/semaphore.py +++ b/distributed/semaphore.py @@ -64,15 +64,17 @@ def __init__(self, scheduler): dask.config.get("distributed.scheduler.locks.lease-validation-interval"), default="s", ) - self._pc_lease_timeout = PeriodicCallback( + self.scheduler.periodic_callbacks[ + "semaphore-lease-timeout" + ] = pc = PeriodicCallback( self._check_lease_timeout, validation_callback_time * 1000 ) - self._pc_lease_timeout.start() + pc.start() self.lease_timeout = parse_timedelta( dask.config.get("distributed.scheduler.locks.lease-timeout"), default="s" ) - async def get_value(self, name=None): + def get_value(self, name=None): return len(self.leases[name]) # `comm` here is required by the handler interface @@ -527,6 +529,7 @@ def __setstate__(self, state): ) def close(self): + self.refresh_callback.stop() return self.sync(self.scheduler.semaphore_close, name=self.name) def __del__(self): diff --git a/distributed/tests/test_semaphore.py b/distributed/tests/test_semaphore.py index 88a24e18a0..09b4bb0467 100644 --- a/distributed/tests/test_semaphore.py +++ b/distributed/tests/test_semaphore.py @@ -129,11 +129,11 @@ async def test_async_ctx(s, a, b): assert await sem.acquire() -@pytest.mark.slow def test_worker_dies(loop): with cluster( config={ - "distributed.scheduler.locks.lease-timeout": "0.1s", + "distributed.scheduler.locks.lease-timeout": "50ms", + "distributed.scheduler.locks.lease-validation-interval": "10ms", } ) as (scheduler, workers): with Client(scheduler["address"], loop=loop) as client: @@ -191,9 +191,8 @@ def f(x, release=True): assert result.count(False) == 9 -@pytest.mark.slow -@gen_cluster(client=True, timeout=120) -async def test_close_async(c, s, a, b): +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_close_async(c, s, a): sem = await Semaphore(name="test") assert await sem.acquire() @@ -212,7 +211,7 @@ async def test_close_async(c, s, a, b): assert await sem2.acquire() def f(sem_): - return sem_.acquire() + sem_.acquire(timeout="0.5s") semaphore_object = s.extensions["semaphores"] fire_and_forget(c.submit(f, sem_=sem2)) @@ -517,7 +516,7 @@ def access_limited(val, sem): assert len(protected_resource) == 0 protected_resource.append(val) # Interact with the DB - time.sleep(0.2) + time.sleep(0.01) protected_resource.remove(val) client.gather(client.map(access_limited, range(10), sem=sem)) @@ -556,50 +555,25 @@ async def test_release_retry(c, s, a, b): "distributed.scheduler.locks.lease-validation-interval": "100ms", }, ) -async def test_release_failure(c, s, a, b): +async def test_release_failure(c, s, a, b, caplog): """Don't raise even if release fails: lease will be cleaned up by the lease-validation after a specified interval anyway (see config parameters used). """ with dask.config.set({"distributed.comm.retry.count": 1}): pool = await FlakyConnectionPool(failing_connections=5) - + ext = s.extensions["semaphores"] + name = "foo" semaphore = await Semaphore( max_leases=2, - name="resource_we_want_to_limit", + name=name, scheduler_rpc=pool(s.address), ) await semaphore.acquire() pool.activate() # Comm chaos starts + assert await semaphore.release() is False - # Release fails (after a single retry) because of broken connections - with captured_logger( - "distributed.semaphore", level=logging.ERROR - ) as semaphore_log: - with captured_logger("distributed.utils_comm") as retry_log: - assert await semaphore.release() is False - - with captured_logger( - "distributed.semaphore", level=logging.DEBUG - ) as semaphore_cleanup_log: - pool.deactivate() # comm chaos stops - assert await semaphore.get_value() == 1 # lease is still registered - await asyncio.sleep(0.2) # Wait for lease to be cleaned up - - # Check release was retried - retry_log = retry_log.getvalue().split("\n")[0] - assert retry_log.startswith( - "Retrying semaphore release:" - ) and retry_log.endswith("after exception in attempt 0/1: ") - # Check release failed - semaphore_log = semaphore_log.getvalue().split("\n")[0] - assert semaphore_log.startswith( - "Release failed for id=" - ) and semaphore_log.endswith("Cluster network might be unstable?") - - # Check lease has timed out - assert any( - log.startswith("Lease") and "timed out after" in log - for log in semaphore_cleanup_log.getvalue().split("\n") - ) - assert await semaphore.get_value() == 0 + pool.deactivate() # comm chaos stops + assert ext.get_value(name) == 1 # lease is still registered + while not (await semaphore.get_value() == 0): + await asyncio.sleep(0.01)