Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Production Deploy 8/19/2024 #1278

Merged
merged 17 commits into from
Aug 19, 2024
147 changes: 92 additions & 55 deletions app/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,52 @@
JOBS_CACHE_HITS = "JOBS_CACHE_HITS"
JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES"

# Global variable
s3_client = None
s3_resource = None


def get_s3_client():
global s3_client
if s3_client is None:
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
s3_client = session.client("s3")
return s3_client


def get_s3_resource():
global s3_resource
if s3_resource is None:
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
s3_resource = session.resource("s3", config=AWS_CLIENT_CONFIG)
return s3_resource


def list_s3_objects():
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
s3 = session.client("s3")

bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
s3_client = get_s3_client()
try:
response = s3.list_objects_v2(Bucket=bucket_name)
response = s3_client.list_objects_v2(Bucket=bucket_name)
while True:
for obj in response.get("Contents", []):
yield obj["Key"]
if "NextContinuationToken" in response:
response = s3.list_objects_v2(
response = s3_client.list_objects_v2(
Bucket=bucket_name,
ContinuationToken=response["NextContinuationToken"],
)
Expand All @@ -51,19 +77,11 @@ def list_s3_objects():


def get_s3_files():
current_app.logger.info("Regenerate job cache #notify-admin-1200")

bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
objects = list_s3_objects()

s3res = session.resource("s3", config=AWS_CLIENT_CONFIG)
s3res = get_s3_resource()
current_app.logger.info(
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200"
)
Expand Down Expand Up @@ -99,12 +117,8 @@ def get_s3_file(bucket_name, file_location, access_key, secret_key, region):
def download_from_s3(
bucket_name, s3_key, local_filename, access_key, secret_key, region
):
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
s3 = session.client("s3", config=AWS_CLIENT_CONFIG)

s3 = get_s3_client()
result = None
try:
result = s3.download_file(bucket_name, s3_key, local_filename)
Expand All @@ -123,27 +137,28 @@ def download_from_s3(


def get_s3_object(bucket_name, file_location, access_key, secret_key, region):
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
s3 = session.resource("s3", config=AWS_CLIENT_CONFIG)
return s3.Object(bucket_name, file_location)

s3 = get_s3_resource()
try:
return s3.Object(bucket_name, file_location)
except botocore.exceptions.ClientError:
current_app.logger.error(
f"Can't retrieve S3 Object from {file_location}", exc_info=True
)


def purge_bucket(bucket_name, access_key, secret_key, region):
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
s3 = session.resource("s3", config=AWS_CLIENT_CONFIG)
s3 = get_s3_resource()
bucket = s3.Bucket(bucket_name)
bucket.objects.all().delete()


def file_exists(bucket_name, file_location, access_key, secret_key, region):
def file_exists(file_location):
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]

try:
# try and access metadata of object
get_s3_object(
Expand Down Expand Up @@ -172,9 +187,25 @@ def get_job_and_metadata_from_s3(service_id, job_id):


def get_job_from_s3(service_id, job_id):
"""
If and only if we hit a throttling exception of some kind, we want to try
exponential backoff. However, if we are getting NoSuchKey or something
that indicates things are permanently broken, we want to give up right away
to save time.
"""
# We have to make sure the retries don't take up to much time, because
# we might be retrieving dozens of jobs. So max time is:
# 0.2 + 0.4 + 0.8 + 1.6 = 3.0 seconds
retries = 0
max_retries = 3
backoff_factor = 1
max_retries = 4
backoff_factor = 0.2

if not file_exists(FILE_LOCATION_STRUCTURE.format(service_id, job_id)):
current_app.logger.error(
f"This file does not exist {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}"
)
return None

while retries < max_retries:

try:
Expand All @@ -186,24 +217,34 @@ def get_job_from_s3(service_id, job_id):
"RequestTimeout",
"SlowDown",
]:
current_app.logger.error(
f"Retrying job fetch {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
exc_info=True,
)
retries += 1
sleep_time = backoff_factor * (2**retries) # Exponential backoff
time.sleep(sleep_time)
continue
else:
# Typically this is "NoSuchKey"
current_app.logger.error(
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} from bucket",
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
exc_info=True,
)
return None

except Exception:
current_app.logger.error(
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} from bucket",
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
exc_info=True,
)
return None

raise Exception("Failed to get object after 3 attempts")
current_app.logger.error(
f"Never retrieved job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
exc_info=True,
)
return None


def incr_jobs_cache_misses():
Expand Down Expand Up @@ -274,19 +315,15 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
if job is None:
current_app.logger.info(f"job {job_id} was not in the cache")
job = get_job_from_s3(service_id, job_id)
# Even if it is None, put it here to avoid KeyErrors
JOBS[job_id] = job
incr_jobs_cache_misses()
else:
incr_jobs_cache_hits()

# If the job is None after our attempt to retrieve it from s3, it
# probably means the job is old and has been deleted from s3, in
# which case there is nothing we can do. It's unlikely to run into
# this, but it could theoretically happen, especially if we ever
# change the task schedules
if job is None:
current_app.logger.warning(
f"Couldnt find phone for job_id {job_id} row number {job_row_number} because job is missing"
current_app.logger.error(
f"Couldnt find phone for job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} because job is missing"
)
return "Unavailable"

Expand Down Expand Up @@ -331,7 +368,7 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
# change the task schedules
if job is None:
current_app.logger.warning(
"Couldnt find personalisation for job_id {job_id} row number {job_row_number} because job is missing"
f"Couldnt find personalisation for job_id {job_id} row number {job_row_number} because job is missing"
)
return {}

Expand Down
2 changes: 0 additions & 2 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

class QueueNames(object):
PERIODIC = "periodic-tasks"
PRIORITY = "priority-tasks"
DATABASE = "database-tasks"
SEND_SMS = "send-sms-tasks"
CHECK_SMS = "check-sms_tasks"
Expand All @@ -30,7 +29,6 @@ class QueueNames(object):
@staticmethod
def all_queues():
return [
QueueNames.PRIORITY,
QueueNames.PERIODIC,
QueueNames.DATABASE,
QueueNames.SEND_SMS,
Expand Down
24 changes: 2 additions & 22 deletions app/dao/provider_details_dao.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime

from flask import current_app
from sqlalchemy import asc, desc, func
from sqlalchemy import desc, func

from app import db
from app.dao.dao_utils import autocommit
Expand Down Expand Up @@ -33,20 +33,6 @@ def dao_get_provider_versions(provider_id):
)


def _adjust_provider_priority(provider, new_priority):
current_app.logger.info(
f"Adjusting provider priority - {provider.identifier} going from {provider.priority} to {new_priority}"
)
provider.priority = new_priority

# Automatic update so set as notify user
provider.created_by_id = current_app.config["NOTIFY_USER_ID"]

# update without commit so that both rows can be changed without ending the transaction
# and releasing the for_update lock
_update_provider_details_without_commit(provider)


def _get_sms_providers_for_update(time_threshold):
"""
Returns a list of providers, while holding a for_update lock on the provider details table, guaranteeing that those
Expand Down Expand Up @@ -86,11 +72,7 @@ def get_provider_details_by_notification_type(
if supports_international:
filters.append(ProviderDetails.supports_international == supports_international)

return (
ProviderDetails.query.filter(*filters)
.order_by(asc(ProviderDetails.priority))
.all()
)
return ProviderDetails.query.filter(*filters).all()


@autocommit
Expand Down Expand Up @@ -135,7 +117,6 @@ def dao_get_provider_stats():
ProviderDetails.id,
ProviderDetails.display_name,
ProviderDetails.identifier,
ProviderDetails.priority,
ProviderDetails.notification_type,
ProviderDetails.active,
ProviderDetails.updated_at,
Expand All @@ -149,7 +130,6 @@ def dao_get_provider_stats():
.outerjoin(User, ProviderDetails.created_by_id == User.id)
.order_by(
ProviderDetails.notification_type,
ProviderDetails.priority,
)
.all()
)
Expand Down
2 changes: 0 additions & 2 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,6 @@ class ProviderDetails(db.Model):
id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
display_name = db.Column(db.String, nullable=False)
identifier = db.Column(db.String, nullable=False)
priority = db.Column(db.Integer, nullable=False)
notification_type = enum_column(NotificationType, nullable=False)
active = db.Column(db.Boolean, default=False, nullable=False)
version = db.Column(db.Integer, default=1, nullable=False)
Expand All @@ -1322,7 +1321,6 @@ class ProviderDetailsHistory(db.Model, HistoryModel):
id = db.Column(UUID(as_uuid=True), primary_key=True, nullable=False)
display_name = db.Column(db.String, nullable=False)
identifier = db.Column(db.String, nullable=False)
priority = db.Column(db.Integer, nullable=False)
notification_type = enum_column(NotificationType, nullable=False)
active = db.Column(db.Boolean, nullable=False)
version = db.Column(db.Integer, primary_key=True, nullable=False)
Expand Down
9 changes: 2 additions & 7 deletions app/notifications/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

from app import api_user, authenticated_service
from app.aws.s3 import get_personalisation_from_s3, get_phone_number_from_s3
from app.config import QueueNames
from app.dao import notifications_dao
from app.enums import KeyType, NotificationType, TemplateProcessType
from app.enums import KeyType, NotificationType
from app.errors import InvalidRequest, register_errors
from app.notifications.process_notifications import (
persist_notification,
Expand Down Expand Up @@ -168,11 +167,7 @@ def send_notification(notification_type):
reply_to_text=template.reply_to_text,
)
if not simulated:
queue_name = (
QueueNames.PRIORITY
if template.process_type == TemplateProcessType.PRIORITY
else None
)
queue_name = None
send_notification_to_queue(notification=notification_model, queue=queue_name)

else:
Expand Down
1 change: 0 additions & 1 deletion app/provider_details/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def get_providers():
"id": row.id,
"display_name": row.display_name,
"identifier": row.identifier,
"priority": row.priority,
"notification_type": row.notification_type,
"active": row.active,
"updated_at": row.updated_at,
Expand Down
Loading
Loading