Skip to content

Commit

Permalink
Ensure users are distributed evently across hosts during ramp up.
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberw committed Feb 21, 2022
1 parent e806b6f commit d34c680
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
12 changes: 11 additions & 1 deletion locust/dispatch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
import contextlib
import itertools
import math
Expand Down Expand Up @@ -58,8 +59,17 @@ def __init__(self, worker_nodes: "List[WorkerNode]", user_classes: List[Type[Use
# This is especially important when iterating over a dictionary which, prior to py3.7, was
# completely unordered. For >=Py3.7, a dictionary keeps the insertion order. Even then,
# it is safer to sort the keys when repeatable behaviour is required.
self._worker_nodes = sorted(worker_nodes, key=lambda w: w.id)
worker_nodes_by_id = sorted(worker_nodes, key=lambda w: w.id)

# NOTE: Rather than just sort by worker name, we now sort by hostname too, so that
# workers from the same host are spread out
workers_per_host = defaultdict(lambda: 0)
for worker_node in worker_nodes_by_id:
host = worker_node.id.split("_")[0]
worker_node._index_within_host = workers_per_host[host]
workers_per_host[host] = workers_per_host[host] + 1

self._worker_nodes = sorted(worker_nodes, key=lambda worker: (worker._index_within_host, worker.id))
self._user_classes = sorted(user_classes, key=attrgetter("__name__"))

assert len(user_classes) > 0
Expand Down
64 changes: 64 additions & 0 deletions locust/test/test_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,70 @@ class User3(User):
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)

def test_users_are_distributed_evenly_across_hosts(self):
class User1(User):
weight = 1

class User2(User):
weight = 1

class User3(User):
weight = 1

worker_node1 = WorkerNode("hostname1_worker1")
worker_node2 = WorkerNode("hostname1_worker2")
worker_node3 = WorkerNode("hostname2_worker1")
worker_node4 = WorkerNode("hostname2_worker2")

sleep_time = 0.2 # Speed-up test

users_dispatcher = UsersDispatcher(
worker_nodes=[worker_node1, worker_node2, worker_node3, worker_node4], user_classes=[User1, User2, User3]
)
users_dispatcher.new_dispatch(target_user_count=6, spawn_rate=2)
users_dispatcher._wait_between_dispatch = sleep_time

ts = time.perf_counter()
self.assertDictEqual(
next(users_dispatcher),
{
"hostname1_worker1": {"User1": 1, "User2": 0, "User3": 0},
"hostname1_worker2": {"User1": 0, "User2": 0, "User3": 0},
"hostname2_worker1": {"User1": 0, "User2": 1, "User3": 0},
"hostname2_worker2": {"User1": 0, "User2": 0, "User3": 0},
},
)
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)

ts = time.perf_counter()
self.assertDictEqual(
next(users_dispatcher),
{
"hostname1_worker1": {"User1": 1, "User2": 0, "User3": 0},
"hostname1_worker2": {"User1": 0, "User2": 0, "User3": 1},
"hostname2_worker1": {"User1": 0, "User2": 1, "User3": 0},
"hostname2_worker2": {"User1": 1, "User2": 0, "User3": 0},
},
)
delta = time.perf_counter() - ts
self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta)

ts = time.perf_counter()
self.assertDictEqual(
next(users_dispatcher),
{
"hostname1_worker1": {"User1": 1, "User2": 1, "User3": 0},
"hostname1_worker2": {"User1": 0, "User2": 0, "User3": 1},
"hostname2_worker1": {"User1": 0, "User2": 1, "User3": 1},
"hostname2_worker2": {"User1": 1, "User2": 0, "User3": 0},
},
)
ts = time.perf_counter()
self.assertRaises(StopIteration, lambda: next(users_dispatcher))
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)


class TestWaitBetweenDispatch(unittest.TestCase):
def test_wait_between_dispatch(self):
Expand Down

0 comments on commit d34c680

Please sign in to comment.