Skip to content

Commit

Permalink
fix(s3 dataplane): folder copy cherrypick (#419)
Browse files Browse the repository at this point in the history
* Fix: S3 directory logic on file upload.

* Fix: S3 directory logic on file upload.

* fix S3DataSink allows part upload when part is 0 byte file

* Improved unit and integration tests.

* Rename function for readability

* Indentation

* Added no such key test

* Fix checkstyle.

* Account for nested folders

* Remove unused argument

* use predicate instead of method

* change predicate to class const

---------

Co-authored-by: bmg13 <gabcmonteiro@gmail.com>
  • Loading branch information
rafaelmag110 and bmg13 authored Aug 21, 2024
1 parent 4e2fa84 commit 2b9378e
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {

var partNumber = 1;
byte[] bytesChunk = input.readNBytes(chunkSize);
while (bytesChunk.length > 0) {
do {
completedParts.add(CompletedPart.builder().partNumber(partNumber)
.eTag(client.uploadPart(UploadPartRequest.builder()
.bucket(bucketName)
Expand All @@ -70,7 +70,7 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
.build(), RequestBody.fromByteBuffer(ByteBuffer.wrap(bytesChunk))).eTag()).build());
bytesChunk = input.readNBytes(chunkSize);
partNumber++;
}
} while (bytesChunk.length > 0);

client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
Expand All @@ -92,12 +92,16 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
}

private String getDestinationObjectName(String partName, int partsSize) {
var name = (partsSize == 1 && !StringUtils.isNullOrEmpty(objectName)) ? objectName : partName;
var name = useObjectName(partsSize) ? objectName : partName;
if (!StringUtils.isNullOrEmpty(folderName)) {
return folderName.endsWith("/") ? folderName + name : folderName + "/" + name;
} else {
return name;
}

return name;
}

private boolean useObjectName(int partsSize) {
return partsSize == 1 && !StringUtils.isNullOrEmpty(objectName);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
Expand All @@ -47,6 +49,8 @@ class S3DataSource implements DataSource {
private S3Client client;
private Monitor monitor;

private final Predicate<S3Object> isFile = object -> !object.key().endsWith("/");

private S3DataSource() {
}

Expand Down Expand Up @@ -107,7 +111,7 @@ private List<S3Object> fetchPrefixedS3Objects() {

var response = client.listObjectsV2(listObjectsRequest);

s3Objects.addAll(response.contents());
s3Objects.addAll(response.contents().stream().filter(isFile).collect(Collectors.toList()));

continuationToken = response.nextContinuationToken();

Expand All @@ -117,7 +121,7 @@ private List<S3Object> fetchPrefixedS3Objects() {
}

@Override
public void close() throws Exception {
public void close() {
client.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -105,6 +107,12 @@ void should_copy_using_destination_object_name_case_single_transfer(List<String>
var objectNameInDestination = "object-name-in-destination";
var objectContent = UUID.randomUUID().toString();

//Put folder 0 byte size file marker. AWS does this when a folder is created via the console.
if (!isSingleObject) {
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX, "");
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX + "testFolder/", "");
}

for (var objectName : objectNames) {
sourceClient.putStringOnBucket(sourceBucketName, objectName, objectContent);
}
Expand Down Expand Up @@ -152,6 +160,11 @@ void should_copy_using_destination_object_name_case_single_transfer(List<String>
.extracting(Long::intValue)
.isEqualTo(objectContent.length());
}

assertThat(destinationClient.getObject(destinationBucketName,
OBJECT_PREFIX)).failsWithin(5, SECONDS)
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(NoSuchKeyException.class);
}
}

Expand All @@ -164,6 +177,12 @@ void should_copy_to_folder_case_property_is_present(List<String> objectNames) {
var folderNameInDestination = "folder-name-in-destination/";
var objectBody = UUID.randomUUID().toString();

//Put folder 0 byte size file marker. AWS does this when a folder is created via the console.
if (!isSingleObject) {
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX, "");
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX + "testFolder/", "");
}

for (var objectToTransfer : objectNames) {
sourceClient.putStringOnBucket(sourceBucketName, objectToTransfer, objectBody);
}
Expand Down Expand Up @@ -212,7 +231,13 @@ void should_copy_to_folder_case_property_is_present(List<String> objectNames) {
.extracting(Long::intValue)
.isEqualTo(objectBody.length());
}
assertThat(destinationClient.getObject(destinationBucketName, folderNameInDestination +
OBJECT_PREFIX)).failsWithin(5, SECONDS)
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(NoSuchKeyException.class);
}


}

private DataAddress createDataAddress(List<String> assetNames, boolean isSingleObject) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.platform.commons.util.StringUtils;
import org.mockito.ArgumentCaptor;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -85,37 +84,23 @@ void setup() {
}

@ParameterizedTest
@ArgumentsSource(SinglePartsInputs.class)
void transferParts_singlePart_succeeds(List<DataSource.Part> inputStream) {
@ArgumentsSource(PartsInputs.class)
void transferParts_succeeds(List<DataSource.Part> inputStream, int expectedPartsPerObject) {
var isSingleObject = inputStream.size() == 1;

var result = dataSink.transferParts(inputStream);
assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, times(inputStream.size())).completeMultipartUpload(completeMultipartUploadRequestCaptor
.capture());

var completeMultipartUploadRequest = completeMultipartUploadRequestCaptor.getValue();
assertThat(completeMultipartUploadRequest.bucket()).isEqualTo(BUCKET_NAME);
assertThat(completeMultipartUploadRequest.key())
.isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME);
assertThat(completeMultipartUploadRequest.multipartUpload().parts()).hasSize(1);
}

@ParameterizedTest
@ArgumentsSource(MultiPartsInputs.class)
void transferParts_multiPart_succeeds(List<DataSource.Part> inputStream) {
var isSingleObject = inputStream.size() == 1;

var result = dataSink.transferParts(inputStream);
var completeMultipartUploadRequests = completeMultipartUploadRequestCaptor.getAllValues();

assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, times(inputStream.size()))
.completeMultipartUpload(completeMultipartUploadRequestCaptor.capture());
var completeMultipartUploadRequest = completeMultipartUploadRequestCaptor.getValue();
assertThat(completeMultipartUploadRequest.bucket()).isEqualTo(BUCKET_NAME);
assertThat(completeMultipartUploadRequest.key())
.isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME);
assertThat(completeMultipartUploadRequest.multipartUpload().parts()).hasSize(2);
for (var request : completeMultipartUploadRequests) {
assertThat(request.bucket()).isEqualTo(BUCKET_NAME);
assertThat(request.key())
.isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME);
assertThat(request.multipartUpload().parts()).hasSize(expectedPartsPerObject);
}
}

@Test
Expand All @@ -134,7 +119,7 @@ void transferParts_failed_to_download() {
}

@ParameterizedTest
@ArgumentsSource(MultiPartsInputs.class)
@ArgumentsSource(PartsInputs.class)
void transferParts_fails_to_upload(List<DataSource.Part> inputStream) {
var isSingleObject = inputStream.size() == 1;

Expand Down Expand Up @@ -162,25 +147,31 @@ void transferParts_fails_to_upload(List<DataSource.Part> inputStream) {
assertThat(result.getFailureDetail()).isEqualTo(expectedMessage);
}

private static class SinglePartsInputs implements ArgumentsProvider {
private static class PartsInputs implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
var content = "content smaller than a chunk size";
return Stream.of(Arguments.of(List.of(createDataSource(content))),
Arguments.of(List.of(createDataSource(content)), List.of(createDataSource(content)))
);
var emptyContent = "";
var smallContent = "content smaller than a chunk size";
var bigContent = "content bigger than 50 bytes chunk size so that it gets chunked and uploaded as a multipart upload";
return Stream.of(
Arguments.of(
List.of(createDataSource(emptyContent)), 1),
Arguments.of(
List.of(createDataSource(smallContent)), 1),
Arguments.of(
List.of(createDataSource(bigContent)), 2),
Arguments.of(
List.of(createDataSource(emptyContent), createDataSource(smallContent)), 1),
Arguments.of(
List.of(createDataSource(bigContent), createDataSource(bigContent)), 2));
}
}

private static class MultiPartsInputs implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
var content = "content bigger than 50 bytes chunk size so that it gets chunked and uploaded as a multipart upload";
return Stream.of(Arguments.of(List.of(createDataSource(content))),
Arguments.of(List.of(createDataSource(content)), List.of(createDataSource(content)))
);
private static InputStreamDataSource createDataSource(String text) {
if (text.length() > 0) {
return new InputStreamDataSource(SOURCE_OBJECT_NAME, new ByteArrayInputStream(text.getBytes(UTF_8)));
}

return new InputStreamDataSource(SOURCE_OBJECT_NAME, new ByteArrayInputStream(new byte[0]));
}

private static InputStreamDataSource createDataSource(String text) {
Expand Down

0 comments on commit 2b9378e

Please sign in to comment.