diff --git a/generate_changelog.py b/generate_changelog.py index 927ef5774a..ae93eaa09d 100755 --- a/generate_changelog.py +++ b/generate_changelog.py @@ -22,7 +22,7 @@ "-p", "locust", "--exclude-labels", - "duplicate,question,invalid,wontfix,cantfix,stale", + "duplicate,question,invalid,wontfix,cantfix,stale,no-changelog", "--header-label", "# Detailed changelog\nThe most important changes can also be found in [the documentation](https://docs.locust.io/en/latest/changelog.html).", "--since-tag", diff --git a/locust/exception.py b/locust/exception.py index 66ae64ad10..6ba264be21 100644 --- a/locust/exception.py +++ b/locust/exception.py @@ -69,6 +69,10 @@ class RPCReceiveError(Exception): When raised from zmqrpc, client connection should be reestablished. """ + def __init__(self, *args: object, addr=None) -> None: + super().__init__(*args) + self.addr = addr + class AuthCredentialsError(ValueError): """ diff --git a/locust/rpc/zmqrpc.py b/locust/rpc/zmqrpc.py index 2ec3b7c000..868dfccd85 100644 --- a/locust/rpc/zmqrpc.py +++ b/locust/rpc/zmqrpc.py @@ -49,7 +49,7 @@ def recv_from_client(self): try: msg = Message.unserialize(data[1]) except (UnicodeDecodeError, msgerr.ExtraData) as e: - raise RPCReceiveError("ZMQ interrupted or corrupted message") from e + raise RPCReceiveError("ZMQ interrupted or corrupted message", addr=addr) from e return addr, msg def close(self, linger=None): diff --git a/locust/runners.py b/locust/runners.py index aa4c3f0159..63f3465139 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -988,9 +988,21 @@ def client_listener(self) -> NoReturn: try: client_id, msg = self.server.recv_from_client() except RPCReceiveError as e: - # TODO: Add proper reconnect if https://github.com/zeromq/pyzmq/issues/1809 fixed - logger.error(f"Unrecognized message detected: {e}") - continue + client_id = e.addr + + if client_id and client_id in self.clients: + logger.error(f"RPCError when receiving from client: {e}. Will reset client {client_id}.") + try: + self.server.send_to_client(Message("reconnect", None, client_id)) + except Exception as error: + logger.error(f"Error sending reconnect message to worker: {error}. Will reset RPC server.") + self.connection_broken = True + gevent.sleep(FALLBACK_INTERVAL) + continue + else: + message = f"{e}" if not client_id else f"{e} from {client_id}" + logger.error(f"Unrecognized message detected: {message}") + continue except RPCSendError as e: logger.error(f"Error sending reconnect message to worker: {e}. Will reset RPC server.") self.connection_broken = True diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 21fea1c697..2042d32dcb 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -47,6 +47,8 @@ NETWORK_BROKEN = "network broken" BAD_MESSAGE = "bad message" +UNRECOGNIZED_HOST_MESSAGE = "unrecognized host message" +UNRECOGNIZED_MESSAGE = "unrecognized message" def mocked_rpc(raise_on_close=True): @@ -82,7 +84,11 @@ def recv_from_client(self): if msg.data == NETWORK_BROKEN: raise RPCError() if msg.data == BAD_MESSAGE: - raise RPCReceiveError("Bad message") + raise RPCReceiveError(BAD_MESSAGE, addr=msg.node_id) + if msg.data == UNRECOGNIZED_HOST_MESSAGE: + raise RPCReceiveError(UNRECOGNIZED_HOST_MESSAGE, addr="FAKE") + if msg.data == UNRECOGNIZED_MESSAGE: + raise RPCReceiveError(UNRECOGNIZED_MESSAGE) return msg.node_id, msg def close(self, linger=None): @@ -3040,6 +3046,57 @@ def my_task(self): master.start(10, 10) sleep(0.1) server.mocked_send(Message("stats", BAD_MESSAGE, "zeh_fake_client1")) + self.assertEqual(4, len(server.outbox)) + + # Expected message order in outbox: ack, spawn, reconnect, ack + self.assertEqual( + "reconnect", server.outbox[2][1].type, "Master didn't send worker reconnect message when expected." + ) + + def test_worker_sends_unrecognized_message_to_master(self): + """ + Validate master ignores message from worker when it cannot parse adddress info. + """ + + class TestUser(User): + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + master = self.get_runner(user_classes=[TestUser]) + server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1")) + self.assertEqual(1, len(master.clients)) + self.assertTrue( + "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict" + ) + + master.start(10, 10) + sleep(0.1) + server.mocked_send(Message("stats", UNRECOGNIZED_MESSAGE, "zeh_fake_client1")) + self.assertEqual(2, len(server.outbox)) + + def test_unknown_host_sends_message_to_master(self): + """ + Validate master ignores message that is sent from unknown host + """ + + class TestUser(User): + @task + def my_task(self): + pass + + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + master = self.get_runner(user_classes=[TestUser]) + server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1")) + self.assertEqual(1, len(master.clients)) + self.assertTrue( + "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict" + ) + + master.start(10, 10) + sleep(0.1) + server.mocked_send(Message("stats", UNRECOGNIZED_HOST_MESSAGE, "unknown_host")) self.assertEqual(2, len(server.outbox))