diff --git a/distributed/_version.py b/distributed/_version.py index cb3e10b6ff..a37b7c1e8d 100644 --- a/distributed/_version.py +++ b/distributed/_version.py @@ -113,7 +113,7 @@ def versions_from_parentdir(parentdir_prefix, root, verbose): """ rootdirs = [] - for i in range(3): + for _ in range(3): dirname = os.path.basename(root) if dirname.startswith(parentdir_prefix): return { @@ -520,7 +520,7 @@ def get_versions(): # versionfile_source is the relative path from the top of the source # tree (where the .git directory might live) to this file. Invert # this to find the root from __file__. - for i in cfg.versionfile_source.split("/"): + for _ in cfg.versionfile_source.split("/"): root = os.path.dirname(root) except NameError: return { diff --git a/distributed/client.py b/distributed/client.py index 3588217520..cf7e06e009 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -1185,7 +1185,7 @@ async def _start(self, timeout=no_default, **kwargs): elif self.scheduler_file is not None: while not os.path.exists(self.scheduler_file): await asyncio.sleep(0.01) - for i in range(10): + for _ in range(10): try: with open(self.scheduler_file) as f: cfg = json.load(f) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index e9acb7086e..9de3c0cf4e 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -519,7 +519,7 @@ async def start(self): self.tcp_server = TCPServer(max_buffer_size=MAX_BUFFER_SIZE, **self.server_args) self.tcp_server.handle_stream = self._handle_stream backlog = int(dask.config.get("distributed.comm.socket-backlog")) - for i in range(5): + for _ in range(5): try: # When shuffling data between workers, there can # really be O(cluster size) connection requests diff --git a/distributed/comm/tests/test_comms.py b/distributed/comm/tests/test_comms.py index 49369255b2..50982c20fc 100644 --- a/distributed/comm/tests/test_comms.py +++ b/distributed/comm/tests/test_comms.py @@ -339,7 +339,7 @@ async def test_comm_failure_threading(tcp): async def sleep_for_60ms(): max_thread_count = 0 - for x in range(60): + for _ in range(60): await asyncio.sleep(0.001) thread_count = threading.active_count() if thread_count > max_thread_count: @@ -381,7 +381,7 @@ async def handle_comm(comm): try: assert comm.peer_address.startswith("inproc://" + addr_head) client_addresses.add(comm.peer_address) - for i in range(N_MSGS): + for _ in range(N_MSGS): msg = await comm.read() msg["op"] = "pong" await comm.write(msg) @@ -401,7 +401,7 @@ async def client_communicate(key, delay=0): comm = await connect(listener.contact_address) try: assert comm.peer_address == "inproc://" + listener_addr - for i in range(N_MSGS): + for _ in range(N_MSGS): await comm.write({"op": "ping", "data": key}) if delay: await asyncio.sleep(delay) @@ -1029,7 +1029,7 @@ async def handle_comm(comm): listeners = [] N = 100 - for i in range(N): + for _ in range(N): listener = await listen(addr, handle_comm) listeners.append(listener) diff --git a/distributed/comm/tests/test_ucx.py b/distributed/comm/tests/test_ucx.py index f5a63bbb5b..581e17b802 100644 --- a/distributed/comm/tests/test_ucx.py +++ b/distributed/comm/tests/test_ucx.py @@ -302,7 +302,7 @@ async def test_stress( x = x.persist() await wait(x) - for i in range(10): + for _ in range(10): x = x.rechunk((chunksize, -1)) x = x.rechunk((-1, chunksize)) x = x.persist() diff --git a/distributed/core.py b/distributed/core.py index ea0c8c38a4..14e6949278 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -1399,7 +1399,7 @@ def collect(self): self.active, len(self._connecting), ) - for addr, comms in self.available.items(): + for comms in self.available.values(): for comm in comms: IOLoop.current().add_callback(comm.close) self.semaphore.release() diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index 130bcfe8a2..3d89bd7a25 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -1440,7 +1440,7 @@ def __init__(self, scheduler, **kwargs): def update(self): agg_times = defaultdict(float) - for key, ts in self.scheduler.task_prefixes.items(): + for ts in self.scheduler.task_prefixes.values(): for action, t in ts.all_durations.items(): agg_times[action] += t @@ -2539,7 +2539,7 @@ def update(self): durations = set() nbytes = set() - for key, tg in self.scheduler.task_groups.items(): + for tg in self.scheduler.task_groups.values(): if tg.duration and tg.nbytes_total: durations.add(tg.duration) @@ -3495,8 +3495,8 @@ def __init__(self, scheduler, width=800, **kwargs): @without_property_validation def update(self): data = {name: [] for name in self.names + self.extra_names} - for i, (addr, ws) in enumerate( - sorted(self.scheduler.workers.items(), key=lambda kv: str(kv[1].name)) + for i, ws in enumerate( + sorted(self.scheduler.workers.values(), key=lambda ws: str(ws.name)) ): minfo = ws.memory diff --git a/distributed/deploy/old_ssh.py b/distributed/deploy/old_ssh.py index a27e64c969..a92be5c20c 100644 --- a/distributed/deploy/old_ssh.py +++ b/distributed/deploy/old_ssh.py @@ -428,7 +428,7 @@ def __init__( # Start worker nodes self.workers = [] - for i, addr in enumerate(worker_addrs): + for addr in worker_addrs: self.add_worker(addr) @gen.coroutine diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 051f551e11..1248db0f0d 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -74,7 +74,7 @@ async def test_adaptive_local_cluster_multi_workers(): await asyncio.sleep(0.01) # no workers for a while - for i in range(10): + for _ in range(10): assert not cluster.scheduler.workers await asyncio.sleep(0.05) @@ -236,7 +236,7 @@ async def test_adapt_quickly(): # Don't scale up for large sequential computations x = await client.scatter(1) - for i in range(100): + for _ in range(100): x = client.submit(slowinc, x) await asyncio.sleep(0.1) diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index 9121abb45f..31b95e2802 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -465,7 +465,7 @@ async def test_MultiWorker(): adapt = cluster.adapt(minimum=0, maximum=4) - for i in range(adapt.wait_count): # relax down to 0 workers + for _ in range(adapt.wait_count): # relax down to 0 workers await adapt.adapt() await cluster assert not s.workers diff --git a/distributed/diagnostics/tests/test_progress_stream.py b/distributed/diagnostics/tests/test_progress_stream.py index f6afea1eb1..73a5be81ab 100644 --- a/distributed/diagnostics/tests/test_progress_stream.py +++ b/distributed/diagnostics/tests/test_progress_stream.py @@ -63,7 +63,7 @@ async def test_progress_stream(c, s, a, b): futures = c.map(div, [1] * 10, range(10)) x = 1 - for i in range(5): + for _ in range(5): x = delayed(inc)(x) future = c.compute(x) diff --git a/distributed/http/routing.py b/distributed/http/routing.py index 3304396437..abce83c573 100644 --- a/distributed/http/routing.py +++ b/distributed/http/routing.py @@ -20,7 +20,7 @@ def _descend_routes(router, routers=None, out=None): if issubclass(rule.target, tornado.web.StaticFileHandler): prefix = rule.matcher.regex.pattern.rstrip("(.*)$").rstrip("/") path = rule.target_kwargs["path"] - for d, dirs, files in os.walk(path): + for d, _, files in os.walk(path): for fn in files: fullpath = d + "/" + fn ourpath = fullpath.replace(path, prefix).replace("\\", "/") diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index edc580db7e..900a233885 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -450,7 +450,7 @@ async def test_profile_nested_sizeof(): original = outer = {} inner = {} - for i in range(n): + for _ in range(n): outer["children"] = inner outer, inner = inner, {} diff --git a/distributed/queues.py b/distributed/queues.py index 23090a7d22..668b04e89d 100644 --- a/distributed/queues.py +++ b/distributed/queues.py @@ -110,7 +110,7 @@ def process(record): "integer batch sizes and timeouts" ) raise NotImplementedError(msg) - for i in range(batch): + for _ in range(batch): record = await q.get() out.append(record) out = [process(o) for o in out] diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e69efb8090..2f3559c34f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6764,7 +6764,7 @@ def add_resources(self, worker: str, resources=None): def remove_resources(self, worker): ws: WorkerState = self.workers[worker] - for resource, quantity in ws.resources.items(): + for resource in ws.resources: dr: dict = self.resources.get(resource, None) if dr is None: self.resources[resource] = dr = {} @@ -6889,7 +6889,7 @@ async def get_profile_metadata( tt = t // dt * dt if tt > last: last = tt - for k, v in keys.items(): + for v in keys.values(): v.append([tt, 0]) for k, v in d.items(): keys[k][-1][1] += v @@ -7147,7 +7147,7 @@ async def reevaluate_occupancy(self, worker_index: int = 0): workers: list = list(self.workers.values()) nworkers: int = len(workers) i: int - for i in range(nworkers): + for _ in range(nworkers): ws: WorkerState = workers[worker_index % nworkers] worker_index += 1 try: diff --git a/distributed/shuffle/tests/test_multi_file.py b/distributed/shuffle/tests/test_multi_file.py index 1483608cea..16ebc7562c 100644 --- a/distributed/shuffle/tests/test_multi_file.py +++ b/distributed/shuffle/tests/test_multi_file.py @@ -43,7 +43,7 @@ async def test_many(tmp_path, count): with MultiFile(directory=tmp_path, dump=dump, load=load) as mf: d = {i: [str(i).encode() * 100] for i in range(count)} - for i in range(10): + for _ in range(10): await mf.put(d) await mf.flush() diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 781727f26d..48f0194b15 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -236,7 +236,7 @@ def test_processing_chain(): filesystem = defaultdict(io.BytesIO) - for worker, partitions in splits_by_worker.items(): + for partitions in splits_by_worker.values(): for partition, batches in partitions.items(): for batch in batches: dump_batch(batch, filesystem[partition], schema) diff --git a/distributed/tests/make_tls_certs.py b/distributed/tests/make_tls_certs.py index 2e47ab3269..b963cb46c4 100644 --- a/distributed/tests/make_tls_certs.py +++ b/distributed/tests/make_tls_certs.py @@ -74,7 +74,7 @@ def make_cert_key(hostname, sign=False): print("creating cert for " + hostname) tempnames = [] - for i in range(3): + for _ in range(3): with tempfile.NamedTemporaryFile(delete=False) as f: tempnames.append(f.name) req_file, cert_file, key_file = tempnames diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index bf07c0474f..9c0f86bc4f 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -298,7 +298,7 @@ async def test_failed_worker(c, s, a, b): async def bench(c, s, a, b): counter = await c.submit(Counter, actor=True) - for i in range(1000): + for _ in range(1000): await counter.increment() @@ -357,7 +357,7 @@ async def test_many_computations(c, s, a, b): counter = await c.submit(Counter, actor=True) def add(n, counter): - for i in range(n): + for _ in range(n): counter.increment().result() futures = c.map(add, range(10), counter=counter) @@ -383,7 +383,7 @@ def f(self): assert self.n == 0 self.n += 1 - for i in range(20): + for _ in range(20): sleep(0.002) assert self.n == 1 self.n = 0 @@ -484,7 +484,7 @@ async def test_compute(c, s, a, b): @dask.delayed def f(n, counter): assert isinstance(counter, Actor) - for i in range(n): + for _ in range(n): counter.increment().result() @dask.delayed @@ -506,7 +506,7 @@ def test_compute_sync(client): @dask.delayed def f(n, counter): assert isinstance(counter, Actor), type(counter) - for i in range(n): + for _ in range(n): counter.increment().result() @dask.delayed @@ -544,7 +544,7 @@ def sleep(self, time): sleeper = await c.submit(Sleeper, actor=True) - for i in range(5): + for _ in range(5): await sleeper.sleep(0.200) if ( list(a.profile_recent["children"])[0].startswith("sleep") diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index ec8333b12c..beb31235f5 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -2120,7 +2120,7 @@ async def test_forget_in_flight(e, s, A, B): x, y = e.compute([ac, acab]) s.validate_state() - for i in range(5): + for _ in range(5): await asyncio.sleep(0.01) s.validate_state() @@ -2856,7 +2856,7 @@ def test_client_num_fds(loop): proc = psutil.Process() with Client(s["address"], loop=loop) as c: # first client to start loop before = proc.num_fds() # measure - for i in range(4): + for _ in range(4): with Client(s["address"], loop=loop): # start more clients pass start = time() @@ -3268,16 +3268,13 @@ async def test_scheduler_saturates_cores(c, s, a, b): @gen_cluster(client=True, nthreads=[("127.0.0.1", 20)] * 2) async def test_scheduler_saturates_cores_random(c, s, a, b): - for delay in [0, 0.01, 0.1]: - futures = c.map(randominc, range(100), scale=0.1) - while not s.tasks: - if s.tasks: - assert all( - len(p) >= 20 - for w in s.workers.values() - for p in w.processing.values() - ) - await asyncio.sleep(0.01) + futures = c.map(randominc, range(100), scale=0.1) + while not s.tasks: + if s.tasks: + assert all( + len(p) >= 20 for w in s.workers.values() for p in w.processing.values() + ) + await asyncio.sleep(0.01) @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4) @@ -3741,7 +3738,7 @@ def test_open_close_many_workers(loop, worker, count, repeat): status = True async def start_worker(sleep, duration, repeat=1): - for i in range(repeat): + for _ in range(repeat): await asyncio.sleep(sleep) if not status: return @@ -3757,7 +3754,7 @@ async def start_worker(sleep, duration, repeat=1): await asyncio.sleep(0) done.release() - for i in range(count): + for _ in range(count): loop.add_callback( start_worker, random.random() / 5, random.random() / 5, repeat=repeat ) @@ -3765,7 +3762,7 @@ async def start_worker(sleep, duration, repeat=1): with Client(s["address"], loop=loop) as c: sleep(1) - for i in range(count): + for _ in range(count): done.acquire(timeout=5) gc.collect() if not running: @@ -3869,7 +3866,7 @@ def test_get_versions_sync(c): assert v["scheduler"] is not None assert v["client"] is not None assert len(v["workers"]) == 2 - for k, v in v["workers"].items(): + for v in v["workers"].values(): assert v is not None c.get_versions(check=True) @@ -4972,7 +4969,7 @@ async def test_close(s, a, b): def test_threadsafe(c): def f(_): d = deque(maxlen=50) - for i in range(100): + for _ in range(100): future = c.submit(inc, random.randint(0, 100)) d.append(future) sleep(0.001) @@ -4995,7 +4992,7 @@ def test_threadsafe_get(c): def f(_): total = 0 - for i in range(20): + for _ in range(20): total += (x + random.randint(0, 20)).sum().compute() sleep(0.001) return total @@ -5014,7 +5011,7 @@ def test_threadsafe_compute(c): def f(_): total = 0 - for i in range(20): + for _ in range(20): future = c.compute((x + random.randint(0, 20)).sum()) total += future.result() sleep(0.001) @@ -7391,7 +7388,7 @@ async def test_dump_cluster_state_exclude_default(c, s, a, b, tmp_path): assert "workers" in state assert len(state["workers"]) == len(s.workers) - for worker, worker_dump in state["workers"].items(): + for worker_dump in state["workers"].values(): for k, task_dump in worker_dump["tasks"].items(): assert not any(blocked in task_dump for blocked in excluded_by_default) assert k in s.tasks @@ -7414,7 +7411,7 @@ async def test_dump_cluster_state_exclude_default(c, s, a, b, tmp_path): assert "workers" in state assert len(state["workers"]) == len(s.workers) - for worker, worker_dump in state["workers"].items(): + for worker_dump in state["workers"].values(): for k, task_dump in worker_dump["tasks"].items(): assert all(blocked in task_dump for blocked in excluded_by_default) assert k in s.tasks diff --git a/distributed/tests/test_client_executor.py b/distributed/tests/test_client_executor.py index adddbebe89..49c1fc884a 100644 --- a/distributed/tests/test_client_executor.py +++ b/distributed/tests/test_client_executor.py @@ -152,7 +152,7 @@ def test_map(client): N = 10 # Not consuming the iterator will cancel remaining tasks it = e.map(slowinc, range(N), [0.3] * N) - for x in take(2, it): + for _ in take(2, it): pass # Some tasks still processing assert number_of_processing_tasks(client) > 0 diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 3a0b9b4db8..8690d499d1 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -86,7 +86,7 @@ def test_async_task_group_initialization(): async def _wait_for_n_loop_cycles(n): - for i in range(n): + for _ in range(n): await asyncio.sleep(0) @@ -541,14 +541,14 @@ async def test_rpc_message_lifetime_inproc(): async def check_rpc_with_many_connections(listen_arg): async def g(): - for i in range(10): + for _ in range(10): await remote.ping() server = await Server({"ping": pingpong}) await server.listen(listen_arg) async with rpc(server.address) as remote: - for i in range(10): + for _ in range(10): await g() server.stop() @@ -939,7 +939,7 @@ async def test_counters(): await server.listen("tcp://") async with rpc(server.address) as r: - for i in range(2): + for _ in range(2): await r.identity() with pytest.raises(ZeroDivisionError): await r.div(x=1, y=0) diff --git a/distributed/tests/test_counter.py b/distributed/tests/test_counter.py index e6e6809418..aa1f0fa746 100644 --- a/distributed/tests/test_counter.py +++ b/distributed/tests/test_counter.py @@ -44,6 +44,6 @@ def test_counter(loop): c = Counter(loop=loop) c.add(1) - for i in range(5): + for _ in range(5): c.shift() assert abs(sum(cc[1] for cc in c.components) - 1) < 1e-13 diff --git a/distributed/tests/test_diskutils.py b/distributed/tests/test_diskutils.py index 8a210c9214..ccf30507a5 100644 --- a/distributed/tests/test_diskutils.py +++ b/distributed/tests/test_diskutils.py @@ -24,7 +24,7 @@ def assert_directory_contents(dir_path, expected, trials=2): expected = [os.path.join(dir_path, p) for p in expected] - for i in range(trials): + for _ in range(trials): actual = [ os.path.join(dir_path, p) for p in os.listdir(dir_path) diff --git a/distributed/tests/test_metrics.py b/distributed/tests/test_metrics.py index f33a425d0b..55e3d9fc1b 100644 --- a/distributed/tests/test_metrics.py +++ b/distributed/tests/test_metrics.py @@ -9,7 +9,7 @@ @pytest.mark.parametrize("name", ["time", "monotonic"]) def test_wall_clock(name): - for i in range(3): + for _ in range(3): time.sleep(0.01) t = getattr(time, name)() samples = [getattr(metrics, name)() for _ in range(100)] diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index b908236c4a..504ade763c 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -176,7 +176,7 @@ async def test_num_fds(s): before = proc.num_fds() - for i in range(3): + for _ in range(3): async with Nanny(s.address): await asyncio.sleep(0.1) diff --git a/distributed/tests/test_profile.py b/distributed/tests/test_profile.py index 1d417cb19c..5312ddd2cc 100644 --- a/distributed/tests/test_profile.py +++ b/distributed/tests/test_profile.py @@ -34,7 +34,7 @@ def test_h(): sleep(0.02) def test_f(): - for i in range(100): + for _ in range(100): test_g() test_h() @@ -44,7 +44,7 @@ def test_f(): state = create() - for i in range(100): + for _ in range(100): sleep(0.02) frame = sys._current_frames()[thread.ident] process(frame, None, state) @@ -75,7 +75,7 @@ def test_basic_low_level(): state = create() - for i in range(100): + for _ in range(100): sleep(0.02) frame = sys._current_frames()[threading.get_ident()] llframes = {threading.get_ident(): ll_get_stack(threading.get_ident())} diff --git a/distributed/tests/test_pubsub.py b/distributed/tests/test_pubsub.py index b3e05c7b74..3a59765830 100644 --- a/distributed/tests/test_pubsub.py +++ b/distributed/tests/test_pubsub.py @@ -32,7 +32,7 @@ def pingpong(a, b, start=False, n=1000, msg=1): if start: pub.put(msg) # other sub may not have started yet - for i in range(n): + for _ in range(n): msg = next(sub) pub.put(msg) # if i % 100 == 0: @@ -85,7 +85,7 @@ def f(x): await wait(futures) L = [] - for i in range(10): + for _ in range(10): result = await sub.get() L.append(result) diff --git a/distributed/tests/test_queues.py b/distributed/tests/test_queues.py index 1554b71215..4bcb3bb94f 100644 --- a/distributed/tests/test_queues.py +++ b/distributed/tests/test_queues.py @@ -145,12 +145,12 @@ async def test_same_futures(c, s, a, b): q = Queue("x") future = await c.scatter(123) - for i in range(5): + for _ in range(5): await q.put(future) assert {ts.key for ts in s.clients["queue-x"].wants_what} == {future.key} - for i in range(4): + for _ in range(4): future2 = await q.get() assert {ts.key for ts in s.clients["queue-x"].wants_what} == {future.key} await asyncio.sleep(0.05) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 437d45e9a8..3eb41a20b9 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -200,7 +200,7 @@ def random(**kwargs): # Check that each chunk-row of the array is (mostly) stored on the same worker primary_worker_key_fractions = [] secondary_worker_key_fractions = [] - for i, keys in enumerate(x.__dask_keys__()): + for keys in x.__dask_keys__(): # Iterate along rows of the array. keys = {stringify(k) for k in keys} @@ -428,7 +428,7 @@ def func(scheduler): comm = await connect(s.address) await comm.write({"op": "feed", "function": dumps(func), "interval": 0.01}) - for i in range(5): + for _ in range(5): response = await comm.read() expected = dict(s.workers) assert cloudpickle.loads(response) == expected @@ -459,7 +459,7 @@ def teardown(scheduler, state): } ) - for i in range(5): + for _ in range(5): response = await comm.read() assert response == "OK" @@ -483,7 +483,7 @@ def func(scheduler): comm = await connect(s.address) await comm.write({"op": "feed", "function": dumps(func), "interval": 0.05}) - for i in range(5): + for _ in range(5): response = await comm.read() assert response is True @@ -1125,7 +1125,7 @@ async def test_file_descriptors(c, s): await c.close() assert not s.rpc.open - for addr, occ in c.rpc.occupied.items(): + for occ in c.rpc.occupied.values(): for comm in occ: assert comm.closed() or comm.peer_address != s.address, comm assert not s.stream_comms @@ -1330,7 +1330,7 @@ async def test_close_nanny(s, a, b): assert not a.is_alive() assert a.pid is None - for i in range(10): + for _ in range(10): await asyncio.sleep(0.1) assert len(s.workers) == 1 assert not a.is_alive() @@ -2401,7 +2401,7 @@ async def test_retire_state_change(c, s, a, b): y = c.map(lambda x: x**2, range(10)) await c.scatter(y) coros = [] - for x in range(2): + for _ in range(2): v = c.map(lambda i: i * np.random.randint(1000), y) k = c.map(lambda i: i * np.random.randint(1000), v) foo = c.map(lambda j: j * 6, k) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 29d6320f8c..8d7a4e8dde 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -674,7 +674,7 @@ async def assert_balanced(inp, expected, c, s, *workers): while len([ts for ts in s.tasks.values() if ts.processing_on]) < len(futures): await asyncio.sleep(0.001) - for i in range(10): + for _ in range(10): steal.balance() while steal.in_flight: @@ -955,7 +955,7 @@ class Foo: async def test_lose_task(c, s, a, b): with captured_logger("distributed.stealing") as log: s.periodic_callbacks["stealing"].interval = 1 - for i in range(100): + for _ in range(100): futures = c.map( slowinc, range(10), diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index 3f9b8cd0e8..adce31c100 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -51,7 +51,7 @@ def test_stress_gc(loop, func, n): with cluster() as (s, [a, b]): with Client(s["address"], loop=loop) as c: x = c.submit(func, 1) - for i in range(n): + for _ in range(n): x = c.submit(func, x) assert x.result() == n + 2 @@ -66,7 +66,7 @@ async def test_cancel_stress(c, s, *workers): await wait([x]) y = (x.sum(axis=0) + x.sum(axis=1) + 1).std() n_todo = len(y.dask) - len(x.dask) - for i in range(5): + for _ in range(5): f = c.compute(y) while ( len([ts for ts in s.tasks.values() if ts.waiting_on]) @@ -84,7 +84,7 @@ def test_cancel_stress_sync(loop): x = c.persist(x) y = (x.sum(axis=0) + x.sum(axis=1) + 1).std() wait(x) - for i in range(5): + for _ in range(5): f = c.compute(y) sleep(random.random()) c.cancel(f) @@ -199,7 +199,7 @@ async def test_stress_steal(c, s, *workers): dinc = delayed(slowinc) L = [delayed(slowinc)(i, delay=0.005) for i in range(100)] - for i in range(5): + for _ in range(5): L = [delayed(slowsum)(part, delay=0.005) for part in sliding_window(5, L)] total = delayed(sum)(L) @@ -207,7 +207,7 @@ async def test_stress_steal(c, s, *workers): while future.status != "finished": await asyncio.sleep(0.1) - for i in range(3): + for _ in range(3): a = random.choice(workers) b = random.choice(workers) if a is not b: @@ -227,7 +227,7 @@ async def test_stress_steal(c, s, *workers): async def test_close_connections(c, s, *workers): da = pytest.importorskip("dask.array") x = da.random.random(size=(1000, 1000), chunks=(1000, 1)) - for i in range(3): + for _ in range(3): x = x.rechunk((1, 1000)) x = x.rechunk((1000, 1)) diff --git a/distributed/tests/test_system_monitor.py b/distributed/tests/test_system_monitor.py index 76d413c139..28d7e2ffa9 100644 --- a/distributed/tests/test_system_monitor.py +++ b/distributed/tests/test_system_monitor.py @@ -29,7 +29,7 @@ def test_count(): sm.update() assert sm.count == 2 - for i in range(10): + for _ in range(10): sm.update() assert sm.count == 12 @@ -50,7 +50,7 @@ def test_range_query(): assert all(len(v) == 4 for v in sm.range_query(0).values()) assert all(len(v) == 3 for v in sm.range_query(1).values()) - for i in range(10): + for _ in range(10): sm.update() assert all(len(v) == 4 for v in sm.range_query(10).values()) diff --git a/distributed/tests/test_threadpoolexecutor.py b/distributed/tests/test_threadpoolexecutor.py index 3ce23144c4..163a64acbd 100644 --- a/distributed/tests/test_threadpoolexecutor.py +++ b/distributed/tests/test_threadpoolexecutor.py @@ -122,7 +122,7 @@ def test_rejoin_idempotent(): def f(): secede() - for i in range(5): + for _ in range(5): rejoin() return 1 diff --git a/distributed/tests/test_utils_perf.py b/distributed/tests/test_utils_perf.py index bee305f5af..daf83714ed 100644 --- a/distributed/tests/test_utils_perf.py +++ b/distributed/tests/test_utils_perf.py @@ -48,7 +48,7 @@ def check_fraction(timer, ft): timer = RandomTimer() ft = FractionalTimer(n_samples=N, timer=timer) - for i in range(N): + for _ in range(N): ft.start_timing() ft.stop_timing() assert len(timer.timings) == N * 2 @@ -60,7 +60,7 @@ def check_fraction(timer, ft): assert ft.running_fraction is not None check_fraction(timer, ft) - for i in range(N * 10): + for _ in range(N * 10): ft.start_timing() ft.stop_timing() check_fraction(timer, ft) @@ -90,7 +90,7 @@ def test_gc_diagnosis_cpu_time(): with enable_gc_diagnosis_and_log(diag, level="WARN") as sio: # Spend some CPU time doing only full GCs - for i in range(diag.N_SAMPLES): + for _ in range(diag.N_SAMPLES): gc.collect() assert not sio.getvalue() gc.collect() @@ -104,7 +104,7 @@ def test_gc_diagnosis_cpu_time(): with enable_gc_diagnosis_and_log(diag, level="WARN") as sio: # Spend half the CPU time doing full GCs - for i in range(diag.N_SAMPLES + 1): + for _ in range(diag.N_SAMPLES + 1): t1 = thread_time() gc.collect() dt = thread_time() - t1 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0d3f900cd9..75985e8df7 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -741,7 +741,7 @@ async def test_restrictions(c, s, a, b): @gen_cluster(client=True) async def test_clean_nbytes(c, s, a, b): L = [delayed(inc)(i) for i in range(10)] - for i in range(5): + for _ in range(5): L = [delayed(add)(x, y) for x, y in sliding_window(2, L)] total = delayed(sum)(L) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index 843c0ef55c..bf52d7cd8a 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -206,7 +206,7 @@ def f(x): loop=loop, processes=False, set_as_default=True, dashboard_address=":0" ) as c: assert dask.base.get_scheduler() == c.get - for i in range(2): + for _ in range(2): b2.compute() assert dask.base.get_scheduler() == c.get diff --git a/distributed/utils.py b/distributed/utils.py index 654b6f96f5..1898d0982a 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1162,7 +1162,7 @@ def json_load_robust(fn, load=json.load): """Reads a JSON file from disk that may be being written as we read""" while not os.path.exists(fn): sleep(0.01) - for i in range(10): + for _ in range(10): try: with open(fn) as f: cfg = load(f) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 9cbf720b57..dbf5a36352 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -462,10 +462,10 @@ async def _(): validate=True, host="127.0.0.1", port=port, **kwargs ) except Exception as exc: - for i in range(nputs): + for _ in range(nputs): q.put(exc) else: - for i in range(nputs): + for _ in range(nputs): q.put(scheduler.address) await scheduler.finished() @@ -908,7 +908,7 @@ def check_invalid_worker_transitions(s: Scheduler) -> None: if not s.events.get("invalid-worker-transition"): return - for timestamp, msg in s.events["invalid-worker-transition"]: + for _, msg in s.events["invalid-worker-transition"]: worker = msg.pop("worker") print("Worker:", worker) print(InvalidTransition(**msg)) @@ -922,7 +922,7 @@ def check_invalid_task_states(s: Scheduler) -> None: if not s.events.get("invalid-worker-task-state"): return - for timestamp, msg in s.events["invalid-worker-task-state"]: + for _, msg in s.events["invalid-worker-task-state"]: print("Worker:", msg["worker"]) print("State:", msg["state"]) for line in msg["story"]: @@ -935,7 +935,7 @@ def check_worker_fail_hard(s: Scheduler) -> None: if not s.events.get("worker-fail-hard"): return - for timestamp, msg in s.events["worker-fail-hard"]: + for _, msg in s.events["worker-fail-hard"]: msg = msg.copy() worker = msg.pop("worker") msg["exception"] = deserialize(msg["exception"].header, msg["exception"].frames) @@ -1840,7 +1840,7 @@ def check_instances(): assert time() < start + 10 Worker._initialized_clients.clear() - for i in range(5): + for _ in range(5): if all(c.closed() for c in Comm._instances): break else: