-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TSPS-306 getResult returns signed urls for all outputs #120
Changes from 8 commits
c0d3af9
cfad601
526bef7
08611f8
ef779fd
30ce7e5
ec522ea
38c24bd
d4265c2
1ef207b
67a5150
5fe0624
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package bio.terra.pipelines.service; | ||
|
||
import static bio.terra.pipelines.common.utils.FileUtils.constructDestinationBlobNameForUserInputFile; | ||
import static bio.terra.pipelines.common.utils.FileUtils.getBlobNameFromTerraWorkspaceStorageUrlGcp; | ||
import static java.util.Collections.emptyList; | ||
import static org.springframework.data.domain.PageRequest.ofSize; | ||
|
||
|
@@ -397,8 +398,8 @@ public PipelineRun markPipelineRunSuccessAndWriteOutputs( | |
// methods to interact with and format pipeline run outputs | ||
|
||
/** | ||
* Extract the pipeline outputs from a pipelineRun object and return an ApiPipelineRunOutputs | ||
* object with the outputs. | ||
* Extract the pipeline outputs from a pipelineRun object, create signed GET (read-only) urls for | ||
* each file, and return an ApiPipelineRunOutputs object with the outputs. | ||
* | ||
* @param pipelineRun object from the pipelineRunsRepository | ||
* @return ApiPipelineRunOutputs | ||
|
@@ -408,6 +409,17 @@ public ApiPipelineRunOutputs formatPipelineRunOutputs(PipelineRun pipelineRun) { | |
pipelineRunOutputsAsMap( | ||
pipelineOutputsRepository.findPipelineOutputsByJobId(pipelineRun.getId()).getOutputs()); | ||
|
||
// currently all outputs are paths that will need a signed url | ||
String workspaceStorageContainerName = pipelineRun.getWorkspaceStorageContainerName(); | ||
outputsMap.replaceAll( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is neat |
||
(k, v) -> | ||
gcsService | ||
.generateGetObjectSignedUrl( | ||
pipelineRun.getWorkspaceGoogleProject(), | ||
workspaceStorageContainerName, | ||
getBlobNameFromTerraWorkspaceStorageUrlGcp(v, workspaceStorageContainerName)) | ||
.toString()); | ||
|
||
ApiPipelineRunOutputs apiPipelineRunOutputs = new ApiPipelineRunOutputs(); | ||
apiPipelineRunOutputs.putAll(outputsMap); | ||
return apiPipelineRunOutputs; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,16 @@ | |
import bio.terra.pipelines.common.utils.FlightUtils; | ||
import bio.terra.pipelines.common.utils.PipelinesEnum; | ||
import bio.terra.pipelines.dependencies.stairway.JobMapKeys; | ||
import bio.terra.pipelines.stairway.imputation.steps.CompletePipelineRunStep; | ||
import bio.terra.pipelines.stairway.imputation.steps.PrepareImputationInputsStep; | ||
import bio.terra.pipelines.stairway.imputation.steps.gcp.AddDataTableRowStep; | ||
import bio.terra.pipelines.stairway.imputation.steps.gcp.FetchOutputsFromDataTableStep; | ||
import bio.terra.pipelines.stairway.imputation.steps.gcp.PollCromwellSubmissionStatusStep; | ||
import bio.terra.pipelines.stairway.imputation.steps.gcp.SubmitCromwellSubmissionStep; | ||
import bio.terra.stairway.Flight; | ||
import bio.terra.stairway.FlightMap; | ||
import bio.terra.stairway.RetryRule; | ||
import bio.terra.stairway.RetryRuleExponentialBackoff; | ||
import bio.terra.stairway.RetryRuleFixedInterval; | ||
import bio.terra.stairway.Step; | ||
|
||
|
@@ -21,6 +24,16 @@ public class RunImputationGcpJobFlight extends Flight { | |
private final RetryRule dbRetryRule = | ||
new RetryRuleFixedInterval(/*intervalSeconds= */ 1, /* maxCount= */ 5); | ||
|
||
/** | ||
* Use for a short exponential backoff retry, for operations that should be completable within a | ||
* few seconds. | ||
*/ | ||
private final RetryRule externalServiceRetryRule = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thank you! |
||
// maxOperationTimeSeconds must be larger than socket timeout (20s), otherwise a socket | ||
// timeout | ||
// won't be retried. | ||
new RetryRuleExponentialBackoff(1, 8, /* maxOperationTimeSeconds */ 30); | ||
|
||
// addStep is protected in Flight, so make an override that is public | ||
@Override | ||
public void addStep(Step step, RetryRule retryRule) { | ||
|
@@ -56,20 +69,28 @@ public RunImputationGcpJobFlight(FlightMap inputParameters, Object beanBag) { | |
dbRetryRule); | ||
|
||
addStep( | ||
new AddDataTableRowStep(flightBeanBag.getRawlsService(), flightBeanBag.getSamService())); | ||
new AddDataTableRowStep(flightBeanBag.getRawlsService(), flightBeanBag.getSamService()), | ||
externalServiceRetryRule); | ||
|
||
addStep( | ||
new SubmitCromwellSubmissionStep( | ||
flightBeanBag.getRawlsService(), | ||
flightBeanBag.getSamService(), | ||
flightBeanBag.getImputationConfiguration()), | ||
dbRetryRule); | ||
externalServiceRetryRule); | ||
|
||
addStep( | ||
new PollCromwellSubmissionStatusStep( | ||
flightBeanBag.getSamService(), | ||
flightBeanBag.getRawlsService(), | ||
flightBeanBag.getImputationConfiguration()), | ||
dbRetryRule); | ||
externalServiceRetryRule); | ||
|
||
addStep( | ||
new FetchOutputsFromDataTableStep( | ||
flightBeanBag.getRawlsService(), flightBeanBag.getSamService()), | ||
externalServiceRetryRule); | ||
|
||
addStep(new CompletePipelineRunStep(flightBeanBag.getPipelineRunsService()), dbRetryRule); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package bio.terra.pipelines.stairway.imputation.steps.gcp; | ||
|
||
import bio.terra.pipelines.common.utils.FlightUtils; | ||
import bio.terra.pipelines.common.utils.PipelinesEnum; | ||
import bio.terra.pipelines.db.entities.PipelineOutputDefinition; | ||
import bio.terra.pipelines.dependencies.rawls.RawlsService; | ||
import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException; | ||
import bio.terra.pipelines.dependencies.sam.SamService; | ||
import bio.terra.pipelines.dependencies.stairway.JobMapKeys; | ||
import bio.terra.pipelines.stairway.imputation.RunImputationJobFlightMapKeys; | ||
import bio.terra.rawls.model.Entity; | ||
import bio.terra.stairway.FlightContext; | ||
import bio.terra.stairway.FlightMap; | ||
import bio.terra.stairway.Step; | ||
import bio.terra.stairway.StepResult; | ||
import bio.terra.stairway.StepStatus; | ||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* This step calls Rawls to fetch outputs from a data table row for a given job and stores them in | ||
* the flight's working map. These outputs are considered raw in that they are cloud paths and not | ||
* signed urls. | ||
*/ | ||
public class FetchOutputsFromDataTableStep implements Step { | ||
|
||
private final RawlsService rawlsService; | ||
private final SamService samService; | ||
|
||
public FetchOutputsFromDataTableStep(RawlsService rawlsService, SamService samService) { | ||
this.rawlsService = rawlsService; | ||
this.samService = samService; | ||
} | ||
|
||
@Override | ||
@SuppressWarnings( | ||
"java:S2259") // suppress warning for possible NPE when calling pipelineName.getValue(), | ||
// since we do validate that pipelineName is not null in `validateRequiredEntries` | ||
public StepResult doStep(FlightContext flightContext) { | ||
String jobId = flightContext.getFlightId(); | ||
|
||
// validate and extract parameters from input map | ||
var inputParameters = flightContext.getInputParameters(); | ||
FlightUtils.validateRequiredEntries( | ||
inputParameters, | ||
JobMapKeys.PIPELINE_NAME.getKeyName(), | ||
RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, | ||
RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_NAME, | ||
RunImputationJobFlightMapKeys.PIPELINE_OUTPUT_DEFINITIONS); | ||
|
||
String controlWorkspaceBillingProject = | ||
inputParameters.get( | ||
RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class); | ||
String controlWorkspaceName = | ||
inputParameters.get(RunImputationJobFlightMapKeys.CONTROL_WORKSPACE_NAME, String.class); | ||
PipelinesEnum pipelineName = | ||
inputParameters.get(JobMapKeys.PIPELINE_NAME.getKeyName(), PipelinesEnum.class); | ||
List<PipelineOutputDefinition> outputDefinitions = | ||
inputParameters.get( | ||
RunImputationJobFlightMapKeys.PIPELINE_OUTPUT_DEFINITIONS, new TypeReference<>() {}); | ||
|
||
Entity entity; | ||
try { | ||
entity = | ||
rawlsService.getDataTableEntity( | ||
samService.getTeaspoonsServiceAccountToken(), | ||
controlWorkspaceBillingProject, | ||
controlWorkspaceName, | ||
pipelineName.getValue(), | ||
jobId); | ||
} catch (RawlsServiceApiException e) { | ||
return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e); | ||
} | ||
|
||
Map<String, String> outputs = new HashMap<>(); | ||
for (PipelineOutputDefinition outputDefinition : outputDefinitions) { | ||
String keyName = outputDefinition.getName(); | ||
String wdlVariableName = outputDefinition.getWdlVariableName(); | ||
outputs.put(keyName, entity.getAttributes().get(wdlVariableName).toString()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as we chatted on slack, it may be worth validating the outputs exist and are in fact not empty |
||
} | ||
|
||
FlightMap workingMap = flightContext.getWorkingMap(); | ||
workingMap.put(RunImputationJobFlightMapKeys.PIPELINE_RUN_OUTPUTS, outputs); | ||
|
||
return StepResult.getStepResultSuccess(); | ||
} | ||
|
||
@Override | ||
public StepResult undoStep(FlightContext flightContext) { | ||
// nothing to undo | ||
return StepResult.getStepResultSuccess(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice this looks great