Skip to content

Commit

Permalink
Introduce experimental searchable snapshot API
Browse files Browse the repository at this point in the history
This commit adds a new parameter to the snapshot restore API to restore
to a new type of "remote snapshot" index where, unlike traditional
snapshot restores, the index data is not all downloaded to disk and
instead is read on-demand at search time. The feature is functional with
this commit, and includes a simple end-to-end integration test, but is
far from complete. See tracking issue opensearch-project#2919 for the rest of the work
planned/underway.

All new capabilities are gated behind a new searchable snapshot feature
flag.
  • Loading branch information
andrross committed Oct 4, 2022
1 parent e403799 commit 29c9ecf
Show file tree
Hide file tree
Showing 22 changed files with 672 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.snapshots;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

import org.hamcrest.MatcherAssert;
import org.junit.BeforeClass;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.monitor.fs.FsInfo;

import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.common.util.CollectionUtils.iterableAsArrayList;

public final class SearchableSnapshotIT extends AbstractSnapshotIntegTestCase {

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue(
"Searchable snapshot feature flag is enabled",
Boolean.parseBoolean(System.getProperty(FeatureFlags.SEARCHABLE_SNAPSHOTS))
);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testCreateSearchableSnapshot() throws Exception {
final Client client = client();
createRepository("test-repo", "fs");
createIndex(
"test-idx-1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
);
createIndex(
"test-idx-2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
);
ensureGreen();
indexRandomDocs("test-idx-1", 100);
indexRandomDocs("test-idx-2", 100);

logger.info("--> snapshot");
final CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices("test-idx-1", "test-idx-2")
.get();
MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
MatcherAssert.assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);

assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged());

logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.setStorageType("remote_snapshot")
.execute()
.actionGet();
ensureGreen();

assertDocCount("test-idx-1-copy", 100L);
assertDocCount("test-idx-2-copy", 100L);
assertIndexDirectoryDoesNotExist("test-idx-1-copy", "test-idx-2-copy");
}

/**
* Picks a shard out of the cluster state for each given index and asserts
* that the 'index' directory does not exist in the node's file system.
* This assertion is digging a bit into the implementation details to
* verify that the Lucene segment files are not copied from the snapshot
* repository to the node's local disk for a remote snapshot index.
*/
private void assertIndexDirectoryDoesNotExist(String... indexNames) {
final ClusterState state = client().admin().cluster().prepareState().get().getState();
for (String indexName : indexNames) {
final Index index = state.metadata().index(indexName).getIndex();
// Get the primary shards for the given index
final GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { indexName }, false);
// Randomly pick one of the shards
final List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
final ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
final ShardRouting shardRouting = shardIterator.nextOrNull();
assertNotNull(shardRouting);
assertTrue(shardRouting.primary());
assertTrue(shardRouting.assignedToNode());
// Get the file system stats for the assigned node
final String nodeId = shardRouting.currentNodeId();
final NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats(nodeId).addMetric(FS.metricName()).get();
for (FsInfo.Path info : nodeStats.getNodes().get(0).getFs()) {
// Build the expected path for the index data for a "normal"
// index and assert it does not exist
final String path = info.getPath();
final Path file = PathUtils.get(path)
.resolve("indices")
.resolve(index.getUUID())
.resolve(Integer.toString(shardRouting.getId()))
.resolve("index");
MatcherAssert.assertThat("Expect file not to exist: " + file, Files.exists(file), is(false));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.admin.cluster.snapshots.restore;

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -80,6 +82,7 @@ public class RestoreSnapshotRequest extends ClusterManagerNodeRequest<RestoreSna
private boolean includeAliases = true;
private Settings indexSettings = EMPTY_SETTINGS;
private String[] ignoreIndexSettings = Strings.EMPTY_ARRAY;
private String storageType = "local";

@Nullable // if any snapshot UUID will do
private String snapshotUuid;
Expand Down Expand Up @@ -117,6 +120,9 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_10_0)) {
snapshotUuid = in.readOptionalString();
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOTS) && in.getVersion().onOrAfter(Version.V_3_0_0)) {
storageType = in.readString();
}
}

@Override
Expand Down Expand Up @@ -144,6 +150,9 @@ public void writeTo(StreamOutput out) throws IOException {
"restricting the snapshot UUID is forbidden in a cluster with version [" + out.getVersion() + "] nodes"
);
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOTS) && out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeString(storageType);
}
}

@Override
Expand Down Expand Up @@ -480,6 +489,15 @@ public String snapshotUuid() {
return snapshotUuid;
}

public RestoreSnapshotRequest storageType(String storageType) {
this.storageType = storageType;
return this;
}

public String storageType() {
return storageType;
}

/**
* Parses restore definition
*
Expand Down Expand Up @@ -537,6 +555,16 @@ public RestoreSnapshotRequest source(Map<String, Object> source) {
} else {
throw new IllegalArgumentException("malformed ignore_index_settings section, should be an array of strings");
}
} else if (name.equals("storage_type")) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOTS)) {
if (entry.getValue() instanceof String) {
storageType((String) entry.getValue());
} else {
throw new IllegalArgumentException("malformed storage_type");
}
} else {
throw new IllegalArgumentException("Unknown parameter " + name);
}
} else {
if (IndicesOptions.isIndicesOptions(name) == false) {
throw new IllegalArgumentException("Unknown parameter " + name);
Expand Down Expand Up @@ -579,6 +607,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.value(ignoreIndexSetting);
}
builder.endArray();
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOTS) && storageType != null) {
builder.field("storage_type", storageType);
}
builder.endObject();
return builder;
}
Expand All @@ -605,7 +636,8 @@ public boolean equals(Object o) {
&& Objects.equals(renameReplacement, that.renameReplacement)
&& Objects.equals(indexSettings, that.indexSettings)
&& Arrays.equals(ignoreIndexSettings, that.ignoreIndexSettings)
&& Objects.equals(snapshotUuid, that.snapshotUuid);
&& Objects.equals(snapshotUuid, that.snapshotUuid)
&& Objects.equals(storageType, that.storageType);
}

@Override
Expand All @@ -621,7 +653,8 @@ public int hashCode() {
partial,
includeAliases,
indexSettings,
snapshotUuid
snapshotUuid,
storageType
);
result = 31 * result + Arrays.hashCode(indices);
result = 31 * result + Arrays.hashCode(ignoreIndexSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,12 @@ public RestoreSnapshotRequestBuilder setIgnoreIndexSettings(List<String> ignoreI
request.ignoreIndexSettings(ignoreIndexSettings);
return this;
}

/**
* Sets the storage type
*/
public RestoreSnapshotRequestBuilder setStorageType(String storageType) {
request.storageType(storageType);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.snapshots.Snapshot;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -418,33 +419,36 @@ public Builder initializeAsFromOpenToClose(IndexMetadata indexMetadata) {
/**
* Initializes a new empty index, to be restored from a snapshot
*/
public Builder initializeAsNewRestore(IndexMetadata indexMetadata, SnapshotRecoverySource recoverySource, IntSet ignoreShards) {
public Builder initializeAsNewRestore(IndexMetadata indexMetadata, RecoverySource recoverySource, IntSet ignoreShards) {
final UnassignedInfo unassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.NEW_INDEX_RESTORED,
"restore_source["
+ recoverySource.snapshot().getRepository()
+ "/"
+ recoverySource.snapshot().getSnapshotId().getName()
+ "]"
createRecoverySourceMessage(recoverySource)
);
return initializeAsRestore(indexMetadata, recoverySource, ignoreShards, true, unassignedInfo);
}

/**
* Initializes an existing index, to be restored from a snapshot
*/
public Builder initializeAsRestore(IndexMetadata indexMetadata, SnapshotRecoverySource recoverySource) {
public Builder initializeAsRestore(IndexMetadata indexMetadata, RecoverySource recoverySource) {
final UnassignedInfo unassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.EXISTING_INDEX_RESTORED,
"restore_source["
+ recoverySource.snapshot().getRepository()
+ "/"
+ recoverySource.snapshot().getSnapshotId().getName()
+ "]"
createRecoverySourceMessage(recoverySource)
);
return initializeAsRestore(indexMetadata, recoverySource, null, false, unassignedInfo);
}

private static String createRecoverySourceMessage(RecoverySource recoverySource) {
final String innerMessage;
if (recoverySource instanceof SnapshotRecoverySource) {
final Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot();
innerMessage = snapshot.getRepository() + "/" + snapshot.getSnapshotId().getName();
} else {
innerMessage = recoverySource.toString();
}
return "restore_source[" + innerMessage + "]";
}

/**
* Initializes an existing index, to be restored from remote store
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -572,7 +571,7 @@ public Builder addAsRemoteStoreRestore(IndexMetadata indexMetadata, RemoteStoreR
return this;
}

public Builder addAsRestore(IndexMetadata indexMetadata, SnapshotRecoverySource recoverySource) {
public Builder addAsRestore(IndexMetadata indexMetadata, RecoverySource recoverySource) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsRestore(
indexMetadata,
recoverySource
Expand All @@ -581,7 +580,7 @@ public Builder addAsRestore(IndexMetadata indexMetadata, SnapshotRecoverySource
return this;
}

public Builder addAsNewRestore(IndexMetadata indexMetadata, SnapshotRecoverySource recoverySource, IntSet ignoreShards) {
public Builder addAsNewRestore(IndexMetadata indexMetadata, RecoverySource recoverySource, IntSet ignoreShards) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNewRestore(
indexMetadata,
recoverySource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING
),
FeatureFlags.SEARCHABLE_SNAPSHOTS,
List.of(
IndexSettings.SNAPSHOT_REPOSITORY,
IndexSettings.SNAPSHOT_INDEX_ID,
IndexSettings.SNAPSHOT_ID_NAME,
IndexSettings.SNAPSHOT_ID_UUID
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class FeatureFlags {
*/
public static final String REMOTE_STORE = "opensearch.experimental.feature.remote_store.enabled";

public static final String SEARCHABLE_SNAPSHOTS = "opensearch.experimental.feature.searchable_snapshots.enabled";

/**
* Used to test feature flags whose values are expected to be booleans.
* This method returns true if the value is "true" (case-insensitive),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {
nodeEnv.availableShardPaths(request.shardId)
);
if (shardStateMetadata != null) {
if (indicesService.getShardOrNull(shardId) == null) {
if (indicesService.getShardOrNull(shardId) == null
&& shardStateMetadata.indexDataLocation == ShardStateMetadata.IndexDataLocation.LOCAL) {
final String customDataPath;
if (request.getCustomDataPath() != null) {
customDataPath = request.getCustomDataPath();
Expand Down
Loading

0 comments on commit 29c9ecf

Please sign in to comment.