Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unzip files after retrieving from external SFTP site #74

Merged
merged 12 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,13 @@ match your newly-created file
4. The app should now read this message and attempt to process it

For the external SFTP call, we've set up a file in docker-compose that's copied to the local SFTP server. The service
then copies it to local Azurite. As of 6/28/24, nothing happens to this file after copy since it's not placed in
an `import` folder.
then copies it to local Azurite. You can add additional files by placing them in `localdata/data/sftp` before running
`docker-compose`.

As of 7/3/24, when we copy a file from the local SFTP server, we try to unzip it
(using the password in `mock_credentials/mock_ca_dph_zip_password.txt` if it's protected). We then place the unzipped
files into the import folder, and if there are any errors, we upload an error file for the zip. If the original file is
not a zip, we just copy it into the import folder.

#### Manual cloud testing
To trigger file ingestion in a deployed environment, go to the `cdcrssftp{env}` storage account in the Azure portal.
Expand Down
15 changes: 15 additions & 0 deletions adr/008-test-naming-convention.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# 8. Test Naming Convention

Date: 2024-07-02

## Decision

For consistency, we'll name tests like
`Test_TheMethodBeingTested_TheCoveredScenario_TheExpectedResult`, e.g.
`Test_Unzip_FileIsPasswordProtected_UnzipsSuccessfully`. For a baseline scenario test,
it's okay to do something like `Test_TheMethodBeingTested_TheExpectedResult`
e.g. `Test_getToken_ReturnsAccessToken`.

## Status

Accepted.
5 changes: 3 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ services:
FLEXION_CLIENT_NAME: flexion.simulated-hospital
QUEUE_MAX_DELIVERY_ATTEMPTS: 5
SFTP_USER: ti_user
SFTP_PASSWORD: ti_password #TODO - update #pragma: allowlist secret
SFTP_KEY_NAME: sftp_server_user_id_rsa.pem #TODO - update #pragma: allowlist secret
SFTP_PASSWORD: ti_password #pragma: allowlist secret
SFTP_KEY_NAME: sftp_server_user_id_rsa.pem #pragma: allowlist secret
SFTP_SERVER_ADDRESS: host.docker.internal:2223 # no http since this is sftp
SFTP_SERVER_PUBLIC_KEY_NAME: ssh_host_rsa_key.pub
CA_DPH_ZIP_PASSWORD_NAME: mock_ca_dph_zip_password.txt #pragma: allowlist secret
volumes:
# map to Azurite data objects to the build directory
- ./localdata/data/reportstream:/localdata
Expand Down
1 change: 1 addition & 0 deletions mock_credentials/mock_ca_dph_zip_password.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test123
1 change: 1 addition & 0 deletions operations/template/app.tf
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ resource "azurerm_linux_web_app" "sftp" {
AZURE_KEY_VAULT_URI = azurerm_key_vault.key_storage.vault_uri
FLEXION_CLIENT_NAME = "flexion.simulated-lab"
QUEUE_MAX_DELIVERY_ATTEMPTS = azurerm_eventgrid_system_topic_event_subscription.topic_sub.retry_policy.0.max_delivery_attempts # making the Azure container <-> queue retry count be in sync with the queue <-> application retry count..
CA_DPH_ZIP_PASSWORD_NAME = azurerm_key_vault_secret.ca_dph_zip_password.name
}

identity {
Expand Down
2 changes: 1 addition & 1 deletion operations/template/event.tf
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ resource "azurerm_eventgrid_system_topic_event_subscription" "topic_sub" {
advanced_filter {
string_contains {
key = "subject"
values = ["import"]
values = ["/import/"]
}
}

Expand Down
12 changes: 12 additions & 0 deletions operations/template/key.tf
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,15 @@ resource "azurerm_key_vault_secret" "mock_public_health_lab_private_key" {
}
depends_on = [azurerm_key_vault_access_policy.allow_github_deployer] //wait for the permission that allows our deployer to write the secret
}

resource "azurerm_key_vault_secret" "ca_dph_zip_password" {
name = "ca-dph-zip-password-${var.environment}"
value = "dogcow"

key_vault_id = azurerm_key_vault.key_storage.id

lifecycle {
ignore_changes = [value]
}
depends_on = [azurerm_key_vault_access_policy.allow_github_deployer] //wait for the permission that allows our deployer to write the secret
}
9 changes: 5 additions & 4 deletions src/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"github.com/CDCgov/reportstream-sftp-ingestion/orchestration"
"github.com/CDCgov/reportstream-sftp-ingestion/sftp"
"github.com/CDCgov/reportstream-sftp-ingestion/utils"
"io"
"log/slog"
"net/http"
Expand All @@ -19,13 +20,13 @@ func main() {

queueHandler, err := orchestration.NewQueueHandler()
if err != nil {
slog.Warn("Failed to create queueHandler", slog.Any("error", err))
slog.Warn("Failed to create queueHandler", slog.Any(utils.ErrorKey, err))
}

// TODO - move calls to SFTP into whatever timer/trigger we set up later
sftpHandler, err := sftp.NewSftpHandler()
if err != nil {
slog.Error("ope, failed to create sftp handler", slog.Any("error", err))
slog.Error("ope, failed to create sftp handler", slog.Any(utils.ErrorKey, err))
// Don't return, we want to let things keep going for now
}
defer sftpHandler.Close()
Expand Down Expand Up @@ -58,12 +59,12 @@ func setupHealthCheck() {

_, err := io.WriteString(response, "Operational")
if err != nil {
slog.Error("Failed to respond to health check", slog.Any("error", err))
slog.Error("Failed to respond to health check", slog.Any(utils.ErrorKey, err))
}
})

err := http.ListenAndServe(":8080", nil)
if err != nil {
slog.Error("Failed to start health check", slog.Any("error", err))
slog.Error("Failed to start health check", slog.Any(utils.ErrorKey, err))
}
}
1 change: 1 addition & 0 deletions src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/google/uuid v1.6.0
github.com/pkg/sftp v1.13.6
github.com/stretchr/testify v1.9.0
github.com/yeka/zip v0.0.0-20231116150916-03d6312748a9
golang.org/x/crypto v0.24.0

)
Expand Down
2 changes: 2 additions & 0 deletions src/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yeka/zip v0.0.0-20231116150916-03d6312748a9 h1:K8gF0eekWPEX+57l30ixxzGhHH/qscI3JCnuhbN6V4M=
github.com/yeka/zip v0.0.0-20231116150916-03d6312748a9/go.mod h1:9BnoKCcgJ/+SLhfAXj15352hTOuVmG5Gzo8xNRINfqI=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
Expand Down
19 changes: 19 additions & 0 deletions src/mocks/mock_credential_getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package mocks

import (
"crypto/rsa"
"github.com/stretchr/testify/mock"
)

type MockCredentialGetter struct {
mock.Mock
}

func (m *MockCredentialGetter) GetPrivateKey(privateKeyName string) (*rsa.PrivateKey, error) {
args := m.Called(privateKeyName)
return args.Get(0).(*rsa.PrivateKey), args.Error(1)
}
func (m *MockCredentialGetter) GetSecret(secretName string) (string, error) {
args := m.Called(secretName)
return args.Get(0).(string), args.Error(1)
}
Binary file added src/mocks/test_data/non-zip-file.zip
Binary file not shown.
Binary file added src/mocks/test_data/passworded.zip
Binary file not shown.
Binary file added src/mocks/test_data/unprotected.zip
Binary file not shown.
33 changes: 17 additions & 16 deletions src/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/azeventgrid"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
"github.com/CDCgov/reportstream-sftp-ingestion/usecases"
"github.com/CDCgov/reportstream-sftp-ingestion/utils"
"log/slog"
"os"
"strconv"
Expand All @@ -31,20 +32,20 @@ func NewQueueHandler() (QueueHandler, error) {

client, err := azqueue.NewQueueClientFromConnectionString(azureQueueConnectionString, "blob-message-queue", nil)
if err != nil {
slog.Error("Unable to create Azure Queue Client for primary queue", slog.Any("error", err))
slog.Error("Unable to create Azure Queue Client for primary queue", slog.Any(utils.ErrorKey, err))
return QueueHandler{}, err
}

dlqClient, err := azqueue.NewQueueClientFromConnectionString(azureQueueConnectionString, "blob-message-dead-letter-queue", nil)
if err != nil {
slog.Error("Unable to create Azure Queue Client for dead letter queue", slog.Any("error", err))
slog.Error("Unable to create Azure Queue Client for dead letter queue", slog.Any(utils.ErrorKey, err))
return QueueHandler{}, err
}

usecase, err := usecases.NewReadAndSendUsecase()

if err != nil {
slog.Error("Unable to create Usecase", slog.Any("error", err))
slog.Error("Unable to create Usecase", slog.Any(utils.ErrorKey, err))
return QueueHandler{}, err
}

Expand All @@ -54,15 +55,15 @@ func NewQueueHandler() (QueueHandler, error) {
func getUrlFromMessage(messageText string) (string, error) {
eventBytes, err := base64.StdEncoding.DecodeString(messageText)
if err != nil {
slog.Error("Failed to decode message text", slog.Any("error", err))
slog.Error("Failed to decode message text", slog.Any(utils.ErrorKey, err))
return "", err
}

// Map bytes json to Event object format (shape)
var event azeventgrid.Event
err = event.UnmarshalJSON(eventBytes)
if err != nil {
slog.Error("Failed to unmarshal event", slog.Any("error", err))
slog.Error("Failed to unmarshal event", slog.Any(utils.ErrorKey, err))
return "", err
}

Expand Down Expand Up @@ -91,7 +92,7 @@ func (receiver QueueHandler) deleteMessage(message azqueue.DequeuedMessage) erro

deleteResponse, err := receiver.queueClient.DeleteMessage(context.Background(), messageId, popReceipt, nil)
if err != nil {
slog.Error("Unable to delete message", slog.Any("error", err))
slog.Error("Unable to delete message", slog.Any(utils.ErrorKey, err))
return err
}

Expand All @@ -112,20 +113,20 @@ func (receiver QueueHandler) handleMessage(message azqueue.DequeuedMessage) erro
sourceUrl, err := getUrlFromMessage(*message.MessageText)

if err != nil {
slog.Error("Failed to get the file URL", slog.Any("error", err))
slog.Error("Failed to get the file URL", slog.Any(utils.ErrorKey, err))
return err
}

err = receiver.usecase.ReadAndSend(sourceUrl)

if err != nil {
slog.Warn("Failed to read/send file", slog.Any("error", err))
slog.Warn("Failed to read/send file", slog.Any(utils.ErrorKey, err))
} else {
// Only delete message if file successfully sent to ReportStream
// (or if there's a known non-transient error and we've moved the file to `failure`)
err = receiver.deleteMessage(message)
if err != nil {
slog.Warn("Failed to delete message", slog.Any("error", err))
slog.Warn("Failed to delete message", slog.Any(utils.ErrorKey, err))
return err
}
}
Expand All @@ -141,14 +142,14 @@ func (receiver QueueHandler) overDeliveryThreshold(message azqueue.DequeuedMessa

if err != nil {
maxDeliveryCount = 5
slog.Warn("Failed to parse QUEUE_MAX_DELIVERY_ATTEMPTS, defaulting to 5", slog.Any("error", err))
slog.Warn("Failed to parse QUEUE_MAX_DELIVERY_ATTEMPTS, defaulting to 5", slog.Any(utils.ErrorKey, err))
}

if *message.DequeueCount > maxDeliveryCount {
slog.Error("Message reached maximum number of delivery attempts", slog.Any("message", message))
err := receiver.deadLetter(message)
if err != nil {
slog.Error("Failed to move message to the DLQ", slog.Any("message", message), slog.Any("error", err))
slog.Error("Failed to move message to the DLQ", slog.Any("message", message), slog.Any(utils.ErrorKey, err))
}
return true
}
Expand All @@ -161,13 +162,13 @@ func (receiver QueueHandler) deadLetter(message azqueue.DequeuedMessage) error {
opts := &azqueue.EnqueueMessageOptions{TimeToLive: to.Ptr(int32(-1))}
_, err := receiver.deadLetterQueueClient.EnqueueMessage(context.Background(), *message.MessageText, opts)
if err != nil {
slog.Error("Failed to add the message to the DLQ", slog.Any("error", err))
slog.Error("Failed to add the message to the DLQ", slog.Any(utils.ErrorKey, err))
return err
}

err = receiver.deleteMessage(message)
if err != nil {
slog.Error("Failed to delete the message to the original queue after adding it to the DLQ", slog.Any("error", err))
slog.Error("Failed to delete the message to the original queue after adding it to the DLQ", slog.Any(utils.ErrorKey, err))
return err
}

Expand All @@ -180,7 +181,7 @@ func (receiver QueueHandler) ListenToQueue() {
for {
err := receiver.receiveQueue()
if err != nil {
slog.Error("Failed to receive message", slog.Any("error", err))
slog.Error("Failed to receive message", slog.Any(utils.ErrorKey, err))
}
time.Sleep(10 * time.Second)
}
Expand All @@ -192,7 +193,7 @@ func (receiver QueueHandler) receiveQueue() error {

messageResponse, err := receiver.queueClient.DequeueMessage(context.Background(), nil)
if err != nil {
slog.Error("Unable to dequeue messages", slog.Any("error", err))
slog.Error("Unable to dequeue messages", slog.Any(utils.ErrorKey, err))
return err
}

Expand All @@ -201,7 +202,7 @@ func (receiver QueueHandler) receiveQueue() error {
go func() {
err := receiver.handleMessage(message)
if err != nil {
slog.Error("Unable to handle message", slog.Any("error", err))
slog.Error("Unable to handle message", slog.Any(utils.ErrorKey, err))
}
}()
}
Expand Down
Loading