Skip to content

Commit

Permalink
[ML] Wait for all shards to be active when creating the ML stats index (
Browse files Browse the repository at this point in the history
elastic#108202)

* Wait for all shards to be active when creating the ML stats index

* Unmute tests

* Wait for the stats index in cleanup

* more waiting for the stats index

* Add adminclient to ensureHealth

Co-authored-by: Pat Whelan <pat.whelan@elastic.co>

* fix errors causing build failures

---------

Co-authored-by: Max Hniebergall <137079448+maxhniebergall@users.noreply.github.com>
Co-authored-by: Pat Whelan <pat.whelan@elastic.co>
Co-authored-by: Max Hniebergall <max.hniebergall@elastic.co>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
5 people committed Oct 30, 2024
1 parent 9f8cf46 commit 3eabf1a
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,7 @@ public static void ensureHealth(RestClient restClient, Consumer<Request> request
ensureHealth(restClient, "", requestConsumer);
}

protected static void ensureHealth(RestClient restClient, String index, Consumer<Request> requestConsumer) throws IOException {
public static void ensureHealth(RestClient restClient, String index, Consumer<Request> requestConsumer) throws IOException {
Request request = new Request("GET", "/_cluster/health" + (index.isBlank() ? "" : "/" + index));
requestConsumer.accept(request);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.core.ml;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -66,6 +67,15 @@ public static void createStatsIndexAndAliasIfNecessary(
TimeValue masterNodeTimeout,
ActionListener<Boolean> listener
) {
MlIndexAndAlias.createIndexAndAliasIfNecessary(client, state, resolver, TEMPLATE_NAME, writeAlias(), masterNodeTimeout, listener);
MlIndexAndAlias.createIndexAndAliasIfNecessary(
client,
state,
resolver,
TEMPLATE_NAME,
writeAlias(),
masterNodeTimeout,
ActiveShardCount.ALL,
listener
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -91,6 +92,10 @@ public static void createStateIndexAndAliasIfNecessary(
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
masterNodeTimeout,
// TODO: shard count default preserves the existing behaviour when the
// parameter was added but it may be that ActiveShardCount.ALL is a
// better option
ActiveShardCount.DEFAULT,
finalListener
);
}
Expand Down Expand Up @@ -123,6 +128,10 @@ public static void createStateIndexAndAliasIfNecessaryAndWaitForYellow(
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
masterNodeTimeout,
// TODO: shard count default preserves the existing behaviour when the
// parameter was added but it may be that ActiveShardCount.ALL is a
// better option
ActiveShardCount.DEFAULT,
stateIndexAndAliasCreated
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -105,6 +106,7 @@ public static void createIndexAndAliasIfNecessary(
String indexPatternPrefix,
String alias,
TimeValue masterNodeTimeout,
ActiveShardCount waitForShardCount,
ActionListener<Boolean> finalListener
) {

Expand Down Expand Up @@ -133,7 +135,7 @@ public static void createIndexAndAliasIfNecessary(

if (concreteIndexNames.length == 0) {
if (indexPointedByCurrentWriteAlias.isEmpty()) {
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, indexCreatedListener);
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, waitForShardCount, indexCreatedListener);
return;
}
logger.error(
Expand All @@ -144,7 +146,7 @@ public static void createIndexAndAliasIfNecessary(
);
} else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) {
if (indexPointedByCurrentWriteAlias.isEmpty()) {
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, indexCreatedListener);
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, waitForShardCount, indexCreatedListener);
return;
}
if (indexPointedByCurrentWriteAlias.get().equals(legacyIndexWithoutSuffix)) {
Expand All @@ -153,6 +155,7 @@ public static void createIndexAndAliasIfNecessary(
firstConcreteIndex,
alias,
false,
waitForShardCount,
indexCreatedListener.delegateFailureAndWrap(
(l, unused) -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, l)
)
Expand Down Expand Up @@ -241,13 +244,15 @@ private static void createFirstConcreteIndex(
String index,
String alias,
boolean addAlias,
ActiveShardCount waitForShardCount,
ActionListener<Boolean> listener
) {
logger.info("About to create first concrete index [{}] with alias [{}]", index, alias);
CreateIndexRequestBuilder requestBuilder = client.admin().indices().prepareCreate(index);
if (addAlias) {
requestBuilder.addAlias(new Alias(alias).isHidden(true));
}
requestBuilder.setWaitForActiveShards(waitForShardCount);
CreateIndexRequest request = requestBuilder.request();

executeAsyncWithOrigin(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public MlRestTestStateCleaner(Logger logger, RestClient adminClient) {
}

public void resetFeatures() throws IOException {
waitForMlStatsIndexToInitialize();
deleteAllTrainedModelIngestPipelines();
// This resets all features, not just ML, but they should have been getting reset between tests anyway so it shouldn't matter
adminClient.performRequest(new Request("POST", "/_features/_reset"));
Expand All @@ -54,4 +55,12 @@ private void deleteAllTrainedModelIngestPipelines() throws IOException {
}
}
}

private void waitForMlStatsIndexToInitialize() throws IOException {
ESRestTestCase.ensureHealth(adminClient, ".ml-stats-*", (request) -> {
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("level", "shards");
request.addParameter("timeout", "30s");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.AdminClient;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ClusterAdminClient;
Expand Down Expand Up @@ -370,7 +371,8 @@ private void createIndexAndAliasIfNecessary(ClusterState clusterState) {
TestIndexNameExpressionResolver.newInstance(),
TEST_INDEX_PREFIX,
TEST_INDEX_ALIAS,
TEST_REQUEST_TIMEOUT,
TimeValue.timeValueSeconds(30),
ActiveShardCount.DEFAULT,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ private void putModelAlias(String modelAlias, String newModel) throws IOExceptio
}

@SuppressWarnings("unchecked")
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/107777")
public void testCreateAndDeletePipelineWithInferenceProcessor() throws Exception {
putRegressionModel(MODEL_ID);
String pipelineId = "regression-model-pipeline";
createdPipelines.add(pipelineId);
putPipeline(MODEL_ID, pipelineId);

waitForStats();
Map<String, Object> statsAsMap = getStats(MODEL_ID);
List<Integer> pipelineCount = (List<Integer>) XContentMapValues.extractValue("trained_model_stats.pipeline_count", statsAsMap);
assertThat(pipelineCount.get(0), equalTo(1));
Expand Down Expand Up @@ -107,6 +107,7 @@ public void testCreateAndDeletePipelineWithInferenceProcessorByName() throws Exc
createdPipelines.add("second_pipeline");
putPipeline("regression_second", "second_pipeline");

waitForStats();
Map<String, Object> statsAsMap = getStats(MODEL_ID);
List<Integer> pipelineCount = (List<Integer>) XContentMapValues.extractValue("trained_model_stats.pipeline_count", statsAsMap);
assertThat(pipelineCount.get(0), equalTo(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,6 @@ setup:

---
"Test delete given model referenced by pipeline":
- skip:
awaits_fix: "https://github.com/elastic/elasticsearch/issues/80703"

- do:
ingest.put_pipeline:
id: "pipeline-using-a-classification-model"
Expand All @@ -592,9 +589,6 @@ setup:

---
"Test force delete given model referenced by pipeline":
- skip:
awaits_fix: "https://github.com/elastic/elasticsearch/issues/80703"

- do:
ingest.put_pipeline:
id: "pipeline-using-a-classification-model"
Expand Down Expand Up @@ -622,9 +616,6 @@ setup:

---
"Test delete given model with alias referenced by pipeline":
- skip:
awaits_fix: "https://github.com/elastic/elasticsearch/issues/80703"

- do:
ml.put_trained_model_alias:
model_alias: "alias-to-a-classification-model"
Expand Down Expand Up @@ -655,8 +646,6 @@ setup:

---
"Test force delete given model with alias referenced by pipeline":
- skip:
awaits_fix: "https://github.com/elastic/elasticsearch/issues/106652"
- do:
ml.put_trained_model_alias:
model_alias: "alias-to-a-classification-model"
Expand Down

0 comments on commit 3eabf1a

Please sign in to comment.