diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index e7610074bc0e8..7c809543d1198 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -107,7 +107,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen */ PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap( + jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap( expandedJobIds -> { validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap( response -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java index 4d11e857c4c8a..ae1b42d2e3083 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java @@ -16,30 +16,33 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction { + private final DatafeedConfigProvider datafeedConfigProvider; + @Inject public TransportGetDatafeedsStatsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { + IndexNameExpressionResolver indexNameExpressionResolver, + DatafeedConfigProvider datafeedConfigProvider) { super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetDatafeedsStatsAction.Request::new); + this.datafeedConfigProvider = datafeedConfigProvider; } @Override @@ -57,16 +60,18 @@ protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterS ActionListener listener) throws Exception { logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds()); - - PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - List results = expandedDatafeedIds.stream() - .map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress)) - .collect(Collectors.toList()); - QueryPage statsPage = new QueryPage<>(results, results.size(), - DatafeedConfig.RESULTS_FIELD); - listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage)); + datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( + expandedDatafeedIds -> { + PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + List results = expandedDatafeedIds.stream() + .map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress)) + .collect(Collectors.toList()); + QueryPage statsPage = new QueryPage<>(results, results.size(), + DatafeedConfig.RESULTS_FIELD); + listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage)); + }, + listener::onFailure + )); } private static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index a119634e5ff0f..57b5dbd8b7f23 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -20,10 +20,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; @@ -32,7 +32,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -54,28 +54,37 @@ public class TransportGetJobsStatsAction extends TransportTasksAction listener) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); - request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs()))); - ActionListener finalListener = listener; - listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata, - request, response, finalListener), listener::onFailure); - super.doExecute(task, request, listener); + protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener finalListener) { + + jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap( + expandedIds -> { + request.setExpandedJobsIds(new ArrayList<>(expandedIds)); + ActionListener jobStatsListener = ActionListener.wrap( + response -> gatherStatsForClosedJobs(request, response, finalListener), + finalListener::onFailure + ); + super.doExecute(task, request, jobStatsListener); + }, + finalListener::onFailure + )); } @Override @@ -123,21 +132,20 @@ protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJo // Up until now we gathered the stats for jobs that were open, // This method will fetch the stats for missing jobs, that was stored in the jobs index - void gatherStatsForClosedJobs(MlMetadata mlMetadata, GetJobsStatsAction.Request request, GetJobsStatsAction.Response response, + void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAction.Response response, ActionListener listener) { - List jobIds = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - request.getExpandedJobsIds(), response.getResponse().results()); - if (jobIds.isEmpty()) { + List closedJobIds = determineJobIdsWithoutLiveStats(request.getExpandedJobsIds(), response.getResponse().results()); + if (closedJobIds.isEmpty()) { listener.onResponse(response); return; } - AtomicInteger counter = new AtomicInteger(jobIds.size()); - AtomicArray jobStats = new AtomicArray<>(jobIds.size()); + AtomicInteger counter = new AtomicInteger(closedJobIds.size()); + AtomicArray jobStats = new AtomicArray<>(closedJobIds.size()); PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - for (int i = 0; i < jobIds.size(); i++) { + for (int i = 0; i < closedJobIds.size(); i++) { int slot = i; - String jobId = jobIds.get(i); + String jobId = closedJobIds.get(i); gatherForecastStats(jobId, forecastStats -> { gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> { JobState jobState = MlTasks.getJobState(jobId, tasks); @@ -180,11 +188,9 @@ static TimeValue durationToTimeValue(Optional duration) { } } - static List determineNonDeletedJobIdsWithoutLiveStats(MlMetadata mlMetadata, - List requestedJobIds, - List stats) { + static List determineJobIdsWithoutLiveStats(List requestedJobIds, + List stats) { Set excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet()); - return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) && - !mlMetadata.isJobDeleting(jobId)).collect(Collectors.toList()); + return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId)).collect(Collectors.toList()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 78658e5330047..58ceda1244c80 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -172,7 +172,7 @@ private void getJobFromClusterState(String jobId, ActionListener jobListene public void expandJobs(String expression, boolean allowNoJobs, ActionListener> jobsListener) { Map clusterStateJobs = expandJobsFromClusterState(expression, allowNoJobs, clusterService.state()); - jobConfigProvider.expandJobs(expression, allowNoJobs, ActionListener.wrap( + jobConfigProvider.expandJobs(expression, allowNoJobs, false, ActionListener.wrap( jobBuilders -> { // Check for duplicate jobs for (Job.Builder jb : jobBuilders) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 2c19f081956f7..8ca0c5ff531a0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -47,6 +47,7 @@ import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.ExistsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; @@ -489,11 +490,12 @@ public void markJobAsDeleting(String jobId, ActionListener listener) { * @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}. * This only applies to wild card expressions, if {@code expression} is not a * wildcard then setting this true will not suppress the exception + * @param excludeDeleting If true exclude jobs marked as deleting * @param listener The expanded job Ids listener */ - public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener> listener) { + public void expandJobsIds(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens)); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); sourceBuilder.sort(Job.ID.getPreferredName()); sourceBuilder.fetchSource(false); sourceBuilder.docValueField(Job.ID.getPreferredName()); @@ -535,21 +537,22 @@ public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener } /** - * The same logic as {@link #expandJobsIds(String, boolean, ActionListener)} but + * The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but * the full anomaly detector job configuration is returned. * - * See {@link #expandJobsIds(String, boolean, ActionListener)} + * See {@link #expandJobsIds(String, boolean, boolean, ActionListener)} * * @param expression the expression to resolve * @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}. * This only applies to wild card expressions, if {@code expression} is not a * wildcard then setting this true will not suppress the exception + * @param excludeDeleting If true exclude jobs marked as deleting * @param listener The expanded jobs listener */ // NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them - public void expandJobs(String expression, boolean allowNoJobs, ActionListener> listener) { + public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens)); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); sourceBuilder.sort(Job.ID.getPreferredName()); SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) @@ -594,7 +597,7 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener
  • terms = new ArrayList<>(); for (String token : tokens) { if (Regex.isSimpleMatchPattern(token)) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index 28d9d091c3714..a5920afd8dd31 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -288,11 +288,11 @@ private void mockDatafeedConfigFindDatafeeds(Set datafeedIds) { private void mockJobConfigProviderExpandIds(Set expandedIds) { doAnswer(invocation -> { - ActionListener> listener = (ActionListener>) invocation.getArguments()[2]; + ActionListener> listener = (ActionListener>) invocation.getArguments()[3]; listener.onResponse(expandedIds); return null; - }).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), any(ActionListener.class)); + }).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(ActionListener.class)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java index 6d4b008570c72..2ee184ec877ed 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java @@ -8,7 +8,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; @@ -18,37 +17,27 @@ import java.util.List; import java.util.Optional; -import static org.elasticsearch.xpack.ml.action.TransportGetJobsStatsAction.determineNonDeletedJobIdsWithoutLiveStats; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.elasticsearch.xpack.ml.action.TransportGetJobsStatsAction.determineJobIdsWithoutLiveStats; public class TransportGetJobsStatsActionTests extends ESTestCase { public void testDetermineJobIds() { - MlMetadata mlMetadata = mock(MlMetadata.class); - when(mlMetadata.isJobDeleting(eq("id4"))).thenReturn(true); - - List result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Collections.singletonList("id1"), Collections.emptyList()); + List result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.emptyList()); assertEquals(1, result.size()); assertEquals("id1", result.get(0)); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Collections.singletonList("id1"), Collections.singletonList( + result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.singletonList( new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null))); assertEquals(0, result.size()); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Arrays.asList("id1", "id2", "id3"), Collections.emptyList()); + result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Collections.emptyList()); assertEquals(3, result.size()); assertEquals("id1", result.get(0)); assertEquals("id2", result.get(1)); assertEquals("id3", result.get(2)); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Arrays.asList("id1", "id2", "id3"), + result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.CLOSED, null, null, null)) ); @@ -56,27 +45,18 @@ public void testDetermineJobIds() { assertEquals("id2", result.get(0)); assertEquals("id3", result.get(1)); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Arrays.asList("id1", "id2", "id3"), Arrays.asList( + result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList( new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null), new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null) )); assertEquals(1, result.size()); assertEquals("id2", result.get(0)); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), Arrays.asList( + result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList( new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null), new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, null, JobState.OPENED, null, null, null), new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null))); assertEquals(0, result.size()); - - // No jobs running, but job 4 is being deleted - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Arrays.asList("id1", "id2", "id3", "id4"), Collections.emptyList()); - assertEquals(3, result.size()); - assertEquals("id1", result.get(0)); - assertEquals("id2", result.get(1)); - assertEquals("id3", result.get(2)); } public void testDurationToTimeValue() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index cb4284c874f77..75198a3350dc5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -273,7 +273,7 @@ public void testAllowNoJobs() throws InterruptedException { AtomicReference> jobIdsHolder = new AtomicReference<>(); AtomicReference exceptionHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", false, actionListener), + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", false, true, actionListener), jobIdsHolder, exceptionHolder); assertNull(jobIdsHolder.get()); @@ -282,13 +282,13 @@ public void testAllowNoJobs() throws InterruptedException { assertThat(exceptionHolder.get().getMessage(), containsString("No known job with id")); exceptionHolder.set(null); - blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", true, actionListener), + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", true, false, actionListener), jobIdsHolder, exceptionHolder); assertNotNull(jobIdsHolder.get()); assertNull(exceptionHolder.get()); AtomicReference> jobsHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.expandJobs("*", false, actionListener), + blockingCall(actionListener -> jobConfigProvider.expandJobs("*", false, true, actionListener), jobsHolder, exceptionHolder); assertNull(jobsHolder.get()); @@ -297,7 +297,7 @@ public void testAllowNoJobs() throws InterruptedException { assertThat(exceptionHolder.get().getMessage(), containsString("No known job with id")); exceptionHolder.set(null); - blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, actionListener), + blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener), jobsHolder, exceptionHolder); assertNotNull(jobsHolder.get()); assertNull(exceptionHolder.get()); @@ -312,21 +312,21 @@ public void testExpandJobs_GroupsAndJobIds() throws Exception { client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); // Job Ids - Set expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", true, actionListener)); + Set expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("_all", true, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("tom", "dick", "harry", "harry-jnr")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("tom", "dick", "harry", "harry-jnr")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,harry", true, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,harry", true, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("tom", "harry")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("harry-group,tom", true, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("harry-group,tom", true, false, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("harry", "harry-jnr", "tom")), expandedIds); AtomicReference exceptionHolder = new AtomicReference<>(); AtomicReference> jobIdsHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,missing1,missing2", true, actionListener), + blockingCall(actionListener -> jobConfigProvider.expandJobsIds("tom,missing1,missing2", true, false, actionListener), jobIdsHolder, exceptionHolder); assertNull(jobIdsHolder.get()); assertNotNull(exceptionHolder.get()); @@ -335,27 +335,27 @@ public void testExpandJobs_GroupsAndJobIds() throws Exception { // Job builders List expandedJobsBuilders = blockingCall(actionListener -> - jobConfigProvider.expandJobs("harry-group,tom", false, actionListener)); + jobConfigProvider.expandJobs("harry-group,tom", false, true, actionListener)); List expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); assertThat(expandedJobs, containsInAnyOrder(harry, harryJnr, tom)); expandedJobsBuilders = blockingCall(actionListener -> - jobConfigProvider.expandJobs("_all", false, actionListener)); + jobConfigProvider.expandJobs("_all", false, true, actionListener)); expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); assertThat(expandedJobs, containsInAnyOrder(tom, dick, harry, harryJnr)); expandedJobsBuilders = blockingCall(actionListener -> - jobConfigProvider.expandJobs("tom,harry", false, actionListener)); + jobConfigProvider.expandJobs("tom,harry", false, false, actionListener)); expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); assertThat(expandedJobs, containsInAnyOrder(tom, harry)); expandedJobsBuilders = blockingCall(actionListener -> - jobConfigProvider.expandJobs("", false, actionListener)); + jobConfigProvider.expandJobs("", false, false, actionListener)); expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); assertThat(expandedJobs, containsInAnyOrder(tom, dick, harry, harryJnr)); AtomicReference> jobsHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.expandJobs("tom,missing1,missing2", false, actionListener), + blockingCall(actionListener -> jobConfigProvider.expandJobs("tom,missing1,missing2", false, true, actionListener), jobsHolder, exceptionHolder); assertNull(jobsHolder.get()); assertNotNull(exceptionHolder.get()); @@ -373,36 +373,68 @@ public void testExpandJobs_WildCardExpansion() throws Exception { client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); // Test job IDs only - Set expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, actionListener)); + Set expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, true, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*-1", true, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*-1", true, true,actionListener)); assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("bar*", true, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("bar*", true, true, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("bar-1", "bar-2", "nbar")), expandedIds); - expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("b*r-1", true, actionListener)); + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("b*r-1", true, true, actionListener)); assertEquals(new TreeSet<>(Collections.singletonList("bar-1")), expandedIds); // Test full job config - List expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("foo*", true, actionListener)); + List expandedJobsBuilders = + blockingCall(actionListener -> jobConfigProvider.expandJobs("foo*", true, true, actionListener)); List expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); assertThat(expandedJobs, containsInAnyOrder(foo1, foo2)); - expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("*-1", true, actionListener)); + expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("*-1", true, true, actionListener)); expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); assertThat(expandedJobs, containsInAnyOrder(foo1, bar1)); - expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("bar*", true, actionListener)); + expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("bar*", true, true, actionListener)); expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); assertThat(expandedJobs, containsInAnyOrder(bar1, bar2, nbar)); - expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("b*r-1", true, actionListener)); + expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("b*r-1", true, true, actionListener)); expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); assertThat(expandedJobs, containsInAnyOrder(bar1)); } + public void testExpandJobIds_excludeDeleting() throws Exception { + putJob(createJob("foo-1", null)); + putJob(createJob("foo-2", null)); + putJob(createJob("foo-deleting", null)); + putJob(createJob("bar", null)); + + Boolean marked = blockingCall(actionListener -> jobConfigProvider.markJobAsDeleting("foo-deleting", actionListener)); + assertTrue(marked); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + Set expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, false, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "foo-deleting")), expandedIds); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "bar")), expandedIds); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, false, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "foo-deleting", "bar")), expandedIds); + + List expandedJobsBuilders = + blockingCall(actionListener -> jobConfigProvider.expandJobs("foo*", true, true, actionListener)); + assertThat(expandedJobsBuilders, hasSize(2)); + + expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("foo*", true, false, actionListener)); + assertThat(expandedJobsBuilders, hasSize(3)); + } + public void testExpandGroups() throws Exception { putJob(createJob("apples", Collections.singletonList("fruit"))); putJob(createJob("pears", Collections.singletonList("fruit")));