diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index 5e947d4fe8c28..37fbf3dab14d8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -489,7 +489,7 @@ && updatesDetectors(job) == false && (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion())) && (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory())) && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion())) - && (clearJobFinishTime == false || job.getFinishedTime() == null); + && ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null); } boolean updatesDetectors(Job job) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index d316ad67b0692..a13b705f12408 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -161,6 +161,7 @@ import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier; import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer; @@ -367,6 +368,7 @@ public Collection createComponents(Client client, ClusterService cluster Auditor auditor = new Auditor(client, clusterService.getNodeName()); JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings); JobConfigProvider jobConfigProvider = new JobConfigProvider(client, settings); + DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool); JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier); @@ -423,6 +425,7 @@ public Collection createComponents(Client client, ClusterService cluster mlLifeCycleService, jobResultsProvider, jobConfigProvider, + datafeedConfigProvider, jobManager, autodetectProcessManager, new MlInitializationService(settings, threadPool, clusterService, client), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 229abc3843eb1..a31be5f3d7d5b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; @@ -23,33 +22,31 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -63,11 +60,14 @@ public class TransportCloseJobAction extends TransportTasksAction listener) { + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + if (request.isLocal() == false && nodes.isLocalNodeElectedMaster() == false) { + // Delegates close job to elected master node, so it becomes the coordinating node. + // See comment in OpenJobAction.Transport class for more information. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master node")); + } else { + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, CloseJobAction.Response::new)); + } + } else { + /* + * Closing of multiple jobs: + * + * 1. Resolve and validate jobs first: if any job does not meet the + * criteria (e.g. open datafeed), fail immediately, do not close any + * job + * + * 2. Internally a task request is created for every open job, so there + * are n inner tasks for 1 user request + * + * 3. No task is created for closing jobs but those will be waited on + * + * 4. Collect n inner task results or failures and send 1 outer + * result/failure + */ + + PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap( + expandedJobIds -> { + validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap( + response -> { + request.setOpenJobIds(response.openJobIds.toArray(new String[0])); + if (response.openJobIds.isEmpty() && response.closingJobIds.isEmpty()) { + listener.onResponse(new CloseJobAction.Response(true)); + return; + } + + if (request.isForce() == false) { + Set executorNodes = new HashSet<>(); + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + for (String resolvedJobId : request.getOpenJobIds()) { + PersistentTasksCustomMetaData.PersistentTask jobTask = + MlTasks.getJobTask(resolvedJobId, tasks); + + if (jobTask == null || jobTask.isAssigned() == false) { + String message = "Cannot close job [" + resolvedJobId + "] because the job does not have " + + "an assigned node. Use force close to close the job"; + listener.onFailure(ExceptionsHelper.conflictStatusException(message)); + return; + } else { + executorNodes.add(jobTask.getExecutorNode()); + } + } + request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); + } + + if (request.isForce()) { + List jobIdsToForceClose = new ArrayList<>(response.openJobIds); + jobIdsToForceClose.addAll(response.closingJobIds); + forceCloseJob(state, request, jobIdsToForceClose, listener); + } else { + normalCloseJob(state, task, request, response.openJobIds, response.closingJobIds, listener); + } + }, + listener::onFailure + )); + }, + listener::onFailure + )); + + } + } + + class OpenAndClosingIds { + OpenAndClosingIds() { + openJobIds = new ArrayList<>(); + closingJobIds = new ArrayList<>(); + } + List openJobIds; + List closingJobIds; } /** - * Resolve the requested jobs and add their IDs to one of the list arguments - * depending on job state. + * Separate the job Ids into open and closing job Ids and validate. + * If a job is failed it is will not be closed unless the force parameter + * in request is true. + * It is an error if the datafeed the job uses is not stopped * - * Opened jobs are added to {@code openJobIds} and closing jobs added to {@code closingJobIds}. Failed jobs are added - * to {@code openJobIds} if allowFailed is set otherwise an exception is thrown. - * @param request The close job request - * @param state Cluster state - * @param openJobIds Opened or failed jobs are added to this list - * @param closingJobIds Closing jobs are added to this list + * @param expandedJobIds The job ids + * @param forceClose Force close the job(s) + * @param tasksMetaData Persistent tasks + * @param listener Resolved job Ids listener */ - static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState state, List openJobIds, - List closingJobIds) { - PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - final MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - - List failedJobs = new ArrayList<>(); - - Consumer jobIdProcessor = id -> { - validateJobAndTaskState(id, mlMetadata, tasksMetaData); - Job job = mlMetadata.getJobs().get(id); - if (job.isDeleting()) { - return; - } - addJobAccordingToState(id, tasksMetaData, openJobIds, closingJobIds, failedJobs); - }; - - Set expandedJobIds = mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs()); - expandedJobIds.forEach(jobIdProcessor::accept); - if (request.isForce() == false && failedJobs.size() > 0) { - if (expandedJobIds.size() == 1) { - throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close", - expandedJobIds.iterator().next()); - } - throw ExceptionsHelper.conflictStatusException("one or more jobs have state failed, use force close"); - } + void validate(Collection expandedJobIds, boolean forceClose, PersistentTasksCustomMetaData tasksMetaData, + ActionListener listener) { + + checkDatafeedsHaveStopped(expandedJobIds, tasksMetaData, ActionListener.wrap( + response -> { + OpenAndClosingIds ids = new OpenAndClosingIds(); + List failedJobs = new ArrayList<>(); - // allowFailed == true - openJobIds.addAll(failedJobs); + for (String jobId : expandedJobIds) { + addJobAccordingToState(jobId, tasksMetaData, ids.openJobIds, ids.closingJobIds, failedJobs); + } + + if (forceClose == false && failedJobs.size() > 0) { + if (expandedJobIds.size() == 1) { + listener.onFailure( + ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close", + expandedJobIds.iterator().next())); + return; + } + listener.onFailure( + ExceptionsHelper.conflictStatusException("one or more jobs have state failed, use force close")); + return; + } + + // If there are failed jobs force close is true + ids.openJobIds.addAll(failedJobs); + listener.onResponse(ids); + }, + listener::onFailure + )); } - private static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData tasksMetaData, + void checkDatafeedsHaveStopped(Collection jobIds, PersistentTasksCustomMetaData tasksMetaData, + ActionListener listener) { + datafeedConfigProvider.findDatafeedsForJobIds(jobIds, ActionListener.wrap( + datafeedIds -> { + for (String datafeedId : datafeedIds) { + DatafeedState datafeedState = MlTasks.getDatafeedState(datafeedId, tasksMetaData); + if (datafeedState != DatafeedState.STOPPED) { + listener.onFailure(ExceptionsHelper.conflictStatusException( + "cannot close job datafeed [{}] hasn't been stopped", datafeedId)); + return; + } + } + listener.onResponse(Boolean.TRUE); + }, + listener::onFailure + )); + } + + static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData tasksMetaData, List openJobs, List closingJobs, List failedJobs) { JobState jobState = MlTasks.getJobState(jobId, tasksMetaData); @@ -161,98 +269,6 @@ static TransportCloseJobAction.WaitForCloseRequest buildWaitForCloseRequest(List return waitForCloseRequest; } - /** - * Validate the close request. Throws an exception on any of these conditions: - *
    - *
  • If the job does not exist
  • - *
  • If the job has a data feed the feed must be closed first
  • - *
  • If the job is opening
  • - *
- * - * @param jobId Job Id - * @param mlMetadata ML MetaData - * @param tasks Persistent tasks - */ - static void validateJobAndTaskState(String jobId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) { - Job job = mlMetadata.getJobs().get(jobId); - if (job == null) { - throw new ResourceNotFoundException("cannot close job, because job [" + jobId + "] does not exist"); - } - - Optional datafeed = mlMetadata.getDatafeedByJobId(jobId); - if (datafeed.isPresent()) { - DatafeedState datafeedState = MlTasks.getDatafeedState(datafeed.get().getId(), tasks); - if (datafeedState != DatafeedState.STOPPED) { - throw ExceptionsHelper.conflictStatusException("cannot close job [{}], datafeed hasn't been stopped", jobId); - } - } - } - - @Override - protected void doExecute(Task task, CloseJobAction.Request request, ActionListener listener) { - final ClusterState state = clusterService.state(); - final DiscoveryNodes nodes = state.nodes(); - if (request.isLocal() == false && nodes.isLocalNodeElectedMaster() == false) { - // Delegates close job to elected master node, so it becomes the coordinating node. - // See comment in OpenJobAction.Transport class for more information. - if (nodes.getMasterNode() == null) { - listener.onFailure(new MasterNotDiscoveredException("no known master node")); - } else { - transportService.sendRequest(nodes.getMasterNode(), actionName, request, - new ActionListenerResponseHandler<>(listener, CloseJobAction.Response::new)); - } - } else { - /* - * Closing of multiple jobs: - * - * 1. Resolve and validate jobs first: if any job does not meet the - * criteria (e.g. open datafeed), fail immediately, do not close any - * job - * - * 2. Internally a task request is created for every open job, so there - * are n inner tasks for 1 user request - * - * 3. No task is created for closing jobs but those will be waited on - * - * 4. Collect n inner task results or failures and send 1 outer - * result/failure - */ - - List openJobIds = new ArrayList<>(); - List closingJobIds = new ArrayList<>(); - resolveAndValidateJobId(request, state, openJobIds, closingJobIds); - request.setOpenJobIds(openJobIds.toArray(new String[0])); - if (openJobIds.isEmpty() && closingJobIds.isEmpty()) { - listener.onResponse(new CloseJobAction.Response(true)); - return; - } - - if (request.isForce() == false) { - Set executorNodes = new HashSet<>(); - PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - for (String resolvedJobId : request.getOpenJobIds()) { - PersistentTasksCustomMetaData.PersistentTask jobTask = MlTasks.getJobTask(resolvedJobId, tasks); - if (jobTask == null || jobTask.isAssigned() == false) { - String message = "Cannot close job [" + resolvedJobId + "] because the job does not have an assigned node." + - " Use force close to close the job"; - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); - return; - } else { - executorNodes.add(jobTask.getExecutorNode()); - } - } - request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); - } - - if (request.isForce()) { - List jobIdsToForceClose = new ArrayList<>(openJobIds); - jobIdsToForceClose.addAll(closingJobIds); - forceCloseJob(state, request, jobIdsToForceClose, listener); - } else { - normalCloseJob(state, task, request, openJobIds, closingJobIds, listener); - } - } - } @Override protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAction.JobTask jobTask, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index dbde3d61d42b0..874ee8f71f5ad 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import java.io.IOException; +import java.util.Collections; import java.util.Map; public class TransportPutDatafeedAction extends TransportMasterNodeAction { @@ -177,7 +178,7 @@ private ElasticsearchException checkConfigsAreNotDefinedInClusterState(String da } private void checkJobDoesNotHaveADatafeed(String jobId, ActionListener listener) { - datafeedConfigProvider.findDatafeedForJobId(jobId, ActionListener.wrap( + datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap( datafeedIds -> { if (datafeedIds.isEmpty()) { listener.onResponse(Boolean.TRUE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 6b17721b20d68..3fd90b5d21c19 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import java.util.Collections; import java.util.Map; public class TransportUpdateDatafeedAction extends TransportMasterNodeAction { @@ -100,7 +101,7 @@ protected void masterOperation(UpdateDatafeedAction.Request request, ClusterStat * if it does have a datafeed it must be the one we are updating */ private void checkJobDoesNotHaveADifferentDatafeed(String jobId, String datafeedId, ActionListener listener) { - datafeedConfigProvider.findDatafeedForJobId(jobId, ActionListener.wrap( + datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap( datafeedIds -> { if (datafeedIds.isEmpty()) { // Ok the job does not have a datafeed diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 22912e9afc795..758c190feef1b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -168,17 +169,17 @@ public void onFailure(Exception e) { } /** - * Find any datafeeds that are used by job {@code jobid} i.e. the - * datafeed that references job {@code jobid}. + * Find any datafeeds that are used by jobs {@code jobIds} i.e. the + * datafeeds that references any of the jobs in {@code jobIds}. * * In theory there should never be more than one datafeed referencing a * particular job. * - * @param jobId The job to find + * @param jobIds The jobs to find the datafeeds of * @param listener Datafeed Id listener */ - public void findDatafeedForJobId(String jobId, ActionListener> listener) { - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedJobIdQuery(jobId)); + public void findDatafeedsForJobIds(Collection jobIds, ActionListener> listener) { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedJobIdsQuery(jobIds)); sourceBuilder.fetchSource(false); sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName()); @@ -191,8 +192,8 @@ public void findDatafeedForJobId(String jobId, ActionListener> liste response -> { Set datafeedIds = new HashSet<>(); SearchHit[] hits = response.getHits().getHits(); - // There should be 0 or 1 datafeeds referencing the same job - assert hits.length <= 1; + // There cannot be more than one datafeed per job + assert hits.length <= jobIds.size(); for (SearchHit hit : hits) { datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue()); @@ -463,10 +464,10 @@ private QueryBuilder buildDatafeedIdQuery(String [] tokens) { return boolQueryBuilder; } - private QueryBuilder buildDatafeedJobIdQuery(String jobId) { + private QueryBuilder buildDatafeedJobIdsQuery(Collection jobIds) { BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); boolQueryBuilder.filter(new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE)); - boolQueryBuilder.filter(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); + boolQueryBuilder.filter(new TermsQueryBuilder(Job.ID.getPreferredName(), jobIds)); return boolQueryBuilder; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index 879e4c3fa18af..81dfa38148fad 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -6,15 +6,18 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; +import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -27,225 +30,179 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; -import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TransportCloseJobActionTests extends ESTestCase { - public void testValidate_datafeedIsStarted() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false); - mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", - Collections.singletonList("*")), Collections.emptyMap()); - final PersistentTasksCustomMetaData.Builder startDataFeedTaskBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("job_id", null, JobState.OPENED, startDataFeedTaskBuilder); - addTask("datafeed_id", 0L, null, DatafeedState.STARTED, startDataFeedTaskBuilder); - - ElasticsearchStatusException e = - expectThrows(ElasticsearchStatusException.class, - () -> TransportCloseJobAction.validateJobAndTaskState("job_id", mlBuilder.build(), - startDataFeedTaskBuilder.build())); - assertEquals(RestStatus.CONFLICT, e.status()); - assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage()); + private ClusterService clusterService; + private JobConfigProvider jobConfigProvider; + private DatafeedConfigProvider datafeedConfigProvider; - final PersistentTasksCustomMetaData.Builder dataFeedNotStartedTaskBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("job_id", null, JobState.OPENED, dataFeedNotStartedTaskBuilder); - if (randomBoolean()) { - addTask("datafeed_id", 0L, null, DatafeedState.STOPPED, dataFeedNotStartedTaskBuilder); - } - - TransportCloseJobAction.validateJobAndTaskState("job_id", mlBuilder.build(), dataFeedNotStartedTaskBuilder.build()); + @Before + private void setupMocks() { + clusterService = mock(ClusterService.class); + jobConfigProvider = mock(JobConfigProvider.class); + datafeedConfigProvider = mock(DatafeedConfigProvider.class); } - public void testValidate_jobIsOpening() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("opening-job").build(new Date()), false); - - // An opening job has a null status field - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("opening-job", null, null, tasksBuilder); + public void testAddJobAccordingToState() { + List openJobIds = new ArrayList<>(); + List closingJobIds = new ArrayList<>(); + List failedJobIds = new ArrayList<>(); - TransportCloseJobAction.validateJobAndTaskState("opening-job", mlBuilder.build(), tasksBuilder.build()); - } + PersistentTasksCustomMetaData.Builder taskBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("open-job", null, JobState.OPENED, taskBuilder); + addJobTask("failed-job", null, JobState.FAILED, taskBuilder); + addJobTask("closing-job", null, JobState.CLOSING, taskBuilder); + addJobTask("opening-job", null, JobState.OPENING, taskBuilder); + PersistentTasksCustomMetaData tasks = taskBuilder.build(); - public void testValidate_jobIsMissing() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("missing-job", null, null, tasksBuilder); - - expectThrows(ResourceNotFoundException.class, () -> - TransportCloseJobAction.validateJobAndTaskState("missing-job", mlBuilder.build(), tasksBuilder.build())); - } - - public void testResolve_givenAll() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()), false); - mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()), false); - mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()), false); - mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_4").build(new Date()), false); - mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_5").build(new Date()), false); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("job_id_1", null, JobState.OPENED, tasksBuilder); - addJobTask("job_id_2", null, JobState.OPENED, tasksBuilder); - addJobTask("job_id_3", null, JobState.FAILED, tasksBuilder); - addJobTask("job_id_4", null, JobState.CLOSING, tasksBuilder); - - ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) - .build(); - - List openJobs = new ArrayList<>(); - List closingJobs = new ArrayList<>(); - - CloseJobAction.Request request = new CloseJobAction.Request("_all"); - request.setForce(true); - TransportCloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs); - assertEquals(Arrays.asList("job_id_1", "job_id_2", "job_id_3"), openJobs); - assertEquals(Collections.singletonList("job_id_4"), closingJobs); - - request.setForce(false); - expectThrows(ElasticsearchStatusException.class, - () -> TransportCloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs)); + for (String id : new String [] {"open-job", "closing-job", "opening-job", "failed-job"}) { + TransportCloseJobAction.addJobAccordingToState(id, tasks, openJobIds, closingJobIds, failedJobIds); + } + assertThat(openJobIds, containsInAnyOrder("open-job", "opening-job")); + assertThat(failedJobIds, contains("failed-job")); + assertThat(closingJobIds, contains("closing-job")); } - public void testResolve_givenJobId() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_1").build(new Date()), false); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("job_id_1", null, JobState.OPENED, tasksBuilder); + public void testValidate_datafeedState() { + final PersistentTasksCustomMetaData.Builder startDataFeedTaskBuilder = PersistentTasksCustomMetaData.builder(); + String jobId = "job-with-started-df"; + String datafeedId = "df1"; + addJobTask(jobId, null, JobState.OPENED, startDataFeedTaskBuilder); + addTask(datafeedId, 0L, null, DatafeedState.STARTED, startDataFeedTaskBuilder); - ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) - .build(); + mockDatafeedConfigFindDatafeeds(Collections.singleton(datafeedId)); - List openJobs = new ArrayList<>(); - List closingJobs = new ArrayList<>(); + TransportCloseJobAction closeJobAction = createAction(); - CloseJobAction.Request request = new CloseJobAction.Request("job_id_1"); - TransportCloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs); - assertEquals(Collections.singletonList("job_id_1"), openJobs); - assertEquals(Collections.emptyList(), closingJobs); + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap( + responseHolder::set, + exceptionHolder::set + ); - // Job without task is closed - cs1 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())) - .build(); + closeJobAction.validate(Arrays.asList(jobId), false, startDataFeedTaskBuilder.build(), listener); - openJobs.clear(); - closingJobs.clear(); - TransportCloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs); - assertEquals(Collections.emptyList(), openJobs); - assertEquals(Collections.emptyList(), closingJobs); - } - - public void testResolve_throwsWithUnknownJobId() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_1").build(new Date()), false); - - ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())) - .build(); + assertNull(responseHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ElasticsearchStatusException.class)); + ElasticsearchStatusException esException = (ElasticsearchStatusException) exceptionHolder.get(); + assertEquals(RestStatus.CONFLICT, esException.status()); + assertEquals("cannot close job datafeed [df1] hasn't been stopped", esException.getMessage()); - List openJobs = new ArrayList<>(); - List closingJobs = new ArrayList<>(); + final PersistentTasksCustomMetaData.Builder dataFeedNotStartedTaskBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask(jobId, null, JobState.OPENED, dataFeedNotStartedTaskBuilder); + if (randomBoolean()) { + addTask(datafeedId, 0L, null, DatafeedState.STOPPED, dataFeedNotStartedTaskBuilder); + } - CloseJobAction.Request request = new CloseJobAction.Request("missing-job"); - expectThrows(ResourceNotFoundException.class, - () -> TransportCloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs)); + exceptionHolder.set(null); + closeJobAction.validate(Arrays.asList(jobId), false, dataFeedNotStartedTaskBuilder.build(), listener); + assertNull(exceptionHolder.get()); + assertNotNull(responseHolder.get()); + assertThat(responseHolder.get().openJobIds, contains(jobId)); + assertThat(responseHolder.get().closingJobIds, empty()); } - public void testResolve_givenJobIdFailed() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_failed").build(new Date()), false); - + public void testValidate_givenFailedJob() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id_failed", null, JobState.FAILED, tasksBuilder); - ClusterState cs1 = ClusterState.builder(new ClusterName("_name")).metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlBuilder.build()).putCustom(PersistentTasksCustomMetaData.TYPE, - tasksBuilder.build())).build(); - - List openJobs = new ArrayList<>(); - List closingJobs = new ArrayList<>(); - - CloseJobAction.Request request = new CloseJobAction.Request("job_id_failed"); - request.setForce(true); - - TransportCloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs); - assertEquals(Collections.singletonList("job_id_failed"), openJobs); - assertEquals(Collections.emptyList(), closingJobs); - - openJobs.clear(); - closingJobs.clear(); - - request.setForce(false); - expectThrows(ElasticsearchStatusException.class, () -> TransportCloseJobAction.resolveAndValidateJobId(request, cs1, - openJobs, closingJobs)); + mockDatafeedConfigFindDatafeeds(Collections.emptySet()); + + TransportCloseJobAction closeJobAction = createAction(); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap( + responseHolder::set, + exceptionHolder::set + ); + + // force close so not an error for the failed job + closeJobAction.validate(Arrays.asList("job_id_failed"), true, tasksBuilder.build(), listener); + assertNull(exceptionHolder.get()); + assertNotNull(responseHolder.get()); + assertThat(responseHolder.get().openJobIds, contains("job_id_failed")); + assertThat(responseHolder.get().closingJobIds, empty()); + + // not a force close so is an error + responseHolder.set(null); + closeJobAction.validate(Arrays.asList("job_id_failed"), false, tasksBuilder.build(), listener); + assertNull(responseHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ElasticsearchStatusException.class)); + ElasticsearchStatusException esException = (ElasticsearchStatusException) exceptionHolder.get(); + assertEquals(RestStatus.CONFLICT, esException.status()); + assertEquals("cannot close job [job_id_failed] because it failed, use force close", esException.getMessage()); } - public void testResolve_withSpecificJobIds() { - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_closing").build(new Date()), false); - mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_open-1").build(new Date()), false); - mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_open-2").build(new Date()), false); - mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("job_id_closed").build(new Date()), false); - + public void testValidate_withSpecificJobIds() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id_closing", null, JobState.CLOSING, tasksBuilder); addJobTask("job_id_open-1", null, JobState.OPENED, tasksBuilder); addJobTask("job_id_open-2", null, JobState.OPENED, tasksBuilder); - // closed job has no task - - ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) - .build(); - - List openJobs = new ArrayList<>(); - List closingJobs = new ArrayList<>(); - - TransportCloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("_all"), cs1, openJobs, closingJobs); - assertEquals(Arrays.asList("job_id_open-1", "job_id_open-2"), openJobs); - assertEquals(Collections.singletonList("job_id_closing"), closingJobs); - openJobs.clear(); - closingJobs.clear(); - - TransportCloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("*open*"), cs1, openJobs, closingJobs); - assertEquals(Arrays.asList("job_id_open-1", "job_id_open-2"), openJobs); - assertEquals(Collections.emptyList(), closingJobs); - openJobs.clear(); - closingJobs.clear(); - - TransportCloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("job_id_closing"), cs1, openJobs, closingJobs); - assertEquals(Collections.emptyList(), openJobs); - assertEquals(Collections.singletonList("job_id_closing"), closingJobs); - openJobs.clear(); - closingJobs.clear(); - - TransportCloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("job_id_open-1"), cs1, openJobs, closingJobs); - assertEquals(Collections.singletonList("job_id_open-1"), openJobs); - assertEquals(Collections.emptyList(), closingJobs); - openJobs.clear(); - closingJobs.clear(); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + + mockDatafeedConfigFindDatafeeds(Collections.emptySet()); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap( + responseHolder::set, + exceptionHolder::set + ); + + TransportCloseJobAction closeJobAction = createAction(); + closeJobAction.validate(Arrays.asList("job_id_closing", "job_id_open-1", "job_id_open-2"), false, tasks, listener); + assertNull(exceptionHolder.get()); + assertNotNull(responseHolder.get()); + assertEquals(Arrays.asList("job_id_open-1", "job_id_open-2"), responseHolder.get().openJobIds); + assertEquals(Collections.singletonList("job_id_closing"), responseHolder.get().closingJobIds); + + closeJobAction.validate(Arrays.asList("job_id_open-1", "job_id_open-2"), false, tasks, listener); + assertNull(exceptionHolder.get()); + assertNotNull(responseHolder.get()); + assertEquals(Arrays.asList("job_id_open-1", "job_id_open-2"), responseHolder.get().openJobIds); + assertEquals(Collections.emptyList(), responseHolder.get().closingJobIds); + + closeJobAction.validate(Arrays.asList("job_id_closing"), false, tasks, listener); + assertNull(exceptionHolder.get()); + assertNotNull(responseHolder.get()); + assertEquals(Collections.emptyList(), responseHolder.get().openJobIds); + assertEquals(Arrays.asList("job_id_closing"), responseHolder.get().closingJobIds); + + closeJobAction.validate(Arrays.asList("job_id_open-1"), false, tasks, listener); + assertNull(exceptionHolder.get()); + assertNotNull(responseHolder.get()); + assertEquals(Arrays.asList("job_id_open-1"), responseHolder.get().openJobIds); + assertEquals(Collections.emptyList(), responseHolder.get().closingJobIds); } public void testDoExecute_whenNothingToClose() { @@ -256,16 +213,13 @@ public void testDoExecute_whenNothingToClose() { addJobTask("foo", null, JobState.CLOSED, tasksBuilder); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) .build(); - ClusterService clusterService = mock(ClusterService.class); + TransportCloseJobAction transportAction = createAction(); when(clusterService.state()).thenReturn(clusterState); - - TransportCloseJobAction transportAction = new TransportCloseJobAction(Settings.EMPTY, - mock(TransportService.class), mock(ThreadPool.class), mock(ActionFilters.class), - clusterService, mock(Client.class), mock(Auditor.class), mock(PersistentTasksService.class)); + mockJobConfigProviderExpandIds(Collections.singleton("foo")); + mockDatafeedConfigFindDatafeeds(Collections.emptySet()); AtomicBoolean gotResponse = new AtomicBoolean(false); CloseJobAction.Request request = new Request("foo"); @@ -316,4 +270,29 @@ public static void addTask(String datafeedId, long startTime, String nodeId, Dat tasks.updateTaskState(MlTasks.datafeedTaskId(datafeedId), state); } + private TransportCloseJobAction createAction() { + return new TransportCloseJobAction(Settings.EMPTY, + mock(TransportService.class), mock(ThreadPool.class), mock(ActionFilters.class), + clusterService, mock(Client.class), mock(Auditor.class), mock(PersistentTasksService.class), + jobConfigProvider, datafeedConfigProvider); + } + + private void mockDatafeedConfigFindDatafeeds(Set datafeedIds) { + doAnswer(invocation -> { + ActionListener> listener = (ActionListener>) invocation.getArguments()[1]; + listener.onResponse(datafeedIds); + + return null; + }).when(datafeedConfigProvider).findDatafeedsForJobIds(any(), any(ActionListener.class)); + } + + private void mockJobConfigProviderExpandIds(Set expandedIds) { + doAnswer(invocation -> { + ActionListener> listener = (ActionListener>) invocation.getArguments()[2]; + listener.onResponse(expandedIds); + + return null; + }).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), any(ActionListener.class)); + } + } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java index c4dd8c19ea611..8acee83e0b0b6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -289,7 +289,7 @@ public void testExpandDatafeeds() throws Exception { assertThat(expandedDatafeeds, containsInAnyOrder(bar1, foo1, foo2)); } - public void testFindDatafeedForJobId() throws Exception { + public void testFindDatafeedsForJobIds() throws Exception { putDatafeedConfig(createDatafeedConfig("foo-1", "j1"), Collections.emptyMap()); putDatafeedConfig(createDatafeedConfig("foo-2", "j2"), Collections.emptyMap()); putDatafeedConfig(createDatafeedConfig("bar-1", "j3"), Collections.emptyMap()); @@ -299,17 +299,17 @@ public void testFindDatafeedForJobId() throws Exception { AtomicReference> datafeedIdsHolder = new AtomicReference<>(); AtomicReference exceptionHolder = new AtomicReference<>(); - blockingCall(actionListener -> datafeedConfigProvider.findDatafeedForJobId("new-job", actionListener), + blockingCall(actionListener -> datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList("new-job"), actionListener), datafeedIdsHolder, exceptionHolder); assertThat(datafeedIdsHolder.get(), empty()); - blockingCall(actionListener -> datafeedConfigProvider.findDatafeedForJobId("j2", actionListener), + blockingCall(actionListener -> datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList("j2"), actionListener), datafeedIdsHolder, exceptionHolder); assertThat(datafeedIdsHolder.get(), contains("foo-2")); - blockingCall(actionListener -> datafeedConfigProvider.findDatafeedForJobId("j3", actionListener), + blockingCall(actionListener -> datafeedConfigProvider.findDatafeedsForJobIds(Arrays.asList("j3", "j1"), actionListener), datafeedIdsHolder, exceptionHolder); - assertThat(datafeedIdsHolder.get(), contains("bar-1")); + assertThat(datafeedIdsHolder.get(), containsInAnyOrder("bar-1", "foo-1")); } public void testHeadersAreOverwritten() throws Exception {