Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1080: Update Read and send errors #124

Merged
merged 4 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/senders/report_stream_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ func (sender Sender) SendMessage(message []byte) (string, error) {
slog.Info("status", slog.Any("code", res.StatusCode), slog.String("status", res.Status))
// The response body from ReportStream may include additional error details. See examples in json_responses.go
slog.Info("response body", slog.String("responseBodyBytes", string(responseBodyBytes)))
if res.StatusCode >= 400 && res.StatusCode < 500 {
return "", errors.New(utils.ReportStreamNonTransientFailure)
}
return "", errors.New(res.Status)
}

Expand Down
165 changes: 134 additions & 31 deletions src/senders/report_stream_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,11 @@ func (suite *SenderTestSuite) Test_NewSender_EnvIsEmpty_ReturnsSenderWithLocalCr
func (suite *SenderTestSuite) Test_GenerateJWT_ReturnsJWT() {

sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
assert.NoError(suite.T(), err)

mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)
jwt, err := sender.generateJwt()

Expand All @@ -74,7 +71,6 @@ func (suite *SenderTestSuite) Test_GenerateJWT_ReturnsJWT() {
func (suite *SenderTestSuite) Test_GenerateJWT_UnableToGetPrivateKey_ReturnsError() {

sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter
Expand All @@ -87,14 +83,11 @@ func (suite *SenderTestSuite) Test_GenerateJWT_UnableToGetPrivateKey_ReturnsErro

func (suite *SenderTestSuite) Test_getToken_ReturnsAccessToken() {
sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
assert.NoError(suite.T(), err)

mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)

// Set up a test server for ReportStream
Expand Down Expand Up @@ -122,7 +115,6 @@ func (suite *SenderTestSuite) Test_getToken_ReturnsAccessToken() {

func (suite *SenderTestSuite) Test_getToken_UnableToGenerateJWT_ReturnsError() {
sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter
Expand All @@ -137,14 +129,11 @@ func (suite *SenderTestSuite) Test_getToken_UnableToGenerateJWT_ReturnsError() {
func (suite *SenderTestSuite) Test_getToken_UnableToCallTokenEndpoint_ReturnsError() {
os.Setenv("REPORT_STREAM_URL_PREFIX", "this is not a good URL")
sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
assert.NoError(suite.T(), err)

mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)

token, err := sender.getToken()
Expand All @@ -155,14 +144,11 @@ func (suite *SenderTestSuite) Test_getToken_UnableToCallTokenEndpoint_ReturnsErr

func (suite *SenderTestSuite) Test_getToken_ReportStreamResponseStatusIsInvalid_ReturnsError() {
sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
assert.NoError(suite.T(), err)

mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)

// Set up a test server for ReportStream
Expand Down Expand Up @@ -190,14 +176,11 @@ func (suite *SenderTestSuite) Test_getToken_ReportStreamResponseStatusIsInvalid_

func (suite *SenderTestSuite) Test_getToken_UnableToMarshallResponseBody_ReturnsError() {
sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
assert.NoError(suite.T(), err)

mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)

// Set up a test server for ReportStream
Expand All @@ -221,14 +204,11 @@ func (suite *SenderTestSuite) Test_getToken_UnableToMarshallResponseBody_Returns

func (suite *SenderTestSuite) Test_SendMessage_MessageSentToReportStream_ReturnsReportId() {
sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
assert.NoError(suite.T(), err)

mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)

// Set up a test server for ReportStream
Expand Down Expand Up @@ -288,14 +268,11 @@ func (suite *SenderTestSuite) Test_SendMessage_MessageSentToReportStream_Returns

func (suite *SenderTestSuite) Test_SendMessage_UnableToGetToken_ReturnsError() {
sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
assert.NoError(suite.T(), err)

mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, errors.New(utils.ErrorKey))

message, _ := os.ReadFile(filepath.Join("..", "..", "mock_data", "order_message.hl7"))
Expand All @@ -309,14 +286,11 @@ func (suite *SenderTestSuite) Test_SendMessage_UnableToGetToken_ReturnsError() {

func (suite *SenderTestSuite) Test_SendMessage_UnableToCallTokenEndpoint_ReturnsError() {
sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
assert.NoError(suite.T(), err)

mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)

// Set up a test server for ReportStream
Expand Down Expand Up @@ -351,14 +325,77 @@ func (suite *SenderTestSuite) Test_SendMessage_UnableToCallTokenEndpoint_Returns

func (suite *SenderTestSuite) Test_SendMessage_StatusCodeIsAbove300_ReturnsError() {
sender, err := NewSender()
assert.NoError(suite.T(), err)

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
assert.NoError(suite.T(), err)
mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)

// Set up a test server for ReportStream
// Response parts: Body, Status Code, Access Token (part of body), Error (part of body)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

if r.URL.Path == "/api/token" {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{
"sub": "flexion.*.report_e6b68103-dd38-420e-8118-2b2f6c9fa3c4",
"access_token": "eyJhbGciOiJIUzM4NCJ9.eyJleHAiOjE3MTk1MjcyNzgsInNjb3BlIjoiZmxleGlvbi4qLnJlcG9ydCIsInN1YiI6ImZsZXhpb24uKi5yZXBvcnRfZTZiNjgxMDMtZGQzOC00MjBlLTgxMTgtMmIyZjZjOWZhM2M0In0.liHv9SJYxztgMmCPKGIF2lzcMMMzFAoatLlIC33uz5jbA5wSJa8iIa5yzJ1ZaECI",
"token_type": "bearer",
"expires_in": 300,
"expires_at_seconds": 1719527278,
"scope": "flexion.*.report"
}
`))
} else {
w.WriteHeader(http.StatusFound)
w.Write([]byte(`
{
"id" : "78809588-1193-4861-a6a7-52493f7dd254",
"submissionId" : 26,
"overallStatus" : "Received",
"timestamp" : "2024-05-20T21:11:36.144Z",
"plannedCompletionAt" : null,
"actualCompletionAt" : null,
"sender" : "flexion.simulated-hospital",
"reportItemCount" : 1,
"errorCount" : 0,
"warningCount" : 0,
"httpStatus" : 201,
"destinations" : [ ],
"actionName" : "receive",
"externalName" : null,
"reportId" : "78809588-1193-4861-a6a7-52493f7dd254",
"topic" : "etor-ti",
"bodyFormat" : "",
"errors" : [ ],
"warnings" : [ ],
"destinationCount" : 0,
"fileName" : ""
}
`))
}

}))
defer server.Close()

sender.baseUrl = server.URL
message, _ := os.ReadFile(filepath.Join("..", "..", "mock_data", "order_message.hl7"))

reportId, err := sender.SendMessage(message)

assert.Error(suite.T(), err)
assert.Equal(suite.T(), "302 Found", err.Error())
assert.Equal(suite.T(), "", reportId)
}

func (suite *SenderTestSuite) Test_SendMessage_StatusCodeIs400_ReturnsNonTransientError() {
sender, err := NewSender()

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)

// Set up a test server for ReportStream
Expand All @@ -377,7 +414,7 @@ func (suite *SenderTestSuite) Test_SendMessage_StatusCodeIsAbove300_ReturnsError
}
`))
} else {
w.WriteHeader(http.StatusNotFound)
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(`
{
"id" : "78809588-1193-4861-a6a7-52493f7dd254",
Expand Down Expand Up @@ -413,12 +450,12 @@ func (suite *SenderTestSuite) Test_SendMessage_StatusCodeIsAbove300_ReturnsError

reportId, err := sender.SendMessage(message)

assert.Equal(suite.T(), "404 Not Found", err.Error())
assert.Error(suite.T(), err)
assert.Equal(suite.T(), utils.ReportStreamNonTransientFailure, err.Error())
assert.Equal(suite.T(), "", reportId)
}

func (suite *SenderTestSuite) Test_SendMessage_UnableToParseResponseBody_ReturnsError() {
func (suite *SenderTestSuite) Test_SendMessage_StatusCodeIsAbove499_ReturnsError() {
sender, err := NewSender()
assert.NoError(suite.T(), err)

Expand All @@ -430,6 +467,72 @@ func (suite *SenderTestSuite) Test_SendMessage_UnableToParseResponseBody_Returns

mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)

// Set up a test server for ReportStream
// Response parts: Body, Status Code, Access Token (part of body), Error (part of body)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

if r.URL.Path == "/api/token" {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{
"sub": "flexion.*.report_e6b68103-dd38-420e-8118-2b2f6c9fa3c4",
"access_token": "eyJhbGciOiJIUzM4NCJ9.eyJleHAiOjE3MTk1MjcyNzgsInNjb3BlIjoiZmxleGlvbi4qLnJlcG9ydCIsInN1YiI6ImZsZXhpb24uKi5yZXBvcnRfZTZiNjgxMDMtZGQzOC00MjBlLTgxMTgtMmIyZjZjOWZhM2M0In0.liHv9SJYxztgMmCPKGIF2lzcMMMzFAoatLlIC33uz5jbA5wSJa8iIa5yzJ1ZaECI",
"token_type": "bearer",
"expires_in": 300,
"expires_at_seconds": 1719527278,
"scope": "flexion.*.report"
}
`))
} else {
w.WriteHeader(http.StatusBadGateway)
w.Write([]byte(`
{
"id" : "78809588-1193-4861-a6a7-52493f7dd254",
"submissionId" : 26,
"overallStatus" : "Received",
"timestamp" : "2024-05-20T21:11:36.144Z",
"plannedCompletionAt" : null,
"actualCompletionAt" : null,
"sender" : "flexion.simulated-hospital",
"reportItemCount" : 1,
"errorCount" : 0,
"warningCount" : 0,
"httpStatus" : 201,
"destinations" : [ ],
"actionName" : "receive",
"externalName" : null,
"reportId" : "78809588-1193-4861-a6a7-52493f7dd254",
"topic" : "etor-ti",
"bodyFormat" : "",
"errors" : [ ],
"warnings" : [ ],
"destinationCount" : 0,
"fileName" : ""
}
`))
}

}))
defer server.Close()

sender.baseUrl = server.URL
message, _ := os.ReadFile(filepath.Join("..", "..", "mock_data", "order_message.hl7"))

reportId, err := sender.SendMessage(message)

assert.Error(suite.T(), err)
assert.Equal(suite.T(), "502 Bad Gateway", err.Error())
assert.Equal(suite.T(), "", reportId)
}

func (suite *SenderTestSuite) Test_SendMessage_UnableToParseResponseBody_ReturnsError() {
sender, err := NewSender()

mockCredentialGetter := new(mocks.MockCredentialGetter)
sender.credentialGetter = mockCredentialGetter

testKey, err := rsa.GenerateKey(rand.Reader, 2048)
mockCredentialGetter.On("GetPrivateKey", "key").Return(testKey, nil)

// Set up a test server for ReportStream
// Response parts: Body, Status Code, Access Token (part of body), Error (part of body)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
4 changes: 2 additions & 2 deletions src/usecases/read_and_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (receiver *ReadAndSendUsecase) ReadAndSend(sourceUrl string) error {
if err != nil {
slog.Error("Failed to send the file to ReportStream", slog.Any(utils.ErrorKey, err), slog.String("sourceUrl", sourceUrl))

// As of June 2024, only the 400 response triggers a move to the `failure` folder. Returning `nil` will let
// queue.go delete the queue message so that it will stop retrying
// As of August 2024, we trigger on any http status code >= 400 and < 500 and move to the `failure` folder.
// Returning `nil` will let queue.go delete the queue message so that it will stop retrying
// We're treating all other errors as unexpected (and possibly transient) for now
if strings.Contains(err.Error(), utils.ReportStreamNonTransientFailure) {
receiver.moveFile(sourceUrl, utils.FailureFolder)
Expand Down
4 changes: 2 additions & 2 deletions src/usecases/read_and_send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ func Test_ReadAndSend_FailsToReadBlob_ReturnsError(t *testing.T) {
assert.Error(t, err)
}

func Test_ReadAndSend_400FromReportStream_MovesFileToFailureFolder(t *testing.T) {
func Test_ReadAndSend_NonTransientFailureFromReportStream_MovesFileToFailureFolder(t *testing.T) {
mockBlobHandler := &mocks.MockBlobHandler{}
mockBlobHandler.On("FetchFile", utils.SourceUrl).Return([]byte("The DogCow went Moof!"), nil)
mockBlobHandler.On("MoveFile", utils.SourceUrl, utils.FailureSourceUrl).Return(nil)

mockMessageSender := &MockMessageSender{}
mockMessageSender.On("SendMessage", mock.Anything).Return("", errors.New("400 Bad Request"))
mockMessageSender.On("SendMessage", mock.Anything).Return("", errors.New(utils.ReportStreamNonTransientFailure))

usecase := ReadAndSendUsecase{blobHandler: mockBlobHandler, messageSender: mockMessageSender}

Expand Down
2 changes: 1 addition & 1 deletion src/utils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const FailureFolder = "failure"
const UnzipFolder = "unzip"

// In read_and_send, move files to the `FailureFolder` when we get the below response from ReportStream
const ReportStreamNonTransientFailure = "400"
const ReportStreamNonTransientFailure = "reportStreamNonTransientFailure"

// Use this when logging an error.
// E.g. `slog.Warn("Failed to construct the ReportStream senders", slog.Any(utils.ErrorKey, err))`
Expand Down
Loading