diff --git a/docs/testing-other-systems.rst b/docs/testing-other-systems.rst index 20fe3e50c3..c93dde5112 100644 --- a/docs/testing-other-systems.rst +++ b/docs/testing-other-systems.rst @@ -42,9 +42,10 @@ Dummy server to test: .. literalinclude:: ../examples/grpc/hello_server.py -gRPC client, base User and example usage: +gRPC client, base GrpcUser, interceptor for sending events to locust and example usage: .. literalinclude:: ../examples/grpc/locustfile.py +As base class for interceptor is used `grpc-interceptor ` library. For more examples of user types, see `locust-plugins `_ (it has users for WebSocket/SocketIO, Kafka, Selenium/WebDriver and more). \ No newline at end of file diff --git a/examples/grpc/hello_server.py b/examples/grpc/hello_server.py index 1307eaf651..6137bf6e7b 100644 --- a/examples/grpc/hello_server.py +++ b/examples/grpc/hello_server.py @@ -22,3 +22,7 @@ def start_server(): server.start() logger.info("gRPC server started") server.wait_for_termination() + + +if __name__ == "__main__": + start_server() diff --git a/examples/grpc/locustfile.py b/examples/grpc/locustfile.py index d494e1a031..c3cbee0d69 100644 --- a/examples/grpc/locustfile.py +++ b/examples/grpc/locustfile.py @@ -1,18 +1,21 @@ # make sure you use grpc version 1.39.0 or later, # because of https://github.com/grpc/grpc/issues/15880 that affected earlier versions +from typing import Callable, Any +import time + import grpc -import hello_pb2_grpc -import hello_pb2 +import grpc.experimental.gevent as grpc_gevent +import gevent from locust import events, User, task from locust.exception import LocustError -from locust.user.task import LOCUST_STATE_STOPPING +from grpc_interceptor import ClientInterceptor + +import hello_pb2_grpc +import hello_pb2 + from hello_server import start_server -import gevent -import time # patch grpc so that it uses gevent instead of asyncio -import grpc.experimental.gevent as grpc_gevent - grpc_gevent.init_gevent() @@ -22,36 +25,38 @@ def run_grpc_server(environment, **_kwargs): gevent.spawn(start_server) -class GrpcClient: - def __init__(self, environment, stub): +class LocustInterceptor(ClientInterceptor): + def __init__(self, environment, *args, **kwargs): + super().__init__(*args, **kwargs) + self.env = environment - self._stub_class = stub.__class__ - self._stub = stub - - def __getattr__(self, name): - func = self._stub_class.__getattribute__(self._stub, name) - - def wrapper(*args, **kwargs): - request_meta = { - "request_type": "grpc", - "name": name, - "start_time": time.time(), - "response_length": 0, - "exception": None, - "context": None, - "response": None, - } - start_perf_counter = time.perf_counter() - try: - request_meta["response"] = func(*args, **kwargs) - request_meta["response_length"] = len(request_meta["response"].message) - except grpc.RpcError as e: - request_meta["exception"] = e - request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000 - self.env.events.request.fire(**request_meta) - return request_meta["response"] - - return wrapper + + def intercept( + self, + method: Callable, + request_or_iterator: Any, + call_details: grpc.ClientCallDetails, + ): + response = None + exception = None + start_perf_counter = time.perf_counter() + response_length = 0 + try: + response = method(request_or_iterator, call_details) + response_length = response.result().ByteSize() + except grpc.RpcError as e: + exception = e + + self.env.events.request.fire( + request_type="grpc", + name=call_details.method, + response_time=(time.perf_counter() - start_perf_counter) * 1000, + response_length=response_length, + response=response, + context=None, + exception=exception, + ) + return response class GrpcUser(User): @@ -64,10 +69,12 @@ def __init__(self, environment): for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")): if attr_value is None: raise LocustError(f"You must specify the {attr_name}.") + self._channel = grpc.insecure_channel(self.host) - self._channel_closed = False - stub = self.stub_class(self._channel) - self.client = GrpcClient(environment, stub) + interceptor = LocustInterceptor(environment=environment) + self._channel = grpc.intercept_channel(self._channel, interceptor) + + self.stub = self.stub_class(self._channel) class HelloGrpcUser(GrpcUser): @@ -76,6 +83,5 @@ class HelloGrpcUser(GrpcUser): @task def sayHello(self): - if not self._channel_closed: - self.client.SayHello(hello_pb2.HelloRequest(name="Test")) + self.stub.SayHello(hello_pb2.HelloRequest(name="Test")) time.sleep(1)