Skip to content

Commit

Permalink
Restore snapshot changes for V2
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
Sachin Kale and gbbafna committed Sep 2, 2024
1 parent ef47252 commit 58cd123
Show file tree
Hide file tree
Showing 15 changed files with 457 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.common.settings.Settings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RestoreShallowSnapshotV2IT extends RemoteRestoreSnapshotIT {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
}

@Override
protected void createRepository(String repoName, String type, Settings.Builder settings) {
logger.info("--> creating repository [{}] [{}]", repoName, type);
settings.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
assertAcked(clusterAdmin().preparePutRepository(repoName).setType(type).setSettings(settings));
}

@Override
protected void createRepository(String repoName, String type, Path location) {
Settings.Builder settings = Settings.builder()
.put("location", location)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);

createRepository(repoName, type, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ private static StorageType fromString(String string) {
private StorageType storageType = StorageType.LOCAL;
@Nullable
private String sourceRemoteStoreRepository = null;
@Nullable
private String sourceRemoteTranslogRepository = null;

@Nullable // if any snapshot UUID will do
private String snapshotUuid;
Expand Down Expand Up @@ -159,6 +161,9 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
sourceRemoteStoreRepository = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.CURRENT)) {
sourceRemoteTranslogRepository = in.readOptionalString();
}
}

@Override
Expand All @@ -183,6 +188,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeOptionalString(sourceRemoteStoreRepository);
}
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalString(sourceRemoteTranslogRepository);
}
}

@Override
Expand Down Expand Up @@ -545,6 +553,16 @@ public RestoreSnapshotRequest setSourceRemoteStoreRepository(String sourceRemote
return this;
}

/**
* Sets Source Remote Translog Repository for all the restored indices
*
* @param sourceRemoteTranslogRepository name of the remote translog repository that should be used for all restored indices.
*/
public RestoreSnapshotRequest setSourceRemoteTranslogRepository(String sourceRemoteTranslogRepository) {
this.sourceRemoteTranslogRepository = sourceRemoteTranslogRepository;
return this;
}

/**
* Returns Source Remote Store Repository for all the restored indices
*
Expand All @@ -554,6 +572,15 @@ public String getSourceRemoteStoreRepository() {
return sourceRemoteStoreRepository;
}

/**
* Returns Source Remote Translog Repository for all the restored indices
*
* @return source Remote Translog Repository
*/
public String getSourceRemoteTranslogRepository() {
return sourceRemoteTranslogRepository;
}

/**
* Parses restore definition
*
Expand Down Expand Up @@ -673,6 +700,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (sourceRemoteStoreRepository != null) {
builder.field("source_remote_store_repository", sourceRemoteStoreRepository);
}
if (sourceRemoteTranslogRepository != null) {
builder.field("source_remote_translog_repository", sourceRemoteTranslogRepository);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -701,7 +731,8 @@ public boolean equals(Object o) {
&& Arrays.equals(ignoreIndexSettings, that.ignoreIndexSettings)
&& Objects.equals(snapshotUuid, that.snapshotUuid)
&& Objects.equals(storageType, that.storageType)
&& Objects.equals(sourceRemoteStoreRepository, that.sourceRemoteStoreRepository);
&& Objects.equals(sourceRemoteStoreRepository, that.sourceRemoteStoreRepository)
&& Objects.equals(sourceRemoteTranslogRepository, that.sourceRemoteTranslogRepository);
return equals;
}

Expand All @@ -721,7 +752,8 @@ public int hashCode() {
indexSettings,
snapshotUuid,
storageType,
sourceRemoteStoreRepository
sourceRemoteStoreRepository,
sourceRemoteTranslogRepository
);
result = 31 * result + Arrays.hashCode(indices);
result = 31 * result + Arrays.hashCode(ignoreIndexSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(
// could not be taken due to partial being set to false.
shardSnapshotStatus = IndexShardSnapshotStatus.newFailed("skipped");
} else {
shardSnapshotStatus = repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), indexId, shardId);
shardSnapshotStatus = repository.getShardSnapshotStatus(snapshotInfo, indexId, shardId);
}
shardStatus.put(shardId, shardSnapshotStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.opensearch.Version.CURRENT;

/**
* Represents the recovery source of a shard. Available recovery types are:
* <p>
Expand Down Expand Up @@ -264,6 +266,9 @@ public static class SnapshotRecoverySource extends RecoverySource {
private final boolean isSearchableSnapshot;
private final boolean remoteStoreIndexShallowCopy;
private final String sourceRemoteStoreRepository;
private final String sourceRemoteTranslogRepository;

private final long pinnedTimestamp;

public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId) {
this(restoreUUID, snapshot, version, indexId, false, false, null);
Expand All @@ -277,6 +282,30 @@ public SnapshotRecoverySource(
boolean isSearchableSnapshot,
boolean remoteStoreIndexShallowCopy,
@Nullable String sourceRemoteStoreRepository
) {
this(
restoreUUID,
snapshot,
version,
indexId,
isSearchableSnapshot,
remoteStoreIndexShallowCopy,
sourceRemoteStoreRepository,
null,
0L
);
}

public SnapshotRecoverySource(
String restoreUUID,
Snapshot snapshot,
Version version,
IndexId indexId,
boolean isSearchableSnapshot,
boolean remoteStoreIndexShallowCopy,
@Nullable String sourceRemoteStoreRepository,
@Nullable String sourceRemoteTranslogRepository,
long pinnedTimestamp
) {
this.restoreUUID = restoreUUID;
this.snapshot = Objects.requireNonNull(snapshot);
Expand All @@ -285,6 +314,8 @@ public SnapshotRecoverySource(
this.isSearchableSnapshot = isSearchableSnapshot;
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
this.sourceRemoteStoreRepository = sourceRemoteStoreRepository;
this.sourceRemoteTranslogRepository = sourceRemoteTranslogRepository;
this.pinnedTimestamp = pinnedTimestamp;
}

SnapshotRecoverySource(StreamInput in) throws IOException {
Expand All @@ -304,6 +335,13 @@ public SnapshotRecoverySource(
remoteStoreIndexShallowCopy = false;
sourceRemoteStoreRepository = null;
}
if (in.getVersion().onOrAfter(CURRENT)) {
sourceRemoteTranslogRepository = in.readOptionalString();
pinnedTimestamp = in.readLong();
} else {
sourceRemoteTranslogRepository = null;
pinnedTimestamp = 0L;
}
}

public String restoreUUID() {
Expand Down Expand Up @@ -336,10 +374,18 @@ public String sourceRemoteStoreRepository() {
return sourceRemoteStoreRepository;
}

public String sourceRemoteTranslogRepository() {
return sourceRemoteTranslogRepository;
}

public boolean remoteStoreIndexShallowCopy() {
return remoteStoreIndexShallowCopy;
}

public long pinnedTimestamp() {
return pinnedTimestamp;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
Expand All @@ -353,6 +399,10 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeBoolean(remoteStoreIndexShallowCopy);
out.writeOptionalString(sourceRemoteStoreRepository);
}
if (out.getVersion().onOrAfter(CURRENT)) {
out.writeOptionalString(sourceRemoteTranslogRepository);
out.writeLong(pinnedTimestamp);
}
}

@Override
Expand All @@ -369,7 +419,8 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
.field("restoreUUID", restoreUUID)
.field("isSearchableSnapshot", isSearchableSnapshot)
.field("remoteStoreIndexShallowCopy", remoteStoreIndexShallowCopy)
.field("sourceRemoteStoreRepository", sourceRemoteStoreRepository);
.field("sourceRemoteStoreRepository", sourceRemoteStoreRepository)
.field("sourceRemoteTranslogRepository", sourceRemoteTranslogRepository);
}

@Override
Expand All @@ -394,8 +445,11 @@ public boolean equals(Object o) {
&& isSearchableSnapshot == that.isSearchableSnapshot
&& remoteStoreIndexShallowCopy == that.remoteStoreIndexShallowCopy
&& sourceRemoteStoreRepository != null
? sourceRemoteStoreRepository.equals(that.sourceRemoteStoreRepository)
: that.sourceRemoteStoreRepository == null;
? sourceRemoteStoreRepository.equals(that.sourceRemoteStoreRepository)
: that.sourceRemoteStoreRepository == null && sourceRemoteTranslogRepository != null
? sourceRemoteTranslogRepository.equals(that.sourceRemoteTranslogRepository)
: that.sourceRemoteTranslogRepository == null && pinnedTimestamp == that.pinnedTimestamp;

}

@Override
Expand All @@ -407,10 +461,11 @@ public int hashCode() {
version,
isSearchableSnapshot,
remoteStoreIndexShallowCopy,
sourceRemoteStoreRepository
sourceRemoteStoreRepository,
sourceRemoteTranslogRepository,
pinnedTimestamp
);
}

}

/**
Expand Down
Loading

0 comments on commit 58cd123

Please sign in to comment.