Skip to content

Commit

Permalink
Merge branch 'main' into breaking/default_random_score_function_seq_no
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent authored Dec 17, 2024
2 parents ff4dc9a + b8f4677 commit 8bd668e
Show file tree
Hide file tree
Showing 99 changed files with 3,662 additions and 205 deletions.
1 change: 1 addition & 0 deletions .buildkite/pipelines/periodic-platform-support.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ steps:
image:
- almalinux-8-aarch64
- ubuntu-2004-aarch64
- ubuntu-2404-aarch64
GRADLE_TASK:
- checkPart1
- checkPart2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,8 @@ org.elasticsearch.cluster.ClusterState#compatibilityVersions()

@defaultMessage ClusterFeatures#nodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#nodeFeatures()
@defaultMessage ClusterFeatures#allNodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#allNodeFeatures()
@defaultMessage ClusterFeatures#clusterHasFeature is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.features.NodeFeature)
org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.cluster.node.DiscoveryNodes, org.elasticsearch.features.NodeFeature)

@defaultMessage Do not construct this records outside the source files they are declared in
org.elasticsearch.cluster.SnapshotsInProgress$ShardSnapshotStatus#<init>(java.lang.String, org.elasticsearch.cluster.SnapshotsInProgress$ShardState, org.elasticsearch.repositories.ShardGeneration, java.lang.String, org.elasticsearch.repositories.ShardSnapshotResult)
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/116388.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116388
summary: Add support for partial shard results
area: EQL
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/118143.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118143
summary: Infrastructure for assuming cluster features in the next major version
area: "Infra/Core"
type: feature
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/118674.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118674
summary: Ignore failures from renormalizing buckets in read-only index
area: Machine Learning
type: enhancement
issues: []
21 changes: 21 additions & 0 deletions docs/reference/alias.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,24 @@ POST _aliases
}
----
// TEST[s/^/PUT my-index-2099.05.06-000001\n/]

[discrete]
[[remove-index]]
=== Remove an index

To remove an index, use the aliases API's `remove_index` action.

[source,console]
----
POST _aliases
{
"actions": [
{
"remove_index": {
"index": "my-index-2099.05.06-000001"
}
}
]
}
----
// TEST[s/^/PUT my-index-2099.05.06-000001\n/]
47 changes: 47 additions & 0 deletions docs/reference/eql/eql-search-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,53 @@ request that targets only `bar*` still returns an error.
+
Defaults to `true`.

`allow_partial_search_results`::
(Optional, Boolean)

If `false`, the request returns an error if one or more shards involved in the query are unavailable.
+
If `true`, the query is executed only on the available shards, ignoring shard request timeouts and
<<shard-failures,shard failures>>.
+
Defaults to `false`.
+
To override the default for this field, set the
`xpack.eql.default_allow_partial_results` cluster setting to `true`.


[IMPORTANT]
====
You can also specify this value using the `allow_partial_search_results` request body parameter.
If both parameters are specified, only the query parameter is used.
====


`allow_partial_sequence_results`::
(Optional, Boolean)


Used together with `allow_partial_search_results=true`, controls the behavior of sequence queries specifically
(if `allow_partial_search_results=false`, this setting has no effect).
If `true` and if some shards are unavailable, the sequences are calculated on available shards only.
+
If `false` and if some shards are unavailable, the query only returns information about the shard failures,
but no further results.
+
Defaults to `false`.
+
Consider that sequences calculated with `allow_partial_search_results=true` can return incorrect results
(eg. if a <<eql-missing-events, missing event>> clause matches records in unavailable shards)
+
To override the default for this field, set the
`xpack.eql.default_allow_partial_sequence_results` cluster setting to `true`.


[IMPORTANT]
====
You can also specify this value using the `allow_partial_sequence_results` request body parameter.
If both parameters are specified, only the query parameter is used.
====

`ccs_minimize_roundtrips`::
(Optional, Boolean) If `true`, network round-trips between the local and the
remote cluster are minimized when running cross-cluster search (CCS) requests.
Expand Down
5 changes: 3 additions & 2 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,9 @@ tests:
- class: org.elasticsearch.xpack.security.QueryableReservedRolesIT
method: testDeletingAndCreatingSecurityIndexTriggersSynchronization
issue: https://github.com/elastic/elasticsearch/issues/118806
- class: org.elasticsearch.xpack.esql.session.IndexResolverFieldNamesTests
issue: https://github.com/elastic/elasticsearch/issues/118814
- class: org.elasticsearch.index.engine.RecoverySourcePruneMergePolicyTests
method: testPruneSome
issue: https://github.com/elastic/elasticsearch/issues/118728

# Examples:
#
Expand Down
10 changes: 10 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/eql.search.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@
"type": "time",
"description": "Update the time interval in which the results (partial or final) for this search will be available",
"default": "5d"
},
"allow_partial_search_results": {
"type":"boolean",
"description":"Control whether the query should keep running in case of shard failures, and return partial results",
"default":false
},
"allow_partial_sequence_results": {
"type":"boolean",
"description":"Control whether a sequence query should return partial results or no results at all in case of shard failures. This option has effect only if [allow_partial_search_results] is true.",
"default":false
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ static TransportVersion def(int id) {
public static final TransportVersion KNN_QUERY_RESCORE_OVERSAMPLE = def(8_806_00_0);
public static final TransportVersion SEMANTIC_QUERY_LENIENT = def(8_807_00_0);
public static final TransportVersion ESQL_QUERY_BUILDER_IN_SEARCH_FUNCTIONS = def(8_808_00_0);
public static final TransportVersion EQL_ALLOW_PARTIAL_SEARCH_RESULTS = def(8_809_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
56 changes: 45 additions & 11 deletions server/src/main/java/org/elasticsearch/cluster/ClusterFeatures.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.xcontent.ToXContent;

Expand Down Expand Up @@ -79,28 +80,61 @@ public Map<String, Set<String>> nodeFeatures() {
return nodeFeatures;
}

/**
* The features in all nodes in the cluster.
* <p>
* NOTE: This should not be used directly.
* Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead.
*/
public Set<String> allNodeFeatures() {
private Set<String> allNodeFeatures() {
if (allNodeFeatures == null) {
allNodeFeatures = Set.copyOf(calculateAllNodeFeatures(nodeFeatures.values()));
}
return allNodeFeatures;
}

/**
* Returns {@code true} if {@code node} can have assumed features.
* @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures
*/
public static boolean featuresCanBeAssumedForNode(DiscoveryNode node) {
return node.getBuildVersion().canRemoveAssumedFeatures();
}

/**
* Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
* @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures
*/
public static boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) {
return nodes.getAllNodes().stream().anyMatch(n -> n.getBuildVersion().canRemoveAssumedFeatures());
}

/**
* {@code true} if {@code feature} is present on all nodes in the cluster.
* <p>
* NOTE: This should not be used directly.
* Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead.
*/
@SuppressForbidden(reason = "directly reading cluster features")
public boolean clusterHasFeature(NodeFeature feature) {
return allNodeFeatures().contains(feature.id());
public boolean clusterHasFeature(DiscoveryNodes nodes, NodeFeature feature) {
assert nodes.getNodes().keySet().equals(nodeFeatures.keySet())
: "Cluster features nodes " + nodeFeatures.keySet() + " is different to discovery nodes " + nodes.getNodes().keySet();

// basic case
boolean allNodesHaveFeature = allNodeFeatures().contains(feature.id());
if (allNodesHaveFeature) {
return true;
}

// if the feature is assumed, check the versions more closely
// it's actually ok if the feature is assumed, and all nodes missing the feature can assume it
// TODO: do we need some kind of transient cache of this calculation?
if (feature.assumedAfterNextCompatibilityBoundary()) {
for (var nf : nodeFeatures.entrySet()) {
if (nf.getValue().contains(feature.id()) == false
&& featuresCanBeAssumedForNode(nodes.getNodes().get(nf.getKey())) == false) {
return false;
}
}

// all nodes missing the feature can assume it - so that's alright then
return true;
}

return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand All @@ -39,6 +40,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -137,8 +139,8 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
Map<String, CompatibilityVersions> compatibilityVersionsMap = new HashMap<>(newState.compatibilityVersions());
Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures());
Set<String> allNodesFeatures = ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures()); // as present in cluster state
Set<String> effectiveClusterFeatures = calculateEffectiveClusterFeatures(newState.nodes(), nodeFeatures);

assert nodesBuilder.isLocalNodeElectedMaster();

Expand Down Expand Up @@ -174,14 +176,17 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex
}
blockForbiddenVersions(compatibilityVersions.transportVersion());
ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
enforceNodeFeatureBarrier(node.getId(), allNodesFeatures, features);
Set<String> newNodeEffectiveFeatures = enforceNodeFeatureBarrier(node, effectiveClusterFeatures, features);
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
ensureIndexCompatibility(node.getMinIndexVersion(), node.getMaxIndexVersion(), initialState.getMetadata());

nodesBuilder.add(node);
compatibilityVersionsMap.put(node.getId(), compatibilityVersions);
// store the actual node features here, not including assumed features, as this is persisted in cluster state
nodeFeatures.put(node.getId(), features);
allNodesFeatures.retainAll(features);
effectiveClusterFeatures.retainAll(newNodeEffectiveFeatures);

nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand Down Expand Up @@ -355,6 +360,35 @@ private static void blockForbiddenVersions(TransportVersion joiningTransportVers
}
}

/**
* Calculate the cluster's effective features. This includes all features that are assumed on any nodes in the cluster,
* that are also present across the whole cluster as a result.
*/
private Set<String> calculateEffectiveClusterFeatures(DiscoveryNodes nodes, Map<String, Set<String>> nodeFeatures) {
if (featureService.featuresCanBeAssumedForNodes(nodes)) {
Set<String> assumedFeatures = featureService.getNodeFeatures()
.values()
.stream()
.filter(NodeFeature::assumedAfterNextCompatibilityBoundary)
.map(NodeFeature::id)
.collect(Collectors.toSet());

// add all assumed features to the featureset of all nodes of the next major version
nodeFeatures = new HashMap<>(nodeFeatures);
for (var node : nodes.getNodes().entrySet()) {
if (featureService.featuresCanBeAssumedForNode(node.getValue())) {
assert nodeFeatures.containsKey(node.getKey()) : "Node " + node.getKey() + " does not have any features";
nodeFeatures.computeIfPresent(node.getKey(), (k, v) -> {
var newFeatures = new HashSet<>(v);
return newFeatures.addAll(assumedFeatures) ? newFeatures : v;
});
}
}
}

return ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
}

/**
* Ensures that all indices are compatible with the given index version. This will ensure that all indices in the given metadata
* will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index
Expand Down Expand Up @@ -461,13 +495,44 @@ public static void ensureVersionBarrier(Version joiningNodeVersion, Version minC
}
}

private void enforceNodeFeatureBarrier(String nodeId, Set<String> existingNodesFeatures, Set<String> newNodeFeatures) {
/**
* Enforces the feature join barrier - a joining node should have all features already present in all existing nodes in the cluster
*
* @return The set of features that this node has (including assumed features)
*/
private Set<String> enforceNodeFeatureBarrier(DiscoveryNode node, Set<String> effectiveClusterFeatures, Set<String> newNodeFeatures) {
// prevent join if it does not have one or more features that all other nodes have
Set<String> missingFeatures = new HashSet<>(existingNodesFeatures);
Set<String> missingFeatures = new HashSet<>(effectiveClusterFeatures);
missingFeatures.removeAll(newNodeFeatures);

if (missingFeatures.isEmpty() == false) {
throw new IllegalStateException("Node " + nodeId + " is missing required features " + missingFeatures);
if (missingFeatures.isEmpty()) {
// nothing missing - all ok
return newNodeFeatures;
}

if (featureService.featuresCanBeAssumedForNode(node)) {
// it might still be ok for this node to join if this node can have assumed features,
// and all the missing features are assumed
// we can get the NodeFeature object direct from this node's registered features
// as all existing nodes in the cluster have the features present in existingNodesFeatures, including this one
newNodeFeatures = new HashSet<>(newNodeFeatures);
for (Iterator<String> it = missingFeatures.iterator(); it.hasNext();) {
String feature = it.next();
NodeFeature nf = featureService.getNodeFeatures().get(feature);
if (nf.assumedAfterNextCompatibilityBoundary()) {
// its ok for this feature to be missing from this node
it.remove();
// and it should be assumed to still be in the cluster
newNodeFeatures.add(feature);
}
// even if we don't remove it, still continue, so the exception message below is accurate
}
}

if (missingFeatures.isEmpty()) {
return newNodeFeatures;
} else {
throw new IllegalStateException("Node " + node.getId() + " is missing required features " + missingFeatures);
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/elasticsearch/env/BuildVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
*/
public abstract class BuildVersion implements ToXContentFragment, Writeable {

/**
* Checks if this version can operate properly in a cluster without features
* that are assumed in the currently running Elasticsearch.
*/
public abstract boolean canRemoveAssumedFeatures();

/**
* Check whether this version is on or after a minimum threshold.
*
Expand Down
Loading

0 comments on commit 8bd668e

Please sign in to comment.