diff --git a/locust/dispatch.py b/locust/dispatch.py index 5569f2962a..d8c676a9e8 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -1,8 +1,10 @@ from __future__ import annotations import contextlib +import functools import itertools import math +import sys import time from collections import defaultdict from operator import attrgetter @@ -16,6 +18,20 @@ from locust.runners import WorkerNode +def compatible_math_gcd(*args: int) -> int: + """ + This function is a workaround for the fact that `math.gcd` in: + - 3.5 <= Python < 3.9 doesn't accept more than two arguments. + - 3.9 <= Python can accept more than two arguments. + See more at https://docs.python.org/3.9/library/math.html#math.gcd + """ + if (3, 5) <= sys.version_info < (3, 9): + return functools.reduce(math.gcd, args) + elif sys.version_info >= (3, 9): + return math.gcd(*args) + raise NotImplementedError("This function is only implemented for Python from 3.5") + + # To profile line-by-line, uncomment the code below (i.e. `import line_profiler ...`) and # place `@profile` on the functions/methods you wish to profile. Then, in the unit test you are # running, use `from locust.dispatch import profile; profile.print_stats()` at the end of the unit test. @@ -366,18 +382,37 @@ def infinite_cycle_gen(users: list[tuple[type[User], int]]) -> itertools.cycle: if not users: return itertools.cycle([None]) - # Normalize the weights so that the smallest weight will be equal to "target_min_weight". - # The value "2" was experimentally determined because it gave a better distribution especially - # when dealing with weights which are close to each others, e.g. 1.5, 2, 2.4, etc. - target_min_weight = 2 - - # 'Value' here means weight or fixed count + def _get_order_of_magnitude(n: float) -> int: + """Get how many times we need to multiply `n` to get an integer-like number. + For example: + 0.1 would return 10, + 0.04 would return 100, + 0.0007 would return 10000. + """ + if n <= 0: + raise ValueError("To get the order of magnitude, the number must be greater than 0.") + + counter = 0 + while n < 1: + n *= 10 + counter += 1 + return 10**counter + + # Get maximum order of magnitude to "normalize the weights". + # "Normalizing the weights" is to multiply all weights by the same number so that + # they become integers. Then we can find the largest common divisor of all the + # weights, divide them by it and get the smallest possible numbers with the same + # ratio as the numbers originally had. + max_order_of_magnitude = _get_order_of_magnitude(min(abs(u[1]) for u in users)) + weights = tuple(int(u[1] * max_order_of_magnitude) for u in users) + + greatest_common_divisor = compatible_math_gcd(*weights) normalized_values = [ ( - user.__name__, - round(target_min_weight * value / min(u[1] for u in users)), + user[0].__name__, + normalized_weight // greatest_common_divisor, ) - for user, value in users + for user, normalized_weight in zip(users, weights) ] generation_length_to_get_proper_distribution = sum( normalized_val[1] for normalized_val in normalized_values diff --git a/locust/test/test_dispatch.py b/locust/test/test_dispatch.py index 64e7b839d8..35d3481375 100644 --- a/locust/test/test_dispatch.py +++ b/locust/test/test_dispatch.py @@ -847,6 +847,75 @@ class User3(User): delta = time.perf_counter() - ts self.assertTrue(0 <= delta <= _TOLERANCE, delta) + def test_implementation_of_dispatch_distribution_with_gcd(self): + class User1(User): + weight = 4 + + class User2(User): + weight = 5 + + user_classes = [User1, User2] + worker_node1 = WorkerNode("1") + + sleep_time = 0.2 # Speed-up test + + users_dispatcher = UsersDispatcher(worker_nodes=[worker_node1], user_classes=user_classes) + users_dispatcher.new_dispatch(target_user_count=9, spawn_rate=9) + + users_dispatcher._wait_between_dispatch = sleep_time + + ts = time.perf_counter() + self.assertDictEqual( + next(users_dispatcher), + { + "1": {"User1": 4, "User2": 5}, + }, + ) + delta = time.perf_counter() - ts + self.assertTrue(0 <= delta <= _TOLERANCE, delta) + + ts = time.perf_counter() + self.assertRaises(StopIteration, lambda: next(users_dispatcher)) + delta = time.perf_counter() - ts + self.assertTrue(0 <= delta <= _TOLERANCE, delta) + + def test_implementation_of_dispatch_distribution_with_gcd_float_weight(self): + class User1(User): + weight = 0.8 + + class User2(User): + weight = 1 + + normalized_weights_to_min_int = 5 # User1: 0.8 * 5 = 4; User2: 1 * 5 = 5 + + user_classes = [User1, User2] + worker_node1 = WorkerNode("1") + + sleep_time = 0.2 # Speed-up test + + users_dispatcher = UsersDispatcher(worker_nodes=[worker_node1], user_classes=user_classes) + users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=18) + + users_dispatcher._wait_between_dispatch = sleep_time + + ts = time.perf_counter() + self.assertDictEqual( + next(users_dispatcher), + { + "1": { + "User1": int(normalized_weights_to_min_int * User1.weight * 2), + "User2": int(normalized_weights_to_min_int * User2.weight * 2), + }, + }, + ) + delta = time.perf_counter() - ts + self.assertTrue(0 <= delta <= _TOLERANCE, delta) + + ts = time.perf_counter() + self.assertRaises(StopIteration, lambda: next(users_dispatcher)) + delta = time.perf_counter() - ts + self.assertTrue(0 <= delta <= _TOLERANCE, delta) + class TestWaitBetweenDispatch(unittest.TestCase): def test_wait_between_dispatch(self):