Skip to content

Commit

Permalink
Merge pull request #79 from henworth:ratelimit_destiny
Browse files Browse the repository at this point in the history
Implement rate limiting on the Destiny API
  • Loading branch information
henworth authored Apr 17, 2021
2 parents dc4f0a3 + a775880 commit e3ab9fb
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 54 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ urllib3 = ">=1.26.4"
jinja2 = ">=2.11.3"
aiohttp = ">=3.7.4"
dataclasses-json = "*"
pyrate-limiter = "*"

[pipenv]
allow_prereleases = false
Expand Down
79 changes: 43 additions & 36 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ peony-twitter==1.1.7
psycopg2-binary==2.8.6
pycparser==2.20; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
pydantic==1.8.1; python_full_version >= '3.6.1'
pyrate-limiter==2.3.0
pytz==2021.1
redis==3.5.3
requests-oauth2==0.3.0
Expand Down
6 changes: 2 additions & 4 deletions seraphsix/tasks/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,14 @@ async def get_activity_list(ctx, platform_id, member_id, characters, count, full


async def get_last_active(ctx, member_db):
acct_last_active = None
platform_id = member_db.platform_id
member_id, _ = parse_platform(member_db.member, platform_id)

acct_last_active = None
profile = await execute_pydest(
ctx['destiny'].api.get_profile, platform_id, member_id, [constants.COMPONENT_PROFILES])

if not profile.response:
log.error(f"Could not get character data for {platform_id}-{member_id}")
return acct_last_active
log.error(f"Could not get character data for {platform_id}-{member_id}: {profile.message}")
else:
acct_last_active = destiny_date_as_utc(profile.response['profile']['data']['dateLastPlayed'])
log.debug(f"Found last active date for {platform_id}-{member_id}: {acct_last_active}")
Expand Down
19 changes: 11 additions & 8 deletions seraphsix/tasks/clan.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ async def member_sync(bot, guild_id, guild_name):
)

# Figure out if there are any members to add
member_added_dbs = []
members_added = bungie_member_set - db_member_set
for member_hash in members_added:
member_info = bungie_members[member_hash]
Expand All @@ -125,14 +126,7 @@ async def member_sync(bot, guild_id, guild_name):
await bot.database.create(
ClanMember, clan=clan_db, member=member_db, **member_details)

# Ensure we bust the member cache before queueing jobs
await set_cached_members(bot.ext_conns, guild_id, guild_name)

# Kick off activity scans for each of the added members
await bot.ext_conns['redis_jobs'].enqueue_job(
'store_member_history', member_db.id, guild_id, guild_name, full_sync=True,
_job_id=f'store_member_history-{member_db.id}')

member_added_dbs.append(member_db)
member_changes[clan_db.clan_id]['added'].append(member_hash)

# Figure out if there are any members to remove
Expand All @@ -148,8 +142,17 @@ async def member_sync(bot, guild_id, guild_name):
await bot.database.delete(clanmember_db)
member_changes[clan_db.clan_id]['removed'].append(member_hash)

# Ensure we bust the member cache before queueing jobs
await set_cached_members(bot.ext_conns, guild_id, guild_name)

for clan, changes in member_changes.items():
if len(changes['added']):
# Kick off activity scans for each of the added members
for member_db in member_added_dbs.append(member_db):
await bot.ext_conns['redis_jobs'].enqueue_job(
'store_member_history', member_db.id, guild_id, guild_name, full_sync=True,
_job_id=f'store_member_history-{member_db.id}')

changes['added'] = await sort_members(bot.database, changes['added'])
log.info(f"Added members {changes['added']}")
if len(changes['removed']):
Expand Down
29 changes: 28 additions & 1 deletion seraphsix/tasks/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from dataclasses import dataclass, asdict
from datetime import datetime
from get_docker_secret import get_docker_secret
from pyrate_limiter import Duration, RequestRate, Limiter, RedisBucket
from redis import ConnectionPool
from seraphsix.constants import LOG_FORMAT_MSG, DESTINY_DATE_FORMAT, DB_MAX_CONNECTIONS, ROOT_LOG_LEVEL


Expand Down Expand Up @@ -79,8 +81,24 @@ def asdict(self):
return asdict(self)


class Borg:
_shared_state = {}

def __init__(self):
self.__dict__ = self._shared_state

def __hash__(self):
return 1

def __eq__(self, other):
try:
return self.__dict__ is other.__dict__
except Exception:
return 0


@dataclass
class Config:
class Config(Borg):
destiny: DestinyConfig
the100: The100Config
twitter: TwitterConfig
Expand All @@ -98,6 +116,8 @@ class Config:
root_log_level: str

def __init__(self):
Borg.__init__(self)

database_user = get_docker_secret('seraphsix_pg_db_user', default='seraphsix')
database_password = get_docker_secret('seraphsix_pg_db_pass')
database_host = get_docker_secret('seraphsix_pg_db_host', default='localhost')
Expand Down Expand Up @@ -131,3 +151,10 @@ def __init__(self):
self.activity_cutoff = datetime.strptime(self.activity_cutoff, '%Y-%m-%d').astimezone(tz=pytz.utc)

self.root_log_level = get_docker_secret('root_log_level', default=ROOT_LOG_LEVEL, cast_to=str)

bucket_kwargs = {
"redis_pool": ConnectionPool.from_url(self.redis_url),
"bucket_name": "ratelimit"
}
destiny_api_rate = RequestRate(20, Duration.SECOND)
self.destiny_api_limiter = Limiter(destiny_api_rate, bucket_class=RedisBucket, bucket_kwargs=bucket_kwargs)
22 changes: 17 additions & 5 deletions seraphsix/tasks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

from playhouse.shortcuts import model_to_dict
from pydest.pydest import PydestException
from pyrate_limiter import BucketFullException
from seraphsix import constants
from seraphsix.tasks.config import Config
from seraphsix.errors import MaintenanceError, PrivateHistoryError
from seraphsix.models.destiny import DestinyResponse, DestinyTokenResponse, DestinyTokenErrorResponse
from seraphsix.tasks.parsing import decode_datetime, encode_datetime

log = logging.getLogger(__name__)
config = Config()


async def create_redis_jobs_pool(config):
Expand All @@ -30,12 +33,16 @@ def backoff_handler(details):
)


@backoff.on_exception(backoff.expo, (PrivateHistoryError, MaintenanceError), max_tries=0, logger=None)
@backoff.on_exception(backoff.expo, (PydestException, asyncio.TimeoutError), logger=None, on_backoff=backoff_handler)
@backoff.on_exception(backoff.constant, (PrivateHistoryError, MaintenanceError), max_tries=1, logger=None)
@backoff.on_exception(
backoff.expo, (PydestException, asyncio.TimeoutError, BucketFullException), logger=None, on_backoff=backoff_handler)
async def execute_pydest(function, *args, **kwargs):
retval = None
log.debug(f"{function} {args} {kwargs}")
data = await function(*args, **kwargs)

async with config.destiny_api_limiter.ratelimit('destiny_api', delay=True):
data = await function(*args, **kwargs)

log.debug(f"{function} {args} {kwargs} - {data}")

try:
Expand All @@ -45,16 +52,21 @@ async def execute_pydest(function, *args, **kwargs):
res = DestinyTokenResponse.from_dict(data)
except KeyError:
res = DestinyTokenErrorResponse.from_dict(data)
except Exception:
raise RuntimeError(f"Cannot parse Destiny API response {data}")
else:
if res.error_status != 'Success':
log.error(f"Error running {function} {args} {kwargs} - {res}")
# https://bungie-net.github.io/#/components/schemas/Exceptions.PlatformErrorCodes
if res.error_status == 'SystemDisabled':
raise MaintenanceError
elif res.error_status == 'PerEndpointRequestThrottleExceeded':
elif res.error_status in ['PerEndpointRequestThrottleExceeded', 'DestinyDirectBabelClientTimeout']:
raise PydestException
elif res.error_status == 'DestinyPrivacyRestriction':
raise PrivateHistoryError
else:
log.error(f"Error running {function} {args} {kwargs} - {res}")
if res.error_status in ['DestinyAccountNotFound']:
raise PydestException
retval = res
log.debug(f"{function} {args} {kwargs} - {res}")
return retval
Expand Down

0 comments on commit e3ab9fb

Please sign in to comment.