diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index fcfccf50ad326..9ffdba5eaae3a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -241,23 +241,27 @@ public void readBlobAsync(String blobName, ActionListener listener) return; } - final List blobPartInputStreamFutures = new ArrayList<>(); - final long blobSize = blobMetadata.objectSize(); - final Integer numberOfParts = blobMetadata.objectParts() == null ? null : blobMetadata.objectParts().totalPartsCount(); - final String blobChecksum = blobMetadata.checksum().checksumCRC32(); - - if (numberOfParts == null) { - blobPartInputStreamFutures.add(() -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, null)); - } else { - // S3 multipart files use 1 to n indexing - for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) { - final int innerPartNumber = partNumber; - blobPartInputStreamFutures.add( - () -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, innerPartNumber) - ); + try { + final List blobPartInputStreamFutures = new ArrayList<>(); + final long blobSize = blobMetadata.objectSize(); + final Integer numberOfParts = blobMetadata.objectParts() == null ? null : blobMetadata.objectParts().totalPartsCount(); + final String blobChecksum = blobMetadata.checksum() == null ? null : blobMetadata.checksum().checksumCRC32(); + + if (numberOfParts == null) { + blobPartInputStreamFutures.add(() -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, null)); + } else { + // S3 multipart files use 1 to n indexing + for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) { + final int innerPartNumber = partNumber; + blobPartInputStreamFutures.add( + () -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, innerPartNumber) + ); + } } + listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum)); + } catch (Exception ex) { + listener.onFailure(ex); } - listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum)); }); } catch (Exception ex) { listener.onFailure(SdkException.create("Error occurred while fetching blob parts from the repository", ex)); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index 2e54705e9cd78..e266bba372d80 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -1074,6 +1074,51 @@ public void testReadBlobAsyncFailure() throws Exception { assertEquals(1, readContextActionListener.getFailureCount()); } + public void testReadBlobAsyncOnCompleteFailureMissingData() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final String blobName = randomAlphaOfLengthBetween(1, 10); + final String checksum = randomAlphaOfLength(10); + + final long objectSize = 100L; + final int objectPartCount = 10; + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( + AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) + ); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + final BlobPath blobPath = new BlobPath(); + + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.serverSideEncryption()).thenReturn(false); + when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference); + + CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); + getObjectAttributesResponseCompletableFuture.complete( + GetObjectAttributesResponse.builder() + .checksum(Checksum.builder().build()) + .objectSize(null) + .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) + .build() + ); + when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( + getObjectAttributesResponseCompletableFuture + ); + + CountDownLatch countDownLatch = new CountDownLatch(1); + CountingCompletionListener readContextActionListener = new CountingCompletionListener<>(); + LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + blobContainer.readBlobAsync(blobName, listener); + countDownLatch.await(); + + assertEquals(0, readContextActionListener.getResponseCount()); + assertEquals(1, readContextActionListener.getFailureCount()); + } + public void testGetBlobMetadata() throws Exception { final String checksum = randomAlphaOfLengthBetween(1, 10); final long objectSize = 100L;