Skip to content

Commit

Permalink
Raise exception in tests when blocking code is called in event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Oct 26, 2024
1 parent 3131c0c commit 1449e96
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/backend/base/langflow/services/telemetry/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(self, settings_service: SettingsService):
self._stopping = False

self.ot = OpenTelemetry(prometheus_enabled=settings_service.settings.prometheus_enabled)
self.architecture = None

# Check for do-not-track settings
self.do_not_track = (
Expand Down Expand Up @@ -93,15 +94,16 @@ async def _queue_event(self, payload) -> None:
async def log_package_version(self) -> None:
python_version = ".".join(platform.python_version().split(".")[:2])
version_info = get_version_info()
architecture = platform.architecture()[0]
if self.architecture is None:
self.architecture = (await asyncio.to_thread(platform.architecture))[0]
payload = VersionPayload(
package=version_info["package"].lower(),
version=version_info["version"],
platform=platform.platform(),
python=python_version,
cache_type=self.settings_service.settings.cache_type,
backend_only=self.settings_service.settings.backend_only,
arch=architecture,
arch=self.architecture,
auto_login=self.settings_service.auth_settings.AUTO_LOGIN,
)
await self._queue_event((self.send_telemetry_data, payload, None))
Expand Down
70 changes: 70 additions & 0 deletions src/backend/tests/blockbuster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import os
import ssl
import time
from socket import socket as _socket


class BlockingError(Exception): ...


def _raise_if_blocking(func):
def wrapper(*args, **kwargs):
try:
asyncio.get_running_loop()
except RuntimeError:
return func(*args, **kwargs)
msg = f"Blocking call to {func.__module__}.{func.__qualname__}"
raise BlockingError(msg)

return wrapper


def _wrap_os_blocking(func):
def os_op(fd, *args, **kwargs):
try:
asyncio.get_running_loop()
except RuntimeError:
return func(fd, *args, **kwargs)
if os.get_blocking(fd):
msg = f"Blocking call to {func.__module__}.{func.__qualname__}"
raise BlockingError(msg)
return func(fd, *args, **kwargs)

return os_op


def _wrap_socket_blocking(func):
def socket_op(self, *args, **kwargs):
try:
asyncio.get_running_loop()
except RuntimeError:
return func(self, *args, **kwargs)
if self.getblocking():
msg = f"Blocking call to {func.__module__}.{func.__qualname__}"
raise BlockingError(msg)
return func(self, *args, **kwargs)

return socket_op


def init():
time.sleep = _raise_if_blocking(time.sleep)

os.read = _wrap_os_blocking(os.read)
os.write = _wrap_os_blocking(os.write)

_socket.send = _wrap_socket_blocking(_socket.send)
_socket.sendall = _wrap_socket_blocking(_socket.sendall)
_socket.sendto = _wrap_socket_blocking(_socket.sendto)
_socket.recv = _wrap_socket_blocking(_socket.recv)
_socket.recv_into = _wrap_socket_blocking(_socket.recv_into)
_socket.recvfrom = _wrap_socket_blocking(_socket.recvfrom)
_socket.recvfrom_into = _wrap_socket_blocking(_socket.recvfrom_into)
_socket.recvmsg = _wrap_socket_blocking(_socket.recvmsg)
_socket.recvmsg_into = _wrap_socket_blocking(_socket.recvmsg_into)

ssl.SSLSocket.write = _wrap_socket_blocking(ssl.SSLSocket.write)
ssl.SSLSocket.send = _wrap_socket_blocking(ssl.SSLSocket.send)
ssl.SSLSocket.read = _wrap_socket_blocking(ssl.SSLSocket.read)
ssl.SSLSocket.recv = _wrap_socket_blocking(ssl.SSLSocket.recv)
2 changes: 2 additions & 0 deletions src/backend/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
from sqlmodel.pool import StaticPool
from typer.testing import CliRunner

from tests import blockbuster
from tests.api_keys import get_openai_api_key

if TYPE_CHECKING:
from langflow.services.database.service import DatabaseService


load_dotenv()
blockbuster.init()


def pytest_configure(config):
Expand Down

0 comments on commit 1449e96

Please sign in to comment.