diff --git a/dynamo/_types.py b/dynamo/_types.py index 85866b9..33b44b8 100644 --- a/dynamo/_types.py +++ b/dynamo/_types.py @@ -22,13 +22,17 @@ V = TypeVar("V", bound="View", covariant=True) -class WrappedCoroutine[**P, R](Protocol): +class WrappedCoroutine[**P, T](Protocol): + """A coroutine that has been wrapped by a decorator.""" + __name__: str - __call__: Callable[P, Coroutine[Any, Any, R]] + __call__: Callable[P, Coroutine[Any, Any, T]] + +class DecoratedCoroutine[**P, T](Protocol): + """A decorator that has been applied to a coroutine.""" -class DecoratedCoroutine[**P, R](Protocol): - __call__: Callable[[WrappedCoroutine[P, R]], R] + __call__: Callable[[WrappedCoroutine[P, T]], T] class NotFoundWithHelp(commands.CommandError): ... @@ -59,8 +63,27 @@ class NotFoundWithHelp(commands.CommandError): ... } -class _MISSING: +class MissingSentinel: + """ + Represents a sentinel value to indicate that something is missing or not provided. + + This class is not meant to be instantiated. It should be used as a type for + comparison or as a default value in function signatures. + """ + __slots__ = () + def __eq__(self, other: Any) -> bool: + return False + + def __bool__(self) -> bool: + return False + + def __hash__(self) -> int: + return 0 + + def __repr__(self): + return "..." + -MISSING: Any = _MISSING() +MISSING: Any = MissingSentinel() diff --git a/dynamo/extensions/cogs/events.py b/dynamo/extensions/cogs/events.py index d1b0b4e..3a9d5bd 100644 --- a/dynamo/extensions/cogs/events.py +++ b/dynamo/extensions/cogs/events.py @@ -54,7 +54,8 @@ async def on_timeout(self) -> None: for i in self.children: item = cast(EventsDropdown[EventsView], i) item.disabled = True - await self.message.edit(view=self) + await self.message.edit(content="Expired!", view=self) + await self.message.delete(delay=10) class InterestedDropdown(EventsDropdown[EventsView]): @@ -88,53 +89,54 @@ async def get_interested(event: discord.ScheduledEvent) -> str: return f"`[{event.name}]({event.url}) {" ".join(u.mention for u in users) or "No users found"}`" -class Events(DynamoCog): - """Scheduled event related commands""" +async def fetch_events(guild: discord.Guild) -> list[discord.ScheduledEvent]: + events: list[discord.ScheduledEvent] = [] + try: + events = await guild.fetch_scheduled_events(with_counts=False) + except discord.HTTPException: + return [] + return sorted(events, key=lambda e: e.start_time) - def __init__(self, bot: Dynamo) -> None: - super().__init__(bot) - async def fetch_events(self, guild: discord.Guild) -> list[discord.ScheduledEvent]: - events: list[discord.ScheduledEvent] = [] +@async_cache(ttl=1800) +async def event_check(guild: discord.Guild, event_id: int | None = None) -> str | list[discord.ScheduledEvent]: + """|coro| + + Get a list of members subscribed to an event. If event is provided, get attendees of that event if it exists. + If no event is provided, get a list of all events in the guild. In both cases if neither events nor attendees + are found, return a failure message. + + Parameters + ---------- + guild : discord.Guild + The guild to fetch events from + event_id : int | None, optional + The id of a specific event to fetch, by default None + + Returns + ------- + str | list[discord.ScheduledEvent] + A string if an event is not found, otherwise a list of events + """ + if event_id is not None: try: - events = await guild.fetch_scheduled_events(with_counts=False) - except discord.HTTPException: - self.log.exception("Failed to fetch events for guild %s", guild.id) - return sorted(events, key=lambda e: e.start_time) + ev = await guild.fetch_scheduled_event(event_id, with_counts=False) + except discord.NotFound: + return f"No event with id: {event_id}" + return await get_interested(ev) - @async_cache(ttl=1800) - async def event_check( - self, guild: discord.Guild, event_id: int | None = None - ) -> str | list[discord.ScheduledEvent]: - """|coro| + events = await fetch_events(guild) + return events or f"{Context.Status.FAILURE} No events found!" - Get a list of members subscribed to an event. If event is provided, get attendees of that event if it exists. - If no event is provided, get a list of all events in the guild. In both cases if neither events nor attendees - are found, return a failure message. - Parameters - ---------- - guild : discord.Guild - The guild to fetch events from - event_id : int | None, optional - The id of a specific event to fetch, by default None - - Returns - ------- - str | list[discord.ScheduledEvent] - _description_ - """ - if event_id is not None: - try: - ev = await guild.fetch_scheduled_event(event_id, with_counts=False) - except discord.NotFound: - return f"No event with id: {event_id}" - return await get_interested(ev) +class Events(DynamoCog): + """Scheduled event related commands""" - return await self.fetch_events(guild) or f"{Context.Status.FAILURE} No events found!" + def __init__(self, bot: Dynamo) -> None: + super().__init__(bot) + self.active_users: set[int] = set() @commands.hybrid_command(name="event") - @commands.cooldown(1, 35, commands.BucketType.user) @commands.guild_only() async def event(self, ctx: Context, event: int | None = None) -> None: """Get a list of members subscribed to an event @@ -144,24 +146,30 @@ async def event(self, ctx: Context, event: int | None = None) -> None: event: int | None, optional The event ID to get attendees of """ - if ctx.guild is None: + if ctx.guild is None or ctx.author.id in self.active_users: return - message = await ctx.send(f"{self.bot.app_emojis.get("loading2", "⏳")}\tFetching events...") + # Prevent invokation when a view is already active by invoking user + self.active_users.add(ctx.author.id) + + guild_not_cached = event_check.get_containing(ctx.guild, event) is None + fetch_message = "Events not cached, fetching..." if guild_not_cached else "Fetching events..." + message = await ctx.send(f"{self.bot.app_emojis.get("loading2", "⏳")}\t{fetch_message}") - event_check: str | list[discord.ScheduledEvent] = await self.event_check(ctx.guild, event) - if isinstance(event_check, str): - await message.edit(content=event_check) + event_exists: str | list[discord.ScheduledEvent] = await event_check(ctx.guild, event) + if isinstance(event_exists, str): + self.active_users.remove(ctx.author.id) + await message.edit(content=event_exists) await message.delete(delay=10) return - view = EventsView(ctx.author.id, event_check, InterestedDropdown, timeout=25) + view = EventsView(ctx.author.id, event_exists, InterestedDropdown, timeout=25) + view.message = message await message.edit(content=f"Events in {ctx.guild.name}:", view=view) await view.wait() - await message.edit(content="Expired!", view=None) - await message.delete(delay=10) + self.active_users.remove(ctx.author.id) async def setup(bot: Dynamo) -> None: diff --git a/dynamo/utils/cache.py b/dynamo/utils/cache.py index d166786..f13d628 100644 --- a/dynamo/utils/cache.py +++ b/dynamo/utils/cache.py @@ -28,6 +28,7 @@ class CachedTask[**P, T](Protocol): cache_info: Callable[[], CacheInfo] cache_clear: Callable[[], None] cache_parameters: Callable[[], dict[str, int | float | None]] + get_containing: Callable[P, asyncio.Task[T] | None] DecoratedCoroutine = Callable[[WrappedCoroutine[P, T]], CachedTask[P, T]] @@ -247,10 +248,10 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> asyncio.Task[T]: if ttl is not None: - def evict(k: Hashable) -> None: + def evict(k: Hashable, default: Any = MISSING) -> None: log.debug("Eviction: TTL expired for %s", k) with lock: - internal_cache.pop(k, sentinel) + internal_cache.pop(k, default) call_after_ttl = partial(asyncio.get_running_loop().call_later, ttl, evict, key) task.add_done_callback(call_after_ttl) @@ -264,7 +265,13 @@ def cache_clear() -> None: internal_cache.clear() _cache_info.clear() + def get_containing(*args: P.args, **kwargs: P.kwargs) -> asyncio.Task[T] | None: + key = make_key(args, kwargs) + result = cache_get(key, sentinel) + return result if result is not sentinel else None + _wrapper = cast(CachedTask[P, T], wrapper) _wrapper.cache_info = cache_info _wrapper.cache_clear = cache_clear + _wrapper.get_containing = get_containing return _wrapper