Skip to content

Commit

Permalink
[Remote Segments] Introduce exponential backoff retry on failures
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed May 4, 2023
1 parent 27c2312 commit 7589820
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.opensearch.index.fielddata.IndexFieldDataService;
import org.opensearch.index.mapper.FieldMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -205,9 +204,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.SEARCHABLE_SNAPSHOT_ID_NAME,
IndexSettings.SEARCHABLE_SNAPSHOT_ID_UUID,

// Settings for Remote Store
RemoteStoreRefreshListener.INDEX_REMOTE_REFRESH_RETRY_INTERVAL,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,29 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand All @@ -54,22 +57,26 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres

private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);

public static final int REMOTE_REFRESH_RETRY_INTERVAL_DEFAULT_VALUE = 1;
/**
* The initial retry interval at which the retry job gets scheduled after a failure.
*/
private static final int REMOTE_REFRESH_RETRY_BASE_INTERVAL_SECONDS = 1;

private static final int MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES = 1;
/**
* In an exponential back off setup, the maximum retry interval after the retry interval increases exponentially.
*/
private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_SECONDS = 30;

/**
* If the remote refresh segment sync fails, we retry to sync the segments scheduling it after a retry interval
* which is controlled by the below setting.
* Exponential back off policy with max retry interval.
*/
public static final Setting<Integer> INDEX_REMOTE_REFRESH_RETRY_INTERVAL = Setting.intSetting(
"index.remote_store.segment_sync.retry_interval",
REMOTE_REFRESH_RETRY_INTERVAL_DEFAULT_VALUE,
REMOTE_REFRESH_RETRY_INTERVAL_DEFAULT_VALUE,
Setting.Property.Dynamic,
Setting.Property.IndexScope
private static final BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialEqualJitterBackoff(
REMOTE_REFRESH_RETRY_BASE_INTERVAL_SECONDS,
REMOTE_REFRESH_RETRY_MAX_INTERVAL_SECONDS
);

private static final int MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES = 1;

// Visible for testing
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
Expand All @@ -87,7 +94,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
*/
private final Semaphore retrySemaphore = new Semaphore(MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES);

private volatile int remoteRefreshRetryInterval;
private volatile Iterator<TimeValue> backoffDelayIterator;

private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry;

public RemoteStoreRefreshListener(IndexShard indexShard) {
this.indexShard = indexShard;
Expand All @@ -103,13 +112,7 @@ public RemoteStoreRefreshListener(IndexShard indexShard) {
logger.error("Exception while initialising RemoteSegmentStoreDirectory", e);
}
}

// Retry remote refresh on failure - This interval is the time delay after which the sync of segments to remote store
// would be retried. This has been kept as an index scoped setting inline with index.refresh_interval setting.
remoteRefreshRetryInterval = INDEX_REMOTE_REFRESH_RETRY_INTERVAL.get(indexShard.indexSettings().getSettings());
indexShard.indexSettings()
.getScopedSettings()
.addSettingsUpdateConsumer(INDEX_REMOTE_REFRESH_RETRY_INTERVAL, this::setRemoteRefreshRetryInterval);
resetBackOffDelayIterator();
}

@Override
Expand Down Expand Up @@ -189,6 +192,7 @@ private synchronized void syncSegments(boolean retry) {
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
OnSuccessfulSegmentsSync();
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
} else {
Expand Down Expand Up @@ -226,6 +230,31 @@ private void beforeSegmentsSync(boolean isRetry) {
}
}

private void OnSuccessfulSegmentsSync() {
// Reset the backoffDelayIterator for the future failures
resetBackOffDelayIterator();
// Cancel the scheduled cancellable retry if possible and set it to null
cancelAndResetScheduledCancellableRetry();
}

/**
* Cancels the scheduled retry if there is one scheduled, and it has not started yet. Clears the reference as the
* schedule retry has been cancelled, or it was null in the first place, or it is running/ran already.
*/
private void cancelAndResetScheduledCancellableRetry() {
if (scheduledCancellableRetry != null && scheduledCancellableRetry.getDelay(TimeUnit.NANOSECONDS) > 0) {
scheduledCancellableRetry.cancel();
}
scheduledCancellableRetry = null;
}

/**
* Resets the backoff delay iterator so that the next set of failures starts with the base delay and goes upto max delay.
*/
private void resetBackOffDelayIterator() {
backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator();
}

private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) {
// If this was a retry attempt, then we release the semaphore at the end so that further retries can be scheduled
if (isRetry) {
Expand All @@ -235,12 +264,8 @@ private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) {
// If there are failures in uploading segments, then we should retry as search idle can lead to
// refresh not occurring until write happens.
if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retrySemaphore.tryAcquire()) {
indexShard.getThreadPool()
.schedule(
() -> this.syncSegments(true),
TimeValue.timeValueSeconds(remoteRefreshRetryInterval),
ThreadPool.Names.REMOTE_REFRESH
);
scheduledCancellableRetry = indexShard.getThreadPool()
.schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH);
}
}

Expand Down Expand Up @@ -310,8 +335,4 @@ private void deleteStaleCommits() {
logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e);
}
}

public void setRemoteRefreshRetryInterval(int remoteRefreshRetryInterval) {
this.remoteRefreshRetryInterval = remoteRefreshRetryInterval;
}
}

0 comments on commit 7589820

Please sign in to comment.