Skip to content

Commit

Permalink
Additional refactors to AsyncUploadUtils and TransferNIOGroup
Browse files Browse the repository at this point in the history
Signed-off-by: Raghuvansh Raj <raghraaj@amazon.com>
  • Loading branch information
raghuvanshraj committed Jul 7, 2023
1 parent 4d28acf commit 5f63ced
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -77,7 +78,6 @@
import org.opensearch.core.common.Strings;
import org.opensearch.repositories.s3.async.UploadRequest;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.utils.CompletableFutureUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -97,7 +97,7 @@
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;

class S3BlobContainer extends AbstractBlobContainer {
class S3BlobContainer extends AbstractBlobContainer implements VerifyingMultiStreamBlobContainer {

private static final Logger logger = LogManager.getLogger(S3BlobContainer.class);

Expand Down Expand Up @@ -175,17 +175,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
}

@Override
public boolean isMultiStreamUploadSupported() {
return blobStore.isMultipartUploadEnabled();
}

@Override
public boolean isRemoteDataIntegritySupported() {
return true;
}

@Override
public CompletableFuture<Void> writeBlobByStreams(WriteContext writeContext) throws IOException {
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {
UploadRequest uploadRequest = new UploadRequest(
blobStore.bucket(),
buildKey(writeContext.getFileName()),
Expand All @@ -196,20 +186,23 @@ public CompletableFuture<Void> writeBlobByStreams(WriteContext writeContext) thr
writeContext.getExpectedChecksum()
);
try {
long partSize = blobStore.getAsyncUploadUtils().calculateOptimalPartSize(writeContext.getFileSize());
long partSize = blobStore.getAsyncTransferManager().calculateOptimalPartSize(writeContext.getFileSize());
StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize));
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {

S3AsyncClient s3AsyncClient = writeContext.getWritePriority() == WritePriority.HIGH
? amazonS3Reference.get().priorityClient()
: amazonS3Reference.get().client();
CompletableFuture<Void> returnFuture = new CompletableFuture<>();
CompletableFuture<Void> completableFuture = blobStore.getAsyncUploadUtils()
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext);

CompletableFutureUtils.forwardExceptionTo(returnFuture, completableFuture);
CompletableFutureUtils.forwardResultTo(completableFuture, returnFuture);
return completableFuture;
completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
completionListener.onFailure(ex);
}
});
}
} catch (Exception e) {
logger.info("exception error from blob container for file {}", writeContext.getFileName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.StorageClass;
import org.opensearch.repositories.s3.async.AsyncExecutorBuilder;
import org.opensearch.repositories.s3.async.AsyncUploadUtils;
import org.opensearch.repositories.s3.async.AsyncTransferManager;

import java.io.IOException;
import java.util.Locale;
Expand Down Expand Up @@ -71,7 +71,7 @@ class S3BlobStore implements BlobStore {

private final StatsMetricPublisher statsMetricPublisher = new StatsMetricPublisher();

private final AsyncUploadUtils asyncUploadUtils;
private final AsyncTransferManager asyncTransferManager;
private final AsyncExecutorBuilder priorityExecutorBuilder;
private final AsyncExecutorBuilder normalExecutorBuilder;
private final boolean multipartUploadEnabled;
Expand All @@ -86,7 +86,7 @@ class S3BlobStore implements BlobStore {
String cannedACL,
String storageClass,
RepositoryMetadata repositoryMetadata,
AsyncUploadUtils asyncUploadUtils,
AsyncTransferManager asyncTransferManager,
AsyncExecutorBuilder priorityExecutorBuilder,
AsyncExecutorBuilder normalExecutorBuilder
) {
Expand All @@ -99,7 +99,7 @@ class S3BlobStore implements BlobStore {
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
this.repositoryMetadata = repositoryMetadata;
this.asyncUploadUtils = asyncUploadUtils;
this.asyncTransferManager = asyncTransferManager;
this.normalExecutorBuilder = normalExecutorBuilder;
this.priorityExecutorBuilder = priorityExecutorBuilder;
}
Expand Down Expand Up @@ -203,7 +203,7 @@ public static ObjectCannedACL initCannedACL(String cannedACL) {
throw new BlobStoreException("cannedACL is not valid: [" + cannedACL + "]");
}

public AsyncUploadUtils getAsyncUploadUtils() {
return asyncUploadUtils;
public AsyncTransferManager getAsyncTransferManager() {
return asyncTransferManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.opensearch.repositories.ShardGenerations;
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.opensearch.repositories.s3.async.AsyncExecutorBuilder;
import org.opensearch.repositories.s3.async.AsyncUploadUtils;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.threadpool.Scheduler;
Expand Down Expand Up @@ -172,24 +172,6 @@ class S3Repository extends MeteredBlobStoreRepository {
Setting.Property.NodeScope
);

/**
* Event loop thread count for priority uploads
*/
public static Setting<Integer> PRIORITY_UPLOAD_EVENT_LOOP_THREAD_COUNT_SETTING = Setting.intSetting(
"parallel_multipart_upload.priority.event_loop_thread_count",
4,
Setting.Property.NodeScope
);

/**
* Event loop thread count for normal uploads
*/
public static Setting<Integer> NORMAL_UPLOAD_EVENT_LOOP_THREAD_COUNT_SETTING = Setting.intSetting(
"parallel_multipart_upload.normal.event_loop_thread_count",
1,
Setting.Property.NodeScope
);

/**
* Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.
*/
Expand Down Expand Up @@ -237,7 +219,7 @@ class S3Repository extends MeteredBlobStoreRepository {

private final RepositoryMetadata repositoryMetadata;

private final AsyncUploadUtils asyncUploadUtils;
private final AsyncTransferManager asyncUploadUtils;
private final S3AsyncService s3AsyncService;
private final boolean multipartUploadEnabled;
private final AsyncExecutorBuilder priorityExecutorBuilder;
Expand All @@ -252,7 +234,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final S3Service service,
final ClusterService clusterService,
final RecoverySettings recoverySettings,
final AsyncUploadUtils asyncUploadUtils,
final AsyncTransferManager asyncUploadUtils,
final AsyncExecutorBuilder priorityExecutorBuilder,
final AsyncExecutorBuilder normalExecutorBuilder,
final S3AsyncService s3AsyncService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
Expand All @@ -49,8 +50,8 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.s3.async.AsyncExecutorBuilder;
import org.opensearch.repositories.s3.async.AsyncUploadUtils;
import org.opensearch.repositories.s3.async.TransferNIOGroup;
import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
Expand Down Expand Up @@ -92,10 +93,14 @@ public S3RepositoryPlugin(final Settings settings, final Path configPath) {
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
executorBuilders.add(new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, 4, 10_000, PRIORITY_FUTURE_COMPLETION));
executorBuilders.add(new FixedExecutorBuilder(settings, PRIORITY_STREAM_READER, 4, 10_000, PRIORITY_STREAM_READER));
executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, 1, 10_000, FUTURE_COMPLETION));
executorBuilders.add(new FixedExecutorBuilder(settings, STREAM_READER, 1, 10_000, STREAM_READER));
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
);
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_STREAM_READER, priorityPoolCount(settings), 10_000, PRIORITY_STREAM_READER)
);
executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION));
executorBuilders.add(new FixedExecutorBuilder(settings, STREAM_READER, normalPoolCount(settings), 10_000, STREAM_READER));
return executorBuilders;
}

Expand All @@ -109,6 +114,22 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
this.s3AsyncService.refreshAndClearCache(clientsSettings);
}

private static int boundedBy(int value, int min, int max) {
return Math.min(max, Math.max(min, value));
}

private static int allocatedProcessors(Settings settings) {
return OpenSearchExecutors.allocatedProcessors(settings);
}

private static int priorityPoolCount(Settings settings) {
return boundedBy((allocatedProcessors(settings) + 1) / 2, 2, 4);
}

private static int normalPoolCount(Settings settings) {
return boundedBy((allocatedProcessors(settings) + 7) / 8, 1, 2);
}

@Override
public Collection<Object> createComponents(
final Client client,
Expand All @@ -123,15 +144,17 @@ public Collection<Object> createComponents(
final IndexNameExpressionResolver expressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
) {
int priorityEventLoopThreads = priorityPoolCount(clusterService.getSettings());
int normalEventLoopThreads = normalPoolCount(clusterService.getSettings());
this.priorityExecutorBuilder = new AsyncExecutorBuilder(
threadPool.executor(PRIORITY_FUTURE_COMPLETION),
threadPool.executor(PRIORITY_STREAM_READER),
new TransferNIOGroup(S3Repository.PRIORITY_UPLOAD_EVENT_LOOP_THREAD_COUNT_SETTING.get(clusterService.getSettings()))
new AsyncTransferEventLoopGroup(priorityEventLoopThreads)
);
this.normalExecutorBuilder = new AsyncExecutorBuilder(
threadPool.executor(FUTURE_COMPLETION),
threadPool.executor(STREAM_READER),
new TransferNIOGroup(S3Repository.NORMAL_UPLOAD_EVENT_LOOP_THREAD_COUNT_SETTING.get(clusterService.getSettings()))
new AsyncTransferEventLoopGroup(normalEventLoopThreads)
);
return Collections.emptyList();
}
Expand All @@ -143,7 +166,8 @@ protected S3Repository createRepository(
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
AsyncUploadUtils asyncUploadUtils = new AsyncUploadUtils(

AsyncTransferManager asyncUploadUtils = new AsyncTransferManager(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(),
normalExecutorBuilder.getStreamReader(),
priorityExecutorBuilder.getStreamReader()
Expand Down
Loading

0 comments on commit 5f63ced

Please sign in to comment.