Skip to content

Commit

Permalink
Update recovery target access time during segment sync for remote ind…
Browse files Browse the repository at this point in the history
…ex (#10252) (#10262)

(cherry picked from commit 91f9c5d)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 6e4a11e commit eaccdf5
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
Expand All @@ -26,6 +27,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog.Durability;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -120,6 +122,16 @@ public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Excep
testPeerRecovery(randomIntBetween(2, 5), true);
}

public void testPeerRecoveryWithLowActivityTimeout() throws Exception {
ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20kb")
.put(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.getKey(), "1s")
);
internalCluster().client().admin().cluster().updateSettings(req).get();
testPeerRecovery(randomIntBetween(2, 5), true);
}

public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception {
testPeerRecovery(1, false);
}
Expand Down
37 changes: 25 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4735,11 +4735,21 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
* Downloads segments from remote segment store
* @param overrideLocal flag to override local segment files with those in remote store.
* @throws IOException if exception occurs while reading segments from remote store.
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
syncSegmentsFromRemoteSegmentStore(overrideLocal, () -> {});
}

/**
* Downloads segments from remote segment store along with updating the access time of the recovery target.
* @param overrideLocal flag to override local segment files with those in remote store.
* @param onFileSync runnable that updates the access time when run.
* @throws IOException if exception occurs while reading segments from remote store.
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.trace("Downloading segments from remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
Expand Down Expand Up @@ -4770,7 +4780,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
} else {
storeDirectory = store.directory();
}
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal);
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);

if (remoteSegmentMetadata != null) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
Expand Down Expand Up @@ -4830,7 +4840,8 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
sourceRemoteDirectory,
remoteDirectory,
uploadedSegments,
overrideLocal
overrideLocal,
() -> {}
);
if (segmentsNFile != null) {
try (
Expand Down Expand Up @@ -4863,7 +4874,8 @@ private String copySegmentFiles(
RemoteSegmentStoreDirectory sourceRemoteDirectory,
RemoteSegmentStoreDirectory targetRemoteDirectory,
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments,
boolean overrideLocal
boolean overrideLocal,
final Runnable onFileSync
) throws IOException {
Set<String> toDownloadSegments = new HashSet<>();
Set<String> skippedSegments = new HashSet<>();
Expand Down Expand Up @@ -4892,9 +4904,7 @@ private String copySegmentFiles(

if (toDownloadSegments.isEmpty() == false) {
try {
final PlainActionFuture<Void> completionListener = PlainActionFuture.newFuture();
downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, completionListener);
completionListener.actionGet();
downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync);
} catch (Exception e) {
throw new IOException("Error occurred when downloading segments from remote store", e);
}
Expand All @@ -4912,22 +4922,25 @@ private void downloadSegments(
RemoteSegmentStoreDirectory sourceRemoteDirectory,
RemoteSegmentStoreDirectory targetRemoteDirectory,
Set<String> toDownloadSegments,
ActionListener<Void> completionListener
final Runnable onFileSync
) {
final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex();
final PlainActionFuture<Void> completionListener = PlainActionFuture.newFuture();
final GroupedActionListener<Void> batchDownloadListener = new GroupedActionListener<>(
ActionListener.map(completionListener, v -> null),
toDownloadSegments.size()
);

final ActionListener<String> segmentsDownloadListener = ActionListener.map(batchDownloadListener, fileName -> {
onFileSync.run();
if (targetRemoteDirectory != null) {
targetRemoteDirectory.copyFrom(storeDirectory, fileName, fileName, IOContext.DEFAULT);
}
return null;
});

toDownloadSegments.forEach(file -> sourceRemoteDirectory.copyTo(file, storeDirectory, indexPath, segmentsDownloadListener));
final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex();
toDownloadSegments.forEach(file -> { sourceRemoteDirectory.copyTo(file, storeDirectory, indexPath, segmentsDownloadListener); });
completionListener.actionGet();
}

private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,6 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true);

indexShard.syncTranslogFilesFromRemoteTranslog();

// On index creation, the only segment file that is created is segments_N. We can safely discard this file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false);
indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down

0 comments on commit eaccdf5

Please sign in to comment.