Skip to content

Commit

Permalink
deprecate passing stopped loops to LoopRunner (and therefore Client/C…
Browse files Browse the repository at this point in the history
…luster) (#6680)
  • Loading branch information
graingert authored Aug 1, 2022
1 parent df28343 commit 172e37f
Show file tree
Hide file tree
Showing 16 changed files with 244 additions and 65 deletions.
4 changes: 2 additions & 2 deletions distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ def __reduce__(self):
@property
def _io_loop(self):
if self._worker:
return self._worker.io_loop
return self._worker.loop
else:
return self._client.io_loop
return self._client.loop

@property
def _scheduler_rpc(self):
Expand Down
38 changes: 35 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
except ImportError:
single_key = first
from tornado import gen
from tornado.ioloop import PeriodicCallback
from tornado.ioloop import IOLoop, PeriodicCallback

import distributed.utils
from distributed import cluster_dump, preloading
Expand Down Expand Up @@ -763,6 +763,7 @@ class Client(SyncMethodMixin):
_default_event_handlers = {"print": _handle_print, "warn": _handle_warn}

preloads: list[preloading.Preload]
__loop: IOLoop | None = None

def __init__(
self,
Expand Down Expand Up @@ -875,7 +876,6 @@ def __init__(

self._asynchronous = asynchronous
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.io_loop = self.loop = self._loop_runner.loop
self._connecting_to_scheduler = False

self._gather_keys = None
Expand Down Expand Up @@ -947,6 +947,38 @@ def __init__(

ReplayTaskClient(self)

@property
def io_loop(self) -> IOLoop | None:
warnings.warn(
"The io_loop property is deprecated", DeprecationWarning, stacklevel=2
)
return self.loop

@io_loop.setter
def io_loop(self, value: IOLoop) -> None:
warnings.warn(
"The io_loop property is deprecated", DeprecationWarning, stacklevel=2
)
self.loop = value

@property
def loop(self) -> IOLoop | None:
loop = self.__loop
if loop is None:
# If the loop is not running when this is called, the LoopRunner.loop
# property will raise a DeprecationWarning
# However subsequent calls might occur - eg atexit, where a stopped
# loop is still acceptable - so we cache access to the loop.
self.__loop = loop = self._loop_runner.loop
return loop

@loop.setter
def loop(self, value: IOLoop) -> None:
warnings.warn(
"setting the loop property is deprecated", DeprecationWarning, stacklevel=2
)
self.__loop = value

@contextmanager
def as_current(self):
"""Thread-local, Task-local context manager that causes the Client.current
Expand Down Expand Up @@ -1359,7 +1391,7 @@ def __exit__(self, exc_type, exc_value, traceback):
def __del__(self):
# If the loop never got assigned, we failed early in the constructor,
# nothing to do
if hasattr(self, "loop"):
if self.__loop is not None:
self.close()

def _inc_ref(self, key):
Expand Down
24 changes: 22 additions & 2 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from inspect import isawaitable
from typing import Any

from tornado.ioloop import PeriodicCallback
from tornado.ioloop import IOLoop, PeriodicCallback

import dask.config
from dask.utils import _deprecated, format_bytes, parse_timedelta, typename
Expand Down Expand Up @@ -55,6 +55,7 @@ class Cluster(SyncMethodMixin):
"""

_supports_scaling = True
__loop: IOLoop | None = None

def __init__(
self,
Expand All @@ -65,7 +66,6 @@ def __init__(
scheduler_sync_interval=1,
):
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.loop = self._loop_runner.loop

self.scheduler_info = {"workers": {}}
self.periodic_callbacks = {}
Expand All @@ -89,6 +89,26 @@ def __init__(
}
self.status = Status.created

@property
def loop(self) -> IOLoop | None:
loop = self.__loop
if loop is None:
# If the loop is not running when this is called, the LoopRunner.loop
# property will raise a DeprecationWarning
# However subsequent calls might occur - eg atexit, where a stopped
# loop is still acceptable - so we cache access to the loop.
self.__loop = loop = self._loop_runner.loop
return loop

@loop.setter
def loop(self, value: IOLoop) -> None:
warnings.warn(
"setting the loop property is deprecated", DeprecationWarning, stacklevel=2
)
if value is None:
raise ValueError("expected an IOLoop, got None")
self.__loop = value

@property
def name(self):
return self._cluster_info["name"]
Expand Down
17 changes: 14 additions & 3 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import TYPE_CHECKING, Any, ClassVar

from tornado import gen
from tornado.ioloop import IOLoop

import dask
from dask.utils import parse_bytes, parse_timedelta
Expand Down Expand Up @@ -230,6 +231,9 @@ def __init__(
shutdown_on_close=True,
scheduler_sync_interval=1,
):
if loop is None and asynchronous:
loop = IOLoop.current()

self._created = weakref.WeakSet()

self.scheduler_spec = copy.copy(scheduler)
Expand Down Expand Up @@ -259,7 +263,14 @@ def __init__(
scheduler_sync_interval=scheduler_sync_interval,
)

if not self.asynchronous:
try:
called_from_running_loop = (
getattr(loop, "asyncio_loop", None) is asyncio.get_running_loop()
)
except RuntimeError:
called_from_running_loop = asynchronous

if not called_from_running_loop:
self._loop_runner.start()
self.sync(self._start)
try:
Expand Down Expand Up @@ -659,7 +670,7 @@ async def run_spec(spec: dict[str, Any], *args: Any) -> dict[str, Worker | Nanny
@atexit.register
def close_clusters():
for cluster in list(SpecCluster._instances):
if cluster.shutdown_on_close:
if getattr(cluster, "shutdown_on_close", False):
with suppress(gen.TimeoutError, TimeoutError):
if cluster.status != Status.closed:
if getattr(cluster, "status", Status.closed) != Status.closed:
cluster.close(timeout=10)
1 change: 1 addition & 0 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ async def test_no_more_workers_than_tasks():
assert len(cluster.scheduler.workers) <= 1


@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
def test_basic_no_loop(cleanup):
loop = None
try:
Expand Down
17 changes: 17 additions & 0 deletions distributed/deploy/tests/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import pytest
from tornado.ioloop import IOLoop

from distributed.deploy.cluster import Cluster
from distributed.utils_test import gen_test
Expand Down Expand Up @@ -35,3 +36,19 @@ async def test_logs_deprecated():
async with Cluster(asynchronous=True) as cluster:
with pytest.warns(FutureWarning, match="get_logs"):
cluster.logs()


@gen_test()
async def test_deprecated_loop_properties():
class ExampleCluster(Cluster):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loop = self.io_loop = IOLoop.current()

with pytest.warns(DeprecationWarning) as warninfo:
async with ExampleCluster(asynchronous=True, loop=IOLoop.current()):
pass

assert [(w.category, *w.message.args) for w in warninfo] == [
(DeprecationWarning, "setting the loop property is deprecated")
]
1 change: 1 addition & 0 deletions distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def test_spec_sync(loop):
assert result == 11


@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
def test_loop_started_in_constructor(cleanup):
# test that SpecCluster.__init__ starts a loop in another thread
cluster = SpecCluster(worker_spec, scheduler=scheduler, loop=None)
Expand Down
24 changes: 22 additions & 2 deletions distributed/diagnostics/progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import html
import logging
import sys
import warnings
import weakref
from contextlib import suppress
from timeit import default_timer
Expand Down Expand Up @@ -111,6 +112,8 @@ def __del__(self):


class TextProgressBar(ProgressBar):
__loop: IOLoop | None = None

def __init__(
self,
keys,
Expand All @@ -122,14 +125,31 @@ def __init__(
start=True,
**kwargs,
):
self._loop_runner = loop_runner = LoopRunner(loop=loop)
super().__init__(keys, scheduler, interval, complete)
self.width = width
self.loop = loop or IOLoop()

if start:
loop_runner = LoopRunner(self.loop)
loop_runner.run_sync(self.listen)

@property
def loop(self) -> IOLoop | None:
loop = self.__loop
if loop is None:
# If the loop is not running when this is called, the LoopRunner.loop
# property will raise a DeprecationWarning
# However subsequent calls might occur - eg atexit, where a stopped
# loop is still acceptable - so we cache access to the loop.
self.__loop = loop = self._loop_runner.loop
return loop

@loop.setter
def loop(self, value: IOLoop) -> None:
warnings.warn(
"setting the loop property is deprecated", DeprecationWarning, stacklevel=2
)
self.__loop = value

def _draw_bar(self, remaining, all, **kwargs):
frac = (1 - remaining / all) if all else 1.0
bar = "#" * int(self.width * frac)
Expand Down
18 changes: 18 additions & 0 deletions distributed/diagnostics/tests/test_progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from time import sleep

import pytest
from tornado.ioloop import IOLoop

from distributed.diagnostics.progressbar import TextProgressBar, progress
from distributed.metrics import time
from distributed.utils_test import div, gen_cluster, inc
Expand Down Expand Up @@ -72,3 +75,18 @@ def test_progress_function_w_kwargs(client, capsys):

progress(f, interval="20ms", loop=client.loop)
check_bar_completed(capsys)


@gen_cluster(client=True, nthreads=[])
async def test_deprecated_loop_properties(c, s):
class ExampleTextProgressBar(TextProgressBar):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loop = self.io_loop = IOLoop.current()

with pytest.warns(DeprecationWarning) as warninfo:
ExampleTextProgressBar(client=c, keys=[], start=False, loop=IOLoop.current())

assert [(w.category, *w.message.args) for w in warninfo] == [
(DeprecationWarning, "setting the loop property is deprecated")
]
2 changes: 1 addition & 1 deletion distributed/semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def __init__(
except ValueError:
client = get_client()
self.scheduler = scheduler_rpc or client.scheduler
self.loop = loop or client.io_loop
self.loop = loop or client.loop

self.name = name or "semaphore-" + uuid.uuid4().hex
self.max_leases = max_leases
Expand Down
28 changes: 27 additions & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import pytest
import yaml
from tlz import concat, first, identity, isdistinct, merge, pluck, valmap
from tornado.ioloop import IOLoop

import dask
import dask.bag as db
Expand Down Expand Up @@ -2873,6 +2874,7 @@ async def test_startup_close_startup(s, a, b):
await c.close()


@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
def test_startup_close_startup_sync(loop):
with cluster() as (s, [a, b]):
with Client(s["address"], loop=loop) as c:
Expand Down Expand Up @@ -5532,13 +5534,18 @@ async def test_future_auto_inform(c, s, a, b):
await client.close()


@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
def test_client_async_before_loop_starts(cleanup):
async def close():
async with client:
pass

with pristine_loop() as loop:
client = Client(asynchronous=True, loop=loop)
with pytest.warns(
DeprecationWarning,
match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated",
):
client = Client(asynchronous=True, loop=loop)
assert client.asynchronous
assert isinstance(client.close(), NoOpAwaitable)
loop.run_sync(close) # TODO: client.close() does not unset global client
Expand Down Expand Up @@ -6837,6 +6844,7 @@ async def test_workers_collection_restriction(c, s, a, b):
assert a.data and not b.data


@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
async def test_get_client_functions_spawn_clusters(c, s, a):
# see gh4565
Expand Down Expand Up @@ -7550,3 +7558,21 @@ def test_quiet_close_process(processes, tmp_path):

assert not out
assert not err


@gen_cluster(client=False, nthreads=[])
async def test_deprecated_loop_properties(s):
class ExampleClient(Client):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loop = self.io_loop = IOLoop.current()

with pytest.warns(DeprecationWarning) as warninfo:
async with ExampleClient(s.address, asynchronous=True, loop=IOLoop.current()):
pass

assert [(w.category, *w.message.args) for w in warninfo] == [
(DeprecationWarning, "setting the loop property is deprecated"),
(DeprecationWarning, "The io_loop property is deprecated"),
(DeprecationWarning, "setting the loop property is deprecated"),
]
Loading

0 comments on commit 172e37f

Please sign in to comment.