Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizations in s3 async upload flow and guards against S3 async SDK… #11327

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) {
return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName);
}

private synchronized void releaseCachedClients() {
public synchronized void releaseCachedClients() {
// the clients will shutdown when they will not be used anymore
for (final AmazonAsyncS3Reference clientReference : clientsCache.values()) {
clientReference.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -188,10 +189,38 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.getWritePriority(),
writeContext.getUploadFinalizer(),
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum()
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled()
);
try {
long partSize = blobStore.getAsyncTransferManager().calculateOptimalPartSize(writeContext.getFileSize());
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
StreamContext streamContext = SocketAccess.doPrivileged(
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
);
InputStreamContainer inputStream = streamContext.provideStream(0);
try {
executeMultipartUpload(
blobStore,
uploadRequest.getKey(),
inputStream.getInputStream(),
uploadRequest.getContentLength()
);
completionListener.onResponse(null);
} catch (Exception ex) {
logger.error(
() -> new ParameterizedMessage(
"Failed to upload large file {} of size {} ",
uploadRequest.getKey(),
uploadRequest.getContentLength()
),
ex
);
completionListener.onFailure(ex);
}
return;
}
long partSize = blobStore.getAsyncTransferManager()
.calculateOptimalPartSize(writeContext.getFileSize(), writeContext.getWritePriority(), blobStore.isUploadRetryEnabled());
StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize));
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {

Expand Down Expand Up @@ -537,8 +566,14 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin

PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final InputStream requestInputStream;
if (blobStore.isUploadRetryEnabled()) {
requestInputStream = new BufferedInputStream(input, (int) (blobSize + 1));
} else {
requestInputStream = input;
}
SocketAccess.doPrivilegedVoid(
() -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(input, blobSize))
() -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(requestInputStream, blobSize))
);
} catch (final SdkException e) {
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
Expand Down Expand Up @@ -578,6 +613,13 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}

final InputStream requestInputStream;
if (blobStore.isUploadRetryEnabled()) {
requestInputStream = new BufferedInputStream(input, (int) (partSize + 1));
} else {
requestInputStream = input;
}

CreateMultipartUploadRequest createMultipartUploadRequest = createMultipartUploadRequestBuilder.build();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
uploadId.set(
Expand All @@ -601,10 +643,9 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
.build();

bytesCount += uploadPartRequest.contentLength();

final UploadPartResponse uploadResponse = SocketAccess.doPrivileged(
() -> clientReference.get()
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(input, uploadPartRequest.contentLength()))
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(requestInputStream, uploadPartRequest.contentLength()))
);
parts.add(CompletedPart.builder().partNumber(uploadPartRequest.partNumber()).eTag(uploadResponse.eTag()).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED;

class S3BlobStore implements BlobStore {

Expand All @@ -71,6 +73,10 @@

private volatile ByteSizeValue bufferSize;

private volatile boolean redirectLargeUploads;

private volatile boolean uploadRetryEnabled;

private volatile boolean serverSideEncryption;

private volatile ObjectCannedACL cannedACL;
Expand Down Expand Up @@ -119,6 +125,9 @@
this.normalExecutorBuilder = normalExecutorBuilder;
this.priorityExecutorBuilder = priorityExecutorBuilder;
this.urgentExecutorBuilder = urgentExecutorBuilder;
// Settings to initialize blobstore with.
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand All @@ -130,6 +139,8 @@
this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings()));
this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings()));
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());

Check warning on line 143 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L142-L143

Added lines #L142 - L143 were not covered by tests
}

@Override
Expand All @@ -149,6 +160,14 @@
return service.settings(repositoryMetadata).maxRetries;
}

public boolean isRedirectLargeUploads() {
return redirectLargeUploads;

Check warning on line 164 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L164

Added line #L164 was not covered by tests
}

public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

public String bucket() {
return bucket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,20 @@
*/
static final ByteSizeValue MAX_FILE_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.TB);

/**
* Whether large uploads need to be redirected to slow sync s3 client.
*/
static final Setting<Boolean> REDIRECT_LARGE_S3_UPLOAD = Setting.boolSetting(
"redirect_large_s3_upload",
true,
Setting.Property.NodeScope
);

/**
* Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries.
*/
static final Setting<Boolean> UPLOAD_RETRY_ENABLED = Setting.boolSetting("s3_upload_retry_enabled", true, Setting.Property.NodeScope);

/**
* Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold,
* the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and
Expand Down Expand Up @@ -391,7 +405,9 @@

// Reload configs for S3RepositoryPlugin
service.settings(metadata);
service.releaseCachedClients();

Check warning on line 408 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java#L408

Added line #L408 was not covered by tests
s3AsyncService.settings(metadata);
s3AsyncService.releaseCachedClients();

Check warning on line 410 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java#L410

Added line #L410 was not covered by tests

// Reload configs for S3BlobStore
BlobStore blobStore = getBlobStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ public List<Setting<?>> getSettings() {
S3ClientSettings.IDENTITY_TOKEN_FILE_SETTING,
S3ClientSettings.ROLE_SESSION_NAME_SETTING,
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING,
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING,
S3Repository.REDIRECT_LARGE_S3_UPLOAD,
S3Repository.UPLOAD_RETRY_ENABLED
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ private static IrsaCredentials buildFromEnviroment(IrsaCredentials defaults) {
return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName);
}

private synchronized void releaseCachedClients() {
public synchronized void releaseCachedClients() {
// the clients will shutdown when they will not be used anymore
for (final AmazonS3Reference clientReference : clientsCache.values()) {
clientReference.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;
Expand Down Expand Up @@ -55,8 +54,8 @@ public class AsyncPartsHandler {
* @param uploadId Upload Id against which multi-part is being performed
* @param completedParts Reference of completed parts
* @param inputStreamContainers Checksum containers
* @return list of completable futures
* @param statsMetricPublisher sdk metric publisher
* @return list of completable futures
* @throws IOException thrown in case of an IO error
*/
public static List<CompletableFuture<CompletedPart>> uploadParts(
Expand All @@ -69,7 +68,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
StatsMetricPublisher statsMetricPublisher
StatsMetricPublisher statsMetricPublisher,
boolean uploadRetryEnabled
) throws IOException {
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
Expand All @@ -95,7 +95,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
futures,
uploadPartRequestBuilder.build(),
inputStreamContainer,
uploadRequest
uploadRequest,
uploadRetryEnabled
);
}

Expand Down Expand Up @@ -132,6 +133,18 @@ public static void cleanUpParts(S3AsyncClient s3AsyncClient, UploadRequest uploa
}));
}

public static InputStream maybeRetryInputStream(
InputStream inputStream,
WritePriority writePriority,
boolean uploadRetryEnabled,
long contentLength
) {
if (uploadRetryEnabled == true && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) {
return new BufferedInputStream(inputStream, (int) (contentLength + 1));
}
return inputStream;
}

private static void uploadPart(
S3AsyncClient s3AsyncClient,
ExecutorService executorService,
Expand All @@ -142,7 +155,8 @@ private static void uploadPart(
List<CompletableFuture<CompletedPart>> futures,
UploadPartRequest uploadPartRequest,
InputStreamContainer inputStreamContainer,
UploadRequest uploadRequest
UploadRequest uploadRequest,
boolean uploadRetryEnabled
) {
Integer partNumber = uploadPartRequest.partNumber();

Expand All @@ -154,9 +168,13 @@ private static void uploadPart(
} else {
streamReadExecutor = executorService;
}
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));

InputStream inputStream = maybeRetryInputStream(
inputStreamContainer.getInputStream(),
uploadRequest.getWritePriority(),
uploadRetryEnabled,
uploadPartRequest.contentLength()
);
CompletableFuture<UploadPartResponse> uploadPartResponseFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.uploadPart(
uploadPartRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
Expand Down Expand Up @@ -183,7 +183,8 @@
uploadId,
completedParts,
inputStreamContainers,
statsMetricPublisher
statsMetricPublisher,
uploadRequest.isUploadRetryEnabled()
);
} catch (Exception ex) {
try {
Expand Down Expand Up @@ -302,10 +303,13 @@
/**
* Calculates the optimal part size of each part request if the upload operation is carried out as multipart upload.
*/
public long calculateOptimalPartSize(long contentLengthOfSource) {
public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority writePriority, boolean uploadRetryEnabled) {
if (contentLengthOfSource < ByteSizeUnit.MB.toBytes(5)) {
return contentLengthOfSource;
}
if (uploadRetryEnabled && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) {
return new ByteSizeValue(5, ByteSizeUnit.MB).getBytes();

Check warning on line 311 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java#L311

Added line #L311 was not covered by tests
}
double optimalPartSize = contentLengthOfSource / (double) MAX_UPLOAD_PARTS;
optimalPartSize = Math.ceil(optimalPartSize);
return (long) Math.max(optimalPartSize, minimumPartSize);
Expand Down Expand Up @@ -335,9 +339,13 @@
} else {
streamReadExecutor = executorService;
}
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));

InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream(
inputStreamContainer.getInputStream(),
uploadRequest.getWritePriority(),
uploadRequest.isUploadRetryEnabled(),
uploadRequest.getContentLength()
);
CompletableFuture<Void> putObjectFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.putObject(
putObjectRequestBuilder.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class UploadRequest {
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;

private boolean uploadRetryEnabled;

/**
* Construct a new UploadRequest object
*
Expand All @@ -43,7 +45,8 @@ public UploadRequest(
WritePriority writePriority,
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
Long expectedChecksum
Long expectedChecksum,
boolean uploadRetryEnabled
) {
this.bucket = bucket;
this.key = key;
Expand All @@ -52,6 +55,7 @@ public UploadRequest(
this.uploadFinalizer = uploadFinalizer;
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
this.uploadRetryEnabled = uploadRetryEnabled;
}

public String getBucket() {
Expand Down Expand Up @@ -81,4 +85,8 @@ public boolean doRemoteDataIntegrityCheck() {
public Long getExpectedChecksum() {
return expectedChecksum;
}

public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}
}
Loading
Loading