Skip to content

Commit

Permalink
Merge pull request #863 from GSA/message-send-flow-docs
Browse files Browse the repository at this point in the history
Message send flow docs
  • Loading branch information
ccostino authored Mar 28, 2024
2 parents 8d11133 + 622a8b9 commit 6396dbe
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 10 deletions.
3 changes: 2 additions & 1 deletion app/celery/provider_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300
)
def deliver_sms(self, notification_id):
"""Branch off to the final step in delivering the notification to sns and get delivery receipts."""
try:
current_app.logger.info(
"Start sending SMS for notification id: {}".format(notification_id)
Expand All @@ -108,7 +109,7 @@ def deliver_sms(self, notification_id):
current_app.logger.warning(
ansi_green + f"AUTHENTICATION CODE: {notification.content}" + ansi_reset
)

# Code branches off to send_to_providers.py
message_id = send_to_providers.send_sms_to_provider(notification)
# We have to put it in UTC. For other timezones, the delay
# will be ignored and it will fire immediately (although this probably only affects developer testing)
Expand Down
14 changes: 13 additions & 1 deletion app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

@notify_celery.task(name="process-job")
def process_job(job_id, sender_id=None):
"""Update job status, get csv data from s3, and begin processing csv rows."""
start = datetime.utcnow()
job = dao_get_job_by_id(job_id)
current_app.logger.info(
Expand Down Expand Up @@ -74,6 +75,7 @@ def process_job(job_id, sender_id=None):
for row in recipient_csv.get_rows():
process_row(row, template, job, service, sender_id=sender_id)

# End point/Exit point for message send flow.
job_complete(job, start=start)


Expand Down Expand Up @@ -109,6 +111,7 @@ def get_recipient_csv_and_template_and_sender_id(job):


def process_row(row, template, job, service, sender_id=None):
"""Branch off based on notification type, sms or email."""
template_type = template.template_type
encrypted = encryption.encrypt(
{
Expand All @@ -121,6 +124,8 @@ def process_row(row, template, job, service, sender_id=None):
}
)

# Both save_sms and save_email have the same general
# persist logic.
send_fns = {NotificationType.SMS: save_sms, NotificationType.EMAIL: save_email}

send_fn = send_fns[template_type]
Expand All @@ -130,6 +135,7 @@ def process_row(row, template, job, service, sender_id=None):
task_kwargs["sender_id"] = sender_id

notification_id = create_uuid()
# Kick-off persisting notification in save_sms/save_email.
send_fn.apply_async(
(
str(service.id),
Expand Down Expand Up @@ -163,7 +169,11 @@ def __total_sending_limits_for_job_exceeded(service, job, job_id):

@notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300)
def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None):
"""Persist notification to db and place notification in queue to send to sns."""
notification = encryption.decrypt(encrypted_notification)
# SerialisedService and SerialisedTemplate classes are
# used here to grab the same service and template from the cache
# to improve performance.
service = SerialisedService.from_id(service_id)
template = SerialisedTemplate.from_id_and_service_id(
notification["template"],
Expand All @@ -177,7 +187,8 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
).sms_sender
else:
reply_to_text = template.reply_to_text

# Return False when trial mode services try sending notifications
# to non-team and non-simulated recipients.
if not service_allowed_to_send_to(notification["to"], service, KeyType.NORMAL):
current_app.logger.debug(
"SMS {} failed as restricted service".format(notification_id)
Expand Down Expand Up @@ -208,6 +219,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
reply_to_text=reply_to_text,
)

# Kick off sns process in provider_tasks.py
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)], queue=QueueNames.SEND_SMS
)
Expand Down
6 changes: 6 additions & 0 deletions app/delivery/send_to_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@


def send_sms_to_provider(notification):
"""Final step in the message send flow.
Get data for recipient, template,
notification and send it to sns.
"""
# we no longer store the personalisation in the db,
# need to retrieve from s3 before generating content
# However, we are still sending the initial verify code through personalisation
Expand All @@ -41,6 +46,7 @@ def send_sms_to_provider(notification):
return

if notification.status == NotificationStatus.CREATED:
# We get the provider here (which is only aws sns)
provider = provider_to_use(NotificationType.SMS, notification.international)
if not provider:
technical_failure(notification=notification)
Expand Down
3 changes: 2 additions & 1 deletion app/job/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def get_jobs_by_service(service_id):

@job_blueprint.route("", methods=["POST"])
def create_job(service_id):
"""Entry point from UI for one-off messages as well as CSV uploads."""
service = dao_fetch_service_by_id(service_id)
if not service.active:
raise InvalidRequest("Create job is not allowed: service is inactive ", 403)
Expand Down Expand Up @@ -204,7 +205,7 @@ def create_job(service_id):
dao_create_job(job)

sender_id = data.get("sender_id")

# Kick off job in tasks.py
if job.job_status == JobStatus.PENDING:
process_job.apply_async(
[str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS
Expand Down
19 changes: 12 additions & 7 deletions docs/all.md
Original file line number Diff line number Diff line change
Expand Up @@ -542,19 +542,24 @@ All commands use the `-g` or `--generate` to determine how many instances to loa

# How messages are queued and sent

Services used during message-send flow:
1. AWS S3
2. AWS SNS
3. AWS Cloudwatch
4. Redis
5. PostgreSQL

There are several ways for notifications to come into the API.

- Messages sent through the API enter through `app/notifications/post_notifications.py`
- One-off messages sent from the UI enter through `create_one_off_notification` in `app/service/rest.py`
- CSV uploads enter through `app/job/rest.py`
- One-off messages and CSV uploads both enter from the UI through `app/job/rest.py:create_job`

API messages and one-off UI messages come in one at a time, and take slightly-separate routes
that both end up at `persist_notification`, which writes to the database, and `provider_tasks.deliver_sms`,
API messages come in one at a time, and end up at `persist_notification`, which writes to the database, and `provider_tasks.deliver_sms`,
which enqueues the sending.

For CSV uploads, the CSV is first stored in S3 and queued as a `Job`. When the job runs, it iterates
through the rows, running `process_job.save_sms` to send notifications through `persist_notification` and
`provider_tasks.deliver_sms`.
One-off messages and batch messages both upload a CSV, which are then first stored in S3 and queued as a `Job`. When the job runs, it iterates
through the rows from `tasks.py:process_row`, running `tasks.py:save_sms` (email notifications branch off through `tasks.py:save_email`) to write to the db with `persist_notification` and begin the process of delivering the notification to the provider
through `provider_tasks.deliver_sms`. The exit point to the provider is in `send_to_providers.py:send_sms`.

# Writing public APIs

Expand Down

0 comments on commit 6396dbe

Please sign in to comment.