From c0d3af98e08959bcd842212257d7ac31e006b8c6 Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Fri, 6 Sep 2024 15:23:04 -0400 Subject: [PATCH 01/12] flight finalizes with outputs, getResult returns signed urls --- .../pipelines/common/utils/FileUtils.java | 21 ++-- .../dependencies/gcs/GcsService.java | 41 ++++++++ .../dependencies/rawls/RawlsService.java | 14 +++ .../service/PipelineRunsService.java | 16 +++- .../imputation/RunImputationGcpJobFlight.java | 8 ++ .../gcp/FetchOutputsFromDataTableStep.java | 95 +++++++++++++++++++ .../pipelines/common/utils/FileUtilsTest.java | 43 +++++++-- .../service/PipelineRunsServiceTest.java | 9 +- .../RunImputationGcpFlightTest.java | 4 +- .../FetchOutputsFromDataTableStepTest.java | 90 ++++++++++++++++++ .../terra/pipelines/testutils/TestUtils.java | 2 +- 11 files changed, 320 insertions(+), 23 deletions(-) create mode 100644 service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java create mode 100644 service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java diff --git a/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java b/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java index 7a754745..26dbdaa2 100644 --- a/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java +++ b/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java @@ -12,23 +12,26 @@ private FileUtils() { private static final String USER_PROVIDED_FILE_INPUT_DIRECTORY = "user-input-files"; /** - * Extract the blob name from the full file path, using the workspaceId as a delimiter. + * Extract the blob name from the full file path, using a defined delimiter. * - *

For example, `https://lz123.blob.core.windows.net/sc-{workspaceId}/path/to/file` becomes - * `path/to/file` + *

For example, in Azure, with workspaceId as the delimiter, + * `https://lz123.blob.core.windows.net/sc-{workspaceDelimiter}/path/to/file` becomes + * `path/to/file`. + * + *

In GCP, with workspaceStorageContainerName as the delimiter + * `gs://{workspaceDelimiter}/path/to/file` becomes `path/to/file`. * * @param blobHttpUrl - * @param workspaceId + * @param delimiter * @return blobName */ public static String getBlobNameFromTerraWorkspaceStorageHttpUrl( - String blobHttpUrl, UUID workspaceId) { - if (!blobHttpUrl.contains(workspaceId.toString())) { + String blobHttpUrl, String delimiter) { + if (!blobHttpUrl.contains(delimiter)) { throw new InternalServerErrorException( - "File path and workspaceId do not match. Cannot extract blob name."); + "File path and delimiter do not match. Cannot extract blob name."); } - return blobHttpUrl.substring( - blobHttpUrl.indexOf(workspaceId.toString()) + workspaceId.toString().length() + 1); + return blobHttpUrl.substring(blobHttpUrl.indexOf(delimiter) + delimiter.length() + 1); } /** diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java index bb2eeda7..7e7d4c1d 100644 --- a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java +++ b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java @@ -70,11 +70,52 @@ public URL generatePutObjectSignedUrl(String projectId, String bucketName, Strin Storage.SignUrlOption.withExtHeaders(extensionHeaders), Storage.SignUrlOption.withV4Signature())); + // remove the signature from the URL before logging + String cleanUrl = url.toString().split("X-Goog-Signature=")[0] + "X-Goog-Signature=REDACTED"; logger.info("Generated PUT signed URL: {}", url); return url; } + /** + * Generates and returns a GET (read-only) signed url for a specific object in a bucket. See + * documentation on signed urls here. + * + *

The output URL can be used with a curl command to download an object: `curl '{url}' > + * {local_file_name}` + * + * @param projectId Google project id + * @param bucketName without a prefix + * @param objectName should include the full path of the object (subdirectories + file name) + * @return url that can be used to download an object to GCS + */ + public URL generateGetObjectSignedUrl(String projectId, String bucketName, String objectName) + throws StorageException { + // define target blob object resource + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build(); + + // generate signed URL + URL url = + executionWithRetryTemplate( + listenerResetRetryTemplate, + () -> + gcsClient + .getStorageService(projectId) + .signUrl( + blobInfo, + gcsConfiguration.signedUrlGetDurationHours(), + TimeUnit.HOURS, + Storage.SignUrlOption.httpMethod(HttpMethod.GET), + Storage.SignUrlOption.withV4Signature())); + + // remove the signature from the URL before logging + String cleanUrl = url.toString().split("X-Goog-Signature=")[0] + "X-Goog-Signature=REDACTED"; + logger.info("Generated GET signed URL: {}", cleanUrl); + + return url; + } + interface GcsAction { T execute(); } diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/rawls/RawlsService.java b/service/src/main/java/bio/terra/pipelines/dependencies/rawls/RawlsService.java index 4c81c77e..555af61f 100644 --- a/service/src/main/java/bio/terra/pipelines/dependencies/rawls/RawlsService.java +++ b/service/src/main/java/bio/terra/pipelines/dependencies/rawls/RawlsService.java @@ -94,6 +94,20 @@ public Entity upsertDataTableEntity( .createEntity(entity, workspaceNamespace, workspaceName)); } + public Entity getDataTableEntity( + String accessToken, + String workspaceNamespace, + String workspaceName, + String entityType, + String entityName) { + return executionWithRetryTemplate( + listenerResetRetryTemplate, + () -> + rawlsClient + .getEntitiesApi(accessToken) + .getEntity(workspaceNamespace, workspaceName, entityType, entityName, null, null)); + } + // returns true if submission is in a running state public static boolean submissionIsRunning(Submission submission) { return !FINAL_RUN_STATES.contains(submission.getStatus()); diff --git a/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java b/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java index f17e2a17..4c5f498b 100644 --- a/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java +++ b/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java @@ -1,6 +1,7 @@ package bio.terra.pipelines.service; import static bio.terra.pipelines.common.utils.FileUtils.constructDestinationBlobNameForUserInputFile; +import static bio.terra.pipelines.common.utils.FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl; import static java.util.Collections.emptyList; import static org.springframework.data.domain.PageRequest.ofSize; @@ -397,8 +398,8 @@ public PipelineRun markPipelineRunSuccessAndWriteOutputs( // methods to interact with and format pipeline run outputs /** - * Extract the pipeline outputs from a pipelineRun object and return an ApiPipelineRunOutputs - * object with the outputs. + * Extract the pipeline outputs from a pipelineRun object, create signed GET (read-only) urls for + * each file, and return an ApiPipelineRunOutputs object with the outputs. * * @param pipelineRun object from the pipelineRunsRepository * @return ApiPipelineRunOutputs @@ -408,6 +409,17 @@ public ApiPipelineRunOutputs formatPipelineRunOutputs(PipelineRun pipelineRun) { pipelineRunOutputsAsMap( pipelineOutputsRepository.findPipelineOutputsByJobId(pipelineRun.getId()).getOutputs()); + // currently all outputs are paths that will need a SAS token + String workspaceStorageContainerName = pipelineRun.getWorkspaceStorageContainerName(); + outputsMap.replaceAll( + (k, v) -> + gcsService + .generateGetObjectSignedUrl( + pipelineRun.getWorkspaceGoogleProject(), + workspaceStorageContainerName, + getBlobNameFromTerraWorkspaceStorageHttpUrl(v, workspaceStorageContainerName)) + .toString()); + ApiPipelineRunOutputs apiPipelineRunOutputs = new ApiPipelineRunOutputs(); apiPipelineRunOutputs.putAll(outputsMap); return apiPipelineRunOutputs; diff --git a/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java b/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java index fcc38080..cf1b7f4a 100644 --- a/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java +++ b/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java @@ -5,8 +5,10 @@ import bio.terra.pipelines.common.utils.FlightUtils; import bio.terra.pipelines.common.utils.PipelinesEnum; import bio.terra.pipelines.dependencies.stairway.JobMapKeys; +import bio.terra.pipelines.stairway.imputation.steps.CompletePipelineRunStep; import bio.terra.pipelines.stairway.imputation.steps.PrepareImputationInputsStep; import bio.terra.pipelines.stairway.imputation.steps.gcp.AddDataTableRowStep; +import bio.terra.pipelines.stairway.imputation.steps.gcp.FetchOutputsFromDataTableStep; import bio.terra.pipelines.stairway.imputation.steps.gcp.PollCromwellSubmissionStatusStep; import bio.terra.pipelines.stairway.imputation.steps.gcp.SubmitCromwellSubmissionStep; import bio.terra.stairway.Flight; @@ -71,5 +73,11 @@ public RunImputationGcpJobFlight(FlightMap inputParameters, Object beanBag) { flightBeanBag.getRawlsService(), flightBeanBag.getImputationConfiguration()), dbRetryRule); + + addStep( + new FetchOutputsFromDataTableStep( + flightBeanBag.getRawlsService(), flightBeanBag.getSamService())); + + addStep(new CompletePipelineRunStep(flightBeanBag.getPipelineRunsService()), dbRetryRule); } } diff --git a/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java b/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java new file mode 100644 index 00000000..15c3bb38 --- /dev/null +++ b/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java @@ -0,0 +1,95 @@ +package bio.terra.pipelines.stairway.imputation.steps.gcp; + +import bio.terra.pipelines.common.utils.FlightUtils; +import bio.terra.pipelines.common.utils.PipelinesEnum; +import bio.terra.pipelines.db.entities.PipelineOutputDefinition; +import bio.terra.pipelines.dependencies.rawls.RawlsService; +import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException; +import bio.terra.pipelines.dependencies.sam.SamService; +import bio.terra.pipelines.dependencies.stairway.JobMapKeys; +import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys; +import bio.terra.rawls.model.Entity; +import bio.terra.stairway.FlightContext; +import bio.terra.stairway.FlightMap; +import bio.terra.stairway.Step; +import bio.terra.stairway.StepResult; +import bio.terra.stairway.StepStatus; +import com.fasterxml.jackson.core.type.TypeReference; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This step calls Rawls to fetch outputs from a data table row for a given job and stores them in + * the flight's working map. These outputs are considered raw in that they are cloud paths and not + * signed urls. + */ +public class FetchOutputsFromDataTableStep implements Step { + + private final RawlsService rawlsService; + private final SamService samService; + + public FetchOutputsFromDataTableStep(RawlsService rawlsService, SamService samService) { + this.rawlsService = rawlsService; + this.samService = samService; + } + + @Override + @SuppressWarnings( + "java:S2259") // suppress warning for possible NPE when calling pipelineName.getValue(), + // since we do validate that pipelineName is not null in `validateRequiredEntries` + public StepResult doStep(FlightContext flightContext) { + String jobId = flightContext.getFlightId(); + + // validate and extract parameters from input map + var inputParameters = flightContext.getInputParameters(); + FlightUtils.validateRequiredEntries( + inputParameters, + JobMapKeys.PIPELINE_NAME.getKeyName(), + RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, + RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_NAME, + RunImputationJobFlightMapKeys.PIPELINE_OUTPUT_DEFINITIONS); + + String controlWorkspaceBillingProject = + inputParameters.get( + RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class); + String controlWorkspaceName = + inputParameters.get(RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_NAME, String.class); + PipelinesEnum pipelineName = + inputParameters.get(JobMapKeys.PIPELINE_NAME.getKeyName(), PipelinesEnum.class); + List outputDefinitions = + inputParameters.get( + RunImputationJobFlightMapKeys.PIPELINE_OUTPUT_DEFINITIONS, new TypeReference<>() {}); + + Entity entity; + try { + entity = + rawlsService.getDataTableEntity( + samService.getTeaspoonsServiceAccountToken(), + controlWorkspaceBillingProject, + controlWorkspaceName, + pipelineName.getValue(), + jobId); + } catch (RawlsServiceApiException e) { + return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e); + } + + Map outputs = new HashMap<>(); + for (PipelineOutputDefinition outputDefinition : outputDefinitions) { + String keyName = outputDefinition.getName(); + String wdlVariableName = outputDefinition.getWdlVariableName(); + outputs.put(keyName, entity.getAttributes().get(wdlVariableName).toString()); + } + + FlightMap workingMap = flightContext.getWorkingMap(); + workingMap.put(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, outputs); + + return StepResult.getStepResultSuccess(); + } + + @Override + public StepResult undoStep(FlightContext flightContext) { + // nothing to undo + return StepResult.getStepResultSuccess(); + } +} diff --git a/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java b/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java index cf18a1cd..3c009c0f 100644 --- a/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java +++ b/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java @@ -11,26 +11,57 @@ class FileUtilsTest extends BaseTest { @Test - void getBlobNameFromTerraWorkspaceStorageHttpUrl() { + void getBlobNameFromTerraWorkspaceStorageHttpUrlAzure() { String fullPath = "https://lze96253b07f13c61ef712bb.blob.core.windows.net/sc-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; - UUID controlWorkspaceId = UUID.fromString("68a43bd8-e744-4f1e-87a5-c44ecef157a3"); + String controlWorkspaceIdForDelimiter = "68a43bd8-e744-4f1e-87a5-c44ecef157a3"; String expectedBlobName = "workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; assertEquals( expectedBlobName, - FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl(fullPath, controlWorkspaceId)); + FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl( + fullPath, controlWorkspaceIdForDelimiter)); } @Test - void getBlobNameFromTerraWorkspaceStorageHttpUrlDifferentWorkspace() { + void getBlobNameFromTerraWorkspaceStorageHttpUrlDifferentWorkspaceAzure() { String fullPath = "https://lze96253b07f13c61ef712bb.blob.core.windows.net/sc-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; - UUID wrongWorkspaceId = UUID.fromString("11111111-1111-1111-1111-111111111111"); + String wrongWorkspaceIdForDelimiter = "11111111-1111-1111-1111-111111111111"; + + assertThrows( + InternalServerErrorException.class, + () -> + FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl( + fullPath, wrongWorkspaceIdForDelimiter)); + } + + @Test + void getBlobNameFromTerraWorkspaceStorageHttpUrlGcp() { + String fullPath = + "gs://fc-secure-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; + String controlWorkspaceStorageContainerNameForDelimiter = + "fc-secure-68a43bd8-e744-4f1e-87a5-c44ecef157a3"; + String expectedBlobName = + "workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; + assertEquals( + expectedBlobName, + FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl( + fullPath, controlWorkspaceStorageContainerNameForDelimiter)); + } + + @Test + void getBlobNameFromTerraWorkspaceStorageHttpUrlDifferentWorkspaceGcp() { + String fullPath = + "gs://fc-secure-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; + String wrongWorkspaceStorageContainerNameForDelimiter = + "fc-secure-11111111-1111-1111-1111-111111111111"; assertThrows( InternalServerErrorException.class, - () -> FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl(fullPath, wrongWorkspaceId)); + () -> + FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl( + fullPath, wrongWorkspaceStorageContainerNameForDelimiter)); } @Test diff --git a/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java b/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java index b41447e8..540d2b90 100644 --- a/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java +++ b/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java @@ -534,7 +534,7 @@ void createImputationRunStairwayError() { } @Test - void formatPipelineRunOutputs() { + void formatPipelineRunOutputs() throws MalformedURLException { PipelineRun pipelineRun = createNewRunWithJobId(testJobId); pipelineRunsRepository.save(pipelineRun); @@ -544,13 +544,14 @@ void formatPipelineRunOutputs() { pipelineRunsService.pipelineRunOutputsAsString(TestUtils.TEST_PIPELINE_OUTPUTS)); pipelineOutputsRepository.save(pipelineOutput); - ApiPipelineRunOutputs expectedOutput = new ApiPipelineRunOutputs(); - expectedOutput.putAll(TestUtils.TEST_PIPELINE_OUTPUTS); + URL fakeUrl = new URL("https://storage.googleapis.com/signed-url-stuff"); + // mock GCS service + when(mockGcsService.generateGetObjectSignedUrl(any(), any(), any())).thenReturn(fakeUrl); ApiPipelineRunOutputs apiPipelineRunOutputs = pipelineRunsService.formatPipelineRunOutputs(pipelineRun); - assertEquals(expectedOutput, apiPipelineRunOutputs); + assertEquals(fakeUrl.toString(), apiPipelineRunOutputs.get("testFileOutputKey")); } @Test diff --git a/service/src/test/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpFlightTest.java b/service/src/test/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpFlightTest.java index e10c7bf1..371f2d78 100644 --- a/service/src/test/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpFlightTest.java +++ b/service/src/test/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpFlightTest.java @@ -33,7 +33,9 @@ class RunImputationGcpFlightTest extends BaseEmbeddedDbTest { "PrepareImputationInputsStep", "AddDataTableRowStep", "SubmitCromwellSubmissionStep", - "PollCromwellSubmissionStatusStep"); + "PollCromwellSubmissionStatusStep", + "CompletePipelineRunStep", + "FetchOutputsFromDataTableStep"); @Autowired FlightBeanBag flightBeanBag; private SimpleMeterRegistry meterRegistry; diff --git a/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java b/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java new file mode 100644 index 00000000..76e33574 --- /dev/null +++ b/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java @@ -0,0 +1,90 @@ +package bio.terra.pipelines.stairway.imputation.steps.gcp; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import bio.terra.pipelines.dependencies.rawls.RawlsService; +import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException; +import bio.terra.pipelines.dependencies.rawls.RawlsServiceException; +import bio.terra.pipelines.dependencies.sam.SamService; +import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys; +import bio.terra.pipelines.testutils.BaseEmbeddedDbTest; +import bio.terra.pipelines.testutils.StairwayTestUtils; +import bio.terra.rawls.model.Entity; +import bio.terra.stairway.FlightContext; +import bio.terra.stairway.FlightMap; +import bio.terra.stairway.StepResult; +import bio.terra.stairway.StepStatus; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +class FetchOutputsFromDataTableStepTest extends BaseEmbeddedDbTest { + + @Mock RawlsService rawlsService; + @Mock SamService samService; + @Mock private FlightContext flightContext; + + @BeforeEach + void setup() { + var inputParameters = new FlightMap(); + var workingMap = new FlightMap(); + + when(flightContext.getInputParameters()).thenReturn(inputParameters); + when(flightContext.getWorkingMap()).thenReturn(workingMap); + } + + @Test + void doStepSuccess() throws RawlsServiceException { + // setup + StairwayTestUtils.constructCreateJobInputs(flightContext.getInputParameters()); + + // outputs to match the test output definitions + Map entityAttributes = new HashMap<>(Map.of("output_name", "some/file.vcf.gz")); + Entity entity = new Entity().attributes(entityAttributes); + + when(rawlsService.getDataTableEntity(any(), any(), any(), any(), any())).thenReturn(entity); + + FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep = + new FetchOutputsFromDataTableStep(rawlsService, samService); + StepResult result = fetchOutputsFromDataTableStep.doStep(flightContext); + + assertEquals(StepStatus.STEP_RESULT_SUCCESS, result.getStepStatus()); + + // in the step we translate the snake_case output keys to camelCase + Map expectedOutputsFromWorkingMap = Map.of("outputName", "some/file.vcf.gz"); + + assertEquals( + expectedOutputsFromWorkingMap, + flightContext + .getWorkingMap() + .get(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, Map.class)); + } + + @Test + void doStepFailureRetry() throws RawlsServiceException { + // setup + StairwayTestUtils.constructCreateJobInputs(flightContext.getInputParameters()); + + when(rawlsService.getDataTableEntity(any(), any(), any(), any(), any())) + .thenThrow(new RawlsServiceApiException("Rawls Service Api Exception")); + + FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep = + new FetchOutputsFromDataTableStep(rawlsService, samService); + StepResult result = fetchOutputsFromDataTableStep.doStep(flightContext); + + assertEquals(StepStatus.STEP_RESULT_FAILURE_RETRY, result.getStepStatus()); + } + + @Test + void undoStepSuccess() { + FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep = + new FetchOutputsFromDataTableStep(rawlsService, samService); + StepResult result = fetchOutputsFromDataTableStep.undoStep(flightContext); + + assertEquals(StepStatus.STEP_RESULT_SUCCESS, result.getStepStatus()); + } +} diff --git a/service/src/test/java/bio/terra/pipelines/testutils/TestUtils.java b/service/src/test/java/bio/terra/pipelines/testutils/TestUtils.java index f1300482..5016a507 100644 --- a/service/src/test/java/bio/terra/pipelines/testutils/TestUtils.java +++ b/service/src/test/java/bio/terra/pipelines/testutils/TestUtils.java @@ -47,7 +47,7 @@ public class TestUtils { new HashMap( Map.of( "testFileOutputKey", - "https://lz123.stuff/sc-%s/testFileOutputValue".formatted(CONTROL_WORKSPACE_ID))); + "gs://fc-secure-%s/testFileOutputValue".formatted(CONTROL_WORKSPACE_ID))); public static final List TEST_PIPELINE_INPUTS_DEFINITION_LIST = new ArrayList<>( From cfad601ca14550e6879538dc1f1ee6b2fada723c Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Fri, 6 Sep 2024 15:50:10 -0400 Subject: [PATCH 02/12] add tests --- .../dependencies/gcs/GcsServiceTest.java | 16 ++++++++++++++++ .../dependencies/rawls/RawlsServiceTest.java | 19 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java index 48e62b13..f3f4019f 100644 --- a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java +++ b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java @@ -66,6 +66,22 @@ void generatePutObjectSignedUrl() throws MalformedURLException { assertEquals(fakeURL, generatedURL); } + @Test + void generateGetObjectSignedUrl() throws MalformedURLException { + URL fakeURL = new URL("https://storage.googleapis.com/signed-url-stuff"); + when(mockStorageService.signUrl( + any(BlobInfo.class), + anyLong(), + any(TimeUnit.class), + any(Storage.SignUrlOption.class), + any(Storage.SignUrlOption.class))) + .thenReturn(fakeURL); + + URL generatedURL = + gcsService.generateGetObjectSignedUrl("projectId", "bucketName", "objectName"); + assertEquals(fakeURL, generatedURL); + } + @Test void socketExceptionRetriesEventuallySucceed() throws Exception { URL fakeURL = new URL("https://storage.googleapis.com/signed-url-stuff"); diff --git a/service/src/test/java/bio/terra/pipelines/dependencies/rawls/RawlsServiceTest.java b/service/src/test/java/bio/terra/pipelines/dependencies/rawls/RawlsServiceTest.java index 504758ff..80a512f9 100644 --- a/service/src/test/java/bio/terra/pipelines/dependencies/rawls/RawlsServiceTest.java +++ b/service/src/test/java/bio/terra/pipelines/dependencies/rawls/RawlsServiceTest.java @@ -216,6 +216,25 @@ void upsertDataTableEntity() throws Exception { assertEquals(expectedResponse, rawlsService.upsertDataTableEntity(any(), any(), any(), any())); } + @Test + void getDataTableEntity() throws Exception { + Entity expectedResponse = new Entity().name("entityName").entityType("entityType"); + + rawlsClient = mock(RawlsClient.class); + EntitiesApi entitiesApi = mock(EntitiesApi.class); + when(entitiesApi.getEntity(any(), any(), any(), any(), any(), any())) + .thenReturn(expectedResponse); + + rawlsService = spy(new RawlsService(rawlsClient, template)); + + doReturn(entitiesApi).when(rawlsClient).getEntitiesApi(any()); + + assertEquals( + expectedResponse, + rawlsService.getDataTableEntity( + "token", "billingProject", "workspace", "entityType", "entityName")); + } + @Test void submissionIsNotRunning() { assertFalse(RawlsService.submissionIsRunning(new Submission().status(SubmissionStatus.DONE))); From 526bef74940c7e33754f3276c2d411e788a7bed2 Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Fri, 6 Sep 2024 15:58:53 -0400 Subject: [PATCH 03/12] oops use the cleanUrl in logs --- .../java/bio/terra/pipelines/dependencies/gcs/GcsService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java index 7e7d4c1d..17425440 100644 --- a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java +++ b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java @@ -72,7 +72,7 @@ public URL generatePutObjectSignedUrl(String projectId, String bucketName, Strin // remove the signature from the URL before logging String cleanUrl = url.toString().split("X-Goog-Signature=")[0] + "X-Goog-Signature=REDACTED"; - logger.info("Generated PUT signed URL: {}", url); + logger.info("Generated PUT signed URL: {}", cleanUrl); return url; } From 08611f85b908834bbef4d6823a8c01211d1d991f Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Mon, 9 Sep 2024 11:38:13 -0400 Subject: [PATCH 04/12] pr comments --- .../pipelines/common/utils/FileUtils.java | 44 ++++++++++++++----- .../dependencies/gcs/GcsService.java | 39 +++++++++++++--- .../service/PipelineRunsService.java | 6 +-- .../pipelines/common/utils/FileUtilsTest.java | 16 +++---- .../dependencies/gcs/GcsServiceTest.java | 26 +++++++++++ 5 files changed, 104 insertions(+), 27 deletions(-) diff --git a/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java b/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java index 26dbdaa2..c2e2b188 100644 --- a/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java +++ b/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java @@ -12,26 +12,50 @@ private FileUtils() { private static final String USER_PROVIDED_FILE_INPUT_DIRECTORY = "user-input-files"; /** - * Extract the blob name from the full file path, using a defined delimiter. + * Extract the blob name from the full Azure file path, using the workspaceId as a delimiter. * - *

For example, in Azure, with workspaceId as the delimiter, + *

For example, with workspaceId as the workspaceSubstringStart, * `https://lz123.blob.core.windows.net/sc-{workspaceDelimiter}/path/to/file` becomes * `path/to/file`. * - *

In GCP, with workspaceStorageContainerName as the delimiter + * @param blobUrl + * @param workspaceId + */ + public static String getBlobNameFromTerraWorkspaceStorageUrlAzure( + String blobUrl, String workspaceId) { + return getBlobNameFromTerraWorkspaceStorageUrl(blobUrl, workspaceId); + } + + /** + * Extract the blob name from the full GCP file path, using the workspaceStorageContainerName as a + * delimiter. + * + *

For example, with workspaceStorageContainerName as the workspaceSubstringStart, * `gs://{workspaceDelimiter}/path/to/file` becomes `path/to/file`. * - * @param blobHttpUrl - * @param delimiter + * @param blobUrl + * @param workspaceStorageContainerName + */ + public static String getBlobNameFromTerraWorkspaceStorageUrlGcp( + String blobUrl, String workspaceStorageContainerName) { + return getBlobNameFromTerraWorkspaceStorageUrl(blobUrl, workspaceStorageContainerName); + } + + /** + * Extract the blob name from the full file path, using a defined workspaceSubstringStart. + * + * @param blobUrl + * @param workspaceSubstringStart * @return blobName */ - public static String getBlobNameFromTerraWorkspaceStorageHttpUrl( - String blobHttpUrl, String delimiter) { - if (!blobHttpUrl.contains(delimiter)) { + private static String getBlobNameFromTerraWorkspaceStorageUrl( + String blobUrl, String workspaceSubstringStart) { + if (!blobUrl.contains(workspaceSubstringStart)) { throw new InternalServerErrorException( - "File path and delimiter do not match. Cannot extract blob name."); + "File path and workspaceSubstringStart do not match. Cannot extract blob name."); } - return blobHttpUrl.substring(blobHttpUrl.indexOf(delimiter) + delimiter.length() + 1); + return blobUrl.substring( + blobUrl.indexOf(workspaceSubstringStart) + workspaceSubstringStart.length() + 1); } /** diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java index 17425440..6f04629e 100644 --- a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java +++ b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java @@ -7,6 +7,7 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import java.net.URL; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -70,9 +71,7 @@ public URL generatePutObjectSignedUrl(String projectId, String bucketName, Strin Storage.SignUrlOption.withExtHeaders(extensionHeaders), Storage.SignUrlOption.withV4Signature())); - // remove the signature from the URL before logging - String cleanUrl = url.toString().split("X-Goog-Signature=")[0] + "X-Goog-Signature=REDACTED"; - logger.info("Generated PUT signed URL: {}", cleanUrl); + logger.info("Generated PUT signed URL: {}", cleanSignedUrl(url)); return url; } @@ -109,13 +108,41 @@ public URL generateGetObjectSignedUrl(String projectId, String bucketName, Strin Storage.SignUrlOption.httpMethod(HttpMethod.GET), Storage.SignUrlOption.withV4Signature())); - // remove the signature from the URL before logging - String cleanUrl = url.toString().split("X-Goog-Signature=")[0] + "X-Goog-Signature=REDACTED"; - logger.info("Generated GET signed URL: {}", cleanUrl); + logger.info("Generated GET signed URL: {}", cleanSignedUrl(url)); return url; } + /** + * Redact the X-Google-Signature element's value from the signed url and return the cleaned result + * as a string. + * + * @param signedUrl + * @return + */ + public static String cleanSignedUrl(URL signedUrl) { + String signedUrlString = signedUrl.toString(); + String cleanUrl = signedUrlString; + String signatureDelimiter = "X-Goog-Signature="; + if (signedUrlString.contains(signatureDelimiter)) { + String cleanUrlUpToSignature = + signedUrlString.split(signatureDelimiter)[0] + signatureDelimiter + "REDACTED"; + String[] cleanUrlElementsAfterSignature = + signedUrlString.split(signatureDelimiter)[1].split("&"); + if (cleanUrlElementsAfterSignature.length == 1) { + cleanUrl = cleanUrlUpToSignature; + } else { + String cleanUrlAfterSignature = + String.join( + "&", + Arrays.copyOfRange( + cleanUrlElementsAfterSignature, 1, cleanUrlElementsAfterSignature.length)); + cleanUrl = "%s&%s".formatted(cleanUrlUpToSignature, cleanUrlAfterSignature); + } + } + return cleanUrl; + } + interface GcsAction { T execute(); } diff --git a/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java b/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java index 4c5f498b..dbc20647 100644 --- a/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java +++ b/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java @@ -1,7 +1,7 @@ package bio.terra.pipelines.service; import static bio.terra.pipelines.common.utils.FileUtils.constructDestinationBlobNameForUserInputFile; -import static bio.terra.pipelines.common.utils.FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl; +import static bio.terra.pipelines.common.utils.FileUtils.getBlobNameFromTerraWorkspaceStorageUrlGcp; import static java.util.Collections.emptyList; import static org.springframework.data.domain.PageRequest.ofSize; @@ -409,7 +409,7 @@ public ApiPipelineRunOutputs formatPipelineRunOutputs(PipelineRun pipelineRun) { pipelineRunOutputsAsMap( pipelineOutputsRepository.findPipelineOutputsByJobId(pipelineRun.getId()).getOutputs()); - // currently all outputs are paths that will need a SAS token + // currently all outputs are paths that will need a signed url String workspaceStorageContainerName = pipelineRun.getWorkspaceStorageContainerName(); outputsMap.replaceAll( (k, v) -> @@ -417,7 +417,7 @@ public ApiPipelineRunOutputs formatPipelineRunOutputs(PipelineRun pipelineRun) { .generateGetObjectSignedUrl( pipelineRun.getWorkspaceGoogleProject(), workspaceStorageContainerName, - getBlobNameFromTerraWorkspaceStorageHttpUrl(v, workspaceStorageContainerName)) + getBlobNameFromTerraWorkspaceStorageUrlGcp(v, workspaceStorageContainerName)) .toString()); ApiPipelineRunOutputs apiPipelineRunOutputs = new ApiPipelineRunOutputs(); diff --git a/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java b/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java index 3c009c0f..dcf891ac 100644 --- a/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java +++ b/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java @@ -11,7 +11,7 @@ class FileUtilsTest extends BaseTest { @Test - void getBlobNameFromTerraWorkspaceStorageHttpUrlAzure() { + void getBlobNameFromTerraWorkspaceStorageUrlAzure() { String fullPath = "https://lze96253b07f13c61ef712bb.blob.core.windows.net/sc-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; String controlWorkspaceIdForDelimiter = "68a43bd8-e744-4f1e-87a5-c44ecef157a3"; @@ -19,12 +19,12 @@ void getBlobNameFromTerraWorkspaceStorageHttpUrlAzure() { "workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; assertEquals( expectedBlobName, - FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl( + FileUtils.getBlobNameFromTerraWorkspaceStorageUrlAzure( fullPath, controlWorkspaceIdForDelimiter)); } @Test - void getBlobNameFromTerraWorkspaceStorageHttpUrlDifferentWorkspaceAzure() { + void getBlobNameFromTerraWorkspaceStorageUrlDifferentWorkspaceAzure() { String fullPath = "https://lze96253b07f13c61ef712bb.blob.core.windows.net/sc-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; String wrongWorkspaceIdForDelimiter = "11111111-1111-1111-1111-111111111111"; @@ -32,12 +32,12 @@ void getBlobNameFromTerraWorkspaceStorageHttpUrlDifferentWorkspaceAzure() { assertThrows( InternalServerErrorException.class, () -> - FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl( + FileUtils.getBlobNameFromTerraWorkspaceStorageUrlAzure( fullPath, wrongWorkspaceIdForDelimiter)); } @Test - void getBlobNameFromTerraWorkspaceStorageHttpUrlGcp() { + void getBlobNameFromTerraWorkspaceStorageUrlGcp() { String fullPath = "gs://fc-secure-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; String controlWorkspaceStorageContainerNameForDelimiter = @@ -46,12 +46,12 @@ void getBlobNameFromTerraWorkspaceStorageHttpUrlGcp() { "workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; assertEquals( expectedBlobName, - FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl( + FileUtils.getBlobNameFromTerraWorkspaceStorageUrlGcp( fullPath, controlWorkspaceStorageContainerNameForDelimiter)); } @Test - void getBlobNameFromTerraWorkspaceStorageHttpUrlDifferentWorkspaceGcp() { + void getBlobNameFromTerraWorkspaceStorageUrlDifferentWorkspaceGcp() { String fullPath = "gs://fc-secure-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file"; String wrongWorkspaceStorageContainerNameForDelimiter = @@ -60,7 +60,7 @@ void getBlobNameFromTerraWorkspaceStorageHttpUrlDifferentWorkspaceGcp() { assertThrows( InternalServerErrorException.class, () -> - FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl( + FileUtils.getBlobNameFromTerraWorkspaceStorageUrlGcp( fullPath, wrongWorkspaceStorageContainerNameForDelimiter)); } diff --git a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java index f3f4019f..0f03fbf7 100644 --- a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java +++ b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java @@ -141,4 +141,30 @@ void storageExceptionDoNotRetry() { gcsService.generatePutObjectSignedUrl("projectId", "bucketName", "objectName"); }); } + + @Test + void cleanSignedUrl() throws MalformedURLException { + // signed URL with X-Goog-Signature as last element + URL fakeURLSignatureLast = + new URL( + "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-SignedHeaders=content-type%3Bhost&X-Goog-Signature=12345"); + String expectedCleanedURLSignatureLast = + "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-SignedHeaders=content-type%3Bhost&X-Goog-Signature=REDACTED"; + assertEquals(expectedCleanedURLSignatureLast, GcsService.cleanSignedUrl(fakeURLSignatureLast)); + + // signed URL with no X-Goog-Signature should not throw an exception + URL fakeURLNoSignature = + new URL( + "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz"); + assertEquals(fakeURLNoSignature.toString(), GcsService.cleanSignedUrl(fakeURLNoSignature)); + + // signed URL with X-Goog-Signature in the middle + URL fakeURLSignatureMiddle = + new URL( + "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-Signature=12345&X-Goog-SignedHeaders=content-type%3Bhost&Last-Element=foobar"); + String expectedCleanedURLSignatureMiddle = + "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-Signature=REDACTED&X-Goog-SignedHeaders=content-type%3Bhost&Last-Element=foobar"; + assertEquals( + expectedCleanedURLSignatureMiddle, GcsService.cleanSignedUrl(fakeURLSignatureMiddle)); + } } From ef779fde05f248b27392f37e5d35cd0eac72fbff Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Mon, 9 Sep 2024 12:03:26 -0400 Subject: [PATCH 05/12] refactor cleanSignedUrl --- .../dependencies/gcs/GcsService.java | 33 ++++++++++--------- .../dependencies/gcs/GcsServiceTest.java | 2 +- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java index 6f04629e..12a177f1 100644 --- a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java +++ b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java @@ -7,8 +7,8 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import java.net.URL; -import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -122,23 +122,24 @@ public URL generateGetObjectSignedUrl(String projectId, String bucketName, Strin */ public static String cleanSignedUrl(URL signedUrl) { String signedUrlString = signedUrl.toString(); - String cleanUrl = signedUrlString; + List signedUrlElements = List.of(signedUrlString.split("\\?")[1].split("&")); + String signatureDelimiter = "X-Goog-Signature="; + + String cleanUrl = signedUrlString; if (signedUrlString.contains(signatureDelimiter)) { - String cleanUrlUpToSignature = - signedUrlString.split(signatureDelimiter)[0] + signatureDelimiter + "REDACTED"; - String[] cleanUrlElementsAfterSignature = - signedUrlString.split(signatureDelimiter)[1].split("&"); - if (cleanUrlElementsAfterSignature.length == 1) { - cleanUrl = cleanUrlUpToSignature; - } else { - String cleanUrlAfterSignature = - String.join( - "&", - Arrays.copyOfRange( - cleanUrlElementsAfterSignature, 1, cleanUrlElementsAfterSignature.length)); - cleanUrl = "%s&%s".formatted(cleanUrlUpToSignature, cleanUrlAfterSignature); - } + // if element contains signatureDelimiter, redact the value, otherwise keep the full element + List cleanUrlElements = + signedUrlElements.stream() + .map( + signedUrlElement -> { + if (signedUrlElement.contains(signatureDelimiter)) { + return signatureDelimiter + "REDACTED"; + } + return signedUrlElement; + }) + .toList(); + cleanUrl = signedUrlString.split("\\?")[0] + "?" + String.join("&", cleanUrlElements); } return cleanUrl; } diff --git a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java index 0f03fbf7..ee62f591 100644 --- a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java +++ b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java @@ -155,7 +155,7 @@ void cleanSignedUrl() throws MalformedURLException { // signed URL with no X-Goog-Signature should not throw an exception URL fakeURLNoSignature = new URL( - "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz"); + "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900"); assertEquals(fakeURLNoSignature.toString(), GcsService.cleanSignedUrl(fakeURLNoSignature)); // signed URL with X-Goog-Signature in the middle From 30ce7e5175c09b76ad62ce9a0850c35a94ab9b38 Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Mon, 9 Sep 2024 13:29:11 -0400 Subject: [PATCH 06/12] another refactor for cleanSignedUrl --- .../dependencies/gcs/GcsService.java | 30 ++++++++++--------- .../dependencies/gcs/GcsServiceTest.java | 2 +- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java index 12a177f1..7124470f 100644 --- a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java +++ b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java @@ -11,6 +11,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.retry.support.RetryTemplate; @@ -122,24 +123,25 @@ public URL generateGetObjectSignedUrl(String projectId, String bucketName, Strin */ public static String cleanSignedUrl(URL signedUrl) { String signedUrlString = signedUrl.toString(); - List signedUrlElements = List.of(signedUrlString.split("\\?")[1].split("&")); + String[] signedUrlParts = signedUrlString.split("\\?"); + String elementDelimiter = "&"; + List signedUrlElements = List.of(signedUrlParts[1].split(elementDelimiter)); - String signatureDelimiter = "X-Goog-Signature="; + String signatureKey = "X-Goog-Signature"; String cleanUrl = signedUrlString; - if (signedUrlString.contains(signatureDelimiter)) { - // if element contains signatureDelimiter, redact the value, otherwise keep the full element - List cleanUrlElements = + if (signedUrlString.contains(signatureKey)) { + String urlElementsWithoutSignature = signedUrlElements.stream() - .map( - signedUrlElement -> { - if (signedUrlElement.contains(signatureDelimiter)) { - return signatureDelimiter + "REDACTED"; - } - return signedUrlElement; - }) - .toList(); - cleanUrl = signedUrlString.split("\\?")[0] + "?" + String.join("&", cleanUrlElements); + .filter(signedUrlElement -> !signedUrlElement.contains(signatureKey)) + .collect(Collectors.joining(elementDelimiter)); + cleanUrl = + signedUrlParts[0] + + "?" + + urlElementsWithoutSignature + + elementDelimiter + + signatureKey + + "=REDACTED"; } return cleanUrl; } diff --git a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java index ee62f591..3c8b9441 100644 --- a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java +++ b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java @@ -163,7 +163,7 @@ void cleanSignedUrl() throws MalformedURLException { new URL( "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-Signature=12345&X-Goog-SignedHeaders=content-type%3Bhost&Last-Element=foobar"); String expectedCleanedURLSignatureMiddle = - "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-Signature=REDACTED&X-Goog-SignedHeaders=content-type%3Bhost&Last-Element=foobar"; + "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-SignedHeaders=content-type%3Bhost&Last-Element=foobar&X-Goog-Signature=REDACTED"; assertEquals( expectedCleanedURLSignatureMiddle, GcsService.cleanSignedUrl(fakeURLSignatureMiddle)); } From ec522ea72ded6723cc0c92f04bdcbb294c10c0e6 Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Mon, 9 Sep 2024 13:49:09 -0400 Subject: [PATCH 07/12] fix tests, add externalServiceRetryRule --- .../imputation/RunImputationGcpJobFlight.java | 21 ++++++++++++---- .../dependencies/gcs/GcsServiceTest.java | 24 ++++++++++++------- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java b/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java index cf1b7f4a..e4eac500 100644 --- a/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java +++ b/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java @@ -14,6 +14,7 @@ import bio.terra.stairway.Flight; import bio.terra.stairway.FlightMap; import bio.terra.stairway.RetryRule; +import bio.terra.stairway.RetryRuleExponentialBackoff; import bio.terra.stairway.RetryRuleFixedInterval; import bio.terra.stairway.Step; @@ -23,6 +24,16 @@ public class RunImputationGcpJobFlight extends Flight { private final RetryRule dbRetryRule = new RetryRuleFixedInterval(/*intervalSeconds= */ 1, /* maxCount= */ 5); + /** + * Use for a short exponential backoff retry, for operations that should be completable within a + * few seconds. + */ + private final RetryRule externalServiceRetryRule = + // maxOperationTimeSeconds must be larger than socket timeout (20s), otherwise a socket + // timeout + // won't be retried. + new RetryRuleExponentialBackoff(1, 8, /* maxOperationTimeSeconds */ 30); + // addStep is protected in Flight, so make an override that is public @Override public void addStep(Step step, RetryRule retryRule) { @@ -58,25 +69,27 @@ public RunImputationGcpJobFlight(FlightMap inputParameters, Object beanBag) { dbRetryRule); addStep( - new AddDataTableRowStep(flightBeanBag.getRawlsService(), flightBeanBag.getSamService())); + new AddDataTableRowStep(flightBeanBag.getRawlsService(), flightBeanBag.getSamService()), + externalServiceRetryRule); addStep( new SubmitCromwellSubmissionStep( flightBeanBag.getRawlsService(), flightBeanBag.getSamService(), flightBeanBag.getImputationConfiguration()), - dbRetryRule); + externalServiceRetryRule); addStep( new PollCromwellSubmissionStatusStep( flightBeanBag.getSamService(), flightBeanBag.getRawlsService(), flightBeanBag.getImputationConfiguration()), - dbRetryRule); + externalServiceRetryRule); addStep( new FetchOutputsFromDataTableStep( - flightBeanBag.getRawlsService(), flightBeanBag.getSamService())); + flightBeanBag.getRawlsService(), flightBeanBag.getSamService()), + externalServiceRetryRule); addStep(new CompletePipelineRunStep(flightBeanBag.getPipelineRunsService()), dbRetryRule); } diff --git a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java index 3c8b9441..3313bfff 100644 --- a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java +++ b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java @@ -49,9 +49,17 @@ void setup() { when(gcsClient.getStorageService(any())).thenReturn(mockStorageService); } + private URL getFakeURL() { + try { + return new URL("https://storage.googleapis.com/signed-url-stuff?X-Goog-Signature=12345"); + } catch (MalformedURLException e) { + return null; + } + } + @Test - void generatePutObjectSignedUrl() throws MalformedURLException { - URL fakeURL = new URL("https://storage.googleapis.com/signed-url-stuff"); + void generatePutObjectSignedUrl() { + URL fakeURL = getFakeURL(); when(mockStorageService.signUrl( any(BlobInfo.class), anyLong(), @@ -67,8 +75,8 @@ void generatePutObjectSignedUrl() throws MalformedURLException { } @Test - void generateGetObjectSignedUrl() throws MalformedURLException { - URL fakeURL = new URL("https://storage.googleapis.com/signed-url-stuff"); + void generateGetObjectSignedUrl() { + URL fakeURL = getFakeURL(); when(mockStorageService.signUrl( any(BlobInfo.class), anyLong(), @@ -83,8 +91,8 @@ void generateGetObjectSignedUrl() throws MalformedURLException { } @Test - void socketExceptionRetriesEventuallySucceed() throws Exception { - URL fakeURL = new URL("https://storage.googleapis.com/signed-url-stuff"); + void socketExceptionRetriesEventuallySucceed() { + URL fakeURL = getFakeURL(); when(mockStorageService.signUrl( any(BlobInfo.class), @@ -102,8 +110,8 @@ void socketExceptionRetriesEventuallySucceed() throws Exception { } @Test - void socketExceptionRetriesEventuallyFail() throws Exception { - URL fakeURL = new URL("https://storage.googleapis.com/signed-url-stuff"); + void socketExceptionRetriesEventuallyFail() { + URL fakeURL = getFakeURL(); when(mockStorageService.signUrl( any(BlobInfo.class), From 38c24bdde60e39c8ad1fed8430b5a521580d9f58 Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Mon, 9 Sep 2024 13:57:50 -0400 Subject: [PATCH 08/12] appease sqube --- .../bio/terra/pipelines/dependencies/gcs/GcsService.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java index 7124470f..4966bbaf 100644 --- a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java +++ b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java @@ -72,7 +72,8 @@ public URL generatePutObjectSignedUrl(String projectId, String bucketName, Strin Storage.SignUrlOption.withExtHeaders(extensionHeaders), Storage.SignUrlOption.withV4Signature())); - logger.info("Generated PUT signed URL: {}", cleanSignedUrl(url)); + String cleanSignedUrlString = cleanSignedUrl(url); + logger.info("Generated PUT signed URL: {}", cleanSignedUrlString); return url; } @@ -109,7 +110,8 @@ public URL generateGetObjectSignedUrl(String projectId, String bucketName, Strin Storage.SignUrlOption.httpMethod(HttpMethod.GET), Storage.SignUrlOption.withV4Signature())); - logger.info("Generated GET signed URL: {}", cleanSignedUrl(url)); + String cleanSignedUrlString = cleanSignedUrl(url); + logger.info("Generated GET signed URL: {}", cleanSignedUrlString); return url; } From d4265c28fe009b6732bbba2dfcf81f87f6935aa1 Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Tue, 10 Sep 2024 09:45:11 -0400 Subject: [PATCH 09/12] wip, test failing --- .../service/PipelineRunsService.java | 27 +++++++++++ .../imputation/RunImputationGcpJobFlight.java | 4 +- .../gcp/FetchOutputsFromDataTableStep.java | 19 +++++--- .../service/PipelineRunsServiceTest.java | 46 +++++++++++++++++++ .../FetchOutputsFromDataTableStepTest.java | 46 +++++++++++++++---- 5 files changed, 124 insertions(+), 18 deletions(-) diff --git a/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java b/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java index dbc20647..ab42aa90 100644 --- a/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java +++ b/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java @@ -20,6 +20,7 @@ import bio.terra.pipelines.db.entities.PipelineInput; import bio.terra.pipelines.db.entities.PipelineInputDefinition; import bio.terra.pipelines.db.entities.PipelineOutput; +import bio.terra.pipelines.db.entities.PipelineOutputDefinition; import bio.terra.pipelines.db.entities.PipelineRun; import bio.terra.pipelines.db.exception.DuplicateObjectException; import bio.terra.pipelines.db.repositories.PipelineInputsRepository; @@ -32,6 +33,7 @@ import bio.terra.pipelines.generated.model.ApiPipelineRunOutputs; import bio.terra.pipelines.stairway.imputation.RunImputationGcpJobFlight; import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys; +import bio.terra.rawls.model.Entity; import bio.terra.stairway.Flight; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -397,6 +399,31 @@ public PipelineRun markPipelineRunSuccessAndWriteOutputs( // methods to interact with and format pipeline run outputs + /** + * Extract pipeline outputs from a Rawls entity object, converting wdlVariableName (typically + * snake_case) to outputName (camelCase). Throw an error if any outputs are missing from the + * entity or empty. + * + * @param pipelineOutputDefinitions + * @param entity + * @return a map of pipeline outputs + */ + public Map extractPipelineOutputsFromEntity( + List pipelineOutputDefinitions, Entity entity) { + Map outputs = new HashMap<>(); + for (PipelineOutputDefinition outputDefinition : pipelineOutputDefinitions) { + String keyName = outputDefinition.getName(); + String wdlVariableName = outputDefinition.getWdlVariableName(); + String outputValue = (String) entity.getAttributes().get(wdlVariableName); + if (outputValue == null || outputValue.isEmpty()) { + throw new InternalServerErrorException( + "Output %s is empty or missing".formatted(wdlVariableName)); + } + outputs.put(keyName, outputValue); + } + return outputs; + } + /** * Extract the pipeline outputs from a pipelineRun object, create signed GET (read-only) urls for * each file, and return an ApiPipelineRunOutputs object with the outputs. diff --git a/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java b/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java index e4eac500..d151e5ff 100644 --- a/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java +++ b/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java @@ -88,7 +88,9 @@ public RunImputationGcpJobFlight(FlightMap inputParameters, Object beanBag) { addStep( new FetchOutputsFromDataTableStep( - flightBeanBag.getRawlsService(), flightBeanBag.getSamService()), + flightBeanBag.getRawlsService(), + flightBeanBag.getSamService(), + flightBeanBag.getPipelineRunsService()), externalServiceRetryRule); addStep(new CompletePipelineRunStep(flightBeanBag.getPipelineRunsService()), dbRetryRule); diff --git a/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java b/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java index 15c3bb38..9279dc27 100644 --- a/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java +++ b/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java @@ -1,5 +1,6 @@ package bio.terra.pipelines.stairway.imputation.steps.gcp; +import bio.terra.common.exception.InternalServerErrorException; import bio.terra.pipelines.common.utils.FlightUtils; import bio.terra.pipelines.common.utils.PipelinesEnum; import bio.terra.pipelines.db.entities.PipelineOutputDefinition; @@ -7,6 +8,7 @@ import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException; import bio.terra.pipelines.dependencies.sam.SamService; import bio.terra.pipelines.dependencies.stairway.JobMapKeys; +import bio.terra.pipelines.service.PipelineRunsService; import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys; import bio.terra.rawls.model.Entity; import bio.terra.stairway.FlightContext; @@ -15,7 +17,6 @@ import bio.terra.stairway.StepResult; import bio.terra.stairway.StepStatus; import com.fasterxml.jackson.core.type.TypeReference; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,10 +29,13 @@ public class FetchOutputsFromDataTableStep implements Step { private final RawlsService rawlsService; private final SamService samService; + private final PipelineRunsService pipelineRunsService; - public FetchOutputsFromDataTableStep(RawlsService rawlsService, SamService samService) { + public FetchOutputsFromDataTableStep( + RawlsService rawlsService, SamService samService, PipelineRunsService pipelineRunsService) { this.rawlsService = rawlsService; this.samService = samService; + this.pipelineRunsService = pipelineRunsService; } @Override @@ -74,11 +78,12 @@ public StepResult doStep(FlightContext flightContext) { return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e); } - Map outputs = new HashMap<>(); - for (PipelineOutputDefinition outputDefinition : outputDefinitions) { - String keyName = outputDefinition.getName(); - String wdlVariableName = outputDefinition.getWdlVariableName(); - outputs.put(keyName, entity.getAttributes().get(wdlVariableName).toString()); + // this will throw an error if any of the output definitions are not found or empty + Map outputs; + try { + outputs = pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity); + } catch (InternalServerErrorException e) { + return new StepResult(StepStatus.STEP_RESULT_FAILURE_FATAL, e); } FlightMap workingMap = flightContext.getWorkingMap(); diff --git a/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java b/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java index 540d2b90..149deba6 100644 --- a/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java +++ b/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java @@ -16,6 +16,7 @@ import bio.terra.pipelines.db.entities.Pipeline; import bio.terra.pipelines.db.entities.PipelineInput; import bio.terra.pipelines.db.entities.PipelineOutput; +import bio.terra.pipelines.db.entities.PipelineOutputDefinition; import bio.terra.pipelines.db.entities.PipelineRun; import bio.terra.pipelines.db.exception.DuplicateObjectException; import bio.terra.pipelines.db.repositories.PipelineInputsRepository; @@ -29,6 +30,7 @@ import bio.terra.pipelines.stairway.imputation.RunImputationAzureJobFlight; import bio.terra.pipelines.testutils.BaseEmbeddedDbTest; import bio.terra.pipelines.testutils.TestUtils; +import bio.terra.rawls.model.Entity; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; @@ -533,6 +535,50 @@ void createImputationRunStairwayError() { Optional.empty(), pipelineRunsRepository.findByJobIdAndUserId(testJobId, testUserId)); } + @Test + void extractPipelineOutputsFromEntity() { + // test that the method correctly extracts the outputs from the entity + List outputDefinitions = + TestUtils.TEST_PIPELINE_OUTPUTS_DEFINITION_LIST; + Entity entity = new Entity(); + entity.setAttributes( + Map.of("output_name", "gs://bucket/file1", "testNonOutputKey", "doesn't matter")); + + Map extractedOutputs = + pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity); + + assertEquals(1, extractedOutputs.size()); + // the meethod should also have converted the wdlVariableName key to the camelCase outputName + // key + assertEquals("gs://bucket/file1", extractedOutputs.get("outputName")); + } + + @Test + void extractPipelineOutputsFromEntityMissingOutput() { + // test that the method correctly throws an error if an output is missing + List outputDefinitions = + TestUtils.TEST_PIPELINE_OUTPUTS_DEFINITION_LIST; + Entity entity = new Entity(); + entity.setAttributes(Map.of("testNonOutputKey", "doesn't matter")); + + assertThrows( + InternalServerErrorException.class, + () -> pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity)); + } + + @Test + void extractPipelineOutputsFromEntityEmptyOutput() { + // test that the method correctly throws an error if an output is empty + List outputDefinitions = + TestUtils.TEST_PIPELINE_OUTPUTS_DEFINITION_LIST; + Entity entity = new Entity(); + entity.setAttributes(Map.of("outputName", "")); + + assertThrows( + InternalServerErrorException.class, + () -> pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity)); + } + @Test void formatPipelineRunOutputs() throws MalformedURLException { PipelineRun pipelineRun = createNewRunWithJobId(testJobId); diff --git a/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java b/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java index 76e33574..d40451e3 100644 --- a/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java +++ b/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java @@ -4,10 +4,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; +import bio.terra.common.exception.InternalServerErrorException; import bio.terra.pipelines.dependencies.rawls.RawlsService; import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException; import bio.terra.pipelines.dependencies.rawls.RawlsServiceException; import bio.terra.pipelines.dependencies.sam.SamService; +import bio.terra.pipelines.service.PipelineRunsService; import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys; import bio.terra.pipelines.testutils.BaseEmbeddedDbTest; import bio.terra.pipelines.testutils.StairwayTestUtils; @@ -26,6 +28,7 @@ class FetchOutputsFromDataTableStepTest extends BaseEmbeddedDbTest { @Mock RawlsService rawlsService; @Mock SamService samService; + @Mock PipelineRunsService pipelineRunsService; @Mock private FlightContext flightContext; @BeforeEach @@ -47,25 +50,29 @@ void doStepSuccess() throws RawlsServiceException { Entity entity = new Entity().attributes(entityAttributes); when(rawlsService.getDataTableEntity(any(), any(), any(), any(), any())).thenReturn(entity); + Map outputsRetrievedFromEntity = Map.of("output_name", "some/file.vcf.gz"); + when(pipelineRunsService.extractPipelineOutputsFromEntity(any(), any())) + .thenReturn(outputsRetrievedFromEntity); FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep = - new FetchOutputsFromDataTableStep(rawlsService, samService); + new FetchOutputsFromDataTableStep(rawlsService, samService, pipelineRunsService); StepResult result = fetchOutputsFromDataTableStep.doStep(flightContext); assertEquals(StepStatus.STEP_RESULT_SUCCESS, result.getStepStatus()); - // in the step we translate the snake_case output keys to camelCase - Map expectedOutputsFromWorkingMap = Map.of("outputName", "some/file.vcf.gz"); - - assertEquals( - expectedOutputsFromWorkingMap, + Map outputsMap = flightContext .getWorkingMap() - .get(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, Map.class)); + .get(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, Map.class); + + assertEquals(outputsRetrievedFromEntity, outputsMap); + // flightContext + // .getWorkingMap() + // .get(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, Map.class)); } @Test - void doStepFailureRetry() throws RawlsServiceException { + void doStepRawlsFailureRetry() throws RawlsServiceException { // setup StairwayTestUtils.constructCreateJobInputs(flightContext.getInputParameters()); @@ -73,16 +80,35 @@ void doStepFailureRetry() throws RawlsServiceException { .thenThrow(new RawlsServiceApiException("Rawls Service Api Exception")); FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep = - new FetchOutputsFromDataTableStep(rawlsService, samService); + new FetchOutputsFromDataTableStep(rawlsService, samService, pipelineRunsService); StepResult result = fetchOutputsFromDataTableStep.doStep(flightContext); assertEquals(StepStatus.STEP_RESULT_FAILURE_RETRY, result.getStepStatus()); } + @Test + void doStepOutputsFailureNoRetry() throws InternalServerErrorException { + // setup + StairwayTestUtils.constructCreateJobInputs(flightContext.getInputParameters()); + + Map entityAttributes = new HashMap<>(Map.of("output_name", "some/file.vcf.gz")); + Entity entity = new Entity().attributes(entityAttributes); + + when(rawlsService.getDataTableEntity(any(), any(), any(), any(), any())).thenReturn(entity); + when(pipelineRunsService.extractPipelineOutputsFromEntity(any(), any())) + .thenThrow(new InternalServerErrorException("Internal Server Error")); + + FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep = + new FetchOutputsFromDataTableStep(rawlsService, samService, pipelineRunsService); + StepResult result = fetchOutputsFromDataTableStep.doStep(flightContext); + + assertEquals(StepStatus.STEP_RESULT_FAILURE_FATAL, result.getStepStatus()); + } + @Test void undoStepSuccess() { FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep = - new FetchOutputsFromDataTableStep(rawlsService, samService); + new FetchOutputsFromDataTableStep(rawlsService, samService, pipelineRunsService); StepResult result = fetchOutputsFromDataTableStep.undoStep(flightContext); assertEquals(StepStatus.STEP_RESULT_SUCCESS, result.getStepStatus()); From 1ef207bce51fd4e86f4797e1c940195f261e2739 Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Tue, 10 Sep 2024 10:56:08 -0400 Subject: [PATCH 10/12] it works now --- .../gcp/FetchOutputsFromDataTableStep.java | 12 +++------ .../FetchOutputsFromDataTableStepTest.java | 26 +++++++++---------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java b/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java index 9279dc27..0d2d963d 100644 --- a/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java +++ b/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java @@ -1,6 +1,5 @@ package bio.terra.pipelines.stairway.imputation.steps.gcp; -import bio.terra.common.exception.InternalServerErrorException; import bio.terra.pipelines.common.utils.FlightUtils; import bio.terra.pipelines.common.utils.PipelinesEnum; import bio.terra.pipelines.db.entities.PipelineOutputDefinition; @@ -78,13 +77,10 @@ public StepResult doStep(FlightContext flightContext) { return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e); } - // this will throw an error if any of the output definitions are not found or empty - Map outputs; - try { - outputs = pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity); - } catch (InternalServerErrorException e) { - return new StepResult(StepStatus.STEP_RESULT_FAILURE_FATAL, e); - } + // this will throw an error and fail without retries if any of the output definitions are not + // found or empty + Map outputs = + pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity); FlightMap workingMap = flightContext.getWorkingMap(); workingMap.put(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, outputs); diff --git a/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java b/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java index d40451e3..349ccd35 100644 --- a/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java +++ b/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java @@ -1,6 +1,7 @@ package bio.terra.pipelines.stairway.imputation.steps.gcp; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; @@ -13,6 +14,7 @@ import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys; import bio.terra.pipelines.testutils.BaseEmbeddedDbTest; import bio.terra.pipelines.testutils.StairwayTestUtils; +import bio.terra.pipelines.testutils.TestUtils; import bio.terra.rawls.model.Entity; import bio.terra.stairway.FlightContext; import bio.terra.stairway.FlightMap; @@ -50,9 +52,11 @@ void doStepSuccess() throws RawlsServiceException { Entity entity = new Entity().attributes(entityAttributes); when(rawlsService.getDataTableEntity(any(), any(), any(), any(), any())).thenReturn(entity); - Map outputsRetrievedFromEntity = Map.of("output_name", "some/file.vcf.gz"); - when(pipelineRunsService.extractPipelineOutputsFromEntity(any(), any())) - .thenReturn(outputsRetrievedFromEntity); + Map outputsProcessedFromEntity = + new HashMap<>(Map.of("outputName", "some/file.vcf.gz")); + when(pipelineRunsService.extractPipelineOutputsFromEntity( + TestUtils.TEST_PIPELINE_OUTPUTS_DEFINITION_LIST, entity)) + .thenReturn(outputsProcessedFromEntity); FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep = new FetchOutputsFromDataTableStep(rawlsService, samService, pipelineRunsService); @@ -60,15 +64,11 @@ void doStepSuccess() throws RawlsServiceException { assertEquals(StepStatus.STEP_RESULT_SUCCESS, result.getStepStatus()); - Map outputsMap = + assertEquals( + outputsProcessedFromEntity, flightContext .getWorkingMap() - .get(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, Map.class); - - assertEquals(outputsRetrievedFromEntity, outputsMap); - // flightContext - // .getWorkingMap() - // .get(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, Map.class)); + .get(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, Map.class)); } @Test @@ -100,9 +100,9 @@ void doStepOutputsFailureNoRetry() throws InternalServerErrorException { FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep = new FetchOutputsFromDataTableStep(rawlsService, samService, pipelineRunsService); - StepResult result = fetchOutputsFromDataTableStep.doStep(flightContext); - - assertEquals(StepStatus.STEP_RESULT_FAILURE_FATAL, result.getStepStatus()); + assertThrows( + InternalServerErrorException.class, + () -> fetchOutputsFromDataTableStep.doStep(flightContext)); } @Test From 67a51507c357540629078089b3879b651108ac80 Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Tue, 10 Sep 2024 13:34:59 -0400 Subject: [PATCH 11/12] ok it really works this time --- .../imputation/steps/gcp/FetchOutputsFromDataTableStep.java | 5 +++-- .../steps/gcp/FetchOutputsFromDataTableStepTest.java | 5 ++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java b/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java index 0d2d963d..efbfcdda 100644 --- a/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java +++ b/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java @@ -77,8 +77,9 @@ public StepResult doStep(FlightContext flightContext) { return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e); } - // this will throw an error and fail without retries if any of the output definitions are not - // found or empty + // this will throw an error and fail the task without retries if any of the output definitions + // are + // missing or empty Map outputs = pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity); diff --git a/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java b/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java index 349ccd35..c5650371 100644 --- a/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java +++ b/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; import bio.terra.common.exception.InternalServerErrorException; @@ -14,7 +15,6 @@ import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys; import bio.terra.pipelines.testutils.BaseEmbeddedDbTest; import bio.terra.pipelines.testutils.StairwayTestUtils; -import bio.terra.pipelines.testutils.TestUtils; import bio.terra.rawls.model.Entity; import bio.terra.stairway.FlightContext; import bio.terra.stairway.FlightMap; @@ -54,8 +54,7 @@ void doStepSuccess() throws RawlsServiceException { when(rawlsService.getDataTableEntity(any(), any(), any(), any(), any())).thenReturn(entity); Map outputsProcessedFromEntity = new HashMap<>(Map.of("outputName", "some/file.vcf.gz")); - when(pipelineRunsService.extractPipelineOutputsFromEntity( - TestUtils.TEST_PIPELINE_OUTPUTS_DEFINITION_LIST, entity)) + when(pipelineRunsService.extractPipelineOutputsFromEntity(any(), eq(entity))) .thenReturn(outputsProcessedFromEntity); FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep = From 5fe062419725cdc0193c2510e4c9cff7e58c6ebc Mon Sep 17 00:00:00 2001 From: Morgan Taylor Date: Tue, 10 Sep 2024 13:59:46 -0400 Subject: [PATCH 12/12] remove unnecessary condition --- .../bio/terra/pipelines/service/PipelineRunsService.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java b/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java index ab42aa90..3bd9ee57 100644 --- a/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java +++ b/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java @@ -414,8 +414,13 @@ public Map extractPipelineOutputsFromEntity( for (PipelineOutputDefinition outputDefinition : pipelineOutputDefinitions) { String keyName = outputDefinition.getName(); String wdlVariableName = outputDefinition.getWdlVariableName(); - String outputValue = (String) entity.getAttributes().get(wdlVariableName); - if (outputValue == null || outputValue.isEmpty()) { + String outputValue = + (String) + entity + .getAttributes() + .get(wdlVariableName); // .get() returns null if the key is missing, or if the + // value is empty + if (outputValue == null) { throw new InternalServerErrorException( "Output %s is empty or missing".formatted(wdlVariableName)); }