Skip to content

Commit

Permalink
TSPS-305 Add GCS service, sign PUT urls for data uploads (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmorgantaylor authored Sep 3, 2024
1 parent d7406e2 commit 494990a
Show file tree
Hide file tree
Showing 14 changed files with 399 additions and 20 deletions.
6 changes: 4 additions & 2 deletions common/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -761,15 +761,17 @@ components:
$ref: "#/components/schemas/PipelineUserProvidedInputs"

PreparePipelineRunResponse:
description: Result of the preparePipelineRun request, containing input file paths
description: Result of the preparePipelineRun request, containing signed URLs to upload input files
type: object
properties:
jobId:
$ref: '#/components/schemas/Id'
fileInputUploadUrls:
type: object
additionalProperties:
type: string
type: object
additionalProperties:
type: string

UpdatePipelineRequestBody:
description: |
Expand Down
4 changes: 4 additions & 0 deletions service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ dependencies {
implementation "com.google.cloud.opentelemetry:exporter-trace:${gcpOpenTelemetryExporterVersion}"
implementation "com.google.cloud.opentelemetry:exporter-metrics:${gcpOpenTelemetryExporterVersion}"

// gcs
implementation platform('com.google.cloud:libraries-bom:26.44.0')
implementation 'com.google.cloud:google-cloud-storage'

liquibaseRuntime 'info.picocli:picocli:4.6.1'
liquibaseRuntime 'org.postgresql:postgresql:42.6.1'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package bio.terra.pipelines.app.configuration.external;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "gcs")
public record GcsConfiguration(long signedUrlPutDurationHours, long signedUrlGetDurationHours) {}
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,11 @@ public ResponseEntity<ApiPreparePipelineRunResponse> preparePipelineRun(
userId,
userProvidedInputs);

pipelineRunsService.preparePipelineRun(pipeline, jobId, userId, userProvidedInputs);

// for now convert userProvidedInputs to Map<String, String> for the response
Map<String, String> userProvidedInputsStringMap = new HashMap<>();
userProvidedInputs.forEach(
(key, value) -> userProvidedInputsStringMap.put(key, value.toString()));
Map<String, Map<String, String>> fileInputUploadUrls =
pipelineRunsService.preparePipelineRun(pipeline, jobId, userId, userProvidedInputs);

ApiPreparePipelineRunResponse prepareResponse =
new ApiPreparePipelineRunResponse()
.jobId(jobId)
.fileInputUploadUrls(userProvidedInputsStringMap);
new ApiPreparePipelineRunResponse().jobId(jobId).fileInputUploadUrls(fileInputUploadUrls);

return new ResponseEntity<>(prepareResponse, HttpStatus.OK);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package bio.terra.pipelines.dependencies.gcs;

import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.springframework.stereotype.Component;

@Component
public class GcsClient {
public Storage getStorageService(String projectId) {
// this will use application default credentials, which is what is used by
// SamService.getTeaspoonsServiceAccountToken()
return StorageOptions.newBuilder().setProjectId(projectId).build().getService();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package bio.terra.pipelines.dependencies.gcs;

import bio.terra.pipelines.app.configuration.external.GcsConfiguration;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.HttpMethod;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;

/** class to encapsulate interacting with GCS client */
@Service
public class GcsService {

private final GcsClient gcsClient;
private final GcsConfiguration gcsConfiguration;
private final RetryTemplate listenerResetRetryTemplate;

private static final Logger logger = LoggerFactory.getLogger(GcsService.class);

public GcsService(
GcsClient gcsClient,
GcsConfiguration gcsConfiguration,
RetryTemplate listenerResetRetryTemplate) {
this.gcsClient = gcsClient;
this.gcsConfiguration = gcsConfiguration;
this.listenerResetRetryTemplate = listenerResetRetryTemplate;
}

/**
* Generates and returns a PUT (write-only) signed url for a specific object in a bucket. See
* documentation on signed urls <a
* href="https://cloud.google.com/storage/docs/access-control/signed-urls">here</a>.
*
* <p>The output URL can be used with a curl command to upload an object to the destination: `curl
* -X PUT -H 'Content-Type: application/octet-stream' --upload-file my-file '{url}'`
*
* @param projectId Google project id
* @param bucketName without a prefix
* @param objectName should include the full path of the object (subdirectories + file name)
* @return url that can be used to write an object to GCS
*/
public URL generatePutObjectSignedUrl(String projectId, String bucketName, String objectName)
throws StorageException {
// define target blob object resource
BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build();

// generate signed URL
Map<String, String> extensionHeaders = new HashMap<>();
extensionHeaders.put("Content-Type", "application/octet-stream");

URL url =
executionWithRetryTemplate(
listenerResetRetryTemplate,
() ->
gcsClient
.getStorageService(projectId)
.signUrl(
blobInfo,
gcsConfiguration.signedUrlPutDurationHours(),
TimeUnit.HOURS,
Storage.SignUrlOption.httpMethod(HttpMethod.PUT),
Storage.SignUrlOption.withExtHeaders(extensionHeaders),
Storage.SignUrlOption.withV4Signature()));

logger.info("Generated PUT signed URL: {}", url);

return url;
}

interface GcsAction<T> {
T execute();
}

static <T> T executionWithRetryTemplate(RetryTemplate retryTemplate, GcsAction<T> action) {
return retryTemplate.execute(
context -> {
try {
return action.execute();
} catch (StorageException e) {
// Note: GCS' StorageException contains retryable exceptions - not sure how to handle
throw new GcsServiceException("Error executing GCS action", e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package bio.terra.pipelines.dependencies.gcs;

import bio.terra.common.exception.ErrorReportException;
import java.util.ArrayList;
import org.springframework.http.HttpStatus;

public class GcsServiceException extends ErrorReportException {
public GcsServiceException(String message, Throwable cause) {
super(message, cause, new ArrayList<>(), HttpStatus.INTERNAL_SERVER_ERROR);
}

public GcsServiceException(String message) {
super(message, new ArrayList<>(), HttpStatus.INTERNAL_SERVER_ERROR);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package bio.terra.pipelines.service;

import static bio.terra.pipelines.common.utils.FileUtils.constructDestinationBlobNameForUserInputFile;
import static java.util.Collections.emptyList;
import static org.springframework.data.domain.PageRequest.ofSize;

Expand All @@ -8,19 +9,22 @@
import bio.terra.common.exception.InternalServerErrorException;
import bio.terra.pipelines.app.common.MetricsUtils;
import bio.terra.pipelines.common.utils.CommonPipelineRunStatusEnum;
import bio.terra.pipelines.common.utils.PipelineVariableTypesEnum;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.common.utils.pagination.CursorBasedPageable;
import bio.terra.pipelines.common.utils.pagination.FieldEqualsSpecification;
import bio.terra.pipelines.common.utils.pagination.PageResponse;
import bio.terra.pipelines.common.utils.pagination.PageSpecification;
import bio.terra.pipelines.db.entities.Pipeline;
import bio.terra.pipelines.db.entities.PipelineInput;
import bio.terra.pipelines.db.entities.PipelineInputDefinition;
import bio.terra.pipelines.db.entities.PipelineOutput;
import bio.terra.pipelines.db.entities.PipelineRun;
import bio.terra.pipelines.db.exception.DuplicateObjectException;
import bio.terra.pipelines.db.repositories.PipelineInputsRepository;
import bio.terra.pipelines.db.repositories.PipelineOutputsRepository;
import bio.terra.pipelines.db.repositories.PipelineRunsRepository;
import bio.terra.pipelines.dependencies.gcs.GcsService;
import bio.terra.pipelines.dependencies.stairway.JobBuilder;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.dependencies.stairway.JobService;
Expand All @@ -31,6 +35,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -47,6 +52,7 @@ public class PipelineRunsService {
private static final Logger logger = LoggerFactory.getLogger(PipelineRunsService.class);

private final JobService jobService;
private final GcsService gcsService;
private final PipelineRunsRepository pipelineRunsRepository;
private final PipelineInputsRepository pipelineInputsRepository;
private final PipelineOutputsRepository pipelineOutputsRepository;
Expand All @@ -56,10 +62,12 @@ public class PipelineRunsService {
@Autowired
public PipelineRunsService(
JobService jobService,
GcsService gcsService,
PipelineRunsRepository pipelineRunsRepository,
PipelineInputsRepository pipelineInputsRepository,
PipelineOutputsRepository pipelineOutputsRepository) {
this.jobService = jobService;
this.gcsService = gcsService;
this.pipelineRunsRepository = pipelineRunsRepository;
this.pipelineInputsRepository = pipelineInputsRepository;
this.pipelineOutputsRepository = pipelineOutputsRepository;
Expand All @@ -80,7 +88,7 @@ public PipelineRunsService(
* @param userProvidedInputs the user-provided inputs
*/
@WriteTransaction
public void preparePipelineRun(
public Map<String, Map<String, String>> preparePipelineRun(
Pipeline pipeline, UUID jobId, String userId, Map<String, Object> userProvidedInputs) {

PipelinesEnum pipelineName = pipeline.getName();
Expand All @@ -97,6 +105,10 @@ public void preparePipelineRun(
.formatted(jobId));
}

// return a map of signed PUT urls and curl commands for the user to upload their input files
Map<String, Map<String, String>> pipelineFileInputs =
prepareFileInputs(pipeline, jobId, userProvidedInputs);

// save the pipeline run to the database
writeNewPipelineRunToDb(
jobId,
Expand All @@ -109,6 +121,55 @@ public void preparePipelineRun(

// increment the prepare metric for this pipeline
MetricsUtils.incrementPipelinePrepareRun(pipelineName);

return pipelineFileInputs;
}

/**
* Generate signed PUT urls and curl commands for each user-provided file input in the pipeline.
*
* <p>Each user-provided file input (assumed to be a path to a local file) is translated into a
* write-only (PUT) signed url in a location in the pipeline workspace storage container, in a
* directory defined by the jobId.
*
* <p>This signed url along with the source file path provided by the user are used to generate a
* curl command that the user can run to upload the file to the location in the pipeline workspace
* storage container.
*/
private Map<String, Map<String, String>> prepareFileInputs(
Pipeline pipeline, UUID jobId, Map<String, Object> userProvidedInputs) {
// get the list of files that the user needs to upload
List<String> fileInputNames =
pipeline.getPipelineInputDefinitions().stream()
.filter(PipelineInputDefinition::getUserProvided)
.filter(p -> p.getType().equals(PipelineVariableTypesEnum.FILE))
.map(PipelineInputDefinition::getName)
.toList();

// this will no longer be hardcoded when TSPS-307 is implemented
String projectId = "terra-dev-244bacba";
String bucketName = pipeline.getWorkspaceStorageContainerUrl().replace("gs://", "");
// generate a map where the key is the input name, and the value is a map containing the
// write-only PUT signed url for the file and the full curl command to upload the file

Map<String, Map<String, String>> fileInputsMap = new HashMap<>();
for (String fileInputName : fileInputNames) {
String fileInputValue = (String) userProvidedInputs.get(fileInputName);
String objectName = constructDestinationBlobNameForUserInputFile(jobId, fileInputValue);
String signedUrl =
gcsService.generatePutObjectSignedUrl(projectId, bucketName, objectName).toString();

fileInputsMap.put(
fileInputName,
Map.of(
"signedUrl",
signedUrl,
"curlCommand",
"curl -X PUT -H 'Content-Type: application/octet-stream' --upload-file %s '%s'"
.formatted(fileInputValue, signedUrl)));
}

return fileInputsMap;
}

/**
Expand Down
4 changes: 4 additions & 0 deletions service/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ management:
# For more information: https://docs.micrometer.io/micrometer/reference/concepts/histogram-quantiles.html
percentiles-histogram[http.server.requests]: true

gcs:
signedUrlPutDurationHours: 24
signedUrlGetDurationHours: 24

imputation:
cromwellSubmissionPollingIntervalInSeconds: 600 # poll every 10 minutes for a cromwell submission
inputKeysToPrependWithStorageUrl: ["refDict", "referencePanelPathPrefix", "geneticMapsPath"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package bio.terra.pipelines.configuration.external;

import static org.junit.jupiter.api.Assertions.assertEquals;

import bio.terra.pipelines.app.configuration.external.GcsConfiguration;
import bio.terra.pipelines.testutils.BaseEmbeddedDbTest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

class GcsConfigurationTest extends BaseEmbeddedDbTest {
/** test reading gcs config from application yml */
@Autowired GcsConfiguration gcsConfiguration;

@Test
void verifyGcsConfig() {
assertEquals(24L, gcsConfiguration.signedUrlGetDurationHours());
assertEquals(24L, gcsConfiguration.signedUrlPutDurationHours());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,15 @@ void prepareRunImputationPipeline() throws Exception {
UUID jobId = newJobId;
String postBodyAsJson = testPreparePipelineRunPostBody(jobId.toString());

Map<String, String> pipelineInputsString = new HashMap<>();
testPipelineInputs.forEach((key, value) -> pipelineInputsString.put(key, value.toString()));
Map<String, Map<String, String>> pipelineInputsWithSasUrls = new HashMap<>();
// the contents of this doesn't matter
testPipelineInputs.forEach(
(key, value) -> pipelineInputsWithSasUrls.put(key, Map.of("sasUrl", value.toString())));

// the mocks
doNothing().when(pipelinesServiceMock).validateUserProvidedInputs(any(), any());
when(pipelineRunsServiceMock.preparePipelineRun(any(), any(), any(), any()))
.thenReturn(pipelineInputsWithSasUrls);

// make the call
MvcResult result =
Expand All @@ -125,7 +129,7 @@ void prepareRunImputationPipeline() throws Exception {
.readValue(
result.getResponse().getContentAsString(), ApiPreparePipelineRunResponse.class);
assertEquals(jobId, response.getJobId());
assertEquals(pipelineInputsString, response.getFileInputUploadUrls());
assertEquals(pipelineInputsWithSasUrls, response.getFileInputUploadUrls());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package bio.terra.pipelines.dependencies.gcs;

import static org.junit.jupiter.api.Assertions.assertEquals;

import bio.terra.pipelines.testutils.BaseEmbeddedDbTest;
import com.google.cloud.storage.Storage;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

class GcsClientTest extends BaseEmbeddedDbTest {
@Autowired GcsClient gcsClient;

@Test
void getGcsStorageService() {
String projectId = "test-project-id";
Storage storageService = gcsClient.getStorageService(projectId);

assertEquals(projectId, storageService.getOptions().getProjectId());
}
}
Loading

0 comments on commit 494990a

Please sign in to comment.