Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncio support #1251

Open
pawamoy opened this issue Jan 31, 2020 · 17 comments
Open

Asyncio support #1251

pawamoy opened this issue Jan 31, 2020 · 17 comments

Comments

@pawamoy
Copy link

pawamoy commented Jan 31, 2020

Follow up of #924 and #1079.

Is your feature request related to a problem? Please describe.

Sometimes one needs to simulate an asynchronous user behavior. Hatching more users does not solve the problem.

Describe the solution you'd like

Support defining async tasks or something similar.

Describe alternatives you've considered

One could also write a custom client. Maybe this would solve the problem. It would be great to collect examples or snippets on how to do so in this issue, and add them in https://docs.locust.io/en/stable/testing-other-systems.html or another doc page.

@heyman
Copy link
Member

heyman commented Jan 31, 2020

You can achieve this by spawning greenlets within your locust tasks. Here's a small example:

from gevent.pool import Pool
from locust import HttpLocust, TaskSet, task, constant

class MyLocust(HttpLocust):
    host = "https://docs.locust.io"
    wait_time = constant(5)
    class task_set(TaskSet):
        @task
        def dual_greenlet_task(self):
            def do_http_request_or_whatever():
                print("yay, running in separate greenlet")
                response = self.client.get("/")
                print("status code:", response.status_code)
            pool = Pool()
            pool.spawn(do_http_request_or_whatever)
            pool.spawn(do_http_request_or_whatever)
            pool.join()

Locust is heavily reliant on Gevent, and as far as I know gevent and python async are not 100% compatible. Therefore I don't see locust supporting python async any time soon. There's been discussion about it in the gevent project: gevent/gevent#982

@pawamoy
Copy link
Author

pawamoy commented Jan 31, 2020

Alright thanks for the explanation, example and link 🙂

@redpublic
Copy link

Is there any reasons to not switch to asyncio completely?

@cyberw
Copy link
Collaborator

cyberw commented Dec 7, 2020

The main reason is that it would be a ton of work to rewrite everything. Apart from that I think it is a great idea :)

@Dalas
Copy link

Dalas commented Jan 6, 2021

You can achieve this by spawning greenlets within your locust tasks. Here's a small example:

from gevent.pool import Pool
from locust import HttpLocust, TaskSet, task, constant

class MyLocust(HttpLocust):
    host = "https://docs.locust.io"
    wait_time = constant(5)
    class task_set(TaskSet):
        @task
        def dual_greenlet_task(self):
            def do_http_request_or_whatever():
                print("yay, running in separate greenlet")
                response = self.client.get("/")
                print("status code:", response.status_code)
            pool = Pool()
            pool.spawn(do_http_request_or_whatever)
            pool.spawn(do_http_request_or_whatever)
            pool.join()

Locust is heavily reliant on Gevent, and as far as I know gevent and python async are not 100% compatible. Therefore I don't see locust supporting python async any time soon. There's been discussion about it in the gevent project: gevent/gevent#982

AFAIK using different threads to make many parallel requests with requests.Session is not good idea because requests.Session is not thread safe (psf/requests#1871). You need to use other client to do this

@heyman
Copy link
Member

heyman commented Jan 6, 2021

AFAIK using different threads to make many parallel requests with requests.Session is not good idea because requests.Session is not thread safe (psf/requests#1871). You need to use other client to do this

From the description of that issue it sounds like it should only be a problem if you're using the same client to connect to many different hosts. In that case one could create another HttpSession instance.

@Dalas
Copy link

Dalas commented Jan 11, 2021

@heyman in my case it happens when I'm executing many requests on the same host (as in your example).

@Dalas
Copy link

Dalas commented Jan 11, 2021

@heyman sorry about confusion, looks like it was my mistake. Everything is ok with requests.Session

@Monica-Ji
Copy link

Do we support Asyncio in locust task now?

@sergeyglazyrindev
Copy link

hey guys, maintainers, theoretically, do you see any potential issues with using asgiref project to execute async functions as sync ones ?

@cyberw
Copy link
Collaborator

cyberw commented Dec 12, 2023

Havent tried it, would love to see a proof of concept!

@cyberw
Copy link
Collaborator

cyberw commented Dec 12, 2023

Most asyncio wrappers have issues coexisting with gevent though.. but if it works then it would be awesome!

@jimdowling
Copy link

gevent now has an asyncio version, so that should make integration easier:
https://github.com/gfmio/asyncio-gevent

@cyberw
Copy link
Collaborator

cyberw commented Mar 15, 2024

gevent now has an asyncio version, so that should make integration easier: https://github.com/gfmio/asyncio-gevent

Unfortunately that repo hasnt been updated in two years, so I'm not sure if it works any more.

I think it would be doable to implement an asyncio version of the locust worker, but it'll take some work.

@thx123
Copy link

thx123 commented Nov 18, 2024

Is there a more concrete example of calling ayncio coroutine inside a Locust gevent task? Sorry but it's not immediately obvious after reading the code examples above (where I can't find where asyncio coroutines are called...)

@cyberw
Copy link
Collaborator

cyberw commented Nov 18, 2024

Asyncio is not supported right now.

We're planning to build a completely asyncio-based worker implementation, but unless theres a simple way to allow gevent and asyncio to coexist (like asyncio-gevent seemed to promise before it went silent) it might be a while.

In the event that someone wants to take it upon themselves to reimplement the worker on asyncio themselves, it would be most welcome :)

@cyberw cyberw reopened this Nov 18, 2024
@thx123
Copy link

thx123 commented Nov 19, 2024

Thanks @cyberw for your prompt reply. After days of struggle, I finally found a way to make asyncio work in Locust. Here is a working example. The basic idea is to create a dedicated thread with the asyncio event loop in each process (since Locust can start multiple workers on multiple CPU cores that run in separate processes), and define a lambda function that takes a coroutine and its parameters as the argument, and dynamically run it in the event loop in the process that it belongs.

import asyncio
import logging
import gevent
import os
import threading
import time
from locust import User, task, between, events
import logging
import threading
from typing import Any, Callable, Awaitable

# Set the logging level for the root logger to WARNING to suppress INFO messages
logging.getLogger("root").setLevel(logging.WARNING)

def thread_func(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any:
    """Run asyncio coroutine in the current event loop to make gevent work."""
    logging.debug(f'In thread_func()')
    try:
        assert loop.is_running(), "Asyncio event loop is not running!"
        future = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop)
        event = gevent.event.Event()
        future.add_done_callback(lambda _: event.set())
        event.wait()
        return future.result(timeout=3)
    except TimeoutError as te:
        logging.exception(f'TimeoutError: {te}')
        future.cancel()
        raise te
    except RuntimeError as rte:
        logging.exception(f'RuntimeError: {rte}')
        raise rte
    except Exception as e:
        logging.exception(f'Other Exception: {e}')
        raise e

def run_asyncio(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any:
    logging.debug(f'In run_asyncio()')
    return thread_func(loop, coro, *args, **kwargs)


async def async_foo(param1, param2):
    """asyncio coroutine function to be tested"""
    ...


class AsyncioInLocustTest(User):
    shortest_secs, longest_secs = 0.1, 1
    wait_time = between(shortest_secs, longest_secs)

    # Class-level variables to track process initialization
    shared_loop = None
    shared_thread = None
    initialized_pid = None  # Store the PID of the process where initialization has occurred

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Detect newly forked processes
        current_pid = os.getpid()
        if OSSLoadTest.initialized_pid != current_pid:
            # Run initialization for the current process
            self.init_process_resources(current_pid)

        # Use the shared event loop and thread
        self.loop = OSSLoadTest.shared_loop

    @classmethod
    def init_process_resources(cls, current_pid):
        """Initialize resources once for each new process."""
        cls.initialized_pid = current_pid  # Mark the process as initialized
        print(f"Initializing resources for process PID: {current_pid}")

        # Create a shared asyncio event loop and thread for this process
        cls.shared_loop = asyncio.new_event_loop()
        cls.shared_thread = threading.Thread(target=cls.shared_loop.run_forever, daemon=True)
        cls.shared_thread.start()

    @task
    def asyncio_test(self):
        start_time = time.time()
        try:
            run_asyncio(self.loop, async_foo, foo_param1, foo_param2)

            # Fire success event
            events.request.fire(
                request_type="foo",
                name="bar",
                response_time=int((time.time() - start_time) * 1000),
                response_length=0,
                exception=None
            )
        except Exception as e:
            # Fire failure event
            events.request.fire(
                request_type="foo",
                name="bar",
                response_time=int((time.time() - start_time) * 1000),
                response_length=0,
                exception=e
            )
            raise e

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants