Skip to content

Commit

Permalink
Delegated the implementations of Lock and Semaphore to the async back…
Browse files Browse the repository at this point in the history
…end class (#761)

Also added the `fast_acquire` parameter for Lock and Semaphore.
  • Loading branch information
agronholm authored Sep 5, 2024
1 parent ee8165b commit 0c8ad51
Show file tree
Hide file tree
Showing 7 changed files with 618 additions and 94 deletions.
13 changes: 13 additions & 0 deletions docs/synchronization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ Example::

run(main)

.. tip:: If the performance of semaphores is critical for you, you could pass
``fast_acquire=True`` to :class:`Semaphore`. This has the effect of skipping the
:func:`~.lowlevel.cancel_shielded_checkpoint` call in :meth:`Semaphore.acquire` if
there is no contention (acquisition succeeds immediately). This could, in some cases,
lead to the task never yielding control back to to the event loop if you use the
semaphore in a loop that does not have other yield points.

Locks
-----

Expand All @@ -92,6 +99,12 @@ Example::

run(main)

.. tip:: If the performance of locks is critical for you, you could pass
``fast_acquire=True`` to :class:`Lock`. This has the effect of skipping the
:func:`~.lowlevel.cancel_shielded_checkpoint` call in :meth:`Lock.acquire` if there
is no contention (acquisition succeeds immediately). This could, in some cases, lead
to the task never yielding control back to to the event loop if use the lock in a
loop that does not have other yield points.

Conditions
----------
Expand Down
5 changes: 5 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.

**UNRELEASED**

- Improved the performance of ``anyio.Lock`` and ``anyio.Semaphore`` on asyncio (even up
to 50 %)
- Added the ``fast_acquire`` parameter to ``anyio.Lock`` and ``anyio.Semaphore`` to
further boost performance at the expense of safety (``acquire()`` will not yield
control back if there is no contention)
- Fixed ``__repr__()`` of ``MemoryObjectItemReceiver``, when ``item`` is not defined
(`#767 <https://github.com/agronholm/anyio/pulls/767>`_; PR by @Danipulok)
- Added support for the ``from_uri()``, ``full_match()``, ``parser`` methods/properties
Expand Down
181 changes: 178 additions & 3 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@

import sniffio

from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
from .. import (
CapacityLimiterStatistics,
EventStatistics,
LockStatistics,
TaskInfo,
abc,
)
from .._core._eventloop import claim_worker_thread, threadlocals
from .._core._exceptions import (
BrokenResourceError,
Expand All @@ -70,9 +76,16 @@
)
from .._core._sockets import convert_ipv6_sockaddr
from .._core._streams import create_memory_object_stream
from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
from .._core._synchronization import (
CapacityLimiter as BaseCapacityLimiter,
)
from .._core._synchronization import Event as BaseEvent
from .._core._synchronization import ResourceGuard
from .._core._synchronization import Lock as BaseLock
from .._core._synchronization import (
ResourceGuard,
SemaphoreStatistics,
)
from .._core._synchronization import Semaphore as BaseSemaphore
from .._core._tasks import CancelScope as BaseCancelScope
from ..abc import (
AsyncBackend,
Expand Down Expand Up @@ -1658,6 +1671,154 @@ def statistics(self) -> EventStatistics:
return EventStatistics(len(self._event._waiters))


class Lock(BaseLock):
def __new__(cls, *, fast_acquire: bool = False) -> Lock:
return object.__new__(cls)

def __init__(self, *, fast_acquire: bool = False) -> None:
self._fast_acquire = fast_acquire
self._owner_task: asyncio.Task | None = None
self._waiters: deque[tuple[asyncio.Task, asyncio.Future]] = deque()

async def acquire(self) -> None:
if self._owner_task is None and not self._waiters:
await AsyncIOBackend.checkpoint_if_cancelled()
self._owner_task = current_task()

# Unless on the "fast path", yield control of the event loop so that other
# tasks can run too
if not self._fast_acquire:
try:
await AsyncIOBackend.cancel_shielded_checkpoint()
except CancelledError:
self.release()
raise

return

task = cast(asyncio.Task, current_task())
fut: asyncio.Future[None] = asyncio.Future()
item = task, fut
self._waiters.append(item)
try:
await fut
except CancelledError:
self._waiters.remove(item)
if self._owner_task is task:
self.release()

raise

self._waiters.remove(item)

def acquire_nowait(self) -> None:
if self._owner_task is None and not self._waiters:
self._owner_task = current_task()
return

raise WouldBlock

def locked(self) -> bool:
return self._owner_task is not None

def release(self) -> None:
if self._owner_task != current_task():
raise RuntimeError("The current task is not holding this lock")

for task, fut in self._waiters:
if not fut.cancelled():
self._owner_task = task
fut.set_result(None)
return

self._owner_task = None

def statistics(self) -> LockStatistics:
task_info = AsyncIOTaskInfo(self._owner_task) if self._owner_task else None
return LockStatistics(self.locked(), task_info, len(self._waiters))


class Semaphore(BaseSemaphore):
def __new__(
cls,
initial_value: int,
*,
max_value: int | None = None,
fast_acquire: bool = False,
) -> Semaphore:
return object.__new__(cls)

def __init__(
self,
initial_value: int,
*,
max_value: int | None = None,
fast_acquire: bool = False,
):
super().__init__(initial_value, max_value=max_value)
self._value = initial_value
self._max_value = max_value
self._fast_acquire = fast_acquire
self._waiters: deque[asyncio.Future[None]] = deque()

async def acquire(self) -> None:
if self._value > 0 and not self._waiters:
await AsyncIOBackend.checkpoint_if_cancelled()
self._value -= 1

# Unless on the "fast path", yield control of the event loop so that other
# tasks can run too
if not self._fast_acquire:
try:
await AsyncIOBackend.cancel_shielded_checkpoint()
except CancelledError:
self.release()
raise

return

fut: asyncio.Future[None] = asyncio.Future()
self._waiters.append(fut)
try:
await fut
except CancelledError:
try:
self._waiters.remove(fut)
except ValueError:
self.release()

raise

def acquire_nowait(self) -> None:
if self._value == 0:
raise WouldBlock

self._value -= 1

def release(self) -> None:
if self._max_value is not None and self._value == self._max_value:
raise ValueError("semaphore released too many times")

for fut in self._waiters:
if not fut.cancelled():
fut.set_result(None)
self._waiters.remove(fut)
return

self._value += 1

@property
def value(self) -> int:
return self._value

@property
def max_value(self) -> int | None:
return self._max_value

def statistics(self) -> SemaphoreStatistics:
return SemaphoreStatistics(len(self._waiters))


class CapacityLimiter(BaseCapacityLimiter):
_total_tokens: float = 0

Expand Down Expand Up @@ -2108,6 +2269,20 @@ def create_task_group(cls) -> abc.TaskGroup:
def create_event(cls) -> abc.Event:
return Event()

@classmethod
def create_lock(cls, *, fast_acquire: bool) -> abc.Lock:
return Lock(fast_acquire=fast_acquire)

@classmethod
def create_semaphore(
cls,
initial_value: int,
*,
max_value: int | None = None,
fast_acquire: bool = False,
) -> abc.Semaphore:
return Semaphore(initial_value, max_value=max_value, fast_acquire=fast_acquire)

@classmethod
def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
return CapacityLimiter(total_tokens)
Expand Down
Loading

0 comments on commit 0c8ad51

Please sign in to comment.