diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index ddf04a92e819c..cbbae041a131c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -629,15 +629,14 @@ public void onFailure(String source, Exception e) { } @Override - public void clusterStateProcessed(String source, ClusterState oldState, - ClusterState newState) { + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { listener.onResponse(new AcknowledgedResponse(true)); } }); } else { JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); - jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( + jobConfigProvider.updateJob(jobId, update, null, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap( job -> listener.onResponse(new AcknowledgedResponse(true)), e -> { logger.error("[" + jobId + "] Failed to clear finished_time", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index a69436cd37460..8d753ee271625 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -103,7 +103,7 @@ private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, Cluster CheckedConsumer updateConsumer = ok -> { datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers, - jobConfigProvider::validateDatafeedJob, + jobConfigProvider::validateDatafeedJob, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap( updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)), listener::onFailure diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java index 0a4ca1d680995..ab17b916aca6e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequest; @@ -18,6 +19,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -53,15 +55,17 @@ public class TransportUpdateFilterAction extends HandledTransportAction listener) { IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId()); - indexRequest.version(version); + if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) { + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); + } else { + indexRequest.version(version); + } indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { @@ -148,7 +159,7 @@ public void onResponse(GetResponse getDocResponse) { XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build(); - listener.onResponse(new FilterWithVersion(filter, getDocResponse.getVersion())); + listener.onResponse(new FilterWithVersion(filter, getDocResponse)); } } else { this.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId))); @@ -169,10 +180,14 @@ private static class FilterWithVersion { private final MlFilter filter; private final long version; + private final long seqNo; + private final long primaryTerm; - private FilterWithVersion(MlFilter filter, long version) { + private FilterWithVersion(MlFilter filter, GetResponse getDocResponse) { this.filter = filter; - this.version = version; + this.version = getDocResponse.getVersion(); + this.seqNo = getDocResponse.getSeqNo(); + this.primaryTerm = getDocResponse.getPrimaryTerm(); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index e68321c621a75..44063bf69783f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; @@ -19,6 +20,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -268,10 +270,12 @@ public void onFailure(Exception e) { * @param headers Datafeed headers applied with the update * @param validator BiConsumer that accepts the updated config and can perform * extra validations. {@code validator} must call the passed listener + * @param minClusterNodeVersion minimum version of nodes in cluster * @param updatedConfigListener Updated datafeed config listener */ public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map headers, BiConsumer> validator, + Version minClusterNodeVersion, ActionListener updatedConfigListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId)); @@ -283,7 +287,9 @@ public void onResponse(GetResponse getResponse) { updatedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); return; } - long version = getResponse.getVersion(); + final long version = getResponse.getVersion(); + final long seqNo = getResponse.getSeqNo(); + final long primaryTerm = getResponse.getPrimaryTerm(); BytesReference source = getResponse.getSourceAsBytesRef(); DatafeedConfig.Builder configBuilder; try { @@ -304,7 +310,7 @@ public void onResponse(GetResponse getResponse) { ActionListener validatedListener = ActionListener.wrap( ok -> { - indexUpdatedConfig(updatedConfig, version, ActionListener.wrap( + indexUpdatedConfig(updatedConfig, version, seqNo, primaryTerm, minClusterNodeVersion, ActionListener.wrap( indexResponse -> { assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; updatedConfigListener.onResponse(updatedConfig); @@ -328,17 +334,23 @@ public void onFailure(Exception e) { }); } - private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, ActionListener listener) { + private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long seqNo, long primaryTerm, + Version minClusterNodeVersion, ActionListener listener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder updatedSource = updatedConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), + IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(updatedConfig.getId())) .setSource(updatedSource) - .setVersion(version) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .request(); + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) { + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); + } else { + indexRequest.setVersion(version); + } - executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener); + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), listener); } catch (IOException e) { listener.onFailure( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 1d7dd1cdd871f..c1918a4f0674b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -522,7 +522,7 @@ private void postJobUpdate(ClusterState clusterState, UpdateJobAction.Request re private void updateJobIndex(UpdateJobAction.Request request, ActionListener updatedJobListener) { jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit, - this::validate, updatedJobListener); + this::validate, clusterService.state().nodes().getMinNodeVersion(), updatedJobListener); } private void updateJobClusterState(UpdateJobAction.Request request, ActionListener actionListener) { @@ -846,7 +846,8 @@ public ClusterState execute(ClusterState currentState) { .setEstablishedModelMemory(response) .build(); - jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( + jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, + clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap( job -> { auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 8741179515aed..dd31083aa15aa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; @@ -24,6 +25,7 @@ import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -282,9 +284,12 @@ public void onFailure(Exception e) { * @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null} * if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits} * are not changed. + * @param minClusterNodeVersion the minimum version of nodes in the cluster * @param updatedJobListener Updated job listener */ - public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener updatedJobListener) { + public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, + Version minClusterNodeVersion, + ActionListener updatedJobListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); @@ -296,7 +301,9 @@ public void onResponse(GetResponse getResponse) { return; } - long version = getResponse.getVersion(); + final long version = getResponse.getVersion(); + final long seqNo = getResponse.getSeqNo(); + final long primaryTerm = getResponse.getPrimaryTerm(); BytesReference source = getResponse.getSourceAsBytesRef(); Job.Builder jobBuilder; try { @@ -316,7 +323,7 @@ public void onResponse(GetResponse getResponse) { return; } - indexUpdatedJob(updatedJob, version, updatedJobListener); + indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener); } @Override @@ -341,17 +348,18 @@ public interface UpdateValidator { } /** - * Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, ActionListener)} but + * Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, Version, ActionListener)} but * with an extra validation step which is called before the updated is applied. * * @param jobId The Id of the job to update * @param update The job update * @param maxModelMemoryLimit The maximum model memory allowed * @param validator The job update validator + * @param minClusterNodeVersion the minimum version of a node ifn the cluster * @param updatedJobListener Updated job listener */ public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, - UpdateValidator validator, ActionListener updatedJobListener) { + UpdateValidator validator, Version minClusterNodeVersion, ActionListener updatedJobListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); @@ -363,7 +371,9 @@ public void onResponse(GetResponse getResponse) { return; } - long version = getResponse.getVersion(); + final long version = getResponse.getVersion(); + final long seqNo = getResponse.getSeqNo(); + final long primaryTerm = getResponse.getPrimaryTerm(); BytesReference source = getResponse.getSourceAsBytesRef(); Job originalJob; try { @@ -385,7 +395,7 @@ public void onResponse(GetResponse getResponse) { return; } - indexUpdatedJob(updatedJob, version, updatedJobListener); + indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener); }, updatedJobListener::onFailure )); @@ -402,17 +412,22 @@ public void onFailure(Exception e) { }); } - private void indexUpdatedJob(Job updatedJob, long version, ActionListener updatedJobListener) { + private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long primaryTerm, Version minClusterNodeVersion, + ActionListener updatedJobListener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder updatedSource = updatedJob.toXContent(builder, ToXContent.EMPTY_PARAMS); - IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), + IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(updatedJob.getId())) .setSource(updatedSource) - .setVersion(version) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .request(); + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) { + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); + } else { + indexRequest.setVersion(version); + } - executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap( + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), ActionListener.wrap( indexResponse -> { assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; updatedJobListener.onResponse(updatedJob); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java index 19f772709ead8..10b54084c0875 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -7,6 +7,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; @@ -86,7 +87,7 @@ public void testCrud() throws InterruptedException { AtomicReference configHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), updateHeaders, - (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener), + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener), configHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertThat(configHolder.get().getIndices(), equalTo(updateIndices)); @@ -167,7 +168,7 @@ public void testUpdateWhenApplyingTheUpdateThrows() throws Exception { AtomicReference configHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), - (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener), + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener), configHolder, exceptionHolder); assertNull(configHolder.get()); assertNotNull(exceptionHolder.get()); @@ -193,7 +194,7 @@ public void testUpdateWithValidatorFunctionThatErrors() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), - validateErrorFunction, actionListener), + validateErrorFunction, Version.CURRENT, actionListener), configHolder, exceptionHolder); assertNull(configHolder.get()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index bc11f9a5c0628..670eed831a519 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; @@ -147,8 +148,8 @@ public void testCrud() throws InterruptedException { JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setDescription("This job has been updated").build(); AtomicReference updateJobResponseHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, jobUpdate, new ByteSizeValue(32), actionListener), - updateJobResponseHolder, exceptionHolder); + blockingCall(actionListener -> jobConfigProvider.updateJob + (jobId, jobUpdate, new ByteSizeValue(32), Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertEquals("This job has been updated", updateJobResponseHolder.get().getDescription()); @@ -222,8 +223,8 @@ public void testUpdateWithAValidationError() throws Exception { .build(); AtomicReference updateJobResponseHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), actionListener), - updateJobResponseHolder, exceptionHolder); + blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), Version.CURRENT, + actionListener), updateJobResponseHolder, exceptionHolder); assertNull(updateJobResponseHolder.get()); assertNotNull(exceptionHolder.get()); assertThat(exceptionHolder.get(), instanceOf(ElasticsearchStatusException.class)); @@ -246,9 +247,8 @@ public void testUpdateWithValidator() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); AtomicReference updateJobResponseHolder = new AtomicReference<>(); // update with the no-op validator - blockingCall(actionListener -> - jobConfigProvider.updateJobWithValidation(jobId, jobUpdate, new ByteSizeValue(32), validator, actionListener), - updateJobResponseHolder, exceptionHolder); + blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation( + jobId, jobUpdate, new ByteSizeValue(32), validator, Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertNotNull(updateJobResponseHolder.get()); @@ -261,7 +261,7 @@ public void testUpdateWithValidator() throws Exception { updateJobResponseHolder.set(null); // Update with a validator that errors blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation(jobId, jobUpdate, new ByteSizeValue(32), - validatorWithAnError, actionListener), + validatorWithAnError, Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder); assertNull(updateJobResponseHolder.get()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index a2962faf945e9..d88ef1b2cd41f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -989,7 +989,7 @@ public void testRevertSnapshot_GivenJobInClusterState() { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[3]; listener.onFailure(new ResourceNotFoundException("missing job")); return null; - }).when(jobConfigProvider).updateJob(anyString(), any(), any(), any(ActionListener.class)); + }).when(jobConfigProvider).updateJob(anyString(), any(), any(), any(Version.class), any(ActionListener.class)); JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); @@ -1009,7 +1009,7 @@ public void testRevertSnapshot_GivenJobInClusterState() { jobManager.revertSnapshot(request, mock(ActionListener.class), modelSnapshot); verify(clusterService, times(1)).submitStateUpdateTask(eq("revert-snapshot-cs-revert"), any(AckedClusterStateUpdateTask.class)); - verify(jobConfigProvider, never()).updateJob(any(), any(), any(), any()); + verify(jobConfigProvider, never()).updateJob(any(), any(), any(), any(Version.class), any()); } private Job.Builder createJobFoo() {