From d34c680b4251e4ab2ca7ca74b8ab0d9de847f053 Mon Sep 17 00:00:00 2001 From: Lars Holmberg Date: Mon, 21 Feb 2022 18:39:42 +0100 Subject: [PATCH 1/2] Ensure users are distributed evently across hosts during ramp up. --- locust/dispatch.py | 12 ++++++- locust/test/test_dispatch.py | 64 ++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/locust/dispatch.py b/locust/dispatch.py index 6f72bce6ac..bdba0dae6a 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -1,3 +1,4 @@ +from collections import defaultdict import contextlib import itertools import math @@ -58,8 +59,17 @@ def __init__(self, worker_nodes: "List[WorkerNode]", user_classes: List[Type[Use # This is especially important when iterating over a dictionary which, prior to py3.7, was # completely unordered. For >=Py3.7, a dictionary keeps the insertion order. Even then, # it is safer to sort the keys when repeatable behaviour is required. - self._worker_nodes = sorted(worker_nodes, key=lambda w: w.id) + worker_nodes_by_id = sorted(worker_nodes, key=lambda w: w.id) + # NOTE: Rather than just sort by worker name, we now sort by hostname too, so that + # workers from the same host are spread out + workers_per_host = defaultdict(lambda: 0) + for worker_node in worker_nodes_by_id: + host = worker_node.id.split("_")[0] + worker_node._index_within_host = workers_per_host[host] + workers_per_host[host] = workers_per_host[host] + 1 + + self._worker_nodes = sorted(worker_nodes, key=lambda worker: (worker._index_within_host, worker.id)) self._user_classes = sorted(user_classes, key=attrgetter("__name__")) assert len(user_classes) > 0 diff --git a/locust/test/test_dispatch.py b/locust/test/test_dispatch.py index 5dbb5c1a00..294aec7dfd 100644 --- a/locust/test/test_dispatch.py +++ b/locust/test/test_dispatch.py @@ -782,6 +782,70 @@ class User3(User): delta = time.perf_counter() - ts self.assertTrue(0 <= delta <= _TOLERANCE, delta) + def test_users_are_distributed_evenly_across_hosts(self): + class User1(User): + weight = 1 + + class User2(User): + weight = 1 + + class User3(User): + weight = 1 + + worker_node1 = WorkerNode("hostname1_worker1") + worker_node2 = WorkerNode("hostname1_worker2") + worker_node3 = WorkerNode("hostname2_worker1") + worker_node4 = WorkerNode("hostname2_worker2") + + sleep_time = 0.2 # Speed-up test + + users_dispatcher = UsersDispatcher( + worker_nodes=[worker_node1, worker_node2, worker_node3, worker_node4], user_classes=[User1, User2, User3] + ) + users_dispatcher.new_dispatch(target_user_count=6, spawn_rate=2) + users_dispatcher._wait_between_dispatch = sleep_time + + ts = time.perf_counter() + self.assertDictEqual( + next(users_dispatcher), + { + "hostname1_worker1": {"User1": 1, "User2": 0, "User3": 0}, + "hostname1_worker2": {"User1": 0, "User2": 0, "User3": 0}, + "hostname2_worker1": {"User1": 0, "User2": 1, "User3": 0}, + "hostname2_worker2": {"User1": 0, "User2": 0, "User3": 0}, + }, + ) + delta = time.perf_counter() - ts + self.assertTrue(0 <= delta <= _TOLERANCE, delta) + + ts = time.perf_counter() + self.assertDictEqual( + next(users_dispatcher), + { + "hostname1_worker1": {"User1": 1, "User2": 0, "User3": 0}, + "hostname1_worker2": {"User1": 0, "User2": 0, "User3": 1}, + "hostname2_worker1": {"User1": 0, "User2": 1, "User3": 0}, + "hostname2_worker2": {"User1": 1, "User2": 0, "User3": 0}, + }, + ) + delta = time.perf_counter() - ts + self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta) + + ts = time.perf_counter() + self.assertDictEqual( + next(users_dispatcher), + { + "hostname1_worker1": {"User1": 1, "User2": 1, "User3": 0}, + "hostname1_worker2": {"User1": 0, "User2": 0, "User3": 1}, + "hostname2_worker1": {"User1": 0, "User2": 1, "User3": 1}, + "hostname2_worker2": {"User1": 1, "User2": 0, "User3": 0}, + }, + ) + ts = time.perf_counter() + self.assertRaises(StopIteration, lambda: next(users_dispatcher)) + delta = time.perf_counter() - ts + self.assertTrue(0 <= delta <= _TOLERANCE, delta) + class TestWaitBetweenDispatch(unittest.TestCase): def test_wait_between_dispatch(self): From 33db8991fdef5d54a0528fd35ce4193090dc6feb Mon Sep 17 00:00:00 2001 From: Lars Holmberg Date: Tue, 22 Feb 2022 10:59:29 +0100 Subject: [PATCH 2/2] Ensure even distribution across hosts even when workers get added during the run. Add test for that too. --- locust/dispatch.py | 33 +++++++++++++++++---------------- locust/test/test_dispatch.py | 19 +++++++++++++++++-- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/locust/dispatch.py b/locust/dispatch.py index bdba0dae6a..3c9e703dba 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -55,21 +55,8 @@ def __init__(self, worker_nodes: "List[WorkerNode]", user_classes: List[Type[Use :param worker_nodes: List of worker nodes :param user_classes: The user classes """ - # NOTE: We use "sorted" to ensure repeatable behaviour. - # This is especially important when iterating over a dictionary which, prior to py3.7, was - # completely unordered. For >=Py3.7, a dictionary keeps the insertion order. Even then, - # it is safer to sort the keys when repeatable behaviour is required. - worker_nodes_by_id = sorted(worker_nodes, key=lambda w: w.id) - - # NOTE: Rather than just sort by worker name, we now sort by hostname too, so that - # workers from the same host are spread out - workers_per_host = defaultdict(lambda: 0) - for worker_node in worker_nodes_by_id: - host = worker_node.id.split("_")[0] - worker_node._index_within_host = workers_per_host[host] - workers_per_host[host] = workers_per_host[host] + 1 - - self._worker_nodes = sorted(worker_nodes, key=lambda worker: (worker._index_within_host, worker.id)) + self._worker_nodes = worker_nodes + self._sort_workers() self._user_classes = sorted(user_classes, key=attrgetter("__name__")) assert len(user_classes) > 0 @@ -126,6 +113,20 @@ def __next__(self) -> Dict[str, Dict[str, int]]: # it won't be mutated by external code? return self._fast_users_on_workers_copy(users_on_workers) + def _sort_workers(self): + # Sorting workers ensures repeatable behaviour + worker_nodes_by_id = sorted(self._worker_nodes, key=lambda w: w.id) + + # Give every worker an index indicating how many workers came before it on that host + workers_per_host = defaultdict(lambda: 0) + for worker_node in worker_nodes_by_id: + host = worker_node.id.split("_")[0] + worker_node._index_within_host = workers_per_host[host] + workers_per_host[host] = workers_per_host[host] + 1 + + # Sort again, first by index within host, to ensure Users get started evenly across hosts + self._worker_nodes = sorted(self._worker_nodes, key=lambda worker: (worker._index_within_host, worker.id)) + def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]: self._dispatch_in_progress = True @@ -194,7 +195,7 @@ def add_worker(self, worker_node: "WorkerNode") -> None: :param worker_node: The worker node to add. """ self._worker_nodes.append(worker_node) - self._worker_nodes = sorted(self._worker_nodes, key=lambda w: w.id) + self._sort_workers() self._prepare_rebalance() def remove_worker(self, worker_node: "WorkerNode") -> None: diff --git a/locust/test/test_dispatch.py b/locust/test/test_dispatch.py index 294aec7dfd..5ae192e645 100644 --- a/locust/test/test_dispatch.py +++ b/locust/test/test_dispatch.py @@ -2758,13 +2758,17 @@ class User3(User): user_classes = [User1, User2, User3] - worker_nodes = [WorkerNode(str(i + 1)) for i in range(3)] + worker_nodes = [ + WorkerNode("hostname1_worker1"), + WorkerNode("hostname1_worker2"), + WorkerNode("hostname2_worker1"), + ] users_dispatcher = UsersDispatcher(worker_nodes=[worker_nodes[0], worker_nodes[2]], user_classes=user_classes) sleep_time = 0.2 # Speed-up test - users_dispatcher.new_dispatch(target_user_count=9, spawn_rate=3) + users_dispatcher.new_dispatch(target_user_count=11, spawn_rate=3) users_dispatcher._wait_between_dispatch = sleep_time # Dispatch iteration 1 @@ -2815,6 +2819,17 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) + # Dispatch iteration 4 + ts = time.perf_counter() + dispatched_users = next(users_dispatcher) + delta = time.perf_counter() - ts + self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta) + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 4, "User2": 4, "User3": 3}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 4) + # without host-based balancing the following two values would be reversed + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4) + def test_add_two_workers_during_ramp_up(self): class User1(User): weight = 1