-
Notifications
You must be signed in to change notification settings - Fork 3k
/
custom_messages.py
70 lines (50 loc) · 2.43 KB
/
custom_messages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from locust import HttpUser, between, events, task
from locust.runners import MasterRunner, WorkerRunner
import gevent
usernames = []
def setup_test_users(environment, msg, **kwargs):
# Fired when the worker receives a message of type 'test_users'
usernames.extend(map(lambda u: u["name"], msg.data))
environment.runner.send_message("acknowledge_users", f"Thanks for the {len(msg.data)} users!")
environment.runner.send_message("concurrent_message", "Message to concurrent handler")
def on_acknowledge(msg, **kwargs):
# Fired when the master receives a message of type 'acknowledge_users'
print(msg.data)
def on_concurrent_message(msg, **kwargs):
print(f"concurrent_message received with data: '{msg.data}'")
gevent.sleep(10) # if this handler was run with concurrent=False it would halt the message handling loop in locust
print("finished processing concurrent_message")
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
environment.runner.register_message("test_users", setup_test_users)
if not isinstance(environment.runner, WorkerRunner):
environment.runner.register_message("acknowledge_users", on_acknowledge)
environment.runner.register_message("concurrent_message", on_concurrent_message, concurrent=True)
@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
# When the test is started, evenly divides list between
# worker nodes to ensure unique data across threads
if not isinstance(environment.runner, WorkerRunner):
users = []
for i in range(environment.runner.target_user_count):
users.append({"name": f"User{i}"})
worker_count = environment.runner.worker_count
chunk_size = int(len(users) / worker_count)
for i, worker in enumerate(environment.runner.clients):
start_index = i * chunk_size
if i + 1 < worker_count:
end_index = start_index + chunk_size
else:
end_index = len(users)
data = users[start_index:end_index]
environment.runner.send_message("test_users", data, worker)
class WebsiteUser(HttpUser):
host = "http://127.0.0.1:8089"
wait_time = between(2, 5)
def __init__(self, parent):
self.username = usernames.pop()
super().__init__(parent)
@task
def task(self):
print(self.username)