Skip to content

Commit

Permalink
Restore snapshot changes for shallow snapshot V2 (#15462)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Gaurav Bafna <gbbafna@amazon.com>
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
3 people committed Sep 3, 2024
1 parent 41e1075 commit 0018907
Show file tree
Hide file tree
Showing 14 changed files with 1,222 additions and 87 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ private static StorageType fromString(String string) {
private StorageType storageType = StorageType.LOCAL;
@Nullable
private String sourceRemoteStoreRepository = null;
@Nullable
private String sourceRemoteTranslogRepository = null;

@Nullable // if any snapshot UUID will do
private String snapshotUuid;
Expand Down Expand Up @@ -165,6 +167,9 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
sourceRemoteStoreRepository = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.CURRENT)) {
sourceRemoteTranslogRepository = in.readOptionalString();
}
}

@Override
Expand Down Expand Up @@ -198,6 +203,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeOptionalString(sourceRemoteStoreRepository);
}
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalString(sourceRemoteTranslogRepository);
}
}

@Override
Expand Down Expand Up @@ -560,6 +568,16 @@ public RestoreSnapshotRequest setSourceRemoteStoreRepository(String sourceRemote
return this;
}

/**
* Sets Source Remote Translog Repository for all the restored indices
*
* @param sourceRemoteTranslogRepository name of the remote translog repository that should be used for all restored indices.
*/
public RestoreSnapshotRequest setSourceRemoteTranslogRepository(String sourceRemoteTranslogRepository) {
this.sourceRemoteTranslogRepository = sourceRemoteTranslogRepository;
return this;
}

/**
* Returns Source Remote Store Repository for all the restored indices
*
Expand All @@ -569,6 +587,15 @@ public String getSourceRemoteStoreRepository() {
return sourceRemoteStoreRepository;
}

/**
* Returns Source Remote Translog Repository for all the restored indices
*
* @return source Remote Translog Repository
*/
public String getSourceRemoteTranslogRepository() {
return sourceRemoteTranslogRepository;
}

/**
* Parses restore definition
*
Expand Down Expand Up @@ -688,6 +715,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (sourceRemoteStoreRepository != null) {
builder.field("source_remote_store_repository", sourceRemoteStoreRepository);
}
if (sourceRemoteTranslogRepository != null) {
builder.field("source_remote_translog_repository", sourceRemoteTranslogRepository);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -716,7 +746,8 @@ public boolean equals(Object o) {
&& Arrays.equals(ignoreIndexSettings, that.ignoreIndexSettings)
&& Objects.equals(snapshotUuid, that.snapshotUuid)
&& Objects.equals(storageType, that.storageType)
&& Objects.equals(sourceRemoteStoreRepository, that.sourceRemoteStoreRepository);
&& Objects.equals(sourceRemoteStoreRepository, that.sourceRemoteStoreRepository)
&& Objects.equals(sourceRemoteTranslogRepository, that.sourceRemoteTranslogRepository);
return equals;
}

Expand All @@ -736,7 +767,8 @@ public int hashCode() {
indexSettings,
snapshotUuid,
storageType,
sourceRemoteStoreRepository
sourceRemoteStoreRepository,
sourceRemoteTranslogRepository
);
result = 31 * result + Arrays.hashCode(indices);
result = 31 * result + Arrays.hashCode(ignoreIndexSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.opensearch.Version.CURRENT;

/**
* Represents the recovery source of a shard. Available recovery types are:
* <p>
Expand Down Expand Up @@ -265,6 +267,9 @@ public static class SnapshotRecoverySource extends RecoverySource {
private final boolean isSearchableSnapshot;
private final boolean remoteStoreIndexShallowCopy;
private final String sourceRemoteStoreRepository;
private final String sourceRemoteTranslogRepository;

private final long pinnedTimestamp;

public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId) {
this(restoreUUID, snapshot, version, indexId, false, false, null);
Expand All @@ -278,6 +283,30 @@ public SnapshotRecoverySource(
boolean isSearchableSnapshot,
boolean remoteStoreIndexShallowCopy,
@Nullable String sourceRemoteStoreRepository
) {
this(
restoreUUID,
snapshot,
version,
indexId,
isSearchableSnapshot,
remoteStoreIndexShallowCopy,
sourceRemoteStoreRepository,
null,
0L
);
}

public SnapshotRecoverySource(
String restoreUUID,
Snapshot snapshot,
Version version,
IndexId indexId,
boolean isSearchableSnapshot,
boolean remoteStoreIndexShallowCopy,
@Nullable String sourceRemoteStoreRepository,
@Nullable String sourceRemoteTranslogRepository,
long pinnedTimestamp
) {
this.restoreUUID = restoreUUID;
this.snapshot = Objects.requireNonNull(snapshot);
Expand All @@ -286,6 +315,8 @@ public SnapshotRecoverySource(
this.isSearchableSnapshot = isSearchableSnapshot;
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
this.sourceRemoteStoreRepository = sourceRemoteStoreRepository;
this.sourceRemoteTranslogRepository = sourceRemoteTranslogRepository;
this.pinnedTimestamp = pinnedTimestamp;
}

SnapshotRecoverySource(StreamInput in) throws IOException {
Expand All @@ -309,6 +340,13 @@ public SnapshotRecoverySource(
remoteStoreIndexShallowCopy = false;
sourceRemoteStoreRepository = null;
}
if (in.getVersion().onOrAfter(CURRENT)) {
sourceRemoteTranslogRepository = in.readOptionalString();
pinnedTimestamp = in.readLong();
} else {
sourceRemoteTranslogRepository = null;
pinnedTimestamp = 0L;
}
}

public String restoreUUID() {
Expand Down Expand Up @@ -341,10 +379,18 @@ public String sourceRemoteStoreRepository() {
return sourceRemoteStoreRepository;
}

public String sourceRemoteTranslogRepository() {
return sourceRemoteTranslogRepository;
}

public boolean remoteStoreIndexShallowCopy() {
return remoteStoreIndexShallowCopy;
}

public long pinnedTimestamp() {
return pinnedTimestamp;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
Expand All @@ -362,6 +408,10 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeBoolean(remoteStoreIndexShallowCopy);
out.writeOptionalString(sourceRemoteStoreRepository);
}
if (out.getVersion().onOrAfter(CURRENT)) {
out.writeOptionalString(sourceRemoteTranslogRepository);
out.writeLong(pinnedTimestamp);
}
}

@Override
Expand All @@ -378,7 +428,8 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
.field("restoreUUID", restoreUUID)
.field("isSearchableSnapshot", isSearchableSnapshot)
.field("remoteStoreIndexShallowCopy", remoteStoreIndexShallowCopy)
.field("sourceRemoteStoreRepository", sourceRemoteStoreRepository);
.field("sourceRemoteStoreRepository", sourceRemoteStoreRepository)
.field("sourceRemoteTranslogRepository", sourceRemoteTranslogRepository);
}

@Override
Expand All @@ -403,8 +454,11 @@ public boolean equals(Object o) {
&& isSearchableSnapshot == that.isSearchableSnapshot
&& remoteStoreIndexShallowCopy == that.remoteStoreIndexShallowCopy
&& sourceRemoteStoreRepository != null
? sourceRemoteStoreRepository.equals(that.sourceRemoteStoreRepository)
: that.sourceRemoteStoreRepository == null;
? sourceRemoteStoreRepository.equals(that.sourceRemoteStoreRepository)
: that.sourceRemoteStoreRepository == null && sourceRemoteTranslogRepository != null
? sourceRemoteTranslogRepository.equals(that.sourceRemoteTranslogRepository)
: that.sourceRemoteTranslogRepository == null && pinnedTimestamp == that.pinnedTimestamp;

}

@Override
Expand All @@ -416,10 +470,11 @@ public int hashCode() {
version,
isSearchableSnapshot,
remoteStoreIndexShallowCopy,
sourceRemoteStoreRepository
sourceRemoteStoreRepository,
sourceRemoteTranslogRepository,
pinnedTimestamp
);
}

}

/**
Expand Down
76 changes: 66 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.index.search.stats.ShardSearchStats;
Expand Down Expand Up @@ -2487,6 +2488,10 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
openEngineAndRecoverFromTranslog(true);
}

public void openEngineAndRecoverFromTranslog(boolean syncFromRemote) throws IOException {
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
Expand All @@ -2507,7 +2512,16 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
loadGlobalCheckpointToReplicationTracker();
}

innerOpenEngineAndTranslog(replicationTracker);
if (isSnapshotV2Restore()) {
translogConfig.setDownloadRemoteTranslogOnInit(false);
}

innerOpenEngineAndTranslog(replicationTracker, syncFromRemote);

if (isSnapshotV2Restore()) {
translogConfig.setDownloadRemoteTranslogOnInit(true);
}

getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

Expand Down Expand Up @@ -2568,7 +2582,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
if (shardRouting.primary()) {
if (syncFromRemote) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else {
} else if (isSnapshotV2Restore() == false) {
// we will enter this block when we do not want to recover from remote translog.
// currently only during snapshot restore, we are coming into this block.
// here, as while initiliazing remote translog we cannot skip downloading translog files,
Expand Down Expand Up @@ -2614,6 +2628,11 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}

private boolean isSnapshotV2Restore() {
return routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT
&& ((SnapshotRecoverySource) routingEntry().recoverySource()).pinnedTimestamp() > 0;
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = fetchUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
Expand Down Expand Up @@ -2899,7 +2918,12 @@ public void restoreFromSnapshotAndRemoteStore(
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: "
+ recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool);
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState().getRecoverySource();
if (recoverySource.pinnedTimestamp() != 0) {
storeRecovery.recoverShallowSnapshotV2(this, repository, repositoriesService, listener, threadPool);
} else {
storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool);
}
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down Expand Up @@ -5013,16 +5037,33 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
syncTranslogFilesFromGivenRemoteTranslog(
repository,
shardId,
indexSettings.getRemoteStorePathStrategy(),
indexSettings().isTranslogMetadataEnabled(),
0
);
}

public void syncTranslogFilesFromGivenRemoteTranslog(
Repository repository,
ShardId shardId,
RemoteStorePathStrategy remoteStorePathStrategy,
boolean isTranslogMetadataEnabled,
long timestamp
) throws IOException {
RemoteFsTranslog.download(
repository,
shardId,
getThreadPool(),
shardPath().resolveTranslog(),
indexSettings.getRemoteStorePathStrategy(),
remoteStorePathStrategy,
remoteStoreSettings,
logger,
shouldSeedRemoteStore(),
indexSettings().isTranslogMetadataEnabled()
isTranslogMetadataEnabled,
timestamp
);
}

Expand Down Expand Up @@ -5111,15 +5152,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
* Downloads segments from given remote segment store for a specific commit.
* @param overrideLocal flag to override local segment files with those in remote store
* @param sourceRemoteDirectory RemoteSegmentDirectory Instance from which we need to sync segments
* @param primaryTerm Primary Term for shard at the time of commit operation for which we are syncing segments
* @param commitGeneration commit generation at the time of commit operation for which we are syncing segments
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromGivenRemoteSegmentStore(
boolean overrideLocal,
RemoteSegmentStoreDirectory sourceRemoteDirectory,
long primaryTerm,
long commitGeneration
RemoteSegmentMetadata remoteSegmentMetadata,
boolean pinnedTimestamp
) throws IOException {
logger.trace("Downloading segments from given remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = null;
Expand Down Expand Up @@ -5155,12 +5194,29 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
overrideLocal,
() -> {}
);
if (segmentsNFile != null) {
if (pinnedTimestamp) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
remoteSegmentMetadata.getSegmentInfosBytes(),
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else if (segmentsNFile != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentsNFile, IOContext.DEFAULT)
)
) {
long commitGeneration = SegmentInfos.generationFromSegmentsFileName(segmentsNFile);
SegmentInfos infosSnapshot = SegmentInfos.readCommit(store.directory(), indexInput, commitGeneration);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
if (remoteStore != null) {
Expand Down
Loading

0 comments on commit 0018907

Please sign in to comment.