Skip to content

Commit

Permalink
Excluding system indices from max shard limit validator (#2894) (#2911)
Browse files Browse the repository at this point in the history
* Excluding system indices from max shard limit validator

Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>

* Fixing spotless check violations

Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>

* Fixing NPE due to null isHidden

Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>

* Adding unit tests for shard opening scenario

Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>

* Addressing review comments

Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>
(cherry picked from commit d39c18f)

Co-authored-by: Ankit Jain <jain.ankitk@gmail.com>
  • Loading branch information
opensearch-trigger-bot[bot] and jainankitk authored Apr 15, 2022
1 parent 2069aa3 commit 35ae7fe
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.InvalidIndexNameException;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.indices.SystemIndices;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -214,17 +213,9 @@ public void validateIndexName(String index, ClusterState state) {
* @param isHidden Whether or not this is a hidden index
*/
public boolean validateDotIndex(String index, @Nullable Boolean isHidden) {
boolean isSystem = false;
if (index.charAt(0) == '.') {
SystemIndexDescriptor matchingDescriptor = systemIndices.findMatchingDescriptor(index);
if (matchingDescriptor != null) {
logger.trace(
"index [{}] is a system index because it matches index pattern [{}] with description [{}]",
index,
matchingDescriptor.getIndexPattern(),
matchingDescriptor.getDescription()
);
isSystem = true;
if (systemIndices.validateSystemIndex(index)) {
return true;
} else if (isHidden) {
logger.trace("index [{}] is a hidden index", index);
} else {
Expand All @@ -237,7 +228,7 @@ public boolean validateDotIndex(String index, @Nullable Boolean isHidden) {
}
}

return isSystem;
return false;
}

/**
Expand Down Expand Up @@ -890,7 +881,7 @@ static Settings aggregateIndexSettings(
* We can not validate settings until we have applied templates, otherwise we do not know the actual settings
* that will be used to create this index.
*/
shardLimitValidator.validateShardLimit(indexSettings, currentState);
shardLimitValidator.validateShardLimit(request.index(), indexSettings, currentState);
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(indexSettings) == false
&& IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(indexSettings).onOrAfter(Version.V_2_0_0)) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ public class ShardLimitValidator {
Setting.Property.NodeScope
);
protected final AtomicInteger shardLimitPerNode = new AtomicInteger();
private final SystemIndices systemIndices;

public ShardLimitValidator(final Settings settings, ClusterService clusterService) {
public ShardLimitValidator(final Settings settings, ClusterService clusterService, SystemIndices systemIndices) {
this.shardLimitPerNode.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE, this::setShardLimitPerNode);
this.systemIndices = systemIndices;
}

private void setShardLimitPerNode(int newValue) {
Expand All @@ -84,11 +86,17 @@ public int getShardLimitPerNode() {
/**
* Checks whether an index can be created without going over the cluster shard limit.
*
* @param indexName the name of the index being created
* @param settings the settings of the index to be created
* @param state the current cluster state
* @throws ValidationException if creating this index would put the cluster over the cluster shard limit
*/
public void validateShardLimit(final Settings settings, final ClusterState state) {
public void validateShardLimit(final String indexName, final Settings settings, final ClusterState state) {
// Validate shard limit only for non system indices as it is not hard limit anyways
if (systemIndices.validateSystemIndex(indexName)) {
return;
}

final int numberOfShards = INDEX_NUMBER_OF_SHARDS_SETTING.get(settings);
final int numberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
final int shardsToCreate = numberOfShards * (1 + numberOfReplicas);
Expand All @@ -111,6 +119,8 @@ public void validateShardLimit(final Settings settings, final ClusterState state
*/
public void validateShardLimit(ClusterState currentState, Index[] indicesToOpen) {
int shardsToOpen = Arrays.stream(indicesToOpen)
// Validate shard limit only for non system indices as it is not hard limit anyways
.filter(index -> !systemIndices.validateSystemIndex(index.getName()))
.filter(index -> currentState.metadata().index(index).getState().equals(IndexMetadata.State.CLOSE))
.mapToInt(index -> getTotalShardCount(currentState, index))
.sum();
Expand Down
25 changes: 25 additions & 0 deletions server/src/main/java/org/opensearch/indices/SystemIndices.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.indices;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.automaton.Automata;
import org.apache.lucene.util.automaton.Automaton;
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
Expand Down Expand Up @@ -63,6 +65,8 @@
* to reduce the locations within the code that need to deal with {@link SystemIndexDescriptor}s.
*/
public class SystemIndices {
private static final Logger logger = LogManager.getLogger(SystemIndices.class);

private static final Map<String, Collection<SystemIndexDescriptor>> SERVER_SYSTEM_INDEX_DESCRIPTORS = singletonMap(
TaskResultsService.class.getName(),
singletonList(new SystemIndexDescriptor(TASK_INDEX + "*", "Task Result Index"))
Expand Down Expand Up @@ -135,6 +139,27 @@ public boolean isSystemIndex(String indexName) {
}
}

/**
* Validates (if this index has a dot-prefixed name) and it is system index.
* @param index The name of the index in question
*/
public boolean validateSystemIndex(String index) {
if (index.charAt(0) == '.') {
SystemIndexDescriptor matchingDescriptor = findMatchingDescriptor(index);
if (matchingDescriptor != null) {
logger.trace(
"index [{}] is a system index because it matches index pattern [{}] with description [{}]",
index,
matchingDescriptor.getIndexPattern(),
matchingDescriptor.getDescription()
);
return true;
}
}

return false;
}

private static CharacterRunAutomaton buildCharacterRunAutomaton(Collection<SystemIndexDescriptor> descriptors) {
Optional<Automaton> automaton = descriptors.stream()
.map(descriptor -> Regex.simpleMatchToAutomaton(descriptor.getIndexPattern()))
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ protected Node(

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
settings,
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,11 @@ public ClusterState execute(ClusterState currentState) {
.put(snapshotIndexMetadata.getSettings())
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
);
shardLimitValidator.validateShardLimit(snapshotIndexMetadata.getSettings(), currentState);
shardLimitValidator.validateShardLimit(
renamedIndexName,
snapshotIndexMetadata.getSettings(),
currentState
);
if (!request.includeAliases() && !snapshotIndexMetadata.getAliases().isEmpty()) {
// Remove all aliases - they shouldn't be restored
indexMdBuilder.removeAllAliases();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,8 @@ public void testRolloverClusterState() throws Exception {
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg());

ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService);
final SystemIndices systemIndices = new SystemIndices(emptyMap());
ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService, systemIndices);
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(
Settings.EMPTY,
clusterService,
Expand All @@ -615,7 +616,7 @@ public void testRolloverClusterState() throws Exception {
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
testThreadPool,
null,
new SystemIndices(emptyMap()),
systemIndices,
false
);
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(
Expand Down Expand Up @@ -739,7 +740,8 @@ public void testRolloverClusterStateForDataStream() throws Exception {
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg());

ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService);
final SystemIndices systemIndices = new SystemIndices(emptyMap());
ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService, systemIndices);
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(
Settings.EMPTY,
clusterService,
Expand All @@ -751,7 +753,7 @@ public void testRolloverClusterStateForDataStream() throws Exception {
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
testThreadPool,
null,
new SystemIndices(emptyMap()),
systemIndices,
false
);
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static org.opensearch.cluster.metadata.IndexMetadata.*;
import static org.opensearch.cluster.metadata.MetadataIndexStateServiceTests.addClosedIndex;
import static org.opensearch.cluster.metadata.MetadataIndexStateServiceTests.addOpenedIndex;
import static org.opensearch.cluster.shards.ShardCounts.forDataNodeCount;
Expand Down Expand Up @@ -104,7 +106,54 @@ public void testUnderShardLimit() {
assertFalse(errorMessage.isPresent());
}

public void testValidateShardLimit() {
/**
* This test validates that system index creation succeeds
* even though it exceeds the cluster max shard limit
*/
public void testSystemIndexCreationSucceeds() {
final ShardLimitValidator shardLimitValidator = createTestShardLimitService(1);
final Settings settings = Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.build();
final ClusterState state = createClusterForShardLimitTest(1, 1, 0);
shardLimitValidator.validateShardLimit(".tasks", settings, state);
}

/**
* This test validates that non-system index creation
* fails when it exceeds the cluster max shard limit
*/
public void testNonSystemIndexCreationFails() {
final ShardLimitValidator shardLimitValidator = createTestShardLimitService(1);
final Settings settings = Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.build();
final ClusterState state = createClusterForShardLimitTest(1, 1, 0);
final ValidationException exception = expectThrows(
ValidationException.class,
() -> shardLimitValidator.validateShardLimit("abc", settings, state)
);
assertEquals(
"Validation Failed: 1: this action would add ["
+ 2
+ "] total shards, but this cluster currently has ["
+ 1
+ "]/["
+ 1
+ "] maximum shards open;",
exception.getMessage()
);
}

/**
* This test validates that non-system index opening
* fails when it exceeds the cluster max shard limit
*/
public void testNonSystemIndexOpeningFails() {
int nodesInCluster = randomIntBetween(2, 90);
ShardCounts counts = forDataNodeCount(nodesInCluster);
ClusterState state = createClusterForShardLimitTest(
Expand Down Expand Up @@ -140,6 +189,33 @@ public void testValidateShardLimit() {
);
}

/**
* This test validates that system index opening succeeds
* even when it exceeds the cluster max shard limit
*/
public void testSystemIndexOpeningSucceeds() {
int nodesInCluster = randomIntBetween(2, 90);
ShardCounts counts = forDataNodeCount(nodesInCluster);
ClusterState state = createClusterForShardLimitTest(
nodesInCluster,
randomAlphaOfLengthBetween(5, 15),
counts.getFirstIndexShards(),
counts.getFirstIndexReplicas(),
".tasks", // Adding closed system index to cluster state
counts.getFailingIndexShards(),
counts.getFailingIndexReplicas()
);

Index[] indices = Arrays.stream(state.metadata().indices().values().toArray(IndexMetadata.class))
.map(IndexMetadata::getIndex)
.collect(Collectors.toList())
.toArray(new Index[2]);

// Shard limit validation succeeds without any issues as system index is being opened
ShardLimitValidator shardLimitValidator = createTestShardLimitService(counts.getShardsPerNode());
shardLimitValidator.validateShardLimit(state, indices);
}

public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int shardsInIndex, int replicas) {
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodes = ImmutableOpenMap.builder();
for (int i = 0; i < nodesInCluster; i++) {
Expand All @@ -165,8 +241,10 @@ public static ClusterState createClusterForShardLimitTest(int nodesInCluster, in

public static ClusterState createClusterForShardLimitTest(
int nodesInCluster,
String openIndexName,
int openIndexShards,
int openIndexReplicas,
String closeIndexName,
int closedIndexShards,
int closedIndexReplicas
) {
Expand All @@ -178,8 +256,8 @@ public static ClusterState createClusterForShardLimitTest(
when(nodes.getDataNodes()).thenReturn(dataNodes.build());

ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build();
state = addOpenedIndex(randomAlphaOfLengthBetween(5, 15), openIndexShards, openIndexReplicas, state);
state = addClosedIndex(randomAlphaOfLengthBetween(5, 15), closedIndexShards, closedIndexReplicas, state);
state = addOpenedIndex(openIndexName, openIndexShards, openIndexReplicas, state);
state = addClosedIndex(closeIndexName, closedIndexShards, closedIndexReplicas, state);

final Metadata.Builder metadata = Metadata.builder(state.metadata());
if (randomBoolean()) {
Expand All @@ -190,6 +268,24 @@ public static ClusterState createClusterForShardLimitTest(
return ClusterState.builder(state).metadata(metadata).nodes(nodes).build();
}

public static ClusterState createClusterForShardLimitTest(
int nodesInCluster,
int openIndexShards,
int openIndexReplicas,
int closedIndexShards,
int closedIndexReplicas
) {
return createClusterForShardLimitTest(
nodesInCluster,
randomAlphaOfLengthBetween(5, 15),
openIndexShards,
openIndexReplicas,
randomAlphaOfLengthBetween(5, 15),
closedIndexShards,
closedIndexReplicas
);
}

/**
* Creates a {@link ShardLimitValidator} for testing with the given setting and a mocked cluster service.
*
Expand All @@ -204,7 +300,7 @@ public static ShardLimitValidator createTestShardLimitService(int maxShardsPerNo
new ClusterSettings(limitOnlySettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

return new ShardLimitValidator(limitOnlySettings, clusterService);
return new ShardLimitValidator(limitOnlySettings, clusterService, new SystemIndices(emptyMap()));
}

/**
Expand All @@ -217,6 +313,6 @@ public static ShardLimitValidator createTestShardLimitService(int maxShardsPerNo
public static ShardLimitValidator createTestShardLimitService(int maxShardsPerNode, ClusterService clusterService) {
Settings limitOnlySettings = Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), maxShardsPerNode).build();

return new ShardLimitValidator(limitOnlySettings, clusterService);
return new ShardLimitValidator(limitOnlySettings, clusterService, new SystemIndices(emptyMap()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ public IndexMetadata upgradeIndexMetadata(IndexMetadata indexMetadata, Version m
null,
actionFilters
);
ShardLimitValidator shardLimitValidator = new ShardLimitValidator(SETTINGS, clusterService);
final SystemIndices systemIndices = new SystemIndices(emptyMap());
ShardLimitValidator shardLimitValidator = new ShardLimitValidator(SETTINGS, clusterService, systemIndices);
MetadataIndexStateService indexStateService = new MetadataIndexStateService(
clusterService,
allocationService,
Expand Down Expand Up @@ -290,7 +291,7 @@ public IndexMetadata upgradeIndexMetadata(IndexMetadata indexMetadata, Version m
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
threadPool,
xContentRegistry,
new SystemIndices(emptyMap()),
systemIndices,
true
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1863,7 +1863,8 @@ public void onFailure(final Exception e) {
RetentionLeaseSyncer.EMPTY
);
Map<ActionType, TransportAction> actions = new HashMap<>();
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
final SystemIndices systemIndices = new SystemIndices(emptyMap());
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
settings,
clusterService,
Expand All @@ -1875,7 +1876,7 @@ public void onFailure(final Exception e) {
indexScopedSettings,
threadPool,
namedXContentRegistry,
new SystemIndices(emptyMap()),
systemIndices,
false
);
actions.put(
Expand Down

0 comments on commit 35ae7fe

Please sign in to comment.