Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3 dataplane): folder copy cherrypick #419

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {

var partNumber = 1;
byte[] bytesChunk = input.readNBytes(chunkSize);
while (bytesChunk.length > 0) {
do {
completedParts.add(CompletedPart.builder().partNumber(partNumber)
.eTag(client.uploadPart(UploadPartRequest.builder()
.bucket(bucketName)
Expand All @@ -70,7 +70,7 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
.build(), RequestBody.fromByteBuffer(ByteBuffer.wrap(bytesChunk))).eTag()).build());
bytesChunk = input.readNBytes(chunkSize);
partNumber++;
}
} while (bytesChunk.length > 0);

client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
Expand All @@ -92,12 +92,16 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
}

private String getDestinationObjectName(String partName, int partsSize) {
var name = (partsSize == 1 && !StringUtils.isNullOrEmpty(objectName)) ? objectName : partName;
var name = useObjectName(partsSize) ? objectName : partName;
if (!StringUtils.isNullOrEmpty(folderName)) {
return folderName.endsWith("/") ? folderName + name : folderName + "/" + name;
} else {
return name;
}

return name;
}

private boolean useObjectName(int partsSize) {
return partsSize == 1 && !StringUtils.isNullOrEmpty(objectName);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
Expand All @@ -47,6 +49,8 @@ class S3DataSource implements DataSource {
private S3Client client;
private Monitor monitor;

private final Predicate<S3Object> isFile = object -> !object.key().endsWith("/");

private S3DataSource() {
}

Expand Down Expand Up @@ -107,7 +111,7 @@ private List<S3Object> fetchPrefixedS3Objects() {

var response = client.listObjectsV2(listObjectsRequest);

s3Objects.addAll(response.contents());
s3Objects.addAll(response.contents().stream().filter(isFile).collect(Collectors.toList()));

continuationToken = response.nextContinuationToken();

Expand All @@ -117,7 +121,7 @@ private List<S3Object> fetchPrefixedS3Objects() {
}

@Override
public void close() throws Exception {
public void close() {
client.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -105,6 +107,12 @@ void should_copy_using_destination_object_name_case_single_transfer(List<String>
var objectNameInDestination = "object-name-in-destination";
var objectContent = UUID.randomUUID().toString();

//Put folder 0 byte size file marker. AWS does this when a folder is created via the console.
if (!isSingleObject) {
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX, "");
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX + "testFolder/", "");
}

for (var objectName : objectNames) {
sourceClient.putStringOnBucket(sourceBucketName, objectName, objectContent);
}
Expand Down Expand Up @@ -152,6 +160,11 @@ void should_copy_using_destination_object_name_case_single_transfer(List<String>
.extracting(Long::intValue)
.isEqualTo(objectContent.length());
}

assertThat(destinationClient.getObject(destinationBucketName,
OBJECT_PREFIX)).failsWithin(5, SECONDS)
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(NoSuchKeyException.class);
}
}

Expand All @@ -164,6 +177,12 @@ void should_copy_to_folder_case_property_is_present(List<String> objectNames) {
var folderNameInDestination = "folder-name-in-destination/";
var objectBody = UUID.randomUUID().toString();

//Put folder 0 byte size file marker. AWS does this when a folder is created via the console.
if (!isSingleObject) {
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX, "");
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX + "testFolder/", "");
}

for (var objectToTransfer : objectNames) {
sourceClient.putStringOnBucket(sourceBucketName, objectToTransfer, objectBody);
}
Expand Down Expand Up @@ -212,7 +231,13 @@ void should_copy_to_folder_case_property_is_present(List<String> objectNames) {
.extracting(Long::intValue)
.isEqualTo(objectBody.length());
}
assertThat(destinationClient.getObject(destinationBucketName, folderNameInDestination +
OBJECT_PREFIX)).failsWithin(5, SECONDS)
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(NoSuchKeyException.class);
}


}

private DataAddress createDataAddress(List<String> assetNames, boolean isSingleObject) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.platform.commons.util.StringUtils;
import org.mockito.ArgumentCaptor;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -85,37 +84,23 @@ void setup() {
}

@ParameterizedTest
@ArgumentsSource(SinglePartsInputs.class)
void transferParts_singlePart_succeeds(List<DataSource.Part> inputStream) {
@ArgumentsSource(PartsInputs.class)
void transferParts_succeeds(List<DataSource.Part> inputStream, int expectedPartsPerObject) {
var isSingleObject = inputStream.size() == 1;

var result = dataSink.transferParts(inputStream);
assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, times(inputStream.size())).completeMultipartUpload(completeMultipartUploadRequestCaptor
.capture());

var completeMultipartUploadRequest = completeMultipartUploadRequestCaptor.getValue();
assertThat(completeMultipartUploadRequest.bucket()).isEqualTo(BUCKET_NAME);
assertThat(completeMultipartUploadRequest.key())
.isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME);
assertThat(completeMultipartUploadRequest.multipartUpload().parts()).hasSize(1);
}

@ParameterizedTest
@ArgumentsSource(MultiPartsInputs.class)
void transferParts_multiPart_succeeds(List<DataSource.Part> inputStream) {
var isSingleObject = inputStream.size() == 1;

var result = dataSink.transferParts(inputStream);
var completeMultipartUploadRequests = completeMultipartUploadRequestCaptor.getAllValues();

assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, times(inputStream.size()))
.completeMultipartUpload(completeMultipartUploadRequestCaptor.capture());
var completeMultipartUploadRequest = completeMultipartUploadRequestCaptor.getValue();
assertThat(completeMultipartUploadRequest.bucket()).isEqualTo(BUCKET_NAME);
assertThat(completeMultipartUploadRequest.key())
.isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME);
assertThat(completeMultipartUploadRequest.multipartUpload().parts()).hasSize(2);
for (var request : completeMultipartUploadRequests) {
assertThat(request.bucket()).isEqualTo(BUCKET_NAME);
assertThat(request.key())
.isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME);
assertThat(request.multipartUpload().parts()).hasSize(expectedPartsPerObject);
}
}

@Test
Expand All @@ -134,7 +119,7 @@ void transferParts_failed_to_download() {
}

@ParameterizedTest
@ArgumentsSource(MultiPartsInputs.class)
@ArgumentsSource(PartsInputs.class)
void transferParts_fails_to_upload(List<DataSource.Part> inputStream) {
var isSingleObject = inputStream.size() == 1;

Expand Down Expand Up @@ -162,25 +147,31 @@ void transferParts_fails_to_upload(List<DataSource.Part> inputStream) {
assertThat(result.getFailureDetail()).isEqualTo(expectedMessage);
}

private static class SinglePartsInputs implements ArgumentsProvider {
private static class PartsInputs implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
var content = "content smaller than a chunk size";
return Stream.of(Arguments.of(List.of(createDataSource(content))),
Arguments.of(List.of(createDataSource(content)), List.of(createDataSource(content)))
);
var emptyContent = "";
var smallContent = "content smaller than a chunk size";
var bigContent = "content bigger than 50 bytes chunk size so that it gets chunked and uploaded as a multipart upload";
return Stream.of(
Arguments.of(
List.of(createDataSource(emptyContent)), 1),
Arguments.of(
List.of(createDataSource(smallContent)), 1),
Arguments.of(
List.of(createDataSource(bigContent)), 2),
Arguments.of(
List.of(createDataSource(emptyContent), createDataSource(smallContent)), 1),
Arguments.of(
List.of(createDataSource(bigContent), createDataSource(bigContent)), 2));
}
}

private static class MultiPartsInputs implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
var content = "content bigger than 50 bytes chunk size so that it gets chunked and uploaded as a multipart upload";
return Stream.of(Arguments.of(List.of(createDataSource(content))),
Arguments.of(List.of(createDataSource(content)), List.of(createDataSource(content)))
);
private static InputStreamDataSource createDataSource(String text) {
if (text.length() > 0) {
return new InputStreamDataSource(SOURCE_OBJECT_NAME, new ByteArrayInputStream(text.getBytes(UTF_8)));
}

return new InputStreamDataSource(SOURCE_OBJECT_NAME, new ByteArrayInputStream(new byte[0]));
}

private static InputStreamDataSource createDataSource(String text) {
Expand Down