Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust snapshot index resolution behavior to be more intuitive #79670

Merged
merged 26 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
15357ce
Adjust snapshot index resolution behavior to be more intuitive
gwbrown Oct 21, 2021
ee34eaf
Adjust behavior of SnapshotsService
gwbrown Oct 21, 2021
32bf887
Adjust behavior of RestoreService
gwbrown Oct 21, 2021
2a26ce4
Merge branch 'master' into si/snapshots-behavior
gwbrown Oct 22, 2021
5d4d7e4
Spotless
gwbrown Oct 22, 2021
3c2c930
Merge remote-tracking branch 'origin/master' into si/snapshots-behavior
gwbrown Nov 4, 2021
caba655
Revise SystemDataStreamSnapshotIT for new behavior
gwbrown Nov 5, 2021
2487d78
Handle system data streams better
gwbrown Nov 6, 2021
da517aa
Improve errors when explicitly requesting system indices
gwbrown Nov 6, 2021
f195401
Add assertions to integration test
gwbrown Nov 6, 2021
3b93884
Throw earlier when we hit system data streams that were explicitly re…
gwbrown Nov 6, 2021
d11ad0e
Merge branch 'master' into si/snapshots-behavior
gwbrown Nov 6, 2021
fdffae9
Spotless
gwbrown Nov 6, 2021
c0bf630
Throw error if a data stream is in both `feature_states` and `indices`
gwbrown Nov 6, 2021
ebd3cce
Update assertions for new error when requesting system indices
gwbrown Nov 8, 2021
aa2ac16
Adjust test to not try to restore system index by name
gwbrown Nov 8, 2021
46389bc
Adjust more tests to not restore system index by name
gwbrown Nov 9, 2021
39b2de9
Merge remote-tracking branch 'origin/master' into si/snapshots-behavior
gwbrown Nov 9, 2021
4872909
Update comments (& test name) per review
gwbrown Nov 9, 2021
f9fc1c1
Remove duplicate test
gwbrown Nov 9, 2021
1477d19
Remove another duplicate test case
gwbrown Nov 9, 2021
baa5f87
Remove accidentally commited file
gwbrown Nov 9, 2021
ff1ddc6
Fix Security snapshot test
gwbrown Nov 9, 2021
1d552f7
Merge branch 'master' into si/snapshots-behavior
gwbrown Nov 22, 2021
033e0ea
Merge remote-tracking branch 'origin/master' into si/snapshots-behavior
gwbrown Nov 25, 2021
02a8f7b
Merge branch 'master' into si/snapshots-behavior
elasticmachine Nov 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ protected Node(
repositoryService,
transportService,
actionModule.getActionFilters(),
systemIndices.getFeatures()
systemIndices
);
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
settings,
Expand Down
107 changes: 79 additions & 28 deletions server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.regex.Regex;
Expand Down Expand Up @@ -356,8 +355,8 @@ private void startRestore(
snapshotId,
snapshotInfo,
globalMetadata,
// include system data stream names in argument to this method
Stream.concat(requestIndices.stream(), featureStateDataStreams.stream()).collect(Collectors.toList()),
requestIndices,
featureStateDataStreams,
request.includeAliases()
);
Map<String, DataStream> dataStreamsToRestore = result.v1();
Expand All @@ -367,47 +366,75 @@ private void startRestore(
requestIndices.removeAll(dataStreamsToRestore.keySet());

// And add the backing indices
Set<String> dataStreamIndices = dataStreamsToRestore.values()
.stream()
.flatMap(ds -> ds.getIndices().stream())
.map(Index::getName)
final Set<String> nonSystemDataStreamIndices;
final Set<String> systemDataStreamIndices;
{
Map<Boolean, Set<String>> dataStreamIndices = dataStreamsToRestore.values()
.stream()
.flatMap(ds -> ds.getIndices().stream().map(idx -> new Tuple<>(ds.isSystem(), idx.getName())))
.collect(Collectors.partitioningBy(Tuple::v1, Collectors.mapping(Tuple::v2, Collectors.toSet())));
systemDataStreamIndices = dataStreamIndices.get(true);
nonSystemDataStreamIndices = dataStreamIndices.get(false);
}
requestIndices.addAll(nonSystemDataStreamIndices);
final Set<String> allSystemIndicesToRestore = Stream.of(systemDataStreamIndices, featureStateIndices)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
requestIndices.addAll(dataStreamIndices);

// Strip system indices out of the list of "available" indices - these should only come from feature states.
List<String> availableNonSystemIndices;
{
Set<String> systemIndicesInSnapshot = new HashSet<>();
snapshotInfo.featureStates().stream().flatMap(state -> state.getIndices().stream()).forEach(systemIndicesInSnapshot::add);
// And the system data stream backing indices too
snapshotInfo.indices().stream().filter(systemIndices::isSystemIndexBackingDataStream).forEach(systemIndicesInSnapshot::add);

Set<String> explicitlyRequestedSystemIndices = new HashSet<>(requestIndices);
explicitlyRequestedSystemIndices.retainAll(systemIndicesInSnapshot);

if (explicitlyRequestedSystemIndices.size() > 0) {
throw new IllegalArgumentException(
new ParameterizedMessage(
"requested system indices {}, but system indices can only be restored as part of a feature state",
explicitlyRequestedSystemIndices
).getFormattedMessage()
);
}

availableNonSystemIndices = snapshotInfo.indices()
.stream()
.filter(idxName -> systemIndicesInSnapshot.contains(idxName) == false)
.collect(Collectors.toList());
}

// Resolve the indices that were directly requested
final List<String> requestedIndicesInSnapshot = filterIndices(
snapshotInfo.indices(),
availableNonSystemIndices,
requestIndices.toArray(String[]::new),
request.indicesOptions()
);

// Combine into the final list of indices to be restored
final List<String> requestedIndicesIncludingSystem = Stream.concat(
requestedIndicesInSnapshot.stream(),
featureStateIndices.stream()
).distinct().collect(Collectors.toList());
final List<String> requestedIndicesIncludingSystem = Stream.of(
requestedIndicesInSnapshot,
featureStateIndices,
systemDataStreamIndices
).flatMap(Collection::stream).distinct().collect(Collectors.toList());

final Set<String> explicitlyRequestedSystemIndices = new HashSet<>();
for (IndexId indexId : repositoryData.resolveIndices(requestedIndicesIncludingSystem).values()) {
IndexMetadata snapshotIndexMetaData = repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId);
if (snapshotIndexMetaData.isSystem()) {
if (requestedIndicesInSnapshot.contains(indexId.getName())) {
if (requestIndices.contains(indexId.getName())) {
explicitlyRequestedSystemIndices.add(indexId.getName());
}
}
metadataBuilder.put(snapshotIndexMetaData, false);
}

// log a deprecation warning if the any of the indexes to delete were included in the request and the snapshot
// is from a version that should have feature states
if (snapshotInfo.version().onOrAfter(Version.V_7_12_0) && explicitlyRequestedSystemIndices.isEmpty() == false) {
deprecationLogger.warn(
DeprecationCategory.API,
"restore-system-index-from-snapshot",
"Restoring system indices by name is deprecated. Use feature states instead. System indices: "
+ explicitlyRequestedSystemIndices
);
}
assert explicitlyRequestedSystemIndices.size() == 0
: "it should be impossible to reach this point with explicitly requested system indices, but got: "
+ explicitlyRequestedSystemIndices;

// Now we can start the actual restore process by adding shards to be recovered in the cluster state
// and updating cluster metadata (global and index) as needed
Expand All @@ -419,7 +446,13 @@ private void startRestore(
featureStatesToRestore.keySet(),
// Apply renaming on index names, returning a map of names where
// the key is the renamed index and the value is the original name
renamedIndices(request, requestedIndicesIncludingSystem, dataStreamIndices, featureStateIndices, repositoryData),
renamedIndices(
request,
requestedIndicesIncludingSystem,
nonSystemDataStreamIndices,
allSystemIndicesToRestore,
repositoryData
),
snapshotInfo,
metadataBuilder.dataStreams(dataStreamsToRestore, dataStreamAliasesToRestore).build(),
dataStreamsToRestore.values(),
Expand Down Expand Up @@ -501,13 +534,14 @@ private Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> getDataStre
SnapshotInfo snapshotInfo,
Metadata globalMetadata,
List<String> requestIndices,
Collection<String> featureStateDataStreams,
boolean includeAliases
) {
Map<String, DataStream> dataStreams;
Map<String, DataStreamAlias> dataStreamAliases;
List<String> requestedDataStreams = filterIndices(
snapshotInfo.dataStreams(),
requestIndices.toArray(String[]::new),
Stream.of(requestIndices, featureStateDataStreams).flatMap(Collection::stream).toArray(String[]::new),
IndicesOptions.fromOptions(true, true, true, true)
);
if (requestedDataStreams.isEmpty()) {
Expand All @@ -522,13 +556,30 @@ private Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> getDataStre
for (String requestedDataStream : requestedDataStreams) {
final DataStream dataStreamInSnapshot = dataStreamsInSnapshot.get(requestedDataStream);
assert dataStreamInSnapshot != null : "DataStream [" + requestedDataStream + "] not found in snapshot";
dataStreams.put(requestedDataStream, dataStreamInSnapshot);

if (dataStreamInSnapshot.isSystem() == false) {
dataStreams.put(requestedDataStream, dataStreamInSnapshot);
} else if (requestIndices.contains(requestedDataStream)) {
throw new IllegalArgumentException(
new ParameterizedMessage(
"requested system data stream [{}], but system data streams can only be restored as part of a feature state",
requestedDataStream
).getFormattedMessage()
);
} else if (featureStateDataStreams.contains(requestedDataStream)) {
dataStreams.put(requestedDataStream, dataStreamInSnapshot);
} else {
logger.debug(
"omitting system data stream [{}] from snapshot restoration because its feature state was not requested",
requestedDataStream
);
}
}
if (includeAliases) {
dataStreamAliases = new HashMap<>();
final Map<String, DataStreamAlias> dataStreamAliasesInSnapshot = globalMetadata.dataStreamAliases();
for (DataStreamAlias alias : dataStreamAliasesInSnapshot.values()) {
DataStreamAlias copy = alias.intersect(requestedDataStreams::contains);
DataStreamAlias copy = alias.intersect(dataStreams.keySet()::contains);
if (copy.getDataStreams().isEmpty() == false) {
dataStreamAliases.put(alias.getName(), copy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus

private final OngoingRepositoryOperations repositoryOperations = new OngoingRepositoryOperations();

private final Map<String, SystemIndices.Feature> systemIndexDescriptorMap;
private final SystemIndices systemIndices;

/**
* Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the
Expand All @@ -194,7 +194,7 @@ public SnapshotsService(
RepositoriesService repositoriesService,
TransportService transportService,
ActionFilters actionFilters,
Map<String, SystemIndices.Feature> systemIndexDescriptorMap
SystemIndices systemIndices
) {
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand All @@ -217,7 +217,7 @@ public SnapshotsService(
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i);
}
this.systemIndexDescriptorMap = systemIndexDescriptorMap;
this.systemIndices = systemIndices;
}

/**
Expand Down Expand Up @@ -262,7 +262,7 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
if (request.includeGlobalState() || requestedStates.isEmpty() == false) {
if (request.includeGlobalState() && requestedStates.isEmpty()) {
// If we're including global state and feature states aren't specified, include all of them
featureStatesSet = systemIndexDescriptorMap.keySet();
featureStatesSet = systemIndices.getFeatures().keySet();
} else if (requestedStates.size() == 1 && NO_FEATURE_STATES_VALUE.equalsIgnoreCase(requestedStates.get(0))) {
// If there's exactly one value and it's "none", include no states
featureStatesSet = Collections.emptySet();
Expand All @@ -281,7 +281,7 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
return;
}
featureStatesSet = new HashSet<>(requestedStates);
featureStatesSet.retainAll(systemIndexDescriptorMap.keySet());
featureStatesSet.retainAll(systemIndices.getFeatures().keySet());
}
} else {
featureStatesSet = Collections.emptySet();
Expand All @@ -306,15 +306,17 @@ public ClusterState execute(ClusterState currentState) {
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
List<String> indices = Arrays.stream(indexNameExpressionResolver.concreteIndexNames(currentState, request))
.filter(indexName -> systemIndices.isSystemIndex(indexName) == false) // Only resolve system indices via Features
.collect(Collectors.toList());

final Set<SnapshotFeatureInfo> featureStates = new HashSet<>();
final Set<String> systemDataStreamNames = new HashSet<>();
// if we have any feature states in the snapshot, we add their required indices to the snapshot indices if they haven't
// been requested by the request directly
final Set<String> indexNames = new HashSet<>(indices);
for (String featureName : featureStatesSet) {
SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName);
SystemIndices.Feature feature = systemIndices.getFeatures().get(featureName);

Set<String> featureSystemIndices = feature.getIndexDescriptors()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,7 @@ protected void assertSnapshotOrGenericThread() {
repositoriesService,
transportService,
actionFilters,
Collections.emptyMap()
EmptySystemIndices.INSTANCE
);
nodeEnv = new NodeEnvironment(settings, environment);
final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,14 @@ public static SnapshotInfo createFullSnapshot(Logger logger, String repoName, St
return snapshotInfo;
}

protected SnapshotInfo createSnapshot(String repositoryName, String snapshot, List<String> indices) {
protected SnapshotInfo createSnapshot(String repositoryName, String snapshot, List<String> indices, List<String> featureStates) {
logger.info("--> creating snapshot [{}] of {} in [{}]", snapshot, indices, repositoryName);
final CreateSnapshotResponse response = client().admin()
.cluster()
.prepareCreateSnapshot(repositoryName, snapshot)
.setIndices(indices.toArray(Strings.EMPTY_ARRAY))
.setWaitForCompletion(true)
.setFeatureStates(NO_FEATURE_STATES_VALUE) // Exclude all feature states to ensure only specified indices are included
.setFeatureStates(featureStates.toArray(Strings.EMPTY_ARRAY))
.get();

final SnapshotInfo snapshotInfo = response.getSnapshotInfo();
Expand All @@ -454,6 +454,10 @@ protected SnapshotInfo createSnapshot(String repositoryName, String snapshot, Li
return snapshotInfo;
}

protected SnapshotInfo createSnapshot(String repositoryName, String snapshot, List<String> indices) {
return createSnapshot(repositoryName, snapshot, indices, Collections.singletonList(NO_FEATURE_STATES_VALUE));
}

protected void createIndexWithRandomDocs(String indexName, int docCount) throws InterruptedException {
createIndex(indexName);
ensureGreen();
Expand Down
Loading