diff --git a/seraphsix/bot.py b/seraphsix/bot.py index 147bc5f..e087e89 100644 --- a/seraphsix/bot.py +++ b/seraphsix/bot.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import aioredis import asyncio -import arq import discord import io import logging @@ -21,6 +20,7 @@ from seraphsix.errors import ( InvalidCommandError, InvalidGameModeError, InvalidMemberError, NotRegisteredError, ConfigurationError, MissingTimezoneError, MaintenanceError) +from seraphsix.tasks.core import create_redis_jobs_pool from seraphsix.tasks.discord import store_sherpas, update_sherpa log = logging.getLogger(__name__) @@ -101,8 +101,7 @@ def __init__(self, config): @tasks.loop(minutes=5.0) async def update_last_active(self): - tasks = [] - guilds = await self.database.execute(Guild.select()) + guilds = await self.ext_conns['database'].execute(Guild.select()) if not guilds: return for guild in guilds: @@ -110,27 +109,9 @@ async def update_last_active(self): discord_guild = await self.fetch_guild(guild.guild_id) guild_name = str(discord_guild) log.info(f"Queueing task to find last active dates for all members of {guild_name} ({guild_id})") - - try: - tasks.extend([ - self.ext_conns['redis_jobs'].enqueue_job( - 'store_last_active', guild_id, guild_name, _job_id=f'store_last_active-{guild_id}') - ]) - except AttributeError: - log.exception("Redis connection not found") - await self.log_channel.send("Redis connection not found") - break - - try: - await asyncio.gather(*tasks) - except MaintenanceError as e: - if not self.bungie_maintenance: - log.info(f"Bungie maintenance is ongoing: {e}") - self.bungie_maintenance = True - else: - if self.bungie_maintenance: - self.bungie_maintenance = False - log.info("Bungie maintenance has ended") + await self.ext_conns['redis_jobs'].enqueue_job( + 'store_last_active', guild_id, guild_name, _job_id=f'store_last_active-{guild_id}' + ) @update_last_active.before_loop async def before_update_last_active(self): @@ -138,40 +119,23 @@ async def before_update_last_active(self): @tasks.loop(hours=1.0) async def update_member_games(self): - await asyncio.sleep(constants.TIME_MIN_SECONDS) - - guilds = await self.database.execute(Guild.select()) + guilds = await self.ext_conns['database'].execute(Guild.select()) if not guilds: return - - tasks = [] for guild in guilds: guild_id = guild.guild_id discord_guild = await self.fetch_guild(guild_id) - tasks.append( - self.ext_conns['redis_jobs'].enqueue_job( - 'store_all_games', guild_id, str(discord_guild), - _job_id=f'store_all_games-{guild_id}' - ) + guild_name = str(discord_guild) + await self.ext_conns['redis_jobs'].enqueue_job( + 'store_all_games', guild_id, guild_name, _job_id=f'store_all_games-{guild_id}' ) - try: - await asyncio.gather(*tasks) - except MaintenanceError as e: - if not self.bungie_maintenance: - log.info(f"Bungie maintenance is ongoing: {e}") - self.bungie_maintenance = True - else: - if self.bungie_maintenance: - self.bungie_maintenance = False - log.info("Bungie maintenance has ended") - @update_member_games.before_loop async def before_update_member_games(self): await self.wait_until_ready() async def update_sherpa_roles(self): - guilds = await self.database.execute(Guild.select()) + guilds = await self.ext_conns['database'].execute(Guild.select()) if not guilds: return tasks = [store_sherpas(self, guild) for guild in guilds if guild.track_sherpas] @@ -180,7 +144,7 @@ async def update_sherpa_roles(self): async def process_tweet(self, tweet): # pylint: disable=assignment-from-no-return query = TwitterChannel.select().where(TwitterChannel.twitter_id == tweet.user.id) - channels = await self.database.execute(query) + channels = await self.ext_conns['database'].execute(query) if not channels: log.info( @@ -209,14 +173,11 @@ async def track_tweets(self): async def connect_redis(self): self.redis = await aioredis.create_redis_pool(self.config.redis_url) self.ext_conns['redis_cache'] = self.redis - self.ext_conns['redis_jobs'] = await arq.create_pool(self.config.arq_redis) + self.ext_conns['redis_jobs'] = await create_redis_jobs_pool(self.config.arq_redis) @tasks.loop(hours=1.0) async def cache_clan_members(self): - database = self.ext_conns['database'] - redis_jobs = self.ext_conns['redis_jobs'] - - guilds = await database.execute(Guild.select()) + guilds = await self.ext_conns['database'].execute(Guild.select()) if not guilds: return @@ -225,7 +186,7 @@ async def cache_clan_members(self): discord_guild = await self.fetch_guild(guild.guild_id) guild_name = str(discord_guild) log.info(f"Queueing task to update cached members of {guild_name} ({guild_id})") - await redis_jobs.enqueue_job( + await self.ext_conns['redis_jobs'].enqueue_job( 'set_cached_members', guild_id, guild_name, _job_id=f'set_cached_members-{guild_id}' ) diff --git a/seraphsix/tasks/core.py b/seraphsix/tasks/core.py index cfae6b3..0050e7f 100644 --- a/seraphsix/tasks/core.py +++ b/seraphsix/tasks/core.py @@ -1,3 +1,4 @@ +import arq import asyncio import backoff import logging @@ -12,6 +13,14 @@ log = logging.getLogger(__name__) +async def create_redis_jobs_pool(config): + return await arq.create_pool( + config, + job_serializer=lambda b: msgpack.packb(b, default=encode_datetime), + job_deserializer=lambda b: msgpack.unpackb(b, object_hook=decode_datetime) + ) + + def backoff_handler(details): if details['wait'] > 30 or details['tries'] > 10: log.debug( @@ -24,7 +33,6 @@ def backoff_handler(details): backoff.expo, (PydestPrivateHistoryException, PydestMaintenanceException), max_tries=1, logger=None) -# @backoff.on_exception(backoff.expo, asyncio.TimeoutError, max_tries=1, logger=None) @backoff.on_exception(backoff.expo, (PydestException, asyncio.TimeoutError), logger=None, on_backoff=backoff_handler) async def execute_pydest(function, *args, **kwargs): retval = None