diff --git a/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java b/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java
index 7a754745..c2e2b188 100644
--- a/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java
+++ b/service/src/main/java/bio/terra/pipelines/common/utils/FileUtils.java
@@ -12,23 +12,50 @@ private FileUtils() {
private static final String USER_PROVIDED_FILE_INPUT_DIRECTORY = "user-input-files";
/**
- * Extract the blob name from the full file path, using the workspaceId as a delimiter.
+ * Extract the blob name from the full Azure file path, using the workspaceId as a delimiter.
*
- *
For example, `https://lz123.blob.core.windows.net/sc-{workspaceId}/path/to/file` becomes
- * `path/to/file`
+ *
For example, with workspaceId as the workspaceSubstringStart,
+ * `https://lz123.blob.core.windows.net/sc-{workspaceDelimiter}/path/to/file` becomes
+ * `path/to/file`.
*
- * @param blobHttpUrl
+ * @param blobUrl
* @param workspaceId
+ */
+ public static String getBlobNameFromTerraWorkspaceStorageUrlAzure(
+ String blobUrl, String workspaceId) {
+ return getBlobNameFromTerraWorkspaceStorageUrl(blobUrl, workspaceId);
+ }
+
+ /**
+ * Extract the blob name from the full GCP file path, using the workspaceStorageContainerName as a
+ * delimiter.
+ *
+ *
For example, with workspaceStorageContainerName as the workspaceSubstringStart,
+ * `gs://{workspaceDelimiter}/path/to/file` becomes `path/to/file`.
+ *
+ * @param blobUrl
+ * @param workspaceStorageContainerName
+ */
+ public static String getBlobNameFromTerraWorkspaceStorageUrlGcp(
+ String blobUrl, String workspaceStorageContainerName) {
+ return getBlobNameFromTerraWorkspaceStorageUrl(blobUrl, workspaceStorageContainerName);
+ }
+
+ /**
+ * Extract the blob name from the full file path, using a defined workspaceSubstringStart.
+ *
+ * @param blobUrl
+ * @param workspaceSubstringStart
* @return blobName
*/
- public static String getBlobNameFromTerraWorkspaceStorageHttpUrl(
- String blobHttpUrl, UUID workspaceId) {
- if (!blobHttpUrl.contains(workspaceId.toString())) {
+ private static String getBlobNameFromTerraWorkspaceStorageUrl(
+ String blobUrl, String workspaceSubstringStart) {
+ if (!blobUrl.contains(workspaceSubstringStart)) {
throw new InternalServerErrorException(
- "File path and workspaceId do not match. Cannot extract blob name.");
+ "File path and workspaceSubstringStart do not match. Cannot extract blob name.");
}
- return blobHttpUrl.substring(
- blobHttpUrl.indexOf(workspaceId.toString()) + workspaceId.toString().length() + 1);
+ return blobUrl.substring(
+ blobUrl.indexOf(workspaceSubstringStart) + workspaceSubstringStart.length() + 1);
}
/**
diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java
index bb2eeda7..4966bbaf 100644
--- a/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java
+++ b/service/src/main/java/bio/terra/pipelines/dependencies/gcs/GcsService.java
@@ -8,8 +8,10 @@
import com.google.cloud.storage.StorageException;
import java.net.URL;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.support.RetryTemplate;
@@ -70,11 +72,82 @@ public URL generatePutObjectSignedUrl(String projectId, String bucketName, Strin
Storage.SignUrlOption.withExtHeaders(extensionHeaders),
Storage.SignUrlOption.withV4Signature()));
- logger.info("Generated PUT signed URL: {}", url);
+ String cleanSignedUrlString = cleanSignedUrl(url);
+ logger.info("Generated PUT signed URL: {}", cleanSignedUrlString);
return url;
}
+ /**
+ * Generates and returns a GET (read-only) signed url for a specific object in a bucket. See
+ * documentation on signed urls here.
+ *
+ *
The output URL can be used with a curl command to download an object: `curl '{url}' >
+ * {local_file_name}`
+ *
+ * @param projectId Google project id
+ * @param bucketName without a prefix
+ * @param objectName should include the full path of the object (subdirectories + file name)
+ * @return url that can be used to download an object to GCS
+ */
+ public URL generateGetObjectSignedUrl(String projectId, String bucketName, String objectName)
+ throws StorageException {
+ // define target blob object resource
+ BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build();
+
+ // generate signed URL
+ URL url =
+ executionWithRetryTemplate(
+ listenerResetRetryTemplate,
+ () ->
+ gcsClient
+ .getStorageService(projectId)
+ .signUrl(
+ blobInfo,
+ gcsConfiguration.signedUrlGetDurationHours(),
+ TimeUnit.HOURS,
+ Storage.SignUrlOption.httpMethod(HttpMethod.GET),
+ Storage.SignUrlOption.withV4Signature()));
+
+ String cleanSignedUrlString = cleanSignedUrl(url);
+ logger.info("Generated GET signed URL: {}", cleanSignedUrlString);
+
+ return url;
+ }
+
+ /**
+ * Redact the X-Google-Signature element's value from the signed url and return the cleaned result
+ * as a string.
+ *
+ * @param signedUrl
+ * @return
+ */
+ public static String cleanSignedUrl(URL signedUrl) {
+ String signedUrlString = signedUrl.toString();
+ String[] signedUrlParts = signedUrlString.split("\\?");
+ String elementDelimiter = "&";
+ List signedUrlElements = List.of(signedUrlParts[1].split(elementDelimiter));
+
+ String signatureKey = "X-Goog-Signature";
+
+ String cleanUrl = signedUrlString;
+ if (signedUrlString.contains(signatureKey)) {
+ String urlElementsWithoutSignature =
+ signedUrlElements.stream()
+ .filter(signedUrlElement -> !signedUrlElement.contains(signatureKey))
+ .collect(Collectors.joining(elementDelimiter));
+ cleanUrl =
+ signedUrlParts[0]
+ + "?"
+ + urlElementsWithoutSignature
+ + elementDelimiter
+ + signatureKey
+ + "=REDACTED";
+ }
+ return cleanUrl;
+ }
+
interface GcsAction {
T execute();
}
diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/rawls/RawlsService.java b/service/src/main/java/bio/terra/pipelines/dependencies/rawls/RawlsService.java
index 4c81c77e..555af61f 100644
--- a/service/src/main/java/bio/terra/pipelines/dependencies/rawls/RawlsService.java
+++ b/service/src/main/java/bio/terra/pipelines/dependencies/rawls/RawlsService.java
@@ -94,6 +94,20 @@ public Entity upsertDataTableEntity(
.createEntity(entity, workspaceNamespace, workspaceName));
}
+ public Entity getDataTableEntity(
+ String accessToken,
+ String workspaceNamespace,
+ String workspaceName,
+ String entityType,
+ String entityName) {
+ return executionWithRetryTemplate(
+ listenerResetRetryTemplate,
+ () ->
+ rawlsClient
+ .getEntitiesApi(accessToken)
+ .getEntity(workspaceNamespace, workspaceName, entityType, entityName, null, null));
+ }
+
// returns true if submission is in a running state
public static boolean submissionIsRunning(Submission submission) {
return !FINAL_RUN_STATES.contains(submission.getStatus());
diff --git a/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java b/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java
index f17e2a17..3bd9ee57 100644
--- a/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java
+++ b/service/src/main/java/bio/terra/pipelines/service/PipelineRunsService.java
@@ -1,6 +1,7 @@
package bio.terra.pipelines.service;
import static bio.terra.pipelines.common.utils.FileUtils.constructDestinationBlobNameForUserInputFile;
+import static bio.terra.pipelines.common.utils.FileUtils.getBlobNameFromTerraWorkspaceStorageUrlGcp;
import static java.util.Collections.emptyList;
import static org.springframework.data.domain.PageRequest.ofSize;
@@ -19,6 +20,7 @@
import bio.terra.pipelines.db.entities.PipelineInput;
import bio.terra.pipelines.db.entities.PipelineInputDefinition;
import bio.terra.pipelines.db.entities.PipelineOutput;
+import bio.terra.pipelines.db.entities.PipelineOutputDefinition;
import bio.terra.pipelines.db.entities.PipelineRun;
import bio.terra.pipelines.db.exception.DuplicateObjectException;
import bio.terra.pipelines.db.repositories.PipelineInputsRepository;
@@ -31,6 +33,7 @@
import bio.terra.pipelines.generated.model.ApiPipelineRunOutputs;
import bio.terra.pipelines.stairway.imputation.RunImputationGcpJobFlight;
import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys;
+import bio.terra.rawls.model.Entity;
import bio.terra.stairway.Flight;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -397,8 +400,38 @@ public PipelineRun markPipelineRunSuccessAndWriteOutputs(
// methods to interact with and format pipeline run outputs
/**
- * Extract the pipeline outputs from a pipelineRun object and return an ApiPipelineRunOutputs
- * object with the outputs.
+ * Extract pipeline outputs from a Rawls entity object, converting wdlVariableName (typically
+ * snake_case) to outputName (camelCase). Throw an error if any outputs are missing from the
+ * entity or empty.
+ *
+ * @param pipelineOutputDefinitions
+ * @param entity
+ * @return a map of pipeline outputs
+ */
+ public Map extractPipelineOutputsFromEntity(
+ List pipelineOutputDefinitions, Entity entity) {
+ Map outputs = new HashMap<>();
+ for (PipelineOutputDefinition outputDefinition : pipelineOutputDefinitions) {
+ String keyName = outputDefinition.getName();
+ String wdlVariableName = outputDefinition.getWdlVariableName();
+ String outputValue =
+ (String)
+ entity
+ .getAttributes()
+ .get(wdlVariableName); // .get() returns null if the key is missing, or if the
+ // value is empty
+ if (outputValue == null) {
+ throw new InternalServerErrorException(
+ "Output %s is empty or missing".formatted(wdlVariableName));
+ }
+ outputs.put(keyName, outputValue);
+ }
+ return outputs;
+ }
+
+ /**
+ * Extract the pipeline outputs from a pipelineRun object, create signed GET (read-only) urls for
+ * each file, and return an ApiPipelineRunOutputs object with the outputs.
*
* @param pipelineRun object from the pipelineRunsRepository
* @return ApiPipelineRunOutputs
@@ -408,6 +441,17 @@ public ApiPipelineRunOutputs formatPipelineRunOutputs(PipelineRun pipelineRun) {
pipelineRunOutputsAsMap(
pipelineOutputsRepository.findPipelineOutputsByJobId(pipelineRun.getId()).getOutputs());
+ // currently all outputs are paths that will need a signed url
+ String workspaceStorageContainerName = pipelineRun.getWorkspaceStorageContainerName();
+ outputsMap.replaceAll(
+ (k, v) ->
+ gcsService
+ .generateGetObjectSignedUrl(
+ pipelineRun.getWorkspaceGoogleProject(),
+ workspaceStorageContainerName,
+ getBlobNameFromTerraWorkspaceStorageUrlGcp(v, workspaceStorageContainerName))
+ .toString());
+
ApiPipelineRunOutputs apiPipelineRunOutputs = new ApiPipelineRunOutputs();
apiPipelineRunOutputs.putAll(outputsMap);
return apiPipelineRunOutputs;
diff --git a/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java b/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java
index fcc38080..d151e5ff 100644
--- a/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java
+++ b/service/src/main/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpJobFlight.java
@@ -5,13 +5,16 @@
import bio.terra.pipelines.common.utils.FlightUtils;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
+import bio.terra.pipelines.stairway.imputation.steps.CompletePipelineRunStep;
import bio.terra.pipelines.stairway.imputation.steps.PrepareImputationInputsStep;
import bio.terra.pipelines.stairway.imputation.steps.gcp.AddDataTableRowStep;
+import bio.terra.pipelines.stairway.imputation.steps.gcp.FetchOutputsFromDataTableStep;
import bio.terra.pipelines.stairway.imputation.steps.gcp.PollCromwellSubmissionStatusStep;
import bio.terra.pipelines.stairway.imputation.steps.gcp.SubmitCromwellSubmissionStep;
import bio.terra.stairway.Flight;
import bio.terra.stairway.FlightMap;
import bio.terra.stairway.RetryRule;
+import bio.terra.stairway.RetryRuleExponentialBackoff;
import bio.terra.stairway.RetryRuleFixedInterval;
import bio.terra.stairway.Step;
@@ -21,6 +24,16 @@ public class RunImputationGcpJobFlight extends Flight {
private final RetryRule dbRetryRule =
new RetryRuleFixedInterval(/*intervalSeconds= */ 1, /* maxCount= */ 5);
+ /**
+ * Use for a short exponential backoff retry, for operations that should be completable within a
+ * few seconds.
+ */
+ private final RetryRule externalServiceRetryRule =
+ // maxOperationTimeSeconds must be larger than socket timeout (20s), otherwise a socket
+ // timeout
+ // won't be retried.
+ new RetryRuleExponentialBackoff(1, 8, /* maxOperationTimeSeconds */ 30);
+
// addStep is protected in Flight, so make an override that is public
@Override
public void addStep(Step step, RetryRule retryRule) {
@@ -56,20 +69,30 @@ public RunImputationGcpJobFlight(FlightMap inputParameters, Object beanBag) {
dbRetryRule);
addStep(
- new AddDataTableRowStep(flightBeanBag.getRawlsService(), flightBeanBag.getSamService()));
+ new AddDataTableRowStep(flightBeanBag.getRawlsService(), flightBeanBag.getSamService()),
+ externalServiceRetryRule);
addStep(
new SubmitCromwellSubmissionStep(
flightBeanBag.getRawlsService(),
flightBeanBag.getSamService(),
flightBeanBag.getImputationConfiguration()),
- dbRetryRule);
+ externalServiceRetryRule);
addStep(
new PollCromwellSubmissionStatusStep(
flightBeanBag.getSamService(),
flightBeanBag.getRawlsService(),
flightBeanBag.getImputationConfiguration()),
- dbRetryRule);
+ externalServiceRetryRule);
+
+ addStep(
+ new FetchOutputsFromDataTableStep(
+ flightBeanBag.getRawlsService(),
+ flightBeanBag.getSamService(),
+ flightBeanBag.getPipelineRunsService()),
+ externalServiceRetryRule);
+
+ addStep(new CompletePipelineRunStep(flightBeanBag.getPipelineRunsService()), dbRetryRule);
}
}
diff --git a/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java b/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java
new file mode 100644
index 00000000..efbfcdda
--- /dev/null
+++ b/service/src/main/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStep.java
@@ -0,0 +1,97 @@
+package bio.terra.pipelines.stairway.imputation.steps.gcp;
+
+import bio.terra.pipelines.common.utils.FlightUtils;
+import bio.terra.pipelines.common.utils.PipelinesEnum;
+import bio.terra.pipelines.db.entities.PipelineOutputDefinition;
+import bio.terra.pipelines.dependencies.rawls.RawlsService;
+import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException;
+import bio.terra.pipelines.dependencies.sam.SamService;
+import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
+import bio.terra.pipelines.service.PipelineRunsService;
+import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys;
+import bio.terra.rawls.model.Entity;
+import bio.terra.stairway.FlightContext;
+import bio.terra.stairway.FlightMap;
+import bio.terra.stairway.Step;
+import bio.terra.stairway.StepResult;
+import bio.terra.stairway.StepStatus;
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This step calls Rawls to fetch outputs from a data table row for a given job and stores them in
+ * the flight's working map. These outputs are considered raw in that they are cloud paths and not
+ * signed urls.
+ */
+public class FetchOutputsFromDataTableStep implements Step {
+
+ private final RawlsService rawlsService;
+ private final SamService samService;
+ private final PipelineRunsService pipelineRunsService;
+
+ public FetchOutputsFromDataTableStep(
+ RawlsService rawlsService, SamService samService, PipelineRunsService pipelineRunsService) {
+ this.rawlsService = rawlsService;
+ this.samService = samService;
+ this.pipelineRunsService = pipelineRunsService;
+ }
+
+ @Override
+ @SuppressWarnings(
+ "java:S2259") // suppress warning for possible NPE when calling pipelineName.getValue(),
+ // since we do validate that pipelineName is not null in `validateRequiredEntries`
+ public StepResult doStep(FlightContext flightContext) {
+ String jobId = flightContext.getFlightId();
+
+ // validate and extract parameters from input map
+ var inputParameters = flightContext.getInputParameters();
+ FlightUtils.validateRequiredEntries(
+ inputParameters,
+ JobMapKeys.PIPELINE_NAME.getKeyName(),
+ RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT,
+ RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_NAME,
+ RunImputationJobFlightMapKeys.PIPELINE_OUTPUT_DEFINITIONS);
+
+ String controlWorkspaceBillingProject =
+ inputParameters.get(
+ RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class);
+ String controlWorkspaceName =
+ inputParameters.get(RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_NAME, String.class);
+ PipelinesEnum pipelineName =
+ inputParameters.get(JobMapKeys.PIPELINE_NAME.getKeyName(), PipelinesEnum.class);
+ List outputDefinitions =
+ inputParameters.get(
+ RunImputationJobFlightMapKeys.PIPELINE_OUTPUT_DEFINITIONS, new TypeReference<>() {});
+
+ Entity entity;
+ try {
+ entity =
+ rawlsService.getDataTableEntity(
+ samService.getTeaspoonsServiceAccountToken(),
+ controlWorkspaceBillingProject,
+ controlWorkspaceName,
+ pipelineName.getValue(),
+ jobId);
+ } catch (RawlsServiceApiException e) {
+ return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e);
+ }
+
+ // this will throw an error and fail the task without retries if any of the output definitions
+ // are
+ // missing or empty
+ Map outputs =
+ pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity);
+
+ FlightMap workingMap = flightContext.getWorkingMap();
+ workingMap.put(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, outputs);
+
+ return StepResult.getStepResultSuccess();
+ }
+
+ @Override
+ public StepResult undoStep(FlightContext flightContext) {
+ // nothing to undo
+ return StepResult.getStepResultSuccess();
+ }
+}
diff --git a/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java b/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java
index cf18a1cd..dcf891ac 100644
--- a/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java
+++ b/service/src/test/java/bio/terra/pipelines/common/utils/FileUtilsTest.java
@@ -11,26 +11,57 @@
class FileUtilsTest extends BaseTest {
@Test
- void getBlobNameFromTerraWorkspaceStorageHttpUrl() {
+ void getBlobNameFromTerraWorkspaceStorageUrlAzure() {
String fullPath =
"https://lze96253b07f13c61ef712bb.blob.core.windows.net/sc-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file";
- UUID controlWorkspaceId = UUID.fromString("68a43bd8-e744-4f1e-87a5-c44ecef157a3");
+ String controlWorkspaceIdForDelimiter = "68a43bd8-e744-4f1e-87a5-c44ecef157a3";
String expectedBlobName =
"workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file";
assertEquals(
expectedBlobName,
- FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl(fullPath, controlWorkspaceId));
+ FileUtils.getBlobNameFromTerraWorkspaceStorageUrlAzure(
+ fullPath, controlWorkspaceIdForDelimiter));
}
@Test
- void getBlobNameFromTerraWorkspaceStorageHttpUrlDifferentWorkspace() {
+ void getBlobNameFromTerraWorkspaceStorageUrlDifferentWorkspaceAzure() {
String fullPath =
"https://lze96253b07f13c61ef712bb.blob.core.windows.net/sc-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file";
- UUID wrongWorkspaceId = UUID.fromString("11111111-1111-1111-1111-111111111111");
+ String wrongWorkspaceIdForDelimiter = "11111111-1111-1111-1111-111111111111";
+
+ assertThrows(
+ InternalServerErrorException.class,
+ () ->
+ FileUtils.getBlobNameFromTerraWorkspaceStorageUrlAzure(
+ fullPath, wrongWorkspaceIdForDelimiter));
+ }
+
+ @Test
+ void getBlobNameFromTerraWorkspaceStorageUrlGcp() {
+ String fullPath =
+ "gs://fc-secure-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file";
+ String controlWorkspaceStorageContainerNameForDelimiter =
+ "fc-secure-68a43bd8-e744-4f1e-87a5-c44ecef157a3";
+ String expectedBlobName =
+ "workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file";
+ assertEquals(
+ expectedBlobName,
+ FileUtils.getBlobNameFromTerraWorkspaceStorageUrlGcp(
+ fullPath, controlWorkspaceStorageContainerNameForDelimiter));
+ }
+
+ @Test
+ void getBlobNameFromTerraWorkspaceStorageUrlDifferentWorkspaceGcp() {
+ String fullPath =
+ "gs://fc-secure-68a43bd8-e744-4f1e-87a5-c44ecef157a3/workspace-services/cbas/terra-app-b1740821-d6e9-44b5-b53b-960953dea218/ImputationBeagle/1adb690d-3d02-4d4a-9dfa-17a31edd74f3/call-WriteEmptyFile/cacheCopy/execution/empty_file";
+ String wrongWorkspaceStorageContainerNameForDelimiter =
+ "fc-secure-11111111-1111-1111-1111-111111111111";
assertThrows(
InternalServerErrorException.class,
- () -> FileUtils.getBlobNameFromTerraWorkspaceStorageHttpUrl(fullPath, wrongWorkspaceId));
+ () ->
+ FileUtils.getBlobNameFromTerraWorkspaceStorageUrlGcp(
+ fullPath, wrongWorkspaceStorageContainerNameForDelimiter));
}
@Test
diff --git a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java
index 48e62b13..3313bfff 100644
--- a/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java
+++ b/service/src/test/java/bio/terra/pipelines/dependencies/gcs/GcsServiceTest.java
@@ -49,9 +49,17 @@ void setup() {
when(gcsClient.getStorageService(any())).thenReturn(mockStorageService);
}
+ private URL getFakeURL() {
+ try {
+ return new URL("https://storage.googleapis.com/signed-url-stuff?X-Goog-Signature=12345");
+ } catch (MalformedURLException e) {
+ return null;
+ }
+ }
+
@Test
- void generatePutObjectSignedUrl() throws MalformedURLException {
- URL fakeURL = new URL("https://storage.googleapis.com/signed-url-stuff");
+ void generatePutObjectSignedUrl() {
+ URL fakeURL = getFakeURL();
when(mockStorageService.signUrl(
any(BlobInfo.class),
anyLong(),
@@ -67,8 +75,24 @@ void generatePutObjectSignedUrl() throws MalformedURLException {
}
@Test
- void socketExceptionRetriesEventuallySucceed() throws Exception {
- URL fakeURL = new URL("https://storage.googleapis.com/signed-url-stuff");
+ void generateGetObjectSignedUrl() {
+ URL fakeURL = getFakeURL();
+ when(mockStorageService.signUrl(
+ any(BlobInfo.class),
+ anyLong(),
+ any(TimeUnit.class),
+ any(Storage.SignUrlOption.class),
+ any(Storage.SignUrlOption.class)))
+ .thenReturn(fakeURL);
+
+ URL generatedURL =
+ gcsService.generateGetObjectSignedUrl("projectId", "bucketName", "objectName");
+ assertEquals(fakeURL, generatedURL);
+ }
+
+ @Test
+ void socketExceptionRetriesEventuallySucceed() {
+ URL fakeURL = getFakeURL();
when(mockStorageService.signUrl(
any(BlobInfo.class),
@@ -86,8 +110,8 @@ void socketExceptionRetriesEventuallySucceed() throws Exception {
}
@Test
- void socketExceptionRetriesEventuallyFail() throws Exception {
- URL fakeURL = new URL("https://storage.googleapis.com/signed-url-stuff");
+ void socketExceptionRetriesEventuallyFail() {
+ URL fakeURL = getFakeURL();
when(mockStorageService.signUrl(
any(BlobInfo.class),
@@ -125,4 +149,30 @@ void storageExceptionDoNotRetry() {
gcsService.generatePutObjectSignedUrl("projectId", "bucketName", "objectName");
});
}
+
+ @Test
+ void cleanSignedUrl() throws MalformedURLException {
+ // signed URL with X-Goog-Signature as last element
+ URL fakeURLSignatureLast =
+ new URL(
+ "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-SignedHeaders=content-type%3Bhost&X-Goog-Signature=12345");
+ String expectedCleanedURLSignatureLast =
+ "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-SignedHeaders=content-type%3Bhost&X-Goog-Signature=REDACTED";
+ assertEquals(expectedCleanedURLSignatureLast, GcsService.cleanSignedUrl(fakeURLSignatureLast));
+
+ // signed URL with no X-Goog-Signature should not throw an exception
+ URL fakeURLNoSignature =
+ new URL(
+ "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900");
+ assertEquals(fakeURLNoSignature.toString(), GcsService.cleanSignedUrl(fakeURLNoSignature));
+
+ // signed URL with X-Goog-Signature in the middle
+ URL fakeURLSignatureMiddle =
+ new URL(
+ "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-Signature=12345&X-Goog-SignedHeaders=content-type%3Bhost&Last-Element=foobar");
+ String expectedCleanedURLSignatureMiddle =
+ "https://storage.googleapis.com/fc-secure-6970c3a9-dc92-436d-af3d-917bcb4cf05a/user-input-files/ffaffa12-5717-4562-b3fc-2c963f66afa6/TEST.vcf.gz?X-Goog-Date=20240823T170006Z&X-Goog-Expires=900&X-Goog-SignedHeaders=content-type%3Bhost&Last-Element=foobar&X-Goog-Signature=REDACTED";
+ assertEquals(
+ expectedCleanedURLSignatureMiddle, GcsService.cleanSignedUrl(fakeURLSignatureMiddle));
+ }
}
diff --git a/service/src/test/java/bio/terra/pipelines/dependencies/rawls/RawlsServiceTest.java b/service/src/test/java/bio/terra/pipelines/dependencies/rawls/RawlsServiceTest.java
index 504758ff..80a512f9 100644
--- a/service/src/test/java/bio/terra/pipelines/dependencies/rawls/RawlsServiceTest.java
+++ b/service/src/test/java/bio/terra/pipelines/dependencies/rawls/RawlsServiceTest.java
@@ -216,6 +216,25 @@ void upsertDataTableEntity() throws Exception {
assertEquals(expectedResponse, rawlsService.upsertDataTableEntity(any(), any(), any(), any()));
}
+ @Test
+ void getDataTableEntity() throws Exception {
+ Entity expectedResponse = new Entity().name("entityName").entityType("entityType");
+
+ rawlsClient = mock(RawlsClient.class);
+ EntitiesApi entitiesApi = mock(EntitiesApi.class);
+ when(entitiesApi.getEntity(any(), any(), any(), any(), any(), any()))
+ .thenReturn(expectedResponse);
+
+ rawlsService = spy(new RawlsService(rawlsClient, template));
+
+ doReturn(entitiesApi).when(rawlsClient).getEntitiesApi(any());
+
+ assertEquals(
+ expectedResponse,
+ rawlsService.getDataTableEntity(
+ "token", "billingProject", "workspace", "entityType", "entityName"));
+ }
+
@Test
void submissionIsNotRunning() {
assertFalse(RawlsService.submissionIsRunning(new Submission().status(SubmissionStatus.DONE)));
diff --git a/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java b/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java
index b41447e8..149deba6 100644
--- a/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java
+++ b/service/src/test/java/bio/terra/pipelines/service/PipelineRunsServiceTest.java
@@ -16,6 +16,7 @@
import bio.terra.pipelines.db.entities.Pipeline;
import bio.terra.pipelines.db.entities.PipelineInput;
import bio.terra.pipelines.db.entities.PipelineOutput;
+import bio.terra.pipelines.db.entities.PipelineOutputDefinition;
import bio.terra.pipelines.db.entities.PipelineRun;
import bio.terra.pipelines.db.exception.DuplicateObjectException;
import bio.terra.pipelines.db.repositories.PipelineInputsRepository;
@@ -29,6 +30,7 @@
import bio.terra.pipelines.stairway.imputation.RunImputationAzureJobFlight;
import bio.terra.pipelines.testutils.BaseEmbeddedDbTest;
import bio.terra.pipelines.testutils.TestUtils;
+import bio.terra.rawls.model.Entity;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
@@ -534,7 +536,51 @@ void createImputationRunStairwayError() {
}
@Test
- void formatPipelineRunOutputs() {
+ void extractPipelineOutputsFromEntity() {
+ // test that the method correctly extracts the outputs from the entity
+ List outputDefinitions =
+ TestUtils.TEST_PIPELINE_OUTPUTS_DEFINITION_LIST;
+ Entity entity = new Entity();
+ entity.setAttributes(
+ Map.of("output_name", "gs://bucket/file1", "testNonOutputKey", "doesn't matter"));
+
+ Map extractedOutputs =
+ pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity);
+
+ assertEquals(1, extractedOutputs.size());
+ // the meethod should also have converted the wdlVariableName key to the camelCase outputName
+ // key
+ assertEquals("gs://bucket/file1", extractedOutputs.get("outputName"));
+ }
+
+ @Test
+ void extractPipelineOutputsFromEntityMissingOutput() {
+ // test that the method correctly throws an error if an output is missing
+ List outputDefinitions =
+ TestUtils.TEST_PIPELINE_OUTPUTS_DEFINITION_LIST;
+ Entity entity = new Entity();
+ entity.setAttributes(Map.of("testNonOutputKey", "doesn't matter"));
+
+ assertThrows(
+ InternalServerErrorException.class,
+ () -> pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity));
+ }
+
+ @Test
+ void extractPipelineOutputsFromEntityEmptyOutput() {
+ // test that the method correctly throws an error if an output is empty
+ List outputDefinitions =
+ TestUtils.TEST_PIPELINE_OUTPUTS_DEFINITION_LIST;
+ Entity entity = new Entity();
+ entity.setAttributes(Map.of("outputName", ""));
+
+ assertThrows(
+ InternalServerErrorException.class,
+ () -> pipelineRunsService.extractPipelineOutputsFromEntity(outputDefinitions, entity));
+ }
+
+ @Test
+ void formatPipelineRunOutputs() throws MalformedURLException {
PipelineRun pipelineRun = createNewRunWithJobId(testJobId);
pipelineRunsRepository.save(pipelineRun);
@@ -544,13 +590,14 @@ void formatPipelineRunOutputs() {
pipelineRunsService.pipelineRunOutputsAsString(TestUtils.TEST_PIPELINE_OUTPUTS));
pipelineOutputsRepository.save(pipelineOutput);
- ApiPipelineRunOutputs expectedOutput = new ApiPipelineRunOutputs();
- expectedOutput.putAll(TestUtils.TEST_PIPELINE_OUTPUTS);
+ URL fakeUrl = new URL("https://storage.googleapis.com/signed-url-stuff");
+ // mock GCS service
+ when(mockGcsService.generateGetObjectSignedUrl(any(), any(), any())).thenReturn(fakeUrl);
ApiPipelineRunOutputs apiPipelineRunOutputs =
pipelineRunsService.formatPipelineRunOutputs(pipelineRun);
- assertEquals(expectedOutput, apiPipelineRunOutputs);
+ assertEquals(fakeUrl.toString(), apiPipelineRunOutputs.get("testFileOutputKey"));
}
@Test
diff --git a/service/src/test/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpFlightTest.java b/service/src/test/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpFlightTest.java
index e10c7bf1..371f2d78 100644
--- a/service/src/test/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpFlightTest.java
+++ b/service/src/test/java/bio/terra/pipelines/stairway/imputation/RunImputationGcpFlightTest.java
@@ -33,7 +33,9 @@ class RunImputationGcpFlightTest extends BaseEmbeddedDbTest {
"PrepareImputationInputsStep",
"AddDataTableRowStep",
"SubmitCromwellSubmissionStep",
- "PollCromwellSubmissionStatusStep");
+ "PollCromwellSubmissionStatusStep",
+ "CompletePipelineRunStep",
+ "FetchOutputsFromDataTableStep");
@Autowired FlightBeanBag flightBeanBag;
private SimpleMeterRegistry meterRegistry;
diff --git a/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java b/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java
new file mode 100644
index 00000000..c5650371
--- /dev/null
+++ b/service/src/test/java/bio/terra/pipelines/stairway/imputation/steps/gcp/FetchOutputsFromDataTableStepTest.java
@@ -0,0 +1,115 @@
+package bio.terra.pipelines.stairway.imputation.steps.gcp;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+import bio.terra.common.exception.InternalServerErrorException;
+import bio.terra.pipelines.dependencies.rawls.RawlsService;
+import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException;
+import bio.terra.pipelines.dependencies.rawls.RawlsServiceException;
+import bio.terra.pipelines.dependencies.sam.SamService;
+import bio.terra.pipelines.service.PipelineRunsService;
+import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys;
+import bio.terra.pipelines.testutils.BaseEmbeddedDbTest;
+import bio.terra.pipelines.testutils.StairwayTestUtils;
+import bio.terra.rawls.model.Entity;
+import bio.terra.stairway.FlightContext;
+import bio.terra.stairway.FlightMap;
+import bio.terra.stairway.StepResult;
+import bio.terra.stairway.StepStatus;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+class FetchOutputsFromDataTableStepTest extends BaseEmbeddedDbTest {
+
+ @Mock RawlsService rawlsService;
+ @Mock SamService samService;
+ @Mock PipelineRunsService pipelineRunsService;
+ @Mock private FlightContext flightContext;
+
+ @BeforeEach
+ void setup() {
+ var inputParameters = new FlightMap();
+ var workingMap = new FlightMap();
+
+ when(flightContext.getInputParameters()).thenReturn(inputParameters);
+ when(flightContext.getWorkingMap()).thenReturn(workingMap);
+ }
+
+ @Test
+ void doStepSuccess() throws RawlsServiceException {
+ // setup
+ StairwayTestUtils.constructCreateJobInputs(flightContext.getInputParameters());
+
+ // outputs to match the test output definitions
+ Map entityAttributes = new HashMap<>(Map.of("output_name", "some/file.vcf.gz"));
+ Entity entity = new Entity().attributes(entityAttributes);
+
+ when(rawlsService.getDataTableEntity(any(), any(), any(), any(), any())).thenReturn(entity);
+ Map outputsProcessedFromEntity =
+ new HashMap<>(Map.of("outputName", "some/file.vcf.gz"));
+ when(pipelineRunsService.extractPipelineOutputsFromEntity(any(), eq(entity)))
+ .thenReturn(outputsProcessedFromEntity);
+
+ FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep =
+ new FetchOutputsFromDataTableStep(rawlsService, samService, pipelineRunsService);
+ StepResult result = fetchOutputsFromDataTableStep.doStep(flightContext);
+
+ assertEquals(StepStatus.STEP_RESULT_SUCCESS, result.getStepStatus());
+
+ assertEquals(
+ outputsProcessedFromEntity,
+ flightContext
+ .getWorkingMap()
+ .get(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, Map.class));
+ }
+
+ @Test
+ void doStepRawlsFailureRetry() throws RawlsServiceException {
+ // setup
+ StairwayTestUtils.constructCreateJobInputs(flightContext.getInputParameters());
+
+ when(rawlsService.getDataTableEntity(any(), any(), any(), any(), any()))
+ .thenThrow(new RawlsServiceApiException("Rawls Service Api Exception"));
+
+ FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep =
+ new FetchOutputsFromDataTableStep(rawlsService, samService, pipelineRunsService);
+ StepResult result = fetchOutputsFromDataTableStep.doStep(flightContext);
+
+ assertEquals(StepStatus.STEP_RESULT_FAILURE_RETRY, result.getStepStatus());
+ }
+
+ @Test
+ void doStepOutputsFailureNoRetry() throws InternalServerErrorException {
+ // setup
+ StairwayTestUtils.constructCreateJobInputs(flightContext.getInputParameters());
+
+ Map entityAttributes = new HashMap<>(Map.of("output_name", "some/file.vcf.gz"));
+ Entity entity = new Entity().attributes(entityAttributes);
+
+ when(rawlsService.getDataTableEntity(any(), any(), any(), any(), any())).thenReturn(entity);
+ when(pipelineRunsService.extractPipelineOutputsFromEntity(any(), any()))
+ .thenThrow(new InternalServerErrorException("Internal Server Error"));
+
+ FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep =
+ new FetchOutputsFromDataTableStep(rawlsService, samService, pipelineRunsService);
+ assertThrows(
+ InternalServerErrorException.class,
+ () -> fetchOutputsFromDataTableStep.doStep(flightContext));
+ }
+
+ @Test
+ void undoStepSuccess() {
+ FetchOutputsFromDataTableStep fetchOutputsFromDataTableStep =
+ new FetchOutputsFromDataTableStep(rawlsService, samService, pipelineRunsService);
+ StepResult result = fetchOutputsFromDataTableStep.undoStep(flightContext);
+
+ assertEquals(StepStatus.STEP_RESULT_SUCCESS, result.getStepStatus());
+ }
+}
diff --git a/service/src/test/java/bio/terra/pipelines/testutils/TestUtils.java b/service/src/test/java/bio/terra/pipelines/testutils/TestUtils.java
index f1300482..5016a507 100644
--- a/service/src/test/java/bio/terra/pipelines/testutils/TestUtils.java
+++ b/service/src/test/java/bio/terra/pipelines/testutils/TestUtils.java
@@ -47,7 +47,7 @@ public class TestUtils {
new HashMap(
Map.of(
"testFileOutputKey",
- "https://lz123.stuff/sc-%s/testFileOutputValue".formatted(CONTROL_WORKSPACE_ID)));
+ "gs://fc-secure-%s/testFileOutputValue".formatted(CONTROL_WORKSPACE_ID)));
public static final List TEST_PIPELINE_INPUTS_DEFINITION_LIST =
new ArrayList<>(