Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dispatch): UserClasses weight distribution with gcd #2663

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions locust/dispatch.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import contextlib
import functools
import itertools
import math
import sys
import time
from collections import defaultdict
from operator import attrgetter
Expand All @@ -16,6 +18,20 @@
from locust.runners import WorkerNode


def compatible_math_gcd(*args: int) -> int:
"""
This function is a workaround for the fact that `math.gcd` in:
- 3.5 <= Python < 3.9 doesn't accept more than two arguments.
- 3.9 <= Python can accept more than two arguments.
See more at https://docs.python.org/3.9/library/math.html#math.gcd
"""
if (3, 5) <= sys.version_info < (3, 9):
return functools.reduce(math.gcd, args)
elif sys.version_info >= (3, 9):
return math.gcd(*args)
raise NotImplementedError("This function is only implemented for Python from 3.5")


# To profile line-by-line, uncomment the code below (i.e. `import line_profiler ...`) and
# place `@profile` on the functions/methods you wish to profile. Then, in the unit test you are
# running, use `from locust.dispatch import profile; profile.print_stats()` at the end of the unit test.
Expand Down Expand Up @@ -366,18 +382,37 @@ def infinite_cycle_gen(users: list[tuple[type[User], int]]) -> itertools.cycle:
if not users:
return itertools.cycle([None])

# Normalize the weights so that the smallest weight will be equal to "target_min_weight".
# The value "2" was experimentally determined because it gave a better distribution especially
# when dealing with weights which are close to each others, e.g. 1.5, 2, 2.4, etc.
target_min_weight = 2

# 'Value' here means weight or fixed count
def _get_order_of_magnitude(n: float) -> int:
"""Get how many times we need to multiply `n` to get an integer-like number.
For example:
0.1 would return 10,
0.04 would return 100,
0.0007 would return 10000.
"""
if n <= 0:
raise ValueError("To get the order of magnitude, the number must be greater than 0.")

counter = 0
while n < 1:
n *= 10
counter += 1
return 10**counter

# Get maximum order of magnitude to "normalize the weights".
# "Normalizing the weights" is to multiply all weights by the same number so that
# they become integers. Then we can find the largest common divisor of all the
# weights, divide them by it and get the smallest possible numbers with the same
# ratio as the numbers originally had.
max_order_of_magnitude = _get_order_of_magnitude(min(abs(u[1]) for u in users))
weights = tuple(int(u[1] * max_order_of_magnitude) for u in users)

greatest_common_divisor = compatible_math_gcd(*weights)
normalized_values = [
(
user.__name__,
round(target_min_weight * value / min(u[1] for u in users)),
user[0].__name__,
normalized_weight // greatest_common_divisor,
)
for user, value in users
for user, normalized_weight in zip(users, weights)
]
generation_length_to_get_proper_distribution = sum(
normalized_val[1] for normalized_val in normalized_values
Expand Down
69 changes: 69 additions & 0 deletions locust/test/test_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,75 @@ class User3(User):
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)

def test_implementation_of_dispatch_distribution_with_gcd(self):
class User1(User):
weight = 4

class User2(User):
weight = 5

user_classes = [User1, User2]
worker_node1 = WorkerNode("1")

sleep_time = 0.2 # Speed-up test

users_dispatcher = UsersDispatcher(worker_nodes=[worker_node1], user_classes=user_classes)
users_dispatcher.new_dispatch(target_user_count=9, spawn_rate=9)

users_dispatcher._wait_between_dispatch = sleep_time

ts = time.perf_counter()
self.assertDictEqual(
next(users_dispatcher),
{
"1": {"User1": 4, "User2": 5},
},
)
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)

ts = time.perf_counter()
self.assertRaises(StopIteration, lambda: next(users_dispatcher))
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)

def test_implementation_of_dispatch_distribution_with_gcd_float_weight(self):
class User1(User):
weight = 0.8

class User2(User):
weight = 1

normalized_weights_to_min_int = 5 # User1: 0.8 * 5 = 4; User2: 1 * 5 = 5

user_classes = [User1, User2]
worker_node1 = WorkerNode("1")

sleep_time = 0.2 # Speed-up test

users_dispatcher = UsersDispatcher(worker_nodes=[worker_node1], user_classes=user_classes)
users_dispatcher.new_dispatch(target_user_count=18, spawn_rate=18)

users_dispatcher._wait_between_dispatch = sleep_time

ts = time.perf_counter()
self.assertDictEqual(
next(users_dispatcher),
{
"1": {
"User1": int(normalized_weights_to_min_int * User1.weight * 2),
"User2": int(normalized_weights_to_min_int * User2.weight * 2),
},
},
)
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)

ts = time.perf_counter()
self.assertRaises(StopIteration, lambda: next(users_dispatcher))
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)


class TestWaitBetweenDispatch(unittest.TestCase):
def test_wait_between_dispatch(self):
Expand Down
Loading