diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8a7edfd110..25b43e2e2c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,5 @@ +# NOTE: autoupdate does not pick up flake8-bugbear since it is a transitive +# dependency. Make sure to update flake8-bugbear manually on a regular basis. repos: - repo: https://github.com/MarcoGorelli/absolufy-imports rev: v0.3.1 @@ -28,6 +30,10 @@ repos: hooks: - id: flake8 language_version: python3 + additional_dependencies: + # NOTE: autoupdate does not pick up flake8-bugbear since it is a transitive + # dependency. Make sure to update flake8-bugbear manually on a regular basis. + - flake8-bugbear==22.7.1 - repo: https://github.com/pre-commit/mirrors-mypy rev: v0.971 hooks: diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index df891cb48d..8f15072855 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3896,10 +3896,10 @@ def test_threaded_get_within_distributed(c): for get in [dask.local.get_sync, dask.multiprocessing.get, dask.threaded.get]: - def f(): + def f(get): return get({"x": (lambda: 1,)}, "x") - future = c.submit(f) + future = c.submit(f, get) assert future.result() == 1 @@ -4017,8 +4017,8 @@ async def test_serialize_future(s, a, b): result = await future for ci in (c1, c2): - for ctxman in ci.as_current, lambda: temp_default_client(ci): - with ctxman(): + for ctxman in lambda ci: ci.as_current(), lambda ci: temp_default_client(ci): + with ctxman(ci): future2 = pickle.loads(pickle.dumps(future)) assert future2.client is ci assert stringify(future2.key) in ci.futures diff --git a/distributed/worker.py b/distributed/worker.py index 5ff76f728e..a1da5f78c9 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1542,7 +1542,7 @@ async def close( # type: ignore if executor is utils._offload_executor: continue # Never shutdown the offload executor - def _close(wait): + def _close(executor, wait): if isinstance(executor, ThreadPoolExecutor): executor._work_queue.queue.clear() executor.shutdown(wait=wait, timeout=timeout) @@ -1556,17 +1556,19 @@ def _close(wait): if is_python_shutting_down(): # If we're shutting down there is no need to wait for daemon # threads to finish - _close(wait=False) + _close(executor=executor, wait=False) else: try: - await to_thread(_close, wait=executor_wait) + await to_thread(_close, executor=executor, wait=executor_wait) except RuntimeError: # Are we shutting down the process? logger.error( "Could not close executor %r by dispatching to thread. Trying synchronously.", executor, exc_info=True, ) - _close(wait=executor_wait) # Just run it directly + _close( + executor=executor, wait=executor_wait + ) # Just run it directly self.stop() await self.rpc.close() diff --git a/setup.cfg b/setup.cfg index 6cd9f78dae..e070168957 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,11 +5,12 @@ # Note: there cannot be spaces after commas here exclude = __init__.py,versioneer.py,distributed/_concurrent_futures_thread.py +extend-select = B950 ignore = # Ignores below are aligned with black https://github.com/psf/black/blob/main/.flake8 E203, # Whitespace before ':' E266, # Too many leading '#' for block comment - E501, # Line too long + E501, # Line too long, ignored in favor of B950 W503, # Line break occurred before a binary operator per-file-ignores = **/tests/*: @@ -20,8 +21,11 @@ per-file-ignores = E741, # Local variable name is assigned to but never used F841, + # Do not call assert False since python -O removes these calls + B011, -max-line-length = 88 +# B950 will only trigger at 88 +max-line-length = 80 [isort] sections = FUTURE,STDLIB,THIRDPARTY,DISTRIBUTED,FIRSTPARTY,LOCALFOLDER