Skip to content

Commit

Permalink
[Remote Store] Integrate S3 async upload with S3BlobContainer (#7218)
Browse files Browse the repository at this point in the history
Signed-off-by: Raghuvansh Raj <raghraaj@amazon.com>
  • Loading branch information
raghuvanshraj authored Jul 13, 2023
1 parent 6a35ffa commit 2f4545a
Show file tree
Hide file tree
Showing 10 changed files with 948 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ protected S3Repository createRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, false) {

@Override
public BlobStore blobStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@
import org.opensearch.action.ActionListener;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.collect.Tuple;
Expand Down Expand Up @@ -72,6 +76,8 @@
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import org.opensearch.core.common.Strings;
import org.opensearch.repositories.s3.async.UploadRequest;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -82,6 +88,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -90,12 +97,13 @@
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;

class S3BlobContainer extends AbstractBlobContainer {
class S3BlobContainer extends AbstractBlobContainer implements VerifyingMultiStreamBlobContainer {

private static final Logger logger = LogManager.getLogger(S3BlobContainer.class);

/**
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
*
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
*/
private static final int MAX_BULK_DELETES = 1000;
Expand Down Expand Up @@ -166,6 +174,42 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
});
}

@Override
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {
UploadRequest uploadRequest = new UploadRequest(
blobStore.bucket(),
buildKey(writeContext.getFileName()),
writeContext.getFileSize(),
writeContext.getWritePriority(),
writeContext.getUploadFinalizer(),
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum()
);
try {
long partSize = blobStore.getAsyncTransferManager().calculateOptimalPartSize(writeContext.getFileSize());
StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize));
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {

S3AsyncClient s3AsyncClient = writeContext.getWritePriority() == WritePriority.HIGH
? amazonS3Reference.get().priorityClient()
: amazonS3Reference.get().client();
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext);
completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
completionListener.onFailure(ex);
}
});
}
} catch (Exception e) {
logger.info("exception error from blob container for file {}", writeContext.getFileName());
throw new IOException(e);
}
}

// package private for testing
long getLargeBlobThresholdInBytes() {
return blobStore.bufferSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.opensearch.common.unit.ByteSizeValue;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.StorageClass;
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
import org.opensearch.repositories.s3.async.AsyncTransferManager;

import java.io.IOException;
import java.util.Locale;
Expand All @@ -53,6 +55,8 @@ class S3BlobStore implements BlobStore {

private final S3Service service;

private final S3AsyncService s3AsyncService;

private final String bucket;

private final ByteSizeValue bufferSize;
Expand All @@ -67,22 +71,41 @@ class S3BlobStore implements BlobStore {

private final StatsMetricPublisher statsMetricPublisher = new StatsMetricPublisher();

private final AsyncTransferManager asyncTransferManager;
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final boolean multipartUploadEnabled;

S3BlobStore(
S3Service service,
S3AsyncService s3AsyncService,
boolean multipartUploadEnabled,
String bucket,
boolean serverSideEncryption,
ByteSizeValue bufferSize,
String cannedACL,
String storageClass,
RepositoryMetadata repositoryMetadata
RepositoryMetadata repositoryMetadata,
AsyncTransferManager asyncTransferManager,
AsyncExecutorContainer priorityExecutorBuilder,
AsyncExecutorContainer normalExecutorBuilder
) {
this.service = service;
this.s3AsyncService = s3AsyncService;
this.multipartUploadEnabled = multipartUploadEnabled;
this.bucket = bucket;
this.serverSideEncryption = serverSideEncryption;
this.bufferSize = bufferSize;
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
this.repositoryMetadata = repositoryMetadata;
this.asyncTransferManager = asyncTransferManager;
this.normalExecutorBuilder = normalExecutorBuilder;
this.priorityExecutorBuilder = priorityExecutorBuilder;
}

public boolean isMultipartUploadEnabled() {
return multipartUploadEnabled;
}

@Override
Expand All @@ -94,6 +117,10 @@ public AmazonS3Reference clientReference() {
return service.client(repositoryMetadata);
}

public AmazonAsyncS3Reference asyncClientReference() {
return s3AsyncService.client(repositoryMetadata, priorityExecutorBuilder, normalExecutorBuilder);
}

int getMaxRetries() {
return service.settings(repositoryMetadata).maxRetries;
}
Expand All @@ -117,7 +144,12 @@ public BlobContainer blobContainer(BlobPath path) {

@Override
public void close() throws IOException {
this.service.close();
if (service != null) {
this.service.close();
}
if (s3AsyncService != null) {
this.s3AsyncService.close();
}
}

@Override
Expand Down Expand Up @@ -170,4 +202,8 @@ public static ObjectCannedACL initCannedACL(String cannedACL) {

throw new BlobStoreException("cannedACL is not valid: [" + cannedACL + "]");
}

public AsyncTransferManager getAsyncTransferManager() {
return asyncTransferManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.ClusterState;
Expand All @@ -57,6 +56,8 @@
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.ShardGenerations;
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.threadpool.Scheduler;
Expand Down Expand Up @@ -103,6 +104,11 @@ class S3Repository extends MeteredBlobStoreRepository {
ByteSizeUnit.BYTES
);

private static final ByteSizeValue DEFAULT_MULTIPART_UPLOAD_MINIMUM_PART_SIZE = new ByteSizeValue(
ByteSizeUnit.MB.toBytes(16),
ByteSizeUnit.BYTES
);

static final Setting<String> BUCKET_SETTING = Setting.simpleString("bucket");

/**
Expand Down Expand Up @@ -146,6 +152,26 @@ class S3Repository extends MeteredBlobStoreRepository {
MAX_PART_SIZE_USING_MULTIPART
);

/**
* Minimum part size for parallel multipart uploads
*/
static final Setting<ByteSizeValue> PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING = Setting.byteSizeSetting(
"parallel_multipart_upload.minimum_part_size",
DEFAULT_MULTIPART_UPLOAD_MINIMUM_PART_SIZE,
MIN_PART_SIZE_USING_MULTIPART,
MAX_PART_SIZE_USING_MULTIPART,
Setting.Property.NodeScope
);

/**
* This setting controls whether parallel multipart uploads will be used when calling S3 or not
*/
public static Setting<Boolean> PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING = Setting.boolSetting(
"parallel_multipart_upload.enabled",
true,
Setting.Property.NodeScope
);

/**
* Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.
*/
Expand Down Expand Up @@ -193,6 +219,12 @@ class S3Repository extends MeteredBlobStoreRepository {

private final RepositoryMetadata repositoryMetadata;

private final AsyncTransferManager asyncUploadUtils;
private final S3AsyncService s3AsyncService;
private final boolean multipartUploadEnabled;
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;

/**
* Constructs an s3 backed repository
*/
Expand All @@ -201,7 +233,12 @@ class S3Repository extends MeteredBlobStoreRepository {
final NamedXContentRegistry namedXContentRegistry,
final S3Service service,
final ClusterService clusterService,
final RecoverySettings recoverySettings
final RecoverySettings recoverySettings,
final AsyncTransferManager asyncUploadUtils,
final AsyncExecutorContainer priorityExecutorBuilder,
final AsyncExecutorContainer normalExecutorBuilder,
final S3AsyncService s3AsyncService,
final boolean multipartUploadEnabled
) {
super(
metadata,
Expand All @@ -212,8 +249,13 @@ class S3Repository extends MeteredBlobStoreRepository {
buildLocation(metadata)
);
this.service = service;
this.s3AsyncService = s3AsyncService;
this.multipartUploadEnabled = multipartUploadEnabled;

this.repositoryMetadata = metadata;
this.asyncUploadUtils = asyncUploadUtils;
this.priorityExecutorBuilder = priorityExecutorBuilder;
this.normalExecutorBuilder = normalExecutorBuilder;

// Parse and validate the user's S3 Storage Class setting
this.bucket = BUCKET_SETTING.get(metadata.settings());
Expand Down Expand Up @@ -314,7 +356,20 @@ public void deleteSnapshots(

@Override
protected S3BlobStore createBlobStore() {
return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetadata);
return new S3BlobStore(
service,
s3AsyncService,
multipartUploadEnabled,
bucket,
serverSideEncryption,
bufferSize,
cannedACL,
storageClass,
repositoryMetadata,
asyncUploadUtils,
priorityExecutorBuilder,
normalExecutorBuilder
);
}

// only use for testing
Expand Down
Loading

0 comments on commit 2f4545a

Please sign in to comment.