diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 8a1f627df9e58..2911a018df337 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -32,6 +32,8 @@ package org.opensearch.repositories.s3; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -44,11 +46,15 @@ import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.ObjectAttributes; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Error; @@ -82,8 +88,8 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.repositories.s3.async.UploadRequest; +import org.opensearch.repositories.s3.utils.HttpRangeUtils; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -222,9 +228,8 @@ public void readBlobAsync(String blobName, ActionListener listener) try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { final S3AsyncClient s3AsyncClient = amazonS3Reference.get().client(); final String bucketName = blobStore.bucket(); - final AsyncTransferManager transferManager = blobStore.getAsyncTransferManager(); - final GetObjectAttributesResponse blobMetadata = transferManager.getBlobMetadata(s3AsyncClient, bucketName, blobName).get(); + final GetObjectAttributesResponse blobMetadata = getBlobMetadata(s3AsyncClient, bucketName, blobName).get(); final long blobSize = blobMetadata.objectSize(); final int numberOfParts = blobMetadata.objectParts().totalPartsCount(); @@ -234,26 +239,22 @@ public void readBlobAsync(String blobName, ActionListener listener) final List> blobPartInputStreamFutures = new ArrayList<>(); // S3 multipart files use 1 to n indexing for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) { - int finalPartNumber = partNumber - 1; - CompletableFuture partInputStreamFuture = transferManager.getBlobPartInputStreamContainer( - s3AsyncClient, - bucketName, - blobName, - partNumber - ).whenComplete((inputStreamContainer, error) -> { - if (error == null) { - blobPartStreams.add(finalPartNumber, inputStreamContainer); - } - }); - - blobPartInputStreamFutures.add(partInputStreamFuture); + blobPartInputStreamFutures.add(getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobName, partNumber)); } CompletableFuture.allOf(blobPartInputStreamFutures.toArray(CompletableFuture[]::new)).whenComplete((unused, throwable) -> { if (throwable == null) { - listener.onResponse(new ReadContext(blobSize, blobPartStreams, blobChecksum)); + listener.onResponse( + new ReadContext( + blobSize, + blobPartInputStreamFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()), + blobChecksum + ) + ); } else { - Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable; + Exception ex = throwable.getCause() instanceof Exception + ? (Exception) throwable.getCause() + : new Exception(throwable.getCause()); listener.onFailure(ex); } }); @@ -678,4 +679,65 @@ static Tuple numberOfMultiparts(final long totalSize, final long par return Tuple.tuple(parts + 1, remaining); } } + + /** + * Fetches a part of the blob from the S3 bucket and transforms it to an {@link InputStreamContainer}, which holds + * the stream and its related metadata. + * @param s3AsyncClient Async client to be utilized to fetch the object part + * @param bucketName Name of the S3 bucket + * @param blobName Identifier of the blob for which the parts will be fetched + * @param partNumber Part number for the blob to be retrieved + * @return A future of {@link InputStreamContainer} containing the stream and stream metadata. + */ + CompletableFuture getBlobPartInputStreamContainer( + S3AsyncClient s3AsyncClient, + String bucketName, + String blobName, + int partNumber + ) { + final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder() + .bucket(bucketName) + .key(blobName) + .partNumber(partNumber); + + return SocketAccess.doPrivileged( + () -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream()) + .thenApply(S3BlobContainer::transformResponseToInputStreamContainer) + ); + } + + /** + * Transforms the stream response object from S3 into an {@link InputStreamContainer} + * @param streamResponse Response stream object from S3 + * @return {@link InputStreamContainer} containing the stream and stream metadata + */ + // Package-Private for testing. + static InputStreamContainer transformResponseToInputStreamContainer(ResponseInputStream streamResponse) { + final GetObjectResponse getObjectResponse = streamResponse.response(); + final String contentRange = getObjectResponse.contentRange(); + final Long contentLength = getObjectResponse.contentLength(); + if (contentRange == null || contentLength == null) { + throw SdkException.builder().message("Failed to fetch required metadata for blob part").build(); + } + final Long offset = HttpRangeUtils.getStartOffsetFromRangeHeader(getObjectResponse.contentRange()); + return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), offset); + } + + /** + * Retrieves the metadata like checksum, object size and parts for the provided blob within the S3 bucket. + * @param s3AsyncClient Async client to be utilized to fetch the metadata + * @param bucketName Name of the S3 bucket + * @param blobName Identifier of the blob for which the metadata will be fetched + * @return A future containing the metadata within {@link GetObjectAttributesResponse} + */ + CompletableFuture getBlobMetadata(S3AsyncClient s3AsyncClient, String bucketName, String blobName) { + // Fetch blob metadata - part info, size, checksum + final GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder() + .bucket(bucketName) + .key(blobName) + .objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS) + .build(); + + return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest)); + } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index c1168600f718d..8d45c2167a3d1 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -8,11 +8,8 @@ package org.opensearch.repositories.s3.async; -import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkClientException; -import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.http.HttpStatusCode; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; @@ -23,11 +20,6 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; -import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.ObjectAttributes; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.utils.CompletableFutureUtils; @@ -37,16 +29,13 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; import org.opensearch.common.StreamContext; -import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.exception.CorruptFileException; import org.opensearch.common.blobstore.stream.write.WritePriority; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.util.ByteUtils; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.s3.SocketAccess; import org.opensearch.repositories.s3.io.CheckedContainer; -import org.opensearch.repositories.s3.utils.HttpRangeUtils; import java.io.IOException; import java.util.Arrays; @@ -364,67 +353,4 @@ private void deleteUploadedObject(S3AsyncClient s3AsyncClient, UploadRequest upl return null; }); } - - /** - * Fetches a part of the blob from the S3 bucket and transforms it to an {@link InputStreamContainer}, which holds - * the stream and its related metadata. - * @param s3AsyncClient Async client to be utilized to fetch the object part - * @param bucketName Name of the S3 bucket - * @param blobName Identifier of the blob for which the parts will be fetched - * @param partNumber Part number for the blob to be retrieved - * @return A future of {@link InputStreamContainer} containing the stream and stream metadata. - */ - @ExperimentalApi - public CompletableFuture getBlobPartInputStreamContainer( - S3AsyncClient s3AsyncClient, - String bucketName, - String blobName, - int partNumber - ) { - final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder() - .bucket(bucketName) - .key(blobName) - .partNumber(partNumber); - - return SocketAccess.doPrivileged( - () -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream()) - .thenApply(this::transformResponseToInputStreamContainer) - ); - } - - /** - * Transforms the stream response object from S3 into an {@link InputStreamContainer} - * @param streamResponse Response stream object from S3 - * @return {@link InputStreamContainer} containing the stream and stream metadata - */ - // Package-Private for testing. - InputStreamContainer transformResponseToInputStreamContainer(ResponseInputStream streamResponse) { - final GetObjectResponse getObjectResponse = streamResponse.response(); - final String contentRange = getObjectResponse.contentRange(); - final Long contentLength = getObjectResponse.contentLength(); - if (contentRange == null || contentLength == null) { - throw SdkException.builder().message("Failed to fetch required metadata for blob part").build(); - } - final Tuple s3ResponseRange = HttpRangeUtils.fromHttpRangeHeader(getObjectResponse.contentRange()); - return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), s3ResponseRange.v1()); - } - - /** - * Retrieves the metadata like checksum, object size and parts for the provided blob within the S3 bucket. - * @param s3AsyncClient Async client to be utilized to fetch the metadata - * @param bucketName Name of the S3 bucket - * @param blobName Identifier of the blob for which the metadata will be fetched - * @return A future containing the metadata within {@link GetObjectAttributesResponse} - */ - @ExperimentalApi - public CompletableFuture getBlobMetadata(S3AsyncClient s3AsyncClient, String bucketName, String blobName) { - // Fetch blob metadata - part info, size, checksum - final GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder() - .bucket(bucketName) - .key(blobName) - .objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS) - .build(); - - return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest)); - } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java index fb0c88364d981..2e2fc9b86a45b 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java @@ -10,28 +10,26 @@ import software.amazon.awssdk.core.exception.SdkException; -import org.opensearch.common.collect.Tuple; - import java.util.regex.Matcher; import java.util.regex.Pattern; public final class HttpRangeUtils { - private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes\\s+(\\d+)-(\\d+)/(\\d+|.*)$"); + private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes\\s+(\\d+)-\\d+[/\\d*]+$"); /** - * Parses the content range header string value to calculate the start and end of the HTTP response. + * Parses the content range header string value to calculate the start (offset) of the HTTP response. * Tests against the RFC9110 specification of content range string. * Sample values: "bytes 0-10/200", "bytes 0-10/*" * Details here * @param headerValue Header content range string value from the HTTP response - * @return Pair of values where v1 represents the lower and v2 represents the upper bound of the stream + * @return Start (Offset) value of the HTTP response */ - public static Tuple fromHttpRangeHeader(String headerValue) { + public static Long getStartOffsetFromRangeHeader(String headerValue) { Matcher matcher = RANGE_PATTERN.matcher(headerValue); if (!matcher.find()) { throw SdkException.create("Regex match for Content-Range header {" + headerValue + "} failed", new RuntimeException()); } - return new Tuple<>(Long.parseLong(matcher.group(1)), Long.parseLong(matcher.group(2))); + return Long.parseLong(matcher.group(1)); } /** diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index 2fe28c458ea8b..a87c060dcc60a 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -899,7 +899,27 @@ public void onFailure(Exception e) {} } } - public void testAsyncBlobDownload() throws Exception { + public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException { + testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithZeroLimit() throws IOException { + testListBlobsByPrefixInLexicographicOrder(0, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitLessThanPageSize() throws IOException { + testListBlobsByPrefixInLexicographicOrder(2, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanPageSize() throws IOException { + testListBlobsByPrefixInLexicographicOrder(8, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanNumberOfRecords() throws IOException { + testListBlobsByPrefixInLexicographicOrder(12, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testReadBlobAsync() throws Exception { final String bucketName = randomAlphaOfLengthBetween(1, 10); final String blobName = randomAlphaOfLengthBetween(1, 10); final String checksum = randomAlphaOfLength(10); @@ -964,7 +984,7 @@ public void testAsyncBlobDownload() throws Exception { } } - public void testAsyncBlobDownloadFailure() throws Exception { + public void testReadBlobAsyncFailure() throws Exception { final String bucketName = randomAlphaOfLengthBetween(1, 10); final String blobName = randomAlphaOfLengthBetween(1, 10); final String checksum = randomAlphaOfLength(10); @@ -1012,24 +1032,102 @@ public void testAsyncBlobDownloadFailure() throws Exception { assertEquals(1, readContextActionListener.getFailureCount()); } - public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException { - testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); - } + public void testGetBlobMetadata() throws Exception { + final String checksum = randomAlphaOfLengthBetween(1, 10); + final long objectSize = 100L; + final int objectPartCount = 10; + final String blobName = randomAlphaOfLengthBetween(1, 10); + final String bucketName = randomAlphaOfLengthBetween(1, 10); - public void testListBlobsByPrefixInLexicographicOrderWithZeroLimit() throws IOException { - testListBlobsByPrefixInLexicographicOrder(0, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); - } + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final BlobPath blobPath = new BlobPath(); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.serverSideEncryption()).thenReturn(false); + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - public void testListBlobsByPrefixInLexicographicOrderWithLimitLessThanPageSize() throws IOException { - testListBlobsByPrefixInLexicographicOrder(2, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); + getObjectAttributesResponseCompletableFuture.complete( + GetObjectAttributesResponse.builder() + .checksum(Checksum.builder().checksumCRC32(checksum).build()) + .objectSize(objectSize) + .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) + .build() + ); + when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( + getObjectAttributesResponseCompletableFuture + ); + + CompletableFuture responseFuture = blobContainer.getBlobMetadata(s3AsyncClient, bucketName, blobName); + GetObjectAttributesResponse objectAttributesResponse = responseFuture.get(); + + assertEquals(checksum, objectAttributesResponse.checksum().checksumCRC32()); + assertEquals(Long.valueOf(objectSize), objectAttributesResponse.objectSize()); + assertEquals(Integer.valueOf(objectPartCount), objectAttributesResponse.objectParts().totalPartsCount()); } - public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanPageSize() throws IOException { - testListBlobsByPrefixInLexicographicOrder(8, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + public void testGetBlobPartInputStream() throws Exception { + final String blobName = randomAlphaOfLengthBetween(1, 10); + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final long contentLength = 10L; + final String contentRange = "bytes 0-10/100"; + final InputStream inputStream = ResponseInputStream.nullInputStream(); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final BlobPath blobPath = new BlobPath(); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.serverSideEncryption()).thenReturn(false); + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + GetObjectResponse getObjectResponse = GetObjectResponse.builder().contentLength(contentLength).contentRange(contentRange).build(); + + CompletableFuture> getObjectPartResponse = new CompletableFuture<>(); + ResponseInputStream responseInputStream = new ResponseInputStream<>(getObjectResponse, inputStream); + getObjectPartResponse.complete(responseInputStream); + + when( + s3AsyncClient.getObject( + any(GetObjectRequest.class), + ArgumentMatchers.>>any() + ) + ).thenReturn(getObjectPartResponse); + + InputStreamContainer inputStreamContainer = blobContainer.getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobName, 0) + .get(); + + assertEquals(0, inputStreamContainer.getOffset()); + assertEquals(contentLength, inputStreamContainer.getContentLength()); + assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available()); } - public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanNumberOfRecords() throws IOException { - testListBlobsByPrefixInLexicographicOrder(12, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + public void testTransformResponseToInputStreamContainer() throws Exception { + final String contentRange = "bytes 0-10/100"; + final long contentLength = 10L; + final InputStream inputStream = ResponseInputStream.nullInputStream(); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + + GetObjectResponse getObjectResponse = GetObjectResponse.builder().contentLength(contentLength).build(); + + ResponseInputStream responseInputStreamNoRange = new ResponseInputStream<>(getObjectResponse, inputStream); + assertThrows(SdkException.class, () -> S3BlobContainer.transformResponseToInputStreamContainer(responseInputStreamNoRange)); + + getObjectResponse = GetObjectResponse.builder().contentRange(contentRange).build(); + ResponseInputStream responseInputStreamNoContentLength = new ResponseInputStream<>( + getObjectResponse, + inputStream + ); + assertThrows(SdkException.class, () -> S3BlobContainer.transformResponseToInputStreamContainer(responseInputStreamNoContentLength)); + + getObjectResponse = GetObjectResponse.builder().contentRange(contentRange).contentLength(contentLength).build(); + ResponseInputStream responseInputStream = new ResponseInputStream<>(getObjectResponse, inputStream); + InputStreamContainer inputStreamContainer = S3BlobContainer.transformResponseToInputStreamContainer(responseInputStream); + assertEquals(contentLength, inputStreamContainer.getContentLength()); + assertEquals(0, inputStreamContainer.getOffset()); + assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available()); } private void mockObjectPartResponse( diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index aa187adcace9c..9c07b929052bc 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -9,26 +9,17 @@ package org.opensearch.repositories.s3.async; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; -import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.http.HttpStatusCode; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; -import software.amazon.awssdk.services.s3.model.Checksum; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; -import software.amazon.awssdk.services.s3.model.GetObjectAttributesParts; -import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; -import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -45,13 +36,10 @@ import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; -import java.io.InputStream; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import org.mockito.ArgumentMatchers; - import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -248,95 +236,4 @@ public void testMultipartUploadCorruption() { verify(s3AsyncClient, times(0)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); verify(s3AsyncClient, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); } - - public void testGetBlobMetadata() throws Exception { - final String checksum = randomAlphaOfLengthBetween(1, 10); - final long objectSize = 100L; - final int objectPartCount = 10; - final String blobName = randomAlphaOfLengthBetween(1, 10); - final String bucketName = randomAlphaOfLengthBetween(1, 10); - - CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); - getObjectAttributesResponseCompletableFuture.complete( - GetObjectAttributesResponse.builder() - .checksum(Checksum.builder().checksumCRC32(checksum).build()) - .objectSize(objectSize) - .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) - .build() - ); - when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( - getObjectAttributesResponseCompletableFuture - ); - - CompletableFuture responseFuture = asyncTransferManager.getBlobMetadata( - s3AsyncClient, - bucketName, - blobName - ); - GetObjectAttributesResponse objectAttributesResponse = responseFuture.get(); - - assertEquals(checksum, objectAttributesResponse.checksum().checksumCRC32()); - assertEquals(Long.valueOf(objectSize), objectAttributesResponse.objectSize()); - assertEquals(Integer.valueOf(objectPartCount), objectAttributesResponse.objectParts().totalPartsCount()); - } - - public void testGetBlobPartInputStream() throws Exception { - final String blobName = randomAlphaOfLengthBetween(1, 10); - final String bucketName = randomAlphaOfLengthBetween(1, 10); - final long contentLength = 10L; - final String contentRange = "bytes 0-10/100"; - final InputStream inputStream = ResponseInputStream.nullInputStream(); - - GetObjectResponse getObjectResponse = GetObjectResponse.builder().contentLength(contentLength).contentRange(contentRange).build(); - - CompletableFuture> getObjectPartResponse = new CompletableFuture<>(); - ResponseInputStream responseInputStream = new ResponseInputStream<>(getObjectResponse, inputStream); - getObjectPartResponse.complete(responseInputStream); - - when( - s3AsyncClient.getObject( - any(GetObjectRequest.class), - ArgumentMatchers.>>any() - ) - ).thenReturn(getObjectPartResponse); - - InputStreamContainer inputStreamContainer = asyncTransferManager.getBlobPartInputStreamContainer( - s3AsyncClient, - bucketName, - blobName, - 0 - ).get(); - - assertEquals(0, inputStreamContainer.getOffset()); - assertEquals(contentLength, inputStreamContainer.getContentLength()); - assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available()); - } - - public void testTransformResponseToInputStreamContainer() throws Exception { - final String contentRange = "bytes 0-10/100"; - final long contentLength = 10L; - final InputStream inputStream = ResponseInputStream.nullInputStream(); - - GetObjectResponse getObjectResponse = GetObjectResponse.builder().contentLength(contentLength).build(); - - ResponseInputStream responseInputStreamNoRange = new ResponseInputStream<>(getObjectResponse, inputStream); - assertThrows(SdkException.class, () -> asyncTransferManager.transformResponseToInputStreamContainer(responseInputStreamNoRange)); - - getObjectResponse = GetObjectResponse.builder().contentRange(contentRange).build(); - ResponseInputStream responseInputStreamNoContentLength = new ResponseInputStream<>( - getObjectResponse, - inputStream - ); - assertThrows( - SdkException.class, - () -> asyncTransferManager.transformResponseToInputStreamContainer(responseInputStreamNoContentLength) - ); - - getObjectResponse = GetObjectResponse.builder().contentRange(contentRange).contentLength(contentLength).build(); - ResponseInputStream responseInputStream = new ResponseInputStream<>(getObjectResponse, inputStream); - InputStreamContainer inputStreamContainer = asyncTransferManager.transformResponseToInputStreamContainer(responseInputStream); - assertEquals(contentLength, inputStreamContainer.getContentLength()); - assertEquals(0, inputStreamContainer.getOffset()); - assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available()); - } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java index 3420dc6df87e8..9a4267c5266e5 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java @@ -10,23 +10,20 @@ import software.amazon.awssdk.core.exception.SdkException; -import org.opensearch.common.collect.Tuple; import org.opensearch.test.OpenSearchTestCase; public final class HttpRangeUtilsTests extends OpenSearchTestCase { public void testFromHttpRangeHeader() { String headerValue = "bytes 0-10/200"; - Tuple range = HttpRangeUtils.fromHttpRangeHeader(headerValue); - assertEquals(0L, range.v1().longValue()); - assertEquals(10L, range.v2().longValue()); + Long offset = HttpRangeUtils.getStartOffsetFromRangeHeader(headerValue); + assertEquals(0L, offset.longValue()); headerValue = "bytes 0-10/*"; - range = HttpRangeUtils.fromHttpRangeHeader(headerValue); - assertEquals(0L, range.v1().longValue()); - assertEquals(10L, range.v2().longValue()); + offset = HttpRangeUtils.getStartOffsetFromRangeHeader(headerValue); + assertEquals(0L, offset.longValue()); final String invalidHeaderValue = "bytes */*"; - assertThrows(SdkException.class, () -> HttpRangeUtils.fromHttpRangeHeader(invalidHeaderValue)); + assertThrows(SdkException.class, () -> HttpRangeUtils.getStartOffsetFromRangeHeader(invalidHeaderValue)); } }