Skip to content

Commit

Permalink
Refactor queue reader and start stubbing out polling queue message ha…
Browse files Browse the repository at this point in the history
…ndler (#82)

* Refactor import specific message handling out of QueueHandler
* update tests
* placeholder for polling message handler and some tidying

Co-authored-by: James Herr <jherr@flexion.us>
Co-authored-by: jcrichlake <145698165+jcrichlake@users.noreply.github.com>
Co-authored-by: pluckyswan <96704946+pluckyswan@users.noreply.github.com>
Co-authored-by: saquino0827 <saquino@flexion.us>
Co-authored-by: jherrflexion <118225331+jherrflexion@users.noreply.github.com>
  • Loading branch information
6 people authored Jul 17, 2024
1 parent e8ca6ea commit 15e5814
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 88 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,6 @@ local.settings.json
__blobstorage__
__queuestorage__
__azurite_db*__.json

# Apple nonsense
.DS_Store
Binary file removed azure_functions/.DS_Store
Binary file not shown.
10 changes: 7 additions & 3 deletions src/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ func main() {

go setupHealthCheck()

queueHandler, err := orchestration.NewQueueHandler()
importMessageHandler, err := orchestration.NewImportMessageHandler()
if err != nil {
slog.Warn("Failed to create queueHandler", slog.Any(utils.ErrorKey, err))
slog.Warn("Failed to create importMessageHandler", slog.Any(utils.ErrorKey, err))
}
importQueueHandler, err := orchestration.NewQueueHandler(importMessageHandler, "message-import")
if err != nil {
slog.Warn("Failed to create importQueueHandler", slog.Any(utils.ErrorKey, err))
}

// TODO - move calls to SFTP into whatever timer/trigger we set up later
Expand All @@ -40,7 +44,7 @@ func main() {

// TODO - add another queue listener for the other queue? Or maybe one listener for all queues but different message handling?
// ListenToQueue is not split into a separate Go Routine since it is the core driver of the application
queueHandler.ListenToQueue()
importQueueHandler.ListenToQueue()
}

func setupLogging() {
Expand Down
71 changes: 71 additions & 0 deletions src/orchestration/import_message_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package orchestration

import (
"encoding/base64"
"errors"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/azeventgrid"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
"github.com/CDCgov/reportstream-sftp-ingestion/usecases"
"github.com/CDCgov/reportstream-sftp-ingestion/utils"
"log/slog"
)

type ImportMessageHandler struct {
usecase usecases.ReadAndSend
}

func NewImportMessageHandler() (ImportMessageHandler, error) {
usecase, err := usecases.NewReadAndSendUsecase()

if err != nil {
slog.Error("Unable to create Usecase", slog.Any(utils.ErrorKey, err))
return ImportMessageHandler{}, err
}

return ImportMessageHandler{usecase: &usecase}, nil
}

func (receiver ImportMessageHandler) HandleMessageContents(message azqueue.DequeuedMessage) error {
sourceUrl, err := getUrlFromMessage(*message.MessageText)

if err != nil {
slog.Error("Failed to get the file URL", slog.Any(utils.ErrorKey, err))
return err
}

return receiver.usecase.ReadAndSend(sourceUrl)
}

func getUrlFromMessage(messageText string) (string, error) {
eventBytes, err := base64.StdEncoding.DecodeString(messageText)
if err != nil {
slog.Error("Failed to decode message text", slog.Any(utils.ErrorKey, err))
return "", err
}

// Map bytes json to Event object format (shape)
var event azeventgrid.Event
err = event.UnmarshalJSON(eventBytes)
if err != nil {
slog.Error("Failed to unmarshal event", slog.Any(utils.ErrorKey, err))
return "", err
}

// Data is an 'any' type. We need to tell Go that it's a map
eventData, ok := event.Data.(map[string]any)

if !ok {
slog.Error("Could not assert event data to a map", slog.Any("event", event))
return "", errors.New("could not assert event data to a map")
}

// Extract blob url from Event's data
eventUrl, ok := eventData["url"].(string)

if !ok {
slog.Error("Could not assert event data url to a string", slog.Any("event", event))
return "", errors.New("could not assert event data url to a string")
}

return eventUrl, nil
}
20 changes: 20 additions & 0 deletions src/orchestration/polling_message_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package orchestration

import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
)

type PollingMessageHandler struct {
// TODO - add SFTP handler
}

func NewPollingMessageHandler() (PollingMessageHandler, error) {
// TODO - add SFTP handler

return PollingMessageHandler{}, nil
}

func (receiver PollingMessageHandler) HandleMessageContents(message azqueue.DequeuedMessage) error {
// TODO - use SFTP handler
return nil
}
80 changes: 10 additions & 70 deletions src/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package orchestration

import (
"context"
"encoding/base64"
"errors"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/azeventgrid"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
"github.com/CDCgov/reportstream-sftp-ingestion/usecases"
"github.com/CDCgov/reportstream-sftp-ingestion/utils"
"log/slog"
"os"
Expand All @@ -18,7 +15,7 @@ import (
type QueueHandler struct {
queueClient QueueClient
deadLetterQueueClient QueueClient
usecase usecases.ReadAndSend
messageContentHandler MessageContentHandler
}

type QueueClient interface {
Expand All @@ -27,71 +24,28 @@ type QueueClient interface {
EnqueueMessage(ctx context.Context, content string, o *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error)
}

// TODO - pass in queue names? Have different queue handlers for the polling queue and the import queues?
// labeled each function based on whether it's specific to the import flow or not
type MessageContentHandler interface {
HandleMessageContents(message azqueue.DequeuedMessage) error
}

func NewQueueHandler() (QueueHandler, error) {
func NewQueueHandler(messageContentHandler MessageContentHandler, queueBaseName string) (QueueHandler, error) {
azureQueueConnectionString := os.Getenv("AZURE_STORAGE_CONNECTION_STRING")

client, err := azqueue.NewQueueClientFromConnectionString(azureQueueConnectionString, "message-import-queue", nil)
client, err := azqueue.NewQueueClientFromConnectionString(azureQueueConnectionString, queueBaseName+"-queue", nil)
if err != nil {
slog.Error("Unable to create Azure Queue Client for primary queue", slog.Any(utils.ErrorKey, err))
return QueueHandler{}, err
}

dlqClient, err := azqueue.NewQueueClientFromConnectionString(azureQueueConnectionString, "message-import-dead-letter-queue", nil)
dlqClient, err := azqueue.NewQueueClientFromConnectionString(azureQueueConnectionString, queueBaseName+"-dead-letter-queue", nil)
if err != nil {
slog.Error("Unable to create Azure Queue Client for dead letter queue", slog.Any(utils.ErrorKey, err))
return QueueHandler{}, err
}

// TODO - this is only relevant for the import queue, not the polling one
usecase, err := usecases.NewReadAndSendUsecase()

if err != nil {
slog.Error("Unable to create Usecase", slog.Any(utils.ErrorKey, err))
return QueueHandler{}, err
}

return QueueHandler{queueClient: client, deadLetterQueueClient: dlqClient, usecase: &usecase}, nil
return QueueHandler{queueClient: client, deadLetterQueueClient: dlqClient, messageContentHandler: messageContentHandler}, nil
}

// TODO - import-specific
func getUrlFromMessage(messageText string) (string, error) {
eventBytes, err := base64.StdEncoding.DecodeString(messageText)
if err != nil {
slog.Error("Failed to decode message text", slog.Any(utils.ErrorKey, err))
return "", err
}

// Map bytes json to Event object format (shape)
var event azeventgrid.Event
err = event.UnmarshalJSON(eventBytes)
if err != nil {
slog.Error("Failed to unmarshal event", slog.Any(utils.ErrorKey, err))
return "", err
}

// Data is an 'any' type. We need to tell Go that it's a map
eventData, ok := event.Data.(map[string]any)

if !ok {
slog.Error("Could not assert event data to a map", slog.Any("event", event))
return "", errors.New("could not assert event data to a map")
}

// Extract blob url from Event's data
eventUrl, ok := eventData["url"].(string)

if !ok {
slog.Error("Could not assert event data url to a string", slog.Any("event", event))
return "", errors.New("could not assert event data url to a string")
}

return eventUrl, nil
}

// TODO - NOT import-specific
func (receiver QueueHandler) deleteMessage(message azqueue.DequeuedMessage) error {
messageId := *message.MessageID
popReceipt := *message.PopReceipt
Expand All @@ -107,7 +61,6 @@ func (receiver QueueHandler) deleteMessage(message azqueue.DequeuedMessage) erro
return nil
}

// TODO - partly import-specific. Will need updates or a new version that kicks off the SFTP code
func (receiver QueueHandler) handleMessage(message azqueue.DequeuedMessage) error {

slog.Info("Handling message", slog.String("id", *message.MessageID))
Expand All @@ -117,20 +70,11 @@ func (receiver QueueHandler) handleMessage(message azqueue.DequeuedMessage) erro
return errors.New("message delivery threshold exceeded")
}

sourceUrl, err := getUrlFromMessage(*message.MessageText)

if err != nil {
slog.Error("Failed to get the file URL", slog.Any(utils.ErrorKey, err))
return err
}

err = receiver.usecase.ReadAndSend(sourceUrl)
err := receiver.messageContentHandler.HandleMessageContents(message)

if err != nil {
slog.Warn("Failed to read/send file", slog.Any(utils.ErrorKey, err))
slog.Warn("Failed to handle message", slog.Any(utils.ErrorKey, err))
} else {
// Only delete message if file successfully sent to ReportStream
// (or if there's a known non-transient error and we've moved the file to `failure`)
err = receiver.deleteMessage(message)
if err != nil {
slog.Warn("Failed to delete message", slog.Any(utils.ErrorKey, err))
Expand All @@ -141,7 +85,6 @@ func (receiver QueueHandler) handleMessage(message azqueue.DequeuedMessage) erro
return nil
}

// TODO - NOT import-specific
// overDeliveryThreshold checks whether the max delivery attempts for the message have been reached.
// If the threshold has been reached, the message should go to dead letter storage.
// Return true if we're over the threshold and should stop processing, else return false
Expand All @@ -164,7 +107,6 @@ func (receiver QueueHandler) overDeliveryThreshold(message azqueue.DequeuedMessa
return false
}

// TODO - NOT import-specific
func (receiver QueueHandler) deadLetter(message azqueue.DequeuedMessage) error {

// a TimeToLive of -1 means the message will not expire
Expand All @@ -186,7 +128,6 @@ func (receiver QueueHandler) deadLetter(message azqueue.DequeuedMessage) error {
return nil
}

// TODO - NOT import-specific
func (receiver QueueHandler) ListenToQueue() {
for {
err := receiver.receiveQueue()
Expand All @@ -197,7 +138,6 @@ func (receiver QueueHandler) ListenToQueue() {
}
}

// TODO - NOT import-specific
func (receiver QueueHandler) receiveQueue() error {

slog.Info("Trying to dequeue")
Expand Down
Loading

0 comments on commit 15e5814

Please sign in to comment.