Skip to content

Commit

Permalink
Ensure the redis jobs pool is created correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
henworth committed Apr 9, 2021
1 parent c4974e5 commit a42e64c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 54 deletions.
67 changes: 14 additions & 53 deletions seraphsix/bot.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
import aioredis
import asyncio
import arq
import discord
import io
import logging
Expand All @@ -21,6 +20,7 @@
from seraphsix.errors import (
InvalidCommandError, InvalidGameModeError, InvalidMemberError,
NotRegisteredError, ConfigurationError, MissingTimezoneError, MaintenanceError)
from seraphsix.tasks.core import create_redis_jobs_pool
from seraphsix.tasks.discord import store_sherpas, update_sherpa

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -101,77 +101,41 @@ def __init__(self, config):

@tasks.loop(minutes=5.0)
async def update_last_active(self):
tasks = []
guilds = await self.database.execute(Guild.select())
guilds = await self.ext_conns['database'].execute(Guild.select())
if not guilds:
return
for guild in guilds:
guild_id = guild.guild_id
discord_guild = await self.fetch_guild(guild.guild_id)
guild_name = str(discord_guild)
log.info(f"Queueing task to find last active dates for all members of {guild_name} ({guild_id})")

try:
tasks.extend([
self.ext_conns['redis_jobs'].enqueue_job(
'store_last_active', guild_id, guild_name, _job_id=f'store_last_active-{guild_id}')
])
except AttributeError:
log.exception("Redis connection not found")
await self.log_channel.send("Redis connection not found")
break

try:
await asyncio.gather(*tasks)
except MaintenanceError as e:
if not self.bungie_maintenance:
log.info(f"Bungie maintenance is ongoing: {e}")
self.bungie_maintenance = True
else:
if self.bungie_maintenance:
self.bungie_maintenance = False
log.info("Bungie maintenance has ended")
await self.ext_conns['redis_jobs'].enqueue_job(
'store_last_active', guild_id, guild_name, _job_id=f'store_last_active-{guild_id}'
)

@update_last_active.before_loop
async def before_update_last_active(self):
await self.wait_until_ready()

@tasks.loop(hours=1.0)
async def update_member_games(self):
await asyncio.sleep(constants.TIME_MIN_SECONDS)

guilds = await self.database.execute(Guild.select())
guilds = await self.ext_conns['database'].execute(Guild.select())
if not guilds:
return

tasks = []
for guild in guilds:
guild_id = guild.guild_id
discord_guild = await self.fetch_guild(guild_id)
tasks.append(
self.ext_conns['redis_jobs'].enqueue_job(
'store_all_games', guild_id, str(discord_guild),
_job_id=f'store_all_games-{guild_id}'
)
guild_name = str(discord_guild)
await self.ext_conns['redis_jobs'].enqueue_job(
'store_all_games', guild_id, guild_name, _job_id=f'store_all_games-{guild_id}'
)

try:
await asyncio.gather(*tasks)
except MaintenanceError as e:
if not self.bungie_maintenance:
log.info(f"Bungie maintenance is ongoing: {e}")
self.bungie_maintenance = True
else:
if self.bungie_maintenance:
self.bungie_maintenance = False
log.info("Bungie maintenance has ended")

@update_member_games.before_loop
async def before_update_member_games(self):
await self.wait_until_ready()

async def update_sherpa_roles(self):
guilds = await self.database.execute(Guild.select())
guilds = await self.ext_conns['database'].execute(Guild.select())
if not guilds:
return
tasks = [store_sherpas(self, guild) for guild in guilds if guild.track_sherpas]
Expand All @@ -180,7 +144,7 @@ async def update_sherpa_roles(self):
async def process_tweet(self, tweet):
# pylint: disable=assignment-from-no-return
query = TwitterChannel.select().where(TwitterChannel.twitter_id == tweet.user.id)
channels = await self.database.execute(query)
channels = await self.ext_conns['database'].execute(query)

if not channels:
log.info(
Expand Down Expand Up @@ -209,14 +173,11 @@ async def track_tweets(self):
async def connect_redis(self):
self.redis = await aioredis.create_redis_pool(self.config.redis_url)
self.ext_conns['redis_cache'] = self.redis
self.ext_conns['redis_jobs'] = await arq.create_pool(self.config.arq_redis)
self.ext_conns['redis_jobs'] = await create_redis_jobs_pool(self.config.arq_redis)

@tasks.loop(hours=1.0)
async def cache_clan_members(self):
database = self.ext_conns['database']
redis_jobs = self.ext_conns['redis_jobs']

guilds = await database.execute(Guild.select())
guilds = await self.ext_conns['database'].execute(Guild.select())
if not guilds:
return

Expand All @@ -225,7 +186,7 @@ async def cache_clan_members(self):
discord_guild = await self.fetch_guild(guild.guild_id)
guild_name = str(discord_guild)
log.info(f"Queueing task to update cached members of {guild_name} ({guild_id})")
await redis_jobs.enqueue_job(
await self.ext_conns['redis_jobs'].enqueue_job(
'set_cached_members', guild_id, guild_name, _job_id=f'set_cached_members-{guild_id}'
)

Expand Down
10 changes: 9 additions & 1 deletion seraphsix/tasks/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import arq
import asyncio
import backoff
import logging
Expand All @@ -12,6 +13,14 @@
log = logging.getLogger(__name__)


async def create_redis_jobs_pool(config):
return await arq.create_pool(
config,
job_serializer=lambda b: msgpack.packb(b, default=encode_datetime),
job_deserializer=lambda b: msgpack.unpackb(b, object_hook=decode_datetime)
)


def backoff_handler(details):
if details['wait'] > 30 or details['tries'] > 10:
log.debug(
Expand All @@ -24,7 +33,6 @@ def backoff_handler(details):
backoff.expo,
(PydestPrivateHistoryException, PydestMaintenanceException),
max_tries=1, logger=None)
# @backoff.on_exception(backoff.expo, asyncio.TimeoutError, max_tries=1, logger=None)
@backoff.on_exception(backoff.expo, (PydestException, asyncio.TimeoutError), logger=None, on_backoff=backoff_handler)
async def execute_pydest(function, *args, **kwargs):
retval = None
Expand Down

0 comments on commit a42e64c

Please sign in to comment.