Skip to content

Commit

Permalink
Rate Limiter integration for remote transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
Bukhtawar committed Aug 19, 2023
1 parent bf10ff7 commit dc6f5aa
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 39 deletions.
51 changes: 51 additions & 0 deletions server/src/main/java/org/opensearch/common/StreamLimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

import org.apache.lucene.store.RateLimiter;

import java.io.IOException;
import java.util.function.Supplier;

public class StreamLimiter {

private final Supplier<RateLimiter> rateLimiterSupplier;

private final StreamLimiter.Listener listener;

private int bytesSinceLastRateLimit;

public StreamLimiter(Supplier<RateLimiter> rateLimiterSupplier, Listener listener) {
this.rateLimiterSupplier = rateLimiterSupplier;
this.listener = listener;
}

public void maybePause(int bytes) throws IOException {
bytesSinceLastRateLimit += bytes;
final RateLimiter rateLimiter = rateLimiterSupplier.get();
if (rateLimiter != null) {
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
bytesSinceLastRateLimit = 0;
if (pause > 0) {
listener.onPause(pause);
}
}
}
}

/**
* Internal listener
*
* @opensearch.internal
*/
public interface Listener {
void onPause(long nanos);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.transfer.stream;

import org.apache.lucene.store.RateLimiter;
import org.opensearch.common.StreamLimiter;

import java.io.IOException;
import java.util.function.Supplier;

public class RateLimitingOffsetRangeInputStream extends OffsetRangeInputStream {

private StreamLimiter streamLimiter;

private OffsetRangeInputStream delegate;

public RateLimitingOffsetRangeInputStream(
OffsetRangeInputStream delegate,
Supplier<RateLimiter> rateLimiterSupplier,
StreamLimiter.Listener listener
) {
this.streamLimiter = new StreamLimiter(rateLimiterSupplier, listener);
this.delegate = delegate;
}

@Override
public int read() throws IOException {
int b = delegate.read();
streamLimiter.maybePause(1);
return b;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int n = delegate.read(b, off, len);
if (n > 0) {
streamLimiter.maybePause(n);
}
return n;
}

@Override
public synchronized void mark(int readlimit) {
delegate.mark(readlimit);
}

@Override
public boolean markSupported() {
return delegate.markSupported();
}

@Override
public long getFilePointer() throws IOException {
return delegate.getFilePointer();
}

@Override
public synchronized void reset() throws IOException {
delegate.reset();
}

@Override
public void close() throws IOException {
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.snapshots.blobstore;

import org.apache.lucene.store.RateLimiter;
import org.opensearch.common.StreamLimiter;

import java.io.FilterInputStream;
import java.io.IOException;
Expand All @@ -46,53 +47,25 @@
*/
public class RateLimitingInputStream extends FilterInputStream {

private final Supplier<RateLimiter> rateLimiterSupplier;
private StreamLimiter streamLimiter;

private final Listener listener;

private long bytesSinceLastRateLimit;

/**
* Internal listener
*
* @opensearch.internal
*/
public interface Listener {
void onPause(long nanos);
}

public RateLimitingInputStream(InputStream delegate, Supplier<RateLimiter> rateLimiterSupplier, Listener listener) {
public RateLimitingInputStream(InputStream delegate, Supplier<RateLimiter> rateLimiterSupplier, StreamLimiter.Listener listener) {
super(delegate);
this.rateLimiterSupplier = rateLimiterSupplier;
this.listener = listener;
}

private void maybePause(int bytes) throws IOException {
bytesSinceLastRateLimit += bytes;
final RateLimiter rateLimiter = rateLimiterSupplier.get();
if (rateLimiter != null) {
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
bytesSinceLastRateLimit = 0;
if (pause > 0) {
listener.onPause(pause);
}
}
}
this.streamLimiter = new StreamLimiter(rateLimiterSupplier, listener);
}

@Override
public int read() throws IOException {
int b = super.read();
maybePause(1);
streamLimiter.maybePause(1);
return b;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int n = super.read(b, off, len);
if (n > 0) {
maybePause(n);
streamLimiter.maybePause(n);
}
return n;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.util.ByteUtils;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.zip.CRC32;

Expand Down Expand Up @@ -101,6 +103,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement

private final ThreadPool threadPool;

private final BiFunction<OffsetRangeInputStream, Boolean, OffsetRangeInputStream> rateLimitedTransfer;

/**
* Keeps track of local segment filename to uploaded filename along with other attributes like checksum.
* This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time.
Expand Down Expand Up @@ -128,13 +132,15 @@ public RemoteSegmentStoreDirectory(
RemoteDirectory remoteDataDirectory,
RemoteDirectory remoteMetadataDirectory,
RemoteStoreLockManager mdLockManager,
ThreadPool threadPool
ThreadPool threadPool,
BiFunction<OffsetRangeInputStream, Boolean, OffsetRangeInputStream> rateLimitedTransfer
) throws IOException {
super(remoteDataDirectory);
this.remoteDataDirectory = remoteDataDirectory;
this.remoteMetadataDirectory = remoteMetadataDirectory;
this.mdLockManager = mdLockManager;
this.threadPool = threadPool;
this.rateLimitedTransfer = rateLimitedTransfer;
init();
}

Expand Down Expand Up @@ -464,7 +470,10 @@ private void uploadBlob(Directory from, String src, String remoteFileName, IOCon
contentLength,
true,
WritePriority.NORMAL,
(size, position) -> new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position),
(size, position) -> rateLimitedTransfer.apply(
new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position),
true
),
expectedChecksum,
remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh
shardId
);

return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool);
return new RemoteSegmentStoreDirectory(
dataDirectory,
metadataDirectory,
mdLockManager,
threadPool,
((BlobStoreRepository) repository)::maybeRateLimitRemoteTransfer
);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
import org.opensearch.common.blobstore.transfer.stream.RateLimitingOffsetRangeInputStream;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.io.Streams;
Expand All @@ -89,9 +94,6 @@
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.compress.Compressor;
Expand Down Expand Up @@ -295,10 +297,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final RateLimiter restoreRateLimiter;

private final RateLimiter remoteUploadRateLimiter;

private final RateLimiter remoteDownloadRateLimiter;

private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric();

private final CounterMetric restoreRateLimitingTimeInNanos = new CounterMetric();

private final CounterMetric remoteDownloadRateLimitingTimeInNanos = new CounterMetric();

private final CounterMetric remoteUploadRateLimitingTimeInNanos = new CounterMetric();

public static final ChecksumBlobStoreFormat<Metadata> GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"metadata",
METADATA_NAME_FORMAT,
Expand Down Expand Up @@ -398,6 +408,8 @@ protected BlobStoreRepository(
this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings());
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO);
remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO);
readOnly = READONLY_SETTING.get(metadata.settings());
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
Expand Down Expand Up @@ -3009,6 +3021,14 @@ private static InputStream maybeRateLimit(InputStream stream, Supplier<RateLimit
return new RateLimitingInputStream(stream, rateLimiterSupplier, metric::inc);
}

private static OffsetRangeInputStream maybeRateLimitRemoteTransfers(
OffsetRangeInputStream offsetRangeInputStream,
Supplier<RateLimiter> rateLimiterSupplier,
CounterMetric metric
) {
return new RateLimitingOffsetRangeInputStream(offsetRangeInputStream, rateLimiterSupplier, metric::inc);
}

public InputStream maybeRateLimitRestores(InputStream stream) {
return maybeRateLimit(
maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos),
Expand All @@ -3017,6 +3037,20 @@ public InputStream maybeRateLimitRestores(InputStream stream) {
);
}

public OffsetRangeInputStream maybeRateLimitRemoteTransfer(OffsetRangeInputStream offsetRangeInputStream, boolean isUpload) {
return isUpload
? maybeRateLimitRemoteTransfers(offsetRangeInputStream, () -> remoteUploadRateLimiter, remoteUploadRateLimitingTimeInNanos)
: maybeRateLimitRemoteTransfers(
maybeRateLimitRemoteTransfers(
offsetRangeInputStream,
() -> remoteDownloadRateLimiter,
remoteDownloadRateLimitingTimeInNanos
),
recoverySettings::rateLimiter,
remoteDownloadRateLimitingTimeInNanos
);
}

public InputStream maybeRateLimitSnapshots(InputStream stream) {
return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos);
}
Expand Down

0 comments on commit dc6f5aa

Please sign in to comment.