Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid to close client on data source closure #470

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
Expand Down Expand Up @@ -111,7 +110,7 @@ private List<S3Object> fetchPrefixedS3Objects() {

var response = client.listObjectsV2(listObjectsRequest);

s3Objects.addAll(response.contents().stream().filter(isFile).collect(Collectors.toList()));
s3Objects.addAll(response.contents().stream().filter(isFile).toList());

continuationToken = response.nextContinuationToken();

Expand All @@ -122,7 +121,7 @@ private List<S3Object> fetchPrefixedS3Objects() {

@Override
public void close() {
client.close();

}

private static class S3Part implements Part {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.connector.dataplane.aws.s3.exceptions.S3DataSourceException;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand All @@ -38,7 +39,7 @@ public class S3DataSourceTest {
private static final String OBJECT_NAME = "object-1";
private static final String OBJECT_PREFIX = "my-prefix/";
private static final String ERROR_MESSAGE = "Error message";
private final S3Client s3ClientMock = mock(S3Client.class);
private final S3Client s3Client = mock();

@Test
void should_select_prefixed_objects_case_key_prefix_is_present() {
Expand All @@ -52,15 +53,15 @@ void should_select_prefixed_objects_case_key_prefix_is_present() {
.bucketName(BUCKET_NAME)
.objectName(OBJECT_NAME)
.objectPrefix(OBJECT_PREFIX)
.client(s3ClientMock)
.client(s3Client)
.build();

when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse);
when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse);

var result = s3Datasource.openPartStream();

assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, atLeastOnce()).listObjectsV2(any(ListObjectsV2Request.class));
verify(s3Client, atLeastOnce()).listObjectsV2(any(ListObjectsV2Request.class));
assertThat(result.getContent()).hasSize(2);
}

Expand All @@ -73,10 +74,10 @@ void should_fail_case_no_object_is_found() {
.bucketName(BUCKET_NAME)
.objectName(OBJECT_NAME)
.objectPrefix(OBJECT_PREFIX)
.client(s3ClientMock)
.client(s3Client)
.build();

when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse);
when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse);

var result = s3Datasource.openPartStream();

Expand All @@ -91,13 +92,13 @@ void should_select_single_object_case_key_prefix_is_not_present() {
.bucketName(BUCKET_NAME)
.objectName(OBJECT_NAME)
.objectPrefix(null)
.client(s3ClientMock)
.client(s3Client)
.build();

var result = s3Datasource.openPartStream();

assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, never()).listObjectsV2(any(ListObjectsV2Request.class));
verify(s3Client, never()).listObjectsV2(any(ListObjectsV2Request.class));
assertThat(result.getContent()).hasSize(1);
}

Expand All @@ -115,16 +116,34 @@ void should_throw_datasource_exception_case_object_fetching_fails() {
.bucketName(BUCKET_NAME)
.objectName(OBJECT_NAME)
.objectPrefix(OBJECT_PREFIX)
.client(s3ClientMock)
.client(s3Client)
.build();

when(s3ClientMock.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse);
when(s3ClientMock.getObject(any(GetObjectRequest.class))).thenThrow(mockThrowable);
when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(mockResponse);
when(s3Client.getObject(any(GetObjectRequest.class))).thenThrow(mockThrowable);

var s3DataSourceException = assertThrows(S3DataSourceException.class, () ->
s3Datasource.openPartStream().getContent().map(DataSource.Part::openStream).toList());
assertThat(s3DataSourceException).hasCause(mockThrowable);
assertThat(s3DataSourceException.getMessage()).isEqualTo(ERROR_MESSAGE);
}

@Nested
class Close {

@Test
void shouldNotCloseClient_becauseItCouldBeReused() {
var s3Datasource = S3DataSource.Builder.newInstance()
.bucketName(BUCKET_NAME)
.objectName(OBJECT_NAME)
.objectPrefix(OBJECT_PREFIX)
.client(s3Client)
.build();

s3Datasource.close();

verify(s3Client, never()).close();
}

}
}
Loading