Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Update recovery target access time during segment sync for remote index #10262

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
Expand All @@ -26,6 +27,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog.Durability;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -120,6 +122,16 @@ public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Excep
testPeerRecovery(randomIntBetween(2, 5), true);
}

public void testPeerRecoveryWithLowActivityTimeout() throws Exception {
ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20kb")
.put(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.getKey(), "1s")
);
internalCluster().client().admin().cluster().updateSettings(req).get();
testPeerRecovery(randomIntBetween(2, 5), true);
}

public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception {
testPeerRecovery(1, false);
}
Expand Down
37 changes: 25 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4735,11 +4735,21 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
* Downloads segments from remote segment store
* @param overrideLocal flag to override local segment files with those in remote store.
* @throws IOException if exception occurs while reading segments from remote store.
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
syncSegmentsFromRemoteSegmentStore(overrideLocal, () -> {});
}

/**
* Downloads segments from remote segment store along with updating the access time of the recovery target.
* @param overrideLocal flag to override local segment files with those in remote store.
* @param onFileSync runnable that updates the access time when run.
* @throws IOException if exception occurs while reading segments from remote store.
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.trace("Downloading segments from remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
Expand Down Expand Up @@ -4770,7 +4780,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
} else {
storeDirectory = store.directory();
}
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal);
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);

if (remoteSegmentMetadata != null) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
Expand Down Expand Up @@ -4830,7 +4840,8 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
sourceRemoteDirectory,
remoteDirectory,
uploadedSegments,
overrideLocal
overrideLocal,
() -> {}
);
if (segmentsNFile != null) {
try (
Expand Down Expand Up @@ -4863,7 +4874,8 @@ private String copySegmentFiles(
RemoteSegmentStoreDirectory sourceRemoteDirectory,
RemoteSegmentStoreDirectory targetRemoteDirectory,
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments,
boolean overrideLocal
boolean overrideLocal,
final Runnable onFileSync
) throws IOException {
Set<String> toDownloadSegments = new HashSet<>();
Set<String> skippedSegments = new HashSet<>();
Expand Down Expand Up @@ -4892,9 +4904,7 @@ private String copySegmentFiles(

if (toDownloadSegments.isEmpty() == false) {
try {
final PlainActionFuture<Void> completionListener = PlainActionFuture.newFuture();
downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, completionListener);
completionListener.actionGet();
downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync);
} catch (Exception e) {
throw new IOException("Error occurred when downloading segments from remote store", e);
}
Expand All @@ -4912,22 +4922,25 @@ private void downloadSegments(
RemoteSegmentStoreDirectory sourceRemoteDirectory,
RemoteSegmentStoreDirectory targetRemoteDirectory,
Set<String> toDownloadSegments,
ActionListener<Void> completionListener
final Runnable onFileSync
) {
final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex();
final PlainActionFuture<Void> completionListener = PlainActionFuture.newFuture();
final GroupedActionListener<Void> batchDownloadListener = new GroupedActionListener<>(
ActionListener.map(completionListener, v -> null),
toDownloadSegments.size()
);

final ActionListener<String> segmentsDownloadListener = ActionListener.map(batchDownloadListener, fileName -> {
onFileSync.run();
if (targetRemoteDirectory != null) {
targetRemoteDirectory.copyFrom(storeDirectory, fileName, fileName, IOContext.DEFAULT);
}
return null;
});

toDownloadSegments.forEach(file -> sourceRemoteDirectory.copyTo(file, storeDirectory, indexPath, segmentsDownloadListener));
final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex();
toDownloadSegments.forEach(file -> { sourceRemoteDirectory.copyTo(file, storeDirectory, indexPath, segmentsDownloadListener); });
completionListener.actionGet();
}

private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,6 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true);

indexShard.syncTranslogFilesFromRemoteTranslog();

// On index creation, the only segment file that is created is segments_N. We can safely discard this file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false);
indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Loading