Skip to content

Commit

Permalink
Merge pull request #2025 from locustio/ensure-even-distribution-acros…
Browse files Browse the repository at this point in the history
…s-worker-hosts

Ensure users are distributed evently across hosts during ramp up
  • Loading branch information
cyberw authored Feb 22, 2022
2 parents 2c07551 + 33db899 commit 1b9c607
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 9 deletions.
25 changes: 18 additions & 7 deletions locust/dispatch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
import contextlib
import itertools
import math
Expand Down Expand Up @@ -54,12 +55,8 @@ def __init__(self, worker_nodes: "List[WorkerNode]", user_classes: List[Type[Use
:param worker_nodes: List of worker nodes
:param user_classes: The user classes
"""
# NOTE: We use "sorted" to ensure repeatable behaviour.
# This is especially important when iterating over a dictionary which, prior to py3.7, was
# completely unordered. For >=Py3.7, a dictionary keeps the insertion order. Even then,
# it is safer to sort the keys when repeatable behaviour is required.
self._worker_nodes = sorted(worker_nodes, key=lambda w: w.id)

self._worker_nodes = worker_nodes
self._sort_workers()
self._user_classes = sorted(user_classes, key=attrgetter("__name__"))

assert len(user_classes) > 0
Expand Down Expand Up @@ -116,6 +113,20 @@ def __next__(self) -> Dict[str, Dict[str, int]]:
# it won't be mutated by external code?
return self._fast_users_on_workers_copy(users_on_workers)

def _sort_workers(self):
# Sorting workers ensures repeatable behaviour
worker_nodes_by_id = sorted(self._worker_nodes, key=lambda w: w.id)

# Give every worker an index indicating how many workers came before it on that host
workers_per_host = defaultdict(lambda: 0)
for worker_node in worker_nodes_by_id:
host = worker_node.id.split("_")[0]
worker_node._index_within_host = workers_per_host[host]
workers_per_host[host] = workers_per_host[host] + 1

# Sort again, first by index within host, to ensure Users get started evenly across hosts
self._worker_nodes = sorted(self._worker_nodes, key=lambda worker: (worker._index_within_host, worker.id))

def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]:
self._dispatch_in_progress = True

Expand Down Expand Up @@ -184,7 +195,7 @@ def add_worker(self, worker_node: "WorkerNode") -> None:
:param worker_node: The worker node to add.
"""
self._worker_nodes.append(worker_node)
self._worker_nodes = sorted(self._worker_nodes, key=lambda w: w.id)
self._sort_workers()
self._prepare_rebalance()

def remove_worker(self, worker_node: "WorkerNode") -> None:
Expand Down
83 changes: 81 additions & 2 deletions locust/test/test_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,70 @@ class User3(User):
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)

def test_users_are_distributed_evenly_across_hosts(self):
class User1(User):
weight = 1

class User2(User):
weight = 1

class User3(User):
weight = 1

worker_node1 = WorkerNode("hostname1_worker1")
worker_node2 = WorkerNode("hostname1_worker2")
worker_node3 = WorkerNode("hostname2_worker1")
worker_node4 = WorkerNode("hostname2_worker2")

sleep_time = 0.2 # Speed-up test

users_dispatcher = UsersDispatcher(
worker_nodes=[worker_node1, worker_node2, worker_node3, worker_node4], user_classes=[User1, User2, User3]
)
users_dispatcher.new_dispatch(target_user_count=6, spawn_rate=2)
users_dispatcher._wait_between_dispatch = sleep_time

ts = time.perf_counter()
self.assertDictEqual(
next(users_dispatcher),
{
"hostname1_worker1": {"User1": 1, "User2": 0, "User3": 0},
"hostname1_worker2": {"User1": 0, "User2": 0, "User3": 0},
"hostname2_worker1": {"User1": 0, "User2": 1, "User3": 0},
"hostname2_worker2": {"User1": 0, "User2": 0, "User3": 0},
},
)
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)

ts = time.perf_counter()
self.assertDictEqual(
next(users_dispatcher),
{
"hostname1_worker1": {"User1": 1, "User2": 0, "User3": 0},
"hostname1_worker2": {"User1": 0, "User2": 0, "User3": 1},
"hostname2_worker1": {"User1": 0, "User2": 1, "User3": 0},
"hostname2_worker2": {"User1": 1, "User2": 0, "User3": 0},
},
)
delta = time.perf_counter() - ts
self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta)

ts = time.perf_counter()
self.assertDictEqual(
next(users_dispatcher),
{
"hostname1_worker1": {"User1": 1, "User2": 1, "User3": 0},
"hostname1_worker2": {"User1": 0, "User2": 0, "User3": 1},
"hostname2_worker1": {"User1": 0, "User2": 1, "User3": 1},
"hostname2_worker2": {"User1": 1, "User2": 0, "User3": 0},
},
)
ts = time.perf_counter()
self.assertRaises(StopIteration, lambda: next(users_dispatcher))
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)


class TestWaitBetweenDispatch(unittest.TestCase):
def test_wait_between_dispatch(self):
Expand Down Expand Up @@ -2694,13 +2758,17 @@ class User3(User):

user_classes = [User1, User2, User3]

worker_nodes = [WorkerNode(str(i + 1)) for i in range(3)]
worker_nodes = [
WorkerNode("hostname1_worker1"),
WorkerNode("hostname1_worker2"),
WorkerNode("hostname2_worker1"),
]

users_dispatcher = UsersDispatcher(worker_nodes=[worker_nodes[0], worker_nodes[2]], user_classes=user_classes)

sleep_time = 0.2 # Speed-up test

users_dispatcher.new_dispatch(target_user_count=9, spawn_rate=3)
users_dispatcher.new_dispatch(target_user_count=11, spawn_rate=3)
users_dispatcher._wait_between_dispatch = sleep_time

# Dispatch iteration 1
Expand Down Expand Up @@ -2751,6 +2819,17 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3)

# Dispatch iteration 4
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
delta = time.perf_counter() - ts
self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta)
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 4, "User2": 4, "User3": 3})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 4)
# without host-based balancing the following two values would be reversed
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4)

def test_add_two_workers_during_ramp_up(self):
class User1(User):
weight = 1
Expand Down

0 comments on commit 1b9c607

Please sign in to comment.