Skip to content

Commit

Permalink
Merge pull request #2096 from Nosibb/Bugfix/2036_multiple_resetting_c…
Browse files Browse the repository at this point in the history
…onnection_after_RPCError

Fix multiple resetting connection after RPCError
  • Loading branch information
cyberw authored May 19, 2022
2 parents be10968 + 2e6d3ae commit 9e81130
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
2 changes: 1 addition & 1 deletion locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ def reset_connection(self):
try:
self.server.close()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.connection_broken = False
except RPCError as e:
logger.error(f"Temporary failure when resetting connection: {e}, will retry later.")

Expand All @@ -905,7 +906,6 @@ def client_listener(self):
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
self.connection_broken = False
msg.node_id = client_id
if msg.type == "client_ready":
if not msg.data:
Expand Down
27 changes: 23 additions & 4 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@
NETWORK_BROKEN = "network broken"


def mocked_rpc():
def mocked_rpc(raise_on_close=True):
class MockedRpcServerClient:
queue = Queue()
outbox = []
raise_error_on_close = raise_on_close

def __init__(self, *args, **kwargs):
pass
Expand Down Expand Up @@ -85,7 +86,10 @@ def recv_from_client(self):
return msg.node_id, msg

def close(self):
raise RPCError()
if self.raise_error_on_close:
raise RPCError()
else:
pass

return MockedRpcServerClient

Expand Down Expand Up @@ -2728,17 +2732,32 @@ class MyUser(User):
def test_master_reset_connection(self):
"""Test that connection will be reset when network issues found"""
with mock.patch("locust.runners.FALLBACK_INTERVAL", new=0.1):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
with mock.patch("locust.rpc.rpc.Server", mocked_rpc(raise_on_close=False)) as server:
master = self.get_runner()
self.assertEqual(0, len(master.clients))
server.mocked_send(Message("client_ready", NETWORK_BROKEN, "fake_client"))
self.assertTrue(master.connection_broken)
server.mocked_send(Message("client_ready", __version__, "fake_client"))
sleep(0.2)
sleep(1)
self.assertFalse(master.connection_broken)
self.assertEqual(1, len(master.clients))
master.quit()

def test_reset_connection_after_RPCError(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc(raise_on_close=False)) as server:
master = self.get_runner()
server.mocked_send(Message("client_ready", __version__, "fake_client"))
sleep(0.2)
self.assertFalse(master.connection_broken)
self.assertEqual(1, len(master.clients))

# Trigger RPCError
server.mocked_send(Message("lets_trigger_RPCError", NETWORK_BROKEN, "fake_client"))
self.assertTrue(master.connection_broken)
sleep(1)
self.assertFalse(master.connection_broken)
master.quit()

def test_attributes_populated_when_calling_start(self):
class MyUser1(User):
@task
Expand Down

0 comments on commit 9e81130

Please sign in to comment.