Skip to content

Commit

Permalink
Introduce experimental searchable snapshot API
Browse files Browse the repository at this point in the history
This commit adds a new parameter to the snapshot restore API to restore
to a new type of "remote snapshot" index where, unlike traditional
snapshot restores, the index data is not all downloaded to disk and
instead is read on-demand at search time. The feature is functional with
this commit, and includes a simple end-to-end integration test, but is
far from complete. See tracking issue opensearch-project#2919 for the rest of the work
planned/underway.

All new capabilities are gated behind a new searchable snapshot feature
flag.

Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross committed Oct 13, 2022
1 parent fe3994c commit 5fcc653
Show file tree
Hide file tree
Showing 23 changed files with 692 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added precommit support for MacOS ([#4682](https://github.com/opensearch-project/OpenSearch/pull/4682))
- Recommission API changes for service layer ([#4320](https://github.com/opensearch-project/OpenSearch/pull/4320))
- Update GeoGrid base class access modifier to support extensibility ([#4572](https://github.com/opensearch-project/OpenSearch/pull/4572))
- Introduce experimental searchable snapshot API ([#4680](https://github.com/opensearch-project/OpenSearch/pull/4680))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.snapshots;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

import org.hamcrest.MatcherAssert;
import org.junit.BeforeClass;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.monitor.fs.FsInfo;

import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.common.util.CollectionUtils.iterableAsArrayList;

public final class SearchableSnapshotIT extends AbstractSnapshotIntegTestCase {

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue(
"Searchable snapshot feature flag is enabled",
Boolean.parseBoolean(System.getProperty(FeatureFlags.SEARCHABLE_SNAPSHOT))
);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testCreateSearchableSnapshot() throws Exception {
final Client client = client();
createRepository("test-repo", "fs");
createIndex(
"test-idx-1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
);
createIndex(
"test-idx-2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
);
ensureGreen();
indexRandomDocs("test-idx-1", 100);
indexRandomDocs("test-idx-2", 100);

logger.info("--> snapshot");
final CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices("test-idx-1", "test-idx-2")
.get();
MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
MatcherAssert.assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);

assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged());

logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();

assertDocCount("test-idx-1-copy", 100L);
assertDocCount("test-idx-2-copy", 100L);
assertIndexDirectoryDoesNotExist("test-idx-1-copy", "test-idx-2-copy");
}

/**
* Picks a shard out of the cluster state for each given index and asserts
* that the 'index' directory does not exist in the node's file system.
* This assertion is digging a bit into the implementation details to
* verify that the Lucene segment files are not copied from the snapshot
* repository to the node's local disk for a remote snapshot index.
*/
private void assertIndexDirectoryDoesNotExist(String... indexNames) {
final ClusterState state = client().admin().cluster().prepareState().get().getState();
for (String indexName : indexNames) {
final Index index = state.metadata().index(indexName).getIndex();
// Get the primary shards for the given index
final GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { indexName }, false);
// Randomly pick one of the shards
final List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
final ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
final ShardRouting shardRouting = shardIterator.nextOrNull();
assertNotNull(shardRouting);
assertTrue(shardRouting.primary());
assertTrue(shardRouting.assignedToNode());
// Get the file system stats for the assigned node
final String nodeId = shardRouting.currentNodeId();
final NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats(nodeId).addMetric(FS.metricName()).get();
for (FsInfo.Path info : nodeStats.getNodes().get(0).getFs()) {
// Build the expected path for the index data for a "normal"
// index and assert it does not exist
final String path = info.getPath();
final Path file = PathUtils.get(path)
.resolve("indices")
.resolve(index.getUUID())
.resolve(Integer.toString(shardRouting.getId()))
.resolve("index");
MatcherAssert.assertThat("Expect file not to exist: " + file, Files.exists(file), is(false));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.admin.cluster.snapshots.restore;

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentType;
Expand All @@ -68,6 +70,33 @@ public class RestoreSnapshotRequest extends ClusterManagerNodeRequest<RestoreSna

private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(RestoreSnapshotRequest.class);

/**
* Enumeration of possible storage types
*/
public enum StorageType {
LOCAL("local"),
REMOTE_SNAPSHOT("remote_snapshot");

private final String text;

StorageType(String text) {
this.text = text;
}

private void toXContent(XContentBuilder builder) throws IOException {
builder.field("storage_type", text);
}

private static StorageType fromString(String string) {
for (StorageType type : values()) {
if (type.text.equals(string)) {
return type;
}
}
throw new IllegalArgumentException("Invalid storage_type: " + string);
}
}

private String snapshot;
private String repository;
private String[] indices = Strings.EMPTY_ARRAY;
Expand All @@ -80,6 +109,7 @@ public class RestoreSnapshotRequest extends ClusterManagerNodeRequest<RestoreSna
private boolean includeAliases = true;
private Settings indexSettings = EMPTY_SETTINGS;
private String[] ignoreIndexSettings = Strings.EMPTY_ARRAY;
private StorageType storageType = StorageType.LOCAL;

@Nullable // if any snapshot UUID will do
private String snapshotUuid;
Expand Down Expand Up @@ -117,6 +147,9 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_10_0)) {
snapshotUuid = in.readOptionalString();
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && in.getVersion().onOrAfter(Version.V_3_0_0)) {
storageType = in.readEnum(StorageType.class);
}
}

@Override
Expand Down Expand Up @@ -144,6 +177,9 @@ public void writeTo(StreamOutput out) throws IOException {
"restricting the snapshot UUID is forbidden in a cluster with version [" + out.getVersion() + "] nodes"
);
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeEnum(storageType);
}
}

@Override
Expand Down Expand Up @@ -480,6 +516,15 @@ public String snapshotUuid() {
return snapshotUuid;
}

public RestoreSnapshotRequest storageType(StorageType storageType) {
this.storageType = storageType;
return this;
}

public String storageType() {
return storageType.text;
}

/**
* Parses restore definition
*
Expand Down Expand Up @@ -537,6 +582,16 @@ public RestoreSnapshotRequest source(Map<String, Object> source) {
} else {
throw new IllegalArgumentException("malformed ignore_index_settings section, should be an array of strings");
}
} else if (name.equals("storage_type")) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
if (entry.getValue() instanceof String) {
storageType(StorageType.fromString((String) entry.getValue()));
} else {
throw new IllegalArgumentException("malformed storage_type");
}
} else {
throw new IllegalArgumentException("Unknown parameter " + name);
}
} else {
if (IndicesOptions.isIndicesOptions(name) == false) {
throw new IllegalArgumentException("Unknown parameter " + name);
Expand Down Expand Up @@ -579,6 +634,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.value(ignoreIndexSetting);
}
builder.endArray();
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && storageType != null) {
storageType.toXContent(builder);
}
builder.endObject();
return builder;
}
Expand All @@ -605,7 +663,8 @@ public boolean equals(Object o) {
&& Objects.equals(renameReplacement, that.renameReplacement)
&& Objects.equals(indexSettings, that.indexSettings)
&& Arrays.equals(ignoreIndexSettings, that.ignoreIndexSettings)
&& Objects.equals(snapshotUuid, that.snapshotUuid);
&& Objects.equals(snapshotUuid, that.snapshotUuid)
&& Objects.equals(storageType, that.storageType);
}

@Override
Expand All @@ -621,7 +680,8 @@ public int hashCode() {
partial,
includeAliases,
indexSettings,
snapshotUuid
snapshotUuid,
storageType
);
result = 31 * result + Arrays.hashCode(indices);
result = 31 * result + Arrays.hashCode(ignoreIndexSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,12 @@ public RestoreSnapshotRequestBuilder setIgnoreIndexSettings(List<String> ignoreI
request.ignoreIndexSettings(ignoreIndexSettings);
return this;
}

/**
* Sets the storage type
*/
public RestoreSnapshotRequestBuilder setStorageType(RestoreSnapshotRequest.StorageType storageType) {
request.storageType(storageType);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -257,12 +258,24 @@ public static class SnapshotRecoverySource extends RecoverySource {
private final Snapshot snapshot;
private final IndexId index;
private final Version version;
private final boolean isSearchableSnapshot;

public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId) {
this(restoreUUID, snapshot, version, indexId, false);
}

public SnapshotRecoverySource(
String restoreUUID,
Snapshot snapshot,
Version version,
IndexId indexId,
boolean isSearchableSnapshot
) {
this.restoreUUID = restoreUUID;
this.snapshot = Objects.requireNonNull(snapshot);
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(indexId);
this.isSearchableSnapshot = isSearchableSnapshot;
}

SnapshotRecoverySource(StreamInput in) throws IOException {
Expand All @@ -274,6 +287,11 @@ public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version ver
} else {
index = new IndexId(in.readString(), IndexMetadata.INDEX_UUID_NA_VALUE);
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && in.getVersion().onOrAfter(Version.V_3_0_0)) {
isSearchableSnapshot = in.readBoolean();
} else {
isSearchableSnapshot = false;
}
}

public String restoreUUID() {
Expand All @@ -298,6 +316,10 @@ public Version version() {
return version;
}

public boolean isSearchableSnapshot() {
return isSearchableSnapshot;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
Expand All @@ -308,6 +330,9 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
} else {
out.writeString(index.getName());
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(isSearchableSnapshot);
}
}

@Override
Expand All @@ -321,7 +346,8 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
.field("snapshot", snapshot.getSnapshotId().getName())
.field("version", version.toString())
.field("index", index.getName())
.field("restoreUUID", restoreUUID);
.field("restoreUUID", restoreUUID)
.field("isRemote", isSearchableSnapshot);
}

@Override
Expand All @@ -342,12 +368,13 @@ public boolean equals(Object o) {
return restoreUUID.equals(that.restoreUUID)
&& snapshot.equals(that.snapshot)
&& index.equals(that.index)
&& version.equals(that.version);
&& version.equals(that.version)
&& isSearchableSnapshot == that.isSearchableSnapshot;
}

@Override
public int hashCode() {
return Objects.hash(restoreUUID, snapshot, index, version);
return Objects.hash(restoreUUID, snapshot, index, version, isSearchableSnapshot);
}

}
Expand Down
Loading

0 comments on commit 5fcc653

Please sign in to comment.