diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 85751a320c585..d7abe9a1f0f03 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -105,6 +105,7 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -289,6 +290,7 @@ public List> getClientActions() { PostCalendarEventsAction.INSTANCE, PersistJobAction.INSTANCE, FindFileStructureAction.INSTANCE, + MlUpgradeAction.INSTANCE, // security ClearRealmCacheAction.INSTANCE, ClearRolesCacheAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeAction.java new file mode 100644 index 0000000000000..404f15d4f6270 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeAction.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + + +public class MlUpgradeAction extends Action { + public static final MlUpgradeAction INSTANCE = new MlUpgradeAction(); + public static final String NAME = "cluster:admin/xpack/ml/upgrade"; + + private MlUpgradeAction() { + super(NAME); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends MasterNodeReadRequest implements ToXContentObject { + + private static final ParseField REINDEX_BATCH_SIZE = new ParseField("reindex_batch_size"); + + public static ObjectParser PARSER = new ObjectParser<>("ml_upgrade", true, Request::new); + static { + PARSER.declareInt(Request::setReindexBatchSize, REINDEX_BATCH_SIZE); + } + + static final String INDEX = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "*"; + private int reindexBatchSize = 1000; + + /** + * Should this task store its result? + */ + private boolean shouldStoreResult; + + // for serialization + public Request() { + } + + public Request(StreamInput in) throws IOException { + super(in); + reindexBatchSize = in.readInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeInt(reindexBatchSize); + } + + public String[] indices() { + return new String[]{INDEX}; + } + + public IndicesOptions indicesOptions() { + return IndicesOptions.strictExpandOpenAndForbidClosed(); + } + + /** + * Should this task store its result after it has finished? + */ + public Request setShouldStoreResult(boolean shouldStoreResult) { + this.shouldStoreResult = shouldStoreResult; + return this; + } + + @Override + public boolean getShouldStoreResult() { + return shouldStoreResult; + } + + public Request setReindexBatchSize(int reindexBatchSize) { + this.reindexBatchSize = reindexBatchSize; + return this; + } + + public int getReindexBatchSize() { + return reindexBatchSize; + } + + @Override + public ActionRequestValidationException validate() { + if (reindexBatchSize <= 0) { + ActionRequestValidationException validationException = new ActionRequestValidationException(); + validationException.addValidationError("["+ REINDEX_BATCH_SIZE.getPreferredName()+"] must be greater than 0."); + return validationException; + } + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Request request = (Request) o; + return Objects.equals(reindexBatchSize, request.reindexBatchSize); + } + + @Override + public int hashCode() { + return Objects.hash(reindexBatchSize); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "ml-upgrade", parentTaskId, headers) { + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + }; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(REINDEX_BATCH_SIZE.getPreferredName(), reindexBatchSize); + builder.endObject(); + return builder; + } + } + + public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder { + + public RequestBuilder(ElasticsearchClient client) { + super(client, INSTANCE, new Request()); + } + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeRequestTests.java new file mode 100644 index 0000000000000..227fc20ec9688 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeRequestTests.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + + +public class MlUpgradeRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected MlUpgradeAction.Request createTestInstance() { + MlUpgradeAction.Request request = new MlUpgradeAction.Request(); + if (randomBoolean()) { + request.setReindexBatchSize(randomIntBetween(1, 10_000)); + } + return request; + } + + @Override + protected Writeable.Reader instanceReader() { + return MlUpgradeAction.Request::new; + } + +} diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index abfed3fd878d0..6e0127f614c9a 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -93,7 +93,9 @@ integTestRunner { 'ml/validate/Test job config that is invalid only because of the job ID', 'ml/validate_detector/Test invalid detector', 'ml/delete_forecast/Test delete on _all forecasts not allow no forecasts', - 'ml/delete_forecast/Test delete forecast on missing forecast' + 'ml/delete_forecast/Test delete forecast on missing forecast', + 'ml/ml_upgrade/Upgrade results when there is nothing to upgrade', + 'ml/ml_upgrade/Upgrade results when there is nothing to upgrade not waiting for results' ].join(',') } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index c06810bbf2a0e..cd33e1d80769e 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.plugins.Plugin; @@ -120,7 +121,7 @@ protected Collection> nodePlugins() { @Override protected Collection> transportClientPlugins() { - return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class); + return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class); } @Override diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlUpgradeIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlUpgradeIT.java new file mode 100644 index 0000000000000..a2a05ea1686fa --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlUpgradeIT.java @@ -0,0 +1,378 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; +import org.elasticsearch.xpack.ml.ResultsIndexUpgradeService; +import org.junit.After; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.core.Is.is; + +public class MlUpgradeIT extends MlNativeAutodetectIntegTestCase { + + @After + public void cleanup() throws Exception { + cleanUp(); + } + + public void testMigrationWhenItIsNotNecessary() throws Exception { + String jobId1 = "no-migration-test1"; + String jobId2 = "no-migration-test2"; + String jobId3 = "no-migration-test3"; + + String dataIndex = createDataIndex().v2(); + List jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex); + Job job1 = jobs.get(0); + Job job2 = jobs.get(1); + Job job3 = jobs.get(2); + + String job1Index = job1.getResultsIndexName(); + String job2Index = job2.getResultsIndexName(); + String job3Index = job3.getResultsIndexName(); + + assertThat(indexExists(job1Index), is(true)); + assertThat(indexExists(job2Index), is(true)); + assertThat(indexExists(job3Index), is(true)); + + long job1Total = getTotalDocCount(job1Index); + long job2Total = getTotalDocCount(job2Index); + long job3Total = getTotalDocCount(job3Index); + + AcknowledgedResponse resp = ESIntegTestCase.client().execute(MlUpgradeAction.INSTANCE, + new MlUpgradeAction.Request()).actionGet(); + assertThat(resp.isAcknowledged(), is(true)); + + // Migration should have done nothing + assertThat(indexExists(job1Index), is(true)); + assertThat(indexExists(job2Index), is(true)); + assertThat(indexExists(job3Index), is(true)); + + assertThat(getTotalDocCount(job1Index), equalTo(job1Total)); + assertThat(getTotalDocCount(job2Index), equalTo(job2Total)); + assertThat(getTotalDocCount(job3Index), equalTo(job3Total)); + + ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState(); + IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); + String[] indices = indexNameExpressionResolver.concreteIndexNames(state, + IndicesOptions.strictExpandOpenAndForbidClosed(), + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"); + + // Our backing index size should be two as we have a shared and custom index + assertThat(indices.length, equalTo(2)); + } + + public void testMigration() throws Exception { + String jobId1 = "migration-test1"; + String jobId2 = "migration-test2"; + String jobId3 = "migration-test3"; + + String dataIndex = createDataIndex().v2(); + List jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex); + Job job1 = jobs.get(0); + Job job2 = jobs.get(1); + Job job3 = jobs.get(2); + + String job1Index = job1.getResultsIndexName(); + String job2Index = job2.getResultsIndexName(); + String job3Index = job3.getResultsIndexName(); + + assertThat(indexExists(job1Index), is(true)); + assertThat(indexExists(job2Index), is(true)); + assertThat(indexExists(job3Index), is(true)); + + long job1Total = getJobResultsCount(job1.getId()); + long job2Total = getJobResultsCount(job2.getId()); + long job3Total = getJobResultsCount(job3.getId()); + + IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); + + ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver, + ThreadPool.Names.SAME, + indexMetaData -> true); + + PlainActionFuture future = PlainActionFuture.newFuture(); + + resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(), + new MlUpgradeAction.Request(), + ESIntegTestCase.client().admin().cluster().prepareState().get().getState(), + future); + + AcknowledgedResponse response = future.get(); + assertThat(response.isAcknowledged(), is(true)); + + assertThat(indexExists(job1Index), is(false)); + assertThat(indexExists(job2Index), is(false)); + assertThat(indexExists(job3Index), is(false)); + + ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState(); + String[] indices = indexNameExpressionResolver.concreteIndexNames(state, + IndicesOptions.strictExpandOpenAndForbidClosed(), + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"); + + // Our backing index size should be four as we have a shared and custom index and upgrading doubles the number of indices + Assert.assertThat(indices.length, equalTo(4)); + + refresh(indices); + assertThat(getJobResultsCount(job1.getId()), equalTo(job1Total)); + assertThat(getJobResultsCount(job2.getId()), equalTo(job2Total)); + assertThat(getJobResultsCount(job3.getId()), equalTo(job3Total)); + + + // WE should still be able to write, and the aliases should allow to read from the appropriate indices + postDataToJob(jobId1); + postDataToJob(jobId2); + postDataToJob(jobId3); + // We should also be able to create new jobs and old jobs should be unaffected. + String jobId4 = "migration-test4"; + Job job4 = createAndOpenJobAndStartDataFeedWithData(jobId4, dataIndex, false); + waitUntilJobIsClosed(jobId4); + + indices = indexNameExpressionResolver.concreteIndexNames(state, + IndicesOptions.strictExpandOpenAndForbidClosed(), + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"); + refresh(indices); + + long newJob1Total = getJobResultsCount(job1.getId()); + assertThat(newJob1Total, greaterThan(job1Total)); + + long newJob2Total = getJobResultsCount(job2.getId()); + assertThat(newJob2Total, greaterThan(job2Total)); + + long newJob3Total = getJobResultsCount(job3.getId()); + assertThat(newJob3Total, greaterThan(job3Total)); + + assertThat(getJobResultsCount(jobId4), greaterThan(0L)); + assertThat(getJobResultsCount(jobId1), equalTo(newJob1Total)); + assertThat(getJobResultsCount(jobId2), equalTo(newJob2Total)); + assertThat(getJobResultsCount(jobId3), equalTo(newJob3Total)); + } + + //I think this test name could be a little bit longer.... + public void testMigrationWithManuallyCreatedIndexThatNeedsMigrating() throws Exception { + String jobId1 = "migration-failure-test1"; + String jobId2 = "migration-failure-test2"; + String jobId3 = "migration-failure-test3"; + + String dataIndex = createDataIndex().v2(); + List jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex); + Job job1 = jobs.get(0); + Job job2 = jobs.get(1); + Job job3 = jobs.get(2); + + String job1Index = job1.getResultsIndexName(); + String job2Index = job2.getResultsIndexName(); + String job3Index = job3.getResultsIndexName(); + + // This index name should match one of the automatically created migration indices + String manuallyCreatedIndex = job1Index + "-" + Version.CURRENT.major; + client().admin().indices().prepareCreate(manuallyCreatedIndex).execute().actionGet(); + + IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); + + ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver, + ThreadPool.Names.SAME, + indexMetaData -> true); //indicates that this manually created index needs migrated + + resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(), + new MlUpgradeAction.Request(), + ESIntegTestCase.client().admin().cluster().prepareState().get().getState(), + ActionListener.wrap( + resp -> fail(), + exception -> { + assertThat(exception, instanceOf(IllegalStateException.class)); + assertThat(exception.getMessage(), + equalTo("Index [" + manuallyCreatedIndex + "] already exists and is not the current version.")); + } + )); + } + + public void testMigrationWithExistingIndexWithData() throws Exception { + String jobId1 = "partial-migration-test1"; + String jobId2 = "partial-migration-test2"; + String jobId3 = "partial-migration-test3"; + + String dataIndex = createDataIndex().v2(); + List jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex); + Job job1 = jobs.get(0); + Job job2 = jobs.get(1); + Job job3 = jobs.get(2); + + String job1Index = job1.getResultsIndexName(); + String job2Index = job2.getResultsIndexName(); + String job3Index = job3.getResultsIndexName(); + + assertThat(indexExists(job1Index), is(true)); + assertThat(indexExists(job2Index), is(true)); + assertThat(indexExists(job3Index), is(true)); + + long job1Total = getJobResultsCount(job1.getId()); + long job2Total = getJobResultsCount(job2.getId()); + long job3Total = getJobResultsCount(job3.getId()); + + //lets manually create a READ index with reindexed data already + // Should still get aliased appropriately without any additional/duplicate data. + String alreadyMigratedIndex = job1Index + "-" + Version.CURRENT.major + "r"; + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(job1Index); + reindexRequest.setDestIndex(alreadyMigratedIndex); + client().execute(ReindexAction.INSTANCE, reindexRequest).actionGet(); + + //New write index as well, should still get aliased appropriately + String alreadyMigratedWriteIndex = job1Index + "-" + Version.CURRENT.major; + client().admin().indices().prepareCreate(alreadyMigratedWriteIndex).execute().actionGet(); + + IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); + + ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver, + ThreadPool.Names.SAME, + //indicates that this manually created index is already migrated and should not be included in our migration steps + indexMetaData -> !(indexMetaData.getIndex().getName().equals(alreadyMigratedIndex) || + indexMetaData.getIndex().getName().equals(alreadyMigratedWriteIndex))); + + PlainActionFuture future = PlainActionFuture.newFuture(); + + resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(), + new MlUpgradeAction.Request(), + ESIntegTestCase.client().admin().cluster().prepareState().get().getState(), + future); + + AcknowledgedResponse response = future.get(); + assertThat(response.isAcknowledged(), is(true)); + + assertThat(indexExists(job1Index), is(false)); + assertThat(indexExists(job2Index), is(false)); + assertThat(indexExists(job3Index), is(false)); + + ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState(); + String[] indices = indexNameExpressionResolver.concreteIndexNames(state, + IndicesOptions.strictExpandOpenAndForbidClosed(), + AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"); + + // Our backing index size should be four as we have a shared and custom index and upgrading doubles the number of indices + Assert.assertThat(indices.length, equalTo(4)); + refresh(indices); + + assertThat(getJobResultsCount(job1.getId()), equalTo(job1Total)); + assertThat(getJobResultsCount(job2.getId()), equalTo(job2Total)); + assertThat(getJobResultsCount(job3.getId()), equalTo(job3Total)); + + // WE should still be able to write, and the aliases should allow to read from the appropriate indices + postDataToJob(jobId1); + postDataToJob(jobId2); + postDataToJob(jobId3); + + refresh(indices); + + long newJob1Total = getJobResultsCount(job1.getId()); + assertThat(newJob1Total, greaterThan(job1Total)); + + long newJob2Total = getJobResultsCount(job2.getId()); + assertThat(newJob2Total, greaterThan(job2Total)); + + long newJob3Total = getJobResultsCount(job3.getId()); + assertThat(newJob3Total, greaterThan(job3Total)); + } + + private long getTotalDocCount(String indexName) { + SearchResponse searchResponse = ESIntegTestCase.client().prepareSearch(indexName) + .setSize(10_000) + .setTrackTotalHits(true) + .setQuery(QueryBuilders.matchAllQuery()) + .execute().actionGet(); + return searchResponse.getHits().getTotalHits().value; + } + + private long getJobResultsCount(String jobId) { + String index = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + jobId; + return getTotalDocCount(index); + } + + private void postDataToJob(String jobId) throws Exception { + openJob(jobId); + ESTestCase.assertBusy(() -> Assert.assertEquals(getJobStats(jobId).get(0).getState(), JobState.OPENED)); + startDatafeed(jobId + "-datafeed", 0L, System.currentTimeMillis()); + waitUntilJobIsClosed(jobId); + } + + private Job createAndOpenJobAndStartDataFeedWithData(String jobId, String dataIndex, boolean isCustom) throws Exception { + Job.Builder jobbuilder = createScheduledJob(jobId); + if (isCustom) { + jobbuilder.setResultsIndexName(jobId); + } + registerJob(jobbuilder); + + Job job = putJob(jobbuilder).getResponse(); + + openJob(job.getId()); + ESTestCase.assertBusy(() -> Assert.assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); + + DatafeedConfig.Builder builder = createDatafeedBuilder(job.getId() + "-datafeed", + job.getId(), + Collections.singletonList(dataIndex)); + builder.setQueryDelay(TimeValue.timeValueSeconds(5)); + builder.setFrequency(TimeValue.timeValueSeconds(5)); + DatafeedConfig datafeedConfig = builder.build(); + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, System.currentTimeMillis()); + waitUntilJobIsClosed(jobId); + return job; + } + + private Tuple createDataIndex() { + ESIntegTestCase.client().admin().indices().prepareCreate("data-for-migration-1") + .addMapping("type", "time", "type=date") + .get(); + long numDocs = ESTestCase.randomIntBetween(32, 512); + long now = System.currentTimeMillis(); + long oneWeekAgo = now - 604800000; + long twoWeeksAgo = oneWeekAgo - 604800000; + indexDocs(logger, "data-for-migration-1", numDocs, twoWeeksAgo, oneWeekAgo); + return new Tuple<>(numDocs, "data-for-migration-1"); + } + + private List createJobsWithData(String sharedJobId1, String sharedJobId2, String customJobId, String dataIndex) throws Exception { + + Job job1 = createAndOpenJobAndStartDataFeedWithData(sharedJobId1, dataIndex, false); + Job job2 = createAndOpenJobAndStartDataFeedWithData(sharedJobId2, dataIndex, false); + Job job3 = createAndOpenJobAndStartDataFeedWithData(customJobId, dataIndex, true); + + return Arrays.asList(job1, job2, job3); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 7060e87fac0bb..418add2757fbc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -96,6 +96,7 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -150,6 +151,7 @@ import org.elasticsearch.xpack.ml.action.TransportPutDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportPutFilterAction; import org.elasticsearch.xpack.ml.action.TransportPutJobAction; +import org.elasticsearch.xpack.ml.action.TransportMlUpgradeAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction; @@ -229,6 +231,7 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetInfluencersAction; import org.elasticsearch.xpack.ml.rest.results.RestGetOverallBucketsAction; import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction; +import org.elasticsearch.xpack.ml.rest.results.RestUpgradeMlAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; @@ -541,7 +544,8 @@ public List getRestHandlers(Settings settings, RestController restC new RestPutCalendarJobAction(settings, restController), new RestGetCalendarEventsAction(settings, restController), new RestPostCalendarEventAction(settings, restController), - new RestFindFileStructureAction(settings, restController) + new RestFindFileStructureAction(settings, restController), + new RestUpgradeMlAction(settings, restController) ); } @@ -599,7 +603,8 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class), new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class), new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class), - new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class) + new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class), + new ActionHandler<>(MlUpgradeAction.INSTANCE, TransportMlUpgradeAction.class) ); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 184ee44cf376c..bb3735f8aa3f1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -44,7 +44,7 @@ import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; -import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor; +import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor; import java.io.IOException; import java.util.ArrayList; @@ -179,9 +179,9 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } private void migrateBatches(List batches, ActionListener listener) { - ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(EsExecutors.newDirectExecutorService(), true); + VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(EsExecutors.newDirectExecutorService(), true); for (JobsAndDatafeeds batch : batches) { - chainTaskExecutor.add(chainedListener -> writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap( + voidChainTaskExecutor.add(chainedListener -> writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap( failedDocumentIds -> { List successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs); List successfulDatafeedWrites = @@ -191,7 +191,7 @@ private void migrateBatches(List batches, ActionListener listener.onResponse(true), listener::onFailure)); + voidChainTaskExecutor.execute(ActionListener.wrap(aVoids -> listener.onResponse(true), listener::onFailure)); } // Exposed for testing diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/ResultsIndexUpgradeService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/ResultsIndexUpgradeService.java new file mode 100644 index 0000000000000..ccbaed13feca0 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/ResultsIndexUpgradeService.java @@ -0,0 +1,513 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.ScrollableHitSource; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + +/** + * ML Job results index upgrade service + */ +public class ResultsIndexUpgradeService { + + private static final Logger logger = LogManager.getLogger(ResultsIndexUpgradeService.class); + + // Adjust the following constants as necessary for various versions and backports. + private static final int INDEX_VERSION = Version.CURRENT.major; + private static final Version MIN_REQUIRED_VERSION = Version.CURRENT.minimumCompatibilityVersion(); + + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final Predicate shouldUpgrade; + private final String executor; + + /** + * Construct a new upgrade service + * + * @param indexNameExpressionResolver Index expression resolver for the request + * @param executor Where to execute client calls + * @param shouldUpgrade Given IndexMetadata indicate if it should be upgraded or not + * {@code true} indicates that it SHOULD upgrade + */ + public ResultsIndexUpgradeService(IndexNameExpressionResolver indexNameExpressionResolver, + String executor, + Predicate shouldUpgrade) { + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.shouldUpgrade = shouldUpgrade; + this.executor = executor; + } + + public static boolean wasIndexCreatedInCurrentMajorVersion(IndexMetaData indexMetaData) { + return indexMetaData.getCreationVersion().major == INDEX_VERSION; + } + + /** + * There are two reasons for these indices to exist: + * 1. The upgrade process has ran before and either failed for some reason, or the end user is simply running it again. + * Either way, it should be ok to proceed as this action SHOULD be idempotent, + * unless the shouldUpgrade predicate is poorly formed + * 2. This index was created manually by the user. If the index was created manually and actually needs upgrading, then + * we consider the "new index" to be invalid as the passed predicate indicates that it still needs upgrading. + * + * @param metaData Cluster metadata + * @param newIndexName The index to check + * @param shouldUpgrade Should be index be upgraded + * @return {@code true} if the "new index" is valid + */ + private static boolean validNewIndex(MetaData metaData, String newIndexName, Predicate shouldUpgrade) { + return (metaData.hasIndex(newIndexName) && shouldUpgrade.test(metaData.index(newIndexName))) == false; + } + + private static void validateMinNodeVersion(ClusterState clusterState) { + if (clusterState.nodes().getMinNodeVersion().before(MIN_REQUIRED_VERSION)) { + throw new IllegalStateException("All nodes should have at least version [" + MIN_REQUIRED_VERSION + "] to upgrade"); + } + } + + // This method copies the behavior of the normal {index}/_upgrade rest response handler + private static Tuple getStatusAndCause(BulkByScrollResponse response) { + /* + * Return the highest numbered rest status under the assumption that higher numbered statuses are "more error" + * and thus more interesting to the user. + */ + RestStatus status = RestStatus.OK; + Throwable cause = null; + if (response.isTimedOut()) { + status = RestStatus.REQUEST_TIMEOUT; + cause = new ElasticsearchTimeoutException("Reindex request timed out"); + } + for (BulkItemResponse.Failure failure : response.getBulkFailures()) { + if (failure.getStatus().getStatus() > status.getStatus()) { + status = failure.getStatus(); + cause = failure.getCause(); + } + } + for (ScrollableHitSource.SearchFailure failure : response.getSearchFailures()) { + RestStatus failureStatus = ExceptionsHelper.status(failure.getReason()); + if (failureStatus.getStatus() > status.getStatus()) { + status = failureStatus; + cause = failure.getReason(); + } + } + return new Tuple<>(status, cause); + } + + /** + * Upgrade the indices given in the request. + * + * @param client The client to use when making calls + * @param request The upgrade request + * @param state The current cluster state + * @param listener The listener to alert when actions have completed + */ + public void upgrade(Client client, MlUpgradeAction.Request request, ClusterState state, + ActionListener listener) { + try { + validateMinNodeVersion(state); + String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request.indicesOptions(), request.indices()); + MetaData metaData = state.getMetaData(); + + List indicesToUpgrade = Arrays.stream(concreteIndices) + .filter(indexName -> shouldUpgrade.test(metaData.index(indexName))) + .collect(Collectors.toList()); + + // All the internal indices are up to date + if (indicesToUpgrade.isEmpty()) { + listener.onResponse(new AcknowledgedResponse(true)); + return; + } + + IndexNameAndAliasProvider indexNameAndAliasProvider = new IndexNameAndAliasProvider(indicesToUpgrade, metaData); + Exception validationException = indexNameAndAliasProvider.validate(metaData, shouldUpgrade); + if (validationException != null) { + listener.onFailure(validationException); + return; + } + + // <7> Now that we have deleted the old indices, we are complete, alert the user + ActionListener deleteIndicesListener = ActionListener.wrap( + listener::onResponse, + error -> { + String msg = "Failed to delete old indices: " + Strings.collectionToCommaDelimitedString(indicesToUpgrade); + logger.error(msg, error); + listener.onFailure(new ElasticsearchException(msg, error)); + } + ); + + // <6> Now that aliases are moved, need to delete the old indices + ActionListener readAliasListener = ActionListener.wrap( + resp -> deleteOldIndices(client, indicesToUpgrade, deleteIndicesListener), + error -> { + String msg = "Failed adjusting aliases from old indices to new."; + logger.error(msg, error); + listener.onFailure(new ElasticsearchException(msg, error)); + } + ); + + // <5> Documents are now reindexed, time to move read aliases + ActionListener reindexListener = ActionListener.wrap( + resp -> + // Need to make indices writable again so that the aliases can be removed from them + removeReadOnlyBlock(client, indicesToUpgrade, + ActionListener.wrap( + rrob -> adjustAliases(client, + indexNameAndAliasProvider.oldIndicesWithReadAliases(), + indexNameAndAliasProvider.newReadIndicesWithReadAliases(), + readAliasListener), + rrobFailure -> { + String msg = "Failed making old indices writable again so that aliases can be moved."; + logger.error(msg, rrobFailure); + listener.onFailure(new ElasticsearchException(msg, rrobFailure)); + }) + ), + error -> { + logger.error("Failed to reindex old read-only indices", error); + removeReadOnlyBlock(client, indicesToUpgrade, ActionListener.wrap( + empty -> listener.onFailure(error), + removeReadOnlyBlockError -> { + String msg = "Failed making old indices read/write again after failing to reindex: " + error.getMessage(); + logger.error(msg, removeReadOnlyBlockError); + listener.onFailure(new ElasticsearchException(msg, removeReadOnlyBlockError)); + } + )); + } + ); + + // <4> Old indexes are now readOnly, Time to reindex + ActionListener readOnlyListener = ActionListener.wrap( + ack -> reindexOldReadIndicesToNewIndices(client, indexNameAndAliasProvider.needsReindex(), request, reindexListener), + listener::onFailure + ); + + // <3> Set old indices to readOnly + ActionListener writeAliasesMovedListener = ActionListener.wrap( + resp -> setReadOnlyBlock(client, indicesToUpgrade, readOnlyListener), + listener::onFailure + ); + + // <2> Move write index alias to new write indices + ActionListener createWriteIndicesAndSetReadAliasListener = ActionListener.wrap( + resp -> adjustAliases(client, + indexNameAndAliasProvider.oldIndicesWithWriteAliases(), + indexNameAndAliasProvider.newWriteIndicesWithWriteAliases(), + writeAliasesMovedListener), + listener::onFailure + ); + + // <1> Create the new write indices and set the read aliases to include them + createNewWriteIndicesIfNecessary(client, metaData, indexNameAndAliasProvider.newWriteIndices(), + ActionListener.wrap( + indicesCreated -> adjustAliases(client, + Collections.emptyMap(), + indexNameAndAliasProvider.newWriteIndicesWithReadAliases(), + createWriteIndicesAndSetReadAliasListener), + listener::onFailure + )); + + } catch (Exception e) { + listener.onFailure(e); + } + + } + + private void createNewWriteIndicesIfNecessary(Client client, + MetaData metaData, + Collection newWriteIndices, + ActionListener createIndexListener) { + TypedChainTaskExecutor chainTaskExecutor = + new TypedChainTaskExecutor<>( + client.threadPool().executor(executor), + (createIndexResponse -> true), //We always want to complete all our tasks + (exception -> + // Short circuit execution IF the exception is NOT a ResourceAlreadyExistsException + // This should be rare, as it requires the index to be created between our previous check and this exception + exception instanceof ResourceAlreadyExistsException == false + )); + newWriteIndices.forEach((index) -> { + // If the index already exists, don't try and created it + // We have already verified that IF this index exists, that it does not require upgrading + // So, if it was created between that check and this one, we can assume it is the correct version as it was JUST created + if (metaData.hasIndex(index) == false) { + CreateIndexRequest request = new CreateIndexRequest(index); + chainTaskExecutor.add(listener -> + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + request, + listener, + client.admin().indices()::create)); + } + }); + + chainTaskExecutor.execute(ActionListener.wrap( + createIndexResponses -> createIndexListener.onResponse(true), + createIndexListener::onFailure + )); + } + + /** + * Makes the indices readonly if it's not set as a readonly yet + */ + private void setReadOnlyBlock(Client client, List indices, ActionListener listener) { + Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build(); + UpdateSettingsRequest request = new UpdateSettingsRequest(indices.toArray(new String[0])); + request.settings(settings); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + request, + listener, + client.admin().indices()::updateSettings); + } + + private void removeReadOnlyBlock(Client client, List indices, + ActionListener listener) { + Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build(); + UpdateSettingsRequest request = new UpdateSettingsRequest(indices.toArray(new String[0])); + request.settings(settings); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + request, + listener, + client.admin().indices()::updateSettings); + } + + private void reindexOldReadIndicesToNewIndices(Client client, + Map reindexIndices, + MlUpgradeAction.Request request, + ActionListener listener) { + TypedChainTaskExecutor chainTaskExecutor = + new TypedChainTaskExecutor<>( + client.threadPool().executor(executor), + (createIndexResponse) -> { // If there are errors in the reindex, we should stop + Tuple status = getStatusAndCause(createIndexResponse); + return status.v1().equals(RestStatus.OK); + }, + (exception -> true)); // Short circuit and call onFailure for any exception + + List newIndices = new ArrayList<>(reindexIndices.size()); + reindexIndices.forEach((oldIndex, newIndex) -> { + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceBatchSize(request.getReindexBatchSize()); + reindexRequest.setSourceIndices(oldIndex); + reindexRequest.setDestIndex(newIndex); + reindexRequest.setSourceDocTypes(ElasticsearchMappings.DOC_TYPE); + reindexRequest.setDestDocType(ElasticsearchMappings.DOC_TYPE); + // Don't worry if these indices already exist, we validated settings.index.created.version earlier + reindexRequest.setAbortOnVersionConflict(false); + // If the document exists already in the new index, don't want to update or overwrite as we are pulling from "old data" + reindexRequest.setDestOpType(DocWriteRequest.OpType.CREATE.getLowercase()); + newIndices.add(newIndex); + chainTaskExecutor.add(chainedListener -> + executeAsyncWithOrigin(client, + ML_ORIGIN, + ReindexAction.INSTANCE, + reindexRequest, + chainedListener)); + }); + + chainTaskExecutor.execute(ActionListener.wrap( + bulkScrollingResponses -> { + BulkByScrollResponse response = bulkScrollingResponses.get(bulkScrollingResponses.size() - 1); + Tuple status = getStatusAndCause(response); + if (status.v1().equals(RestStatus.OK)) { + listener.onResponse(true); + } else { + logger.error("Failed to reindex old results indices.", status.v2()); + listener.onFailure(new ElasticsearchException("Failed to reindex old results indices.",status.v2())); + } + }, + failure -> { + List createdIndices = newIndices.subList(0, chainTaskExecutor.getCollectedResponses().size()); + logger.error( + "Failed to reindex all old read indices. Successfully reindexed: [" + + Strings.collectionToCommaDelimitedString(createdIndices) + "]", + failure); + listener.onFailure(failure); + } + )); + + } + + private void deleteOldIndices(Client client, + List oldIndices, + ActionListener deleteIndicesListener) { + DeleteIndexRequest request = new DeleteIndexRequest(oldIndices.toArray(new String[0])); + request.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + request, + deleteIndicesListener, + client.admin().indices()::delete); + } + + private void adjustAliases(Client client, + Map> oldAliases, + Map> newAliases, + ActionListener indicesAliasListener) { + IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); + oldAliases.forEach((oldIndex, aliases) -> + { + if (aliases.isEmpty() == false) { //if the aliases are empty, that means there are none to remove + indicesAliasesRequest.addAliasAction(IndicesAliasesRequest + .AliasActions + .remove() + .index(oldIndex) + .aliases(aliases.stream().map(Alias::name).toArray(String[]::new))); + } + } + ); + newAliases.forEach((newIndex, aliases) -> + aliases.forEach(alias -> { + IndicesAliasesRequest.AliasActions action = IndicesAliasesRequest.AliasActions.add().index(newIndex); + if (alias.filter() != null) { + action.filter(alias.filter()); + } + action.alias(alias.name()); + indicesAliasesRequest.addAliasAction(action); + }) + ); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + indicesAliasesRequest, + indicesAliasListener, + client.admin().indices()::aliases); + } + + + private static class IndexNameAndAliasProvider { + + private final List oldIndices; + private final Map> writeAliases = new HashMap<>(); + private final Map> readAliases = new HashMap<>(); + + private IndexNameAndAliasProvider(List oldIndices, MetaData metaData) { + this.oldIndices = oldIndices; + oldIndices.forEach(index -> { + IndexMetaData indexMetaData = metaData.index(index); + List writes = new ArrayList<>(); + List reads = new ArrayList<>(); + indexMetaData.getAliases().forEach(aliasCursor -> { + Alias alias = new Alias(aliasCursor.value.alias()); + if (aliasCursor.value.filteringRequired()) { + alias.filter(aliasCursor.value.getFilter().string()); //Set the read alias jobId filter + } + if (alias.name().contains(".write-")) { + writes.add(alias); + } else { + reads.add(alias); + } + }); + + writeAliases.put(index, writes); + readAliases.put(index, reads); + }); + } + + private Exception validate(MetaData metaData, Predicate shouldUpgrade) { + for (String index : oldIndices) { + String newWriteName = newWriteName(index); + // If the "new" indices exist, either they were created from a previous run of the upgrade process or the end user + if (validNewIndex(metaData, newWriteName, shouldUpgrade) == false) { + return new IllegalStateException("Index [" + newWriteName + "] already exists and is not the current version."); + } + + String newReadName = newReadName(index); + if (validNewIndex(metaData, newReadName, shouldUpgrade) == false) { + return new IllegalStateException("Index [" + newReadName + "] already exists and is not the current version."); + } + } + return null; + } + + private String newReadName(String oldIndexName) { + return oldIndexName + "-" + INDEX_VERSION + "r"; + } + + private String newWriteName(String oldIndexName) { + return oldIndexName + "-" + INDEX_VERSION; + } + + private List newWriteIndices() { + return oldIndices.stream().map(this::newWriteName).collect(Collectors.toList()); + } + + private List readAliases(String oldIndex) { + return readAliases.get(oldIndex); + } + + private List writeAliases(String oldIndex) { + return writeAliases.get(oldIndex); + } + + private Map> newWriteIndicesWithReadAliases() { + return oldIndices.stream().collect(Collectors.toMap(this::newWriteName, this::readAliases)); + } + + private Map> oldIndicesWithWriteAliases() { + return writeAliases; + } + + private Map> newWriteIndicesWithWriteAliases() { + return oldIndices.stream().collect(Collectors.toMap(this::newWriteName, this::writeAliases)); + } + + private Map> oldIndicesWithReadAliases() { + return readAliases; + } + + private Map> newReadIndicesWithReadAliases() { + return oldIndices.stream().collect(Collectors.toMap(this::newReadName, this::readAliases)); + } + + private Map needsReindex() { + return oldIndices.stream().collect(Collectors.toMap(Function.identity(), this::newReadName)); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index d6c03d6c93fbf..9d76844121cbb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -26,7 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor; +import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor; import java.util.Collections; import java.util.Date; @@ -65,7 +65,7 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust String jobIdString = String.join(",", request.getJobIds()); logger.debug("finalizing jobs [{}]", jobIdString); - ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor( + VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.executor( MachineLearning.UTILITY_THREAD_POOL_NAME), true); Map update = Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date()); @@ -77,7 +77,7 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust updateRequest.doc(update); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - chainTaskExecutor.add(chainedListener -> { + voidChainTaskExecutor.add(chainedListener -> { executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap( updateResponse -> chainedListener.onResponse(null), chainedListener::onFailure @@ -85,8 +85,8 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust }); } - chainTaskExecutor.execute(ActionListener.wrap( - aVoid -> { + voidChainTaskExecutor.execute(ActionListener.wrap( + aVoids -> { logger.debug("finalized job [{}]", jobIdString); listener.onResponse(new AcknowledgedResponse(true)); }, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlUpgradeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlUpgradeAction.java new file mode 100644 index 0000000000000..2b676277aa690 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlUpgradeAction.java @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; +import org.elasticsearch.xpack.ml.ResultsIndexUpgradeService; + +import static org.elasticsearch.xpack.ml.ResultsIndexUpgradeService.wasIndexCreatedInCurrentMajorVersion; + +public class TransportMlUpgradeAction + extends TransportMasterNodeReadAction { + + private final Client client; + private final ResultsIndexUpgradeService resultsIndexUpgradeService; + + @Inject + public TransportMlUpgradeAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, Client client, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(MlUpgradeAction.NAME, transportService, clusterService, threadPool, + actionFilters, MlUpgradeAction.Request::new, indexNameExpressionResolver); + this.client = client; + this.resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver, + executor(), + indexMetadata -> wasIndexCreatedInCurrentMajorVersion(indexMetadata) == false); + } + + @Override + protected void masterOperation(Task task, MlUpgradeAction.Request request, ClusterState state, + ActionListener listener) { + TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); + ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, taskId); + try { + resultsIndexUpgradeService.upgrade(parentAwareClient, request, state, listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + protected final void masterOperation(MlUpgradeAction.Request request, ClusterState state, + ActionListener listener) { + throw new UnsupportedOperationException("the task parameter is required"); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected ClusterBlockException checkBlock(MlUpgradeAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 33047c1fca39a..53559aee4701b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -55,7 +55,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; import org.elasticsearch.xpack.ml.notifications.Auditor; -import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor; +import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor; import java.io.IOException; import java.util.ArrayList; @@ -397,16 +397,16 @@ private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, Acti } private void validate(Job job, JobUpdate jobUpdate, ActionListener handler) { - ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(client.threadPool().executor( + VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(client.threadPool().executor( MachineLearning.UTILITY_THREAD_POOL_NAME), true); - validateModelSnapshotIdUpdate(job, jobUpdate.getModelSnapshotId(), chainTaskExecutor); - validateAnalysisLimitsUpdate(job, jobUpdate.getAnalysisLimits(), chainTaskExecutor); - chainTaskExecutor.execute(handler); + validateModelSnapshotIdUpdate(job, jobUpdate.getModelSnapshotId(), voidChainTaskExecutor); + validateAnalysisLimitsUpdate(job, jobUpdate.getAnalysisLimits(), voidChainTaskExecutor); + voidChainTaskExecutor.execute(ActionListener.wrap(aVoids -> handler.onResponse(null), handler::onFailure)); } - private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, ChainTaskExecutor chainTaskExecutor) { + private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, VoidChainTaskExecutor voidChainTaskExecutor) { if (modelSnapshotId != null) { - chainTaskExecutor.add(listener -> { + voidChainTaskExecutor.add(listener -> { jobResultsProvider.getModelSnapshot(job.getId(), modelSnapshotId, newModelSnapshot -> { if (newModelSnapshot == null) { String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, modelSnapshotId, @@ -428,12 +428,12 @@ private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, Chai } } - private void validateAnalysisLimitsUpdate(Job job, AnalysisLimits newLimits, ChainTaskExecutor chainTaskExecutor) { + private void validateAnalysisLimitsUpdate(Job job, AnalysisLimits newLimits, VoidChainTaskExecutor voidChainTaskExecutor) { if (newLimits == null || newLimits.getModelMemoryLimit() == null) { return; } Long newModelMemoryLimit = newLimits.getModelMemoryLimit(); - chainTaskExecutor.add(listener -> { + voidChainTaskExecutor.add(listener -> { if (isJobOpen(clusterService.state(), job.getId())) { listener.onFailure(ExceptionsHelper.badRequestException("Cannot update " + Job.ANALYSIS_LIMITS.getPreferredName() + " while the job is open")); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestUpgradeMlAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestUpgradeMlAction.java new file mode 100644 index 0000000000000..cad82ce325c27 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestUpgradeMlAction.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.rest.results; + +import org.apache.logging.log4j.LogManager; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.tasks.LoggingTaskListener; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; +import org.elasticsearch.xpack.ml.MachineLearning; + +import java.io.IOException; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestUpgradeMlAction extends BaseRestHandler { + + private static final DeprecationLogger deprecationLogger = + new DeprecationLogger(LogManager.getLogger(RestUpgradeMlAction.class)); + + public RestUpgradeMlAction(Settings settings, RestController controller) { + super(settings); + controller.registerWithDeprecatedHandler( + POST, + MachineLearning.BASE_PATH + "_upgrade", + this, + POST, + MachineLearning.PRE_V7_BASE_PATH + "_upgrade", + deprecationLogger); + } + + @Override + public String getName() { + return "xpack_ml_upgrade_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + MlUpgradeAction.Request parsedRequest = new MlUpgradeAction.Request(); + if (restRequest.hasContent()) { + XContentParser parser = restRequest.contentParser(); + parsedRequest = MlUpgradeAction.Request.PARSER.apply(parser, null); + } + final MlUpgradeAction.Request upgradeRequest = parsedRequest; + + if (restRequest.paramAsBoolean("wait_for_completion", false)) { + return channel -> client.execute(MlUpgradeAction.INSTANCE, upgradeRequest, new RestToXContentListener<>(channel)); + } else { + upgradeRequest.setShouldStoreResult(true); + + Task task = client.executeLocally(MlUpgradeAction.INSTANCE, upgradeRequest, LoggingTaskListener.instance()); + // Send task description id instead of waiting for the message + return channel -> { + try (XContentBuilder builder = channel.newBuilder()) { + builder.startObject(); + builder.field("task", client.getLocalNodeId() + ":" + task.getId()); + builder.endObject(); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + } + }; + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/ChainTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/ChainTaskExecutor.java deleted file mode 100644 index 9a0ddb5dd4add..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/ChainTaskExecutor.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.utils; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; - -import java.util.LinkedList; -import java.util.Objects; -import java.util.concurrent.ExecutorService; - -/** - * A utility that allows chained (serial) execution of a number of tasks - * in async manner. - */ -public class ChainTaskExecutor { - - public interface ChainTask { - void run(ActionListener listener); - } - - private final ExecutorService executorService; - private final boolean shortCircuit; - private final LinkedList tasks = new LinkedList<>(); - - public ChainTaskExecutor(ExecutorService executorService, boolean shortCircuit) { - this.executorService = Objects.requireNonNull(executorService); - this.shortCircuit = shortCircuit; - } - - public synchronized void add(ChainTask task) { - tasks.add(task); - } - - public synchronized void execute(ActionListener listener) { - if (tasks.isEmpty()) { - listener.onResponse(null); - return; - } - ChainTask task = tasks.pop(); - executorService.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - if (shortCircuit) { - listener.onFailure(e); - } else { - execute(listener); - } - } - - @Override - protected void doRun() { - task.run(ActionListener.wrap(nullValue -> execute(listener), this::onFailure)); - } - }); - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/TypedChainTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/TypedChainTaskExecutor.java new file mode 100644 index 0000000000000..5af9c53649853 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/TypedChainTaskExecutor.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.function.Predicate; + +/** + * A utility that allows chained (serial) execution of a number of tasks + * in async manner. + */ +public class TypedChainTaskExecutor { + + public interface ChainTask { + void run(ActionListener listener); + } + + private final ExecutorService executorService; + private final LinkedList> tasks = new LinkedList<>(); + private final Predicate failureShortCircuitPredicate; + private final Predicate continuationPredicate; + private final List collectedResponses; + + /** + * Creates a new TypedChainTaskExecutor. + * Each chainedTask is executed in order serially and after each execution the continuationPredicate is tested. + * + * On failures the failureShortCircuitPredicate is tested. + * + * @param executorService The service where to execute the tasks + * @param continuationPredicate The predicate to test on whether to execute the next task or not. + * {@code true} means continue on to the next task. + * Must be able to handle null values. + * @param failureShortCircuitPredicate The predicate on whether to short circuit execution on a give exception. + * {@code true} means that no more tasks should execute and the the listener::onFailure should be + * called. + */ + public TypedChainTaskExecutor(ExecutorService executorService, + Predicate continuationPredicate, + Predicate failureShortCircuitPredicate) { + this.executorService = Objects.requireNonNull(executorService); + this.continuationPredicate = continuationPredicate; + this.failureShortCircuitPredicate = failureShortCircuitPredicate; + this.collectedResponses = new ArrayList<>(); + } + + public synchronized void add(ChainTask task) { + tasks.add(task); + } + + private synchronized void execute(T previousValue, ActionListener> listener) { + collectedResponses.add(previousValue); + if (continuationPredicate.test(previousValue)) { + if (tasks.isEmpty()) { + listener.onResponse(Collections.unmodifiableList(new ArrayList<>(collectedResponses))); + return; + } + ChainTask task = tasks.pop(); + executorService.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + if (failureShortCircuitPredicate.test(e)) { + listener.onFailure(e); + } else { + execute(null, listener); + } + } + + @Override + protected void doRun() { + task.run(ActionListener.wrap(value -> execute(value, listener), this::onFailure)); + } + }); + } else { + listener.onResponse(Collections.unmodifiableList(new ArrayList<>(collectedResponses))); + } + } + + /** + * Execute all the chained tasks serially, notify listener when completed + * + * @param listener The ActionListener to notify when all executions have been completed, + * or when no further tasks should be executed. + * The resulting list COULD contain null values depending on if execution is continued + * on exceptions or not. + */ + public synchronized void execute(ActionListener> listener) { + if (tasks.isEmpty()) { + listener.onResponse(Collections.emptyList()); + return; + } + collectedResponses.clear(); + ChainTask task = tasks.pop(); + executorService.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + if (failureShortCircuitPredicate.test(e)) { + listener.onFailure(e); + } else { + execute(null, listener); + } + } + + @Override + protected void doRun() { + task.run(ActionListener.wrap(value -> execute(value, listener), this::onFailure)); + } + }); + } + + public synchronized List getCollectedResponses() { + return Collections.unmodifiableList(new ArrayList<>(collectedResponses)); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/VoidChainTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/VoidChainTaskExecutor.java new file mode 100644 index 0000000000000..8351c0a81aaf6 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/VoidChainTaskExecutor.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils; + +import java.util.concurrent.ExecutorService; +import java.util.function.Predicate; + +/** + * A utility that allows chained (serial) execution of a number of tasks + * in async manner. + */ +public class VoidChainTaskExecutor extends TypedChainTaskExecutor { + + public VoidChainTaskExecutor(ExecutorService executorService, boolean shortCircuit) { + this(executorService, (a) -> true, (e) -> shortCircuit); + } + + VoidChainTaskExecutor(ExecutorService executorService, + Predicate continuationPredicate, + Predicate failureShortCircuitPredicate) { + super(executorService, continuationPredicate, failureShortCircuitPredicate); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/ChainTaskExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/VoidChainTaskExecutorTests.java similarity index 62% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/ChainTaskExecutorTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/VoidChainTaskExecutorTests.java index 87b83852ff56c..44bf4cf75aa13 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/ChainTaskExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/VoidChainTaskExecutorTests.java @@ -19,7 +19,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; -public class ChainTaskExecutorTests extends ESTestCase { +public class VoidChainTaskExecutorTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); private final CountDownLatch latch = new CountDownLatch(1); @@ -36,18 +36,18 @@ public void tearDown() throws Exception { public void testExecute() throws InterruptedException { final List strings = new ArrayList<>(); - ActionListener finalListener = createBlockingListener(() -> strings.add("last"), e -> fail()); - ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), false); - chainTaskExecutor.add(listener -> { + ActionListener> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail()); + VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.generic(), false); + voidChainTaskExecutor.add(listener -> { strings.add("first"); listener.onResponse(null); }); - chainTaskExecutor.add(listener -> { + voidChainTaskExecutor.add(listener -> { strings.add("second"); listener.onResponse(null); }); - chainTaskExecutor.execute(finalListener); + voidChainTaskExecutor.execute(finalListener); latch.await(); @@ -56,22 +56,22 @@ public void testExecute() throws InterruptedException { public void testExecute_GivenSingleFailureAndShortCircuit() throws InterruptedException { final List strings = new ArrayList<>(); - ActionListener finalListener = createBlockingListener(() -> fail(), + ActionListener> finalListener = createBlockingListener(() -> fail(), e -> assertThat(e.getMessage(), equalTo("some error"))); - ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), true); - chainTaskExecutor.add(listener -> { + VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.generic(), true); + voidChainTaskExecutor.add(listener -> { strings.add("before"); listener.onResponse(null); }); - chainTaskExecutor.add(listener -> { + voidChainTaskExecutor.add(listener -> { throw new RuntimeException("some error"); }); - chainTaskExecutor.add(listener -> { + voidChainTaskExecutor.add(listener -> { strings.add("after"); listener.onResponse(null); }); - chainTaskExecutor.execute(finalListener); + voidChainTaskExecutor.execute(finalListener); latch.await(); @@ -80,21 +80,21 @@ public void testExecute_GivenSingleFailureAndShortCircuit() throws InterruptedEx public void testExecute_GivenMultipleFailuresAndShortCircuit() throws InterruptedException { final List strings = new ArrayList<>(); - ActionListener finalListener = createBlockingListener(() -> fail(), + ActionListener> finalListener = createBlockingListener(() -> fail(), e -> assertThat(e.getMessage(), equalTo("some error 1"))); - ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), true); - chainTaskExecutor.add(listener -> { + VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.generic(), true); + voidChainTaskExecutor.add(listener -> { strings.add("before"); listener.onResponse(null); }); - chainTaskExecutor.add(listener -> { + voidChainTaskExecutor.add(listener -> { throw new RuntimeException("some error 1"); }); - chainTaskExecutor.add(listener -> { + voidChainTaskExecutor.add(listener -> { throw new RuntimeException("some error 2"); }); - chainTaskExecutor.execute(finalListener); + voidChainTaskExecutor.execute(finalListener); latch.await(); @@ -103,21 +103,21 @@ public void testExecute_GivenMultipleFailuresAndShortCircuit() throws Interrupte public void testExecute_GivenFailureAndNoShortCircuit() throws InterruptedException { final List strings = new ArrayList<>(); - ActionListener finalListener = createBlockingListener(() -> strings.add("last"), e -> fail()); - ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), false); - chainTaskExecutor.add(listener -> { + ActionListener> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail()); + VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.generic(), false); + voidChainTaskExecutor.add(listener -> { strings.add("before"); listener.onResponse(null); }); - chainTaskExecutor.add(listener -> { + voidChainTaskExecutor.add(listener -> { throw new RuntimeException("some error"); }); - chainTaskExecutor.add(listener -> { + voidChainTaskExecutor.add(listener -> { strings.add("after"); listener.onResponse(null); }); - chainTaskExecutor.execute(finalListener); + voidChainTaskExecutor.execute(finalListener); latch.await(); @@ -126,17 +126,17 @@ public void testExecute_GivenFailureAndNoShortCircuit() throws InterruptedExcept public void testExecute_GivenNoTasksAdded() throws InterruptedException { final List strings = new ArrayList<>(); - ActionListener finalListener = createBlockingListener(() -> strings.add("last"), e -> fail()); - ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), false); + ActionListener> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail()); + VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.generic(), false); - chainTaskExecutor.execute(finalListener); + voidChainTaskExecutor.execute(finalListener); latch.await(); assertThat(strings, contains("last")); } - private ActionListener createBlockingListener(Runnable runnable, Consumer errorHandler) { + private ActionListener> createBlockingListener(Runnable runnable, Consumer errorHandler) { return ActionListener.wrap(nullValue -> { runnable.run(); latch.countDown(); @@ -145,4 +145,4 @@ private ActionListener createBlockingListener(Runnable runnable, Consumer< latch.countDown(); }); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.upgrade.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.upgrade.json new file mode 100644 index 0000000000000..b67b125bb692a --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.upgrade.json @@ -0,0 +1,21 @@ +{ + "ml.upgrade": { + "documentation": "TODO", + "methods": [ "POST" ], + "url": { + "path": "/_ml/_upgrade", + "paths": [ "/_ml/_upgrade" ], + "params": { + "wait_for_completion": { + "type": "boolean", + "description": "Should this request wait until the operation has completed before returning", + "default": false + } + } + }, + "body": { + "description" : "Upgrade options", + "required" : false + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_upgrade.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_upgrade.yml new file mode 100644 index 0000000000000..ee1f9f77f9325 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_upgrade.yml @@ -0,0 +1,70 @@ +setup: + - skip: + features: headers + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + ml.put_job: + job_id: jobs-upgrade-results + body: > + { + "analysis_config" : { + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "format":"xcontent", + "time_field":"time" + } + } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + Content-Type: application/json + index: + index: .ml-anomalies-jobs-upgrade-results + type: doc + id: "jobs-upgrade-results_1464739200000_1" + body: + { + "job_id": "jobs-upgrade-results", + "result_type": "bucket", + "timestamp": "2016-06-01T00:00:00Z", + "anomaly_score": 90.0, + "bucket_span":1 + } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + indices.refresh: + index: .ml-anomalies-jobs-upgrade-results + +--- +"Upgrade results when there is nothing to upgrade": + - do: + ml.upgrade: + wait_for_completion: true + + - match: { acknowledged: true } + + - do: + indices.exists: + index: .ml-anomalies-shared + + - is_true: '' +--- +"Upgrade results when there is nothing to upgrade not waiting for results": + - do: + ml.upgrade: + wait_for_completion: false + + - match: {task: '/.+:\d+/'} + - set: {task: task} + + - do: + tasks.get: + wait_for_completion: true + task_id: $task + - match: {completed: true} + - match: {response.acknowledged: true} diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_ml_results_upgrade.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_ml_results_upgrade.yml new file mode 100644 index 0000000000000..73478be65597e --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_ml_results_upgrade.yml @@ -0,0 +1,11 @@ +--- +"Verify jobs exist": + - do: + ml.get_jobs: + job_id: old-cluster-job-to-upgrade + - match: { count: 1 } + + - do: + ml.get_jobs: + job_id: old-cluster-job-to-upgrade-custom + - match: { count: 1 } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_ml_results_upgrade.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_ml_results_upgrade.yml new file mode 100644 index 0000000000000..d21b5e6def61d --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_ml_results_upgrade.yml @@ -0,0 +1,120 @@ +--- +"Put job on the old cluster and post some data": + + - do: + ml.put_job: + job_id: old-cluster-job-to-upgrade + body: > + { + "description":"Cluster upgrade", + "analysis_config" : { + "bucket_span": "60s", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "analysis_limits" : { + "model_memory_limit": "50mb" + }, + "data_description" : { + "format":"xcontent", + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: old-cluster-job-to-upgrade } + + - do: + ml.open_job: + job_id: old-cluster-job-to-upgrade + + - do: + ml.post_data: + job_id: old-cluster-job-to-upgrade + body: + - airline: AAL + responsetime: 132.2046 + sourcetype: post-data-job + time: 1403481600 + - airline: JZA + responsetime: 990.4628 + sourcetype: post-data-job + time: 1403481700 + - match: { processed_record_count: 2 } + + - do: + ml.close_job: + job_id: old-cluster-job-to-upgrade + + - do: + ml.get_buckets: + job_id: old-cluster-job-to-upgrade + - match: { count: 1 } + +# Wait for indices to be fully allocated before +# killing the node + - do: + cluster.health: + index: [".ml-state", ".ml-anomalies-shared"] + wait_for_status: green + +--- +"Put job on the old cluster with a custom index": + - do: + ml.put_job: + job_id: old-cluster-job-to-upgrade-custom + body: > + { + "description":"Cluster upgrade", + "analysis_config" : { + "bucket_span": "60s", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "analysis_limits" : { + "model_memory_limit": "50mb" + }, + "data_description" : { + "format":"xcontent", + "time_field":"time", + "time_format":"epoch" + }, + "results_index_name": "old-cluster-job-to-upgrade-custom" + } + - match: { job_id: old-cluster-job-to-upgrade-custom } + + - do: + ml.open_job: + job_id: old-cluster-job-to-upgrade-custom + + - do: + ml.post_data: + job_id: old-cluster-job-to-upgrade-custom + body: + - airline: AAL + responsetime: 132.2046 + sourcetype: post-data-job + time: 1403481600 + - airline: JZA + responsetime: 990.4628 + sourcetype: post-data-job + time: 1403481700 + - airline: JZA + responsetime: 423.0000 + sourcetype: post-data-job + time: 1403481800 + - match: { processed_record_count: 3 } + + - do: + ml.close_job: + job_id: old-cluster-job-to-upgrade-custom + + - do: + ml.get_buckets: + job_id: old-cluster-job-to-upgrade-custom + - match: { count: 3 } + +# Wait for indices to be fully allocated before +# killing the node + - do: + cluster.health: + index: [".ml-state", ".ml-anomalies-old-cluster-job-to-upgrade-custom"] + wait_for_status: green + diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_ml_results_upgrade.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_ml_results_upgrade.yml new file mode 100644 index 0000000000000..f049b9c073ad8 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_ml_results_upgrade.yml @@ -0,0 +1,158 @@ +--- +"Migrate results data to latest index binary version": + # Verify that all the results are there and the typical indices exist + - do: + ml.get_buckets: + job_id: old-cluster-job-to-upgrade + - match: { count: 1 } + + - do: + ml.get_buckets: + job_id: old-cluster-job-to-upgrade-custom + - match: { count: 3 } + + - do: + indices.exists: + index: .ml-anomalies-shared + + - is_true: '' + + - do: + indices.get_settings: + index: .ml-anomalies-shared + name: index.version.created + + - match: { \.ml-anomalies-shared.settings.index.version.created: '/6\d+/' } + + - do: + indices.exists: + index: .ml-anomalies-custom-old-cluster-job-to-upgrade-custom + + - is_true: '' + + # Do the upgrade + - do: + ml.upgrade: + wait_for_completion: true + + - match: { acknowledged: true } + + # Verify that old indices are gone + - do: + indices.exists: + index: .ml-anomalies-shared + + - is_false: '' + + - do: + indices.exists: + index: .ml-anomalies-custom-old-cluster-job-to-upgrade-custom + + - is_false: '' + + # Verify that results can still be retrieved + + - do: + indices.refresh: {} + + - do: + ml.get_buckets: + job_id: old-cluster-job-to-upgrade + - match: { count: 1 } + + - do: + ml.get_buckets: + job_id: old-cluster-job-to-upgrade-custom + - match: { count: 3 } + + # Verify the created version is correct + + - do: + indices.get_settings: + index: .ml-anomalies-old-cluster-job-to-upgrade + name: index.version.created + - match: { \.ml-anomalies-shared-7.settings.index.version.created: '/7\d+/' } + - match: { \.ml-anomalies-shared-7r.settings.index.version.created: '/7\d+/' } + + - do: + indices.get_settings: + index: .ml-anomalies-old-cluster-job-to-upgrade-custom + name: index.version.created + - match: { \.ml-anomalies-custom-old-cluster-job-to-upgrade-custom-7.settings.index.version.created: '/7\d+/' } + - match: { \.ml-anomalies-custom-old-cluster-job-to-upgrade-custom-7r.settings.index.version.created: '/7\d+/' } + + # Create a new job to verify that the .ml-anomalies-shared index gets created again without issues + + - do: + ml.put_job: + job_id: upgraded-cluster-job-should-not-upgrade + body: > + { + "description":"Cluster upgrade", + "analysis_config" : { + "bucket_span": "60s", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "analysis_limits" : { + "model_memory_limit": "50mb" + }, + "data_description" : { + "format":"xcontent", + "time_field":"time", + "time_format":"epoch" + } + } + - match: { job_id: upgraded-cluster-job-should-not-upgrade } + + - do: + ml.open_job: + job_id: upgraded-cluster-job-should-not-upgrade + + - do: + ml.post_data: + job_id: upgraded-cluster-job-should-not-upgrade + body: + - airline: AAL + responsetime: 132.2046 + sourcetype: post-data-job + time: 1403481600 + - airline: JZA + responsetime: 990.4628 + sourcetype: post-data-job + time: 1403481700 + - match: { processed_record_count: 2 } + + - do: + ml.close_job: + job_id: upgraded-cluster-job-should-not-upgrade + + - do: + ml.get_buckets: + job_id: upgraded-cluster-job-should-not-upgrade + - match: { count: 1 } + + - do: + indices.exists: + index: .ml-anomalies-shared + + - is_true: '' + + - do: + indices.get_settings: + index: .ml-anomalies-shared + name: index.version.created + + - match: { \.ml-anomalies-shared.settings.index.version.created: '/7\d+/' } + + # Do the upgrade Again as nothing needs upgraded now + - do: + ml.upgrade: + wait_for_completion: true + + - match: { acknowledged: true } + + - do: + indices.exists: + index: .ml-anomalies-shared + + - is_true: ''