Skip to content

Commit

Permalink
TSPS-306 getResult returns signed urls for all outputs (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmorgantaylor authored Sep 10, 2024
1 parent d3bd62f commit d1285a6
Show file tree
Hide file tree
Showing 13 changed files with 576 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,50 @@ private FileUtils() {
private static final String USER_PROVIDED_FILE_INPUT_DIRECTORY = "user-input-files";

/**
* Extract the blob name from the full file path, using the workspaceId as a delimiter.
* Extract the blob name from the full Azure file path, using the workspaceId as a delimiter.
*
* <p>For example, `https://lz123.blob.core.windows.net/sc-{workspaceId}/path/to/file` becomes
* `path/to/file`
* <p>For example, with workspaceId as the workspaceSubstringStart,
* `https://lz123.blob.core.windows.net/sc-{workspaceDelimiter}/path/to/file` becomes
* `path/to/file`.
*
* @param blobHttpUrl
* @param blobUrl
* @param workspaceId
*/
public static String getBlobNameFromTerraWorkspaceStorageUrlAzure(
String blobUrl, String workspaceId) {
return getBlobNameFromTerraWorkspaceStorageUrl(blobUrl, workspaceId);
}

/**
* Extract the blob name from the full GCP file path, using the workspaceStorageContainerName as a
* delimiter.
*
* <p>For example, with workspaceStorageContainerName as the workspaceSubstringStart,
* `gs://{workspaceDelimiter}/path/to/file` becomes `path/to/file`.
*
* @param blobUrl
* @param workspaceStorageContainerName
*/
public static String getBlobNameFromTerraWorkspaceStorageUrlGcp(
String blobUrl, String workspaceStorageContainerName) {
return getBlobNameFromTerraWorkspaceStorageUrl(blobUrl, workspaceStorageContainerName);
}

/**
* Extract the blob name from the full file path, using a defined workspaceSubstringStart.
*
* @param blobUrl
* @param workspaceSubstringStart
* @return blobName
*/
public static String getBlobNameFromTerraWorkspaceStorageHttpUrl(
String blobHttpUrl, UUID workspaceId) {
if (!blobHttpUrl.contains(workspaceId.toString())) {
private static String getBlobNameFromTerraWorkspaceStorageUrl(
String blobUrl, String workspaceSubstringStart) {
if (!blobUrl.contains(workspaceSubstringStart)) {
throw new InternalServerErrorException(
"File path and workspaceId do not match. Cannot extract blob name.");
"File path and workspaceSubstringStart do not match. Cannot extract blob name.");
}
return blobHttpUrl.substring(
blobHttpUrl.indexOf(workspaceId.toString()) + workspaceId.toString().length() + 1);
return blobUrl.substring(
blobUrl.indexOf(workspaceSubstringStart) + workspaceSubstringStart.length() + 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import com.google.cloud.storage.StorageException;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.support.RetryTemplate;
Expand Down Expand Up @@ -70,11 +72,82 @@ public URL generatePutObjectSignedUrl(String projectId, String bucketName, Strin
Storage.SignUrlOption.withExtHeaders(extensionHeaders),
Storage.SignUrlOption.withV4Signature()));

logger.info("Generated PUT signed URL: {}", url);
String cleanSignedUrlString = cleanSignedUrl(url);
logger.info("Generated PUT signed URL: {}", cleanSignedUrlString);

return url;
}

/**
* Generates and returns a GET (read-only) signed url for a specific object in a bucket. See
* documentation on signed urls <a
* href="https://cloud.google.com/storage/docs/access-control/signed-urls">here</a>.
*
* <p>The output URL can be used with a curl command to download an object: `curl '{url}' >
* {local_file_name}`
*
* @param projectId Google project id
* @param bucketName without a prefix
* @param objectName should include the full path of the object (subdirectories + file name)
* @return url that can be used to download an object to GCS
*/
public URL generateGetObjectSignedUrl(String projectId, String bucketName, String objectName)
throws StorageException {
// define target blob object resource
BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build();

// generate signed URL
URL url =
executionWithRetryTemplate(
listenerResetRetryTemplate,
() ->
gcsClient
.getStorageService(projectId)
.signUrl(
blobInfo,
gcsConfiguration.signedUrlGetDurationHours(),
TimeUnit.HOURS,
Storage.SignUrlOption.httpMethod(HttpMethod.GET),
Storage.SignUrlOption.withV4Signature()));

String cleanSignedUrlString = cleanSignedUrl(url);
logger.info("Generated GET signed URL: {}", cleanSignedUrlString);

return url;
}

/**
* Redact the X-Google-Signature element's value from the signed url and return the cleaned result
* as a string.
*
* @param signedUrl
* @return
*/
public static String cleanSignedUrl(URL signedUrl) {
String signedUrlString = signedUrl.toString();
String[] signedUrlParts = signedUrlString.split("\\?");
String elementDelimiter = "&";
List<String> signedUrlElements = List.of(signedUrlParts[1].split(elementDelimiter));

String signatureKey = "X-Goog-Signature";

String cleanUrl = signedUrlString;
if (signedUrlString.contains(signatureKey)) {
String urlElementsWithoutSignature =
signedUrlElements.stream()
.filter(signedUrlElement -> !signedUrlElement.contains(signatureKey))
.collect(Collectors.joining(elementDelimiter));
cleanUrl =
signedUrlParts[0]
+ "?"
+ urlElementsWithoutSignature
+ elementDelimiter
+ signatureKey
+ "=REDACTED";
}
return cleanUrl;
}

interface GcsAction<T> {
T execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ public Entity upsertDataTableEntity(
.createEntity(entity, workspaceNamespace, workspaceName));
}

public Entity getDataTableEntity(
String accessToken,
String workspaceNamespace,
String workspaceName,
String entityType,
String entityName) {
return executionWithRetryTemplate(
listenerResetRetryTemplate,
() ->
rawlsClient
.getEntitiesApi(accessToken)
.getEntity(workspaceNamespace, workspaceName, entityType, entityName, null, null));
}

// returns true if submission is in a running state
public static boolean submissionIsRunning(Submission submission) {
return !FINAL_RUN_STATES.contains(submission.getStatus());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bio.terra.pipelines.service;

import static bio.terra.pipelines.common.utils.FileUtils.constructDestinationBlobNameForUserInputFile;
import static bio.terra.pipelines.common.utils.FileUtils.getBlobNameFromTerraWorkspaceStorageUrlGcp;
import static java.util.Collections.emptyList;
import static org.springframework.data.domain.PageRequest.ofSize;

Expand All @@ -19,6 +20,7 @@
import bio.terra.pipelines.db.entities.PipelineInput;
import bio.terra.pipelines.db.entities.PipelineInputDefinition;
import bio.terra.pipelines.db.entities.PipelineOutput;
import bio.terra.pipelines.db.entities.PipelineOutputDefinition;
import bio.terra.pipelines.db.entities.PipelineRun;
import bio.terra.pipelines.db.exception.DuplicateObjectException;
import bio.terra.pipelines.db.repositories.PipelineInputsRepository;
Expand All @@ -31,6 +33,7 @@
import bio.terra.pipelines.generated.model.ApiPipelineRunOutputs;
import bio.terra.pipelines.stairway.imputation.RunImputationGcpJobFlight;
import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys;
import bio.terra.rawls.model.Entity;
import bio.terra.stairway.Flight;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -400,8 +403,38 @@ public PipelineRun markPipelineRunSuccessAndWriteOutputs(
// methods to interact with and format pipeline run outputs

/**
* Extract the pipeline outputs from a pipelineRun object and return an ApiPipelineRunOutputs
* object with the outputs.
* Extract pipeline outputs from a Rawls entity object, converting wdlVariableName (typically
* snake_case) to outputName (camelCase). Throw an error if any outputs are missing from the
* entity or empty.
*
* @param pipelineOutputDefinitions
* @param entity
* @return a map of pipeline outputs
*/
public Map<String, String> extractPipelineOutputsFromEntity(
List<PipelineOutputDefinition> pipelineOutputDefinitions, Entity entity) {
Map<String, String> outputs = new HashMap<>();
for (PipelineOutputDefinition outputDefinition : pipelineOutputDefinitions) {
String keyName = outputDefinition.getName();
String wdlVariableName = outputDefinition.getWdlVariableName();
String outputValue =
(String)
entity
.getAttributes()
.get(wdlVariableName); // .get() returns null if the key is missing, or if the
// value is empty
if (outputValue == null) {
throw new InternalServerErrorException(
"Output %s is empty or missing".formatted(wdlVariableName));
}
outputs.put(keyName, outputValue);
}
return outputs;
}

/**
* Extract the pipeline outputs from a pipelineRun object, create signed GET (read-only) urls for
* each file, and return an ApiPipelineRunOutputs object with the outputs.
*
* @param pipelineRun object from the pipelineRunsRepository
* @return ApiPipelineRunOutputs
Expand All @@ -411,6 +444,17 @@ public ApiPipelineRunOutputs formatPipelineRunOutputs(PipelineRun pipelineRun) {
pipelineRunOutputsAsMap(
pipelineOutputsRepository.findPipelineOutputsByJobId(pipelineRun.getId()).getOutputs());

// currently all outputs are paths that will need a signed url
String workspaceStorageContainerName = pipelineRun.getWorkspaceStorageContainerName();
outputsMap.replaceAll(
(k, v) ->
gcsService
.generateGetObjectSignedUrl(
pipelineRun.getWorkspaceGoogleProject(),
workspaceStorageContainerName,
getBlobNameFromTerraWorkspaceStorageUrlGcp(v, workspaceStorageContainerName))
.toString());

ApiPipelineRunOutputs apiPipelineRunOutputs = new ApiPipelineRunOutputs();
apiPipelineRunOutputs.putAll(outputsMap);
return apiPipelineRunOutputs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import bio.terra.pipelines.common.utils.FlightUtils;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.stairway.imputation.steps.CompletePipelineRunStep;
import bio.terra.pipelines.stairway.imputation.steps.PrepareImputationInputsStep;
import bio.terra.pipelines.stairway.imputation.steps.gcp.AddDataTableRowStep;
import bio.terra.pipelines.stairway.imputation.steps.gcp.FetchOutputsFromDataTableStep;
import bio.terra.pipelines.stairway.imputation.steps.gcp.PollCromwellSubmissionStatusStep;
import bio.terra.pipelines.stairway.imputation.steps.gcp.SubmitCromwellSubmissionStep;
import bio.terra.stairway.Flight;
import bio.terra.stairway.FlightMap;
import bio.terra.stairway.RetryRule;
import bio.terra.stairway.RetryRuleExponentialBackoff;
import bio.terra.stairway.RetryRuleFixedInterval;
import bio.terra.stairway.Step;

Expand All @@ -21,6 +24,16 @@ public class RunImputationGcpJobFlight extends Flight {
private final RetryRule dbRetryRule =
new RetryRuleFixedInterval(/*intervalSeconds= */ 1, /* maxCount= */ 5);

/**
* Use for a short exponential backoff retry, for operations that should be completable within a
* few seconds.
*/
private final RetryRule externalServiceRetryRule =
// maxOperationTimeSeconds must be larger than socket timeout (20s), otherwise a socket
// timeout
// won't be retried.
new RetryRuleExponentialBackoff(1, 8, /* maxOperationTimeSeconds */ 30);

// addStep is protected in Flight, so make an override that is public
@Override
public void addStep(Step step, RetryRule retryRule) {
Expand Down Expand Up @@ -56,20 +69,30 @@ public RunImputationGcpJobFlight(FlightMap inputParameters, Object beanBag) {
dbRetryRule);

addStep(
new AddDataTableRowStep(flightBeanBag.getRawlsService(), flightBeanBag.getSamService()));
new AddDataTableRowStep(flightBeanBag.getRawlsService(), flightBeanBag.getSamService()),
externalServiceRetryRule);

addStep(
new SubmitCromwellSubmissionStep(
flightBeanBag.getRawlsService(),
flightBeanBag.getSamService(),
flightBeanBag.getImputationConfiguration()),
dbRetryRule);
externalServiceRetryRule);

addStep(
new PollCromwellSubmissionStatusStep(
flightBeanBag.getSamService(),
flightBeanBag.getRawlsService(),
flightBeanBag.getImputationConfiguration()),
dbRetryRule);
externalServiceRetryRule);

addStep(
new FetchOutputsFromDataTableStep(
flightBeanBag.getRawlsService(),
flightBeanBag.getSamService(),
flightBeanBag.getPipelineRunsService()),
externalServiceRetryRule);

addStep(new CompletePipelineRunStep(flightBeanBag.getPipelineRunsService()), dbRetryRule);
}
}
Loading

0 comments on commit d1285a6

Please sign in to comment.