Skip to content

Commit

Permalink
[ML] Wait for autodetect to be ready in the datafeed
Browse files Browse the repository at this point in the history
This is a reinforcement of elastic#37227.  It turns out that
persistent tasks are not made stale if the node they
were running on is restarted and the master node does
not notice this.  The main scenario where this happens
is when minimum master nodes is the same as the number
of nodes in the cluster, so the cluster cannot elect a
master node when any node is restarted.

When an ML node restarts we need the datafeeds for any
jobs that were running on that node to not just wait
until the jobs are allocated, but to wait for the
autodetect process of the job to start up.  In the case
of reassignment of the job persistent task this was
dealt with by the stale status test.  But in the case
where a node restarts but its persistent tasks are not
reassigned we need a deeper test.

Fixes elastic#36810
  • Loading branch information
droberts195 committed Jan 11, 2019
1 parent ed2b5e8 commit de295f7
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor);
System::currentTimeMillis, auditor, autodetectProcessManager);
this.datafeedManager.set(datafeedManager);
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
autodetectProcessManager);
Expand Down Expand Up @@ -473,7 +473,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
return Arrays.asList(
new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(),
memoryTracker.get(), client),
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor( datafeedManager.get())
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.util.ArrayList;
Expand Down Expand Up @@ -62,16 +64,18 @@ public class DatafeedManager {
private final ConcurrentMap<Long, Holder> runningDatafeedsOnThisNode = new ConcurrentHashMap<>();
private final DatafeedJobBuilder datafeedJobBuilder;
private final TaskRunner taskRunner = new TaskRunner();
private final AutodetectProcessManager autodetectProcessManager;
private volatile boolean isolated;

public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
Supplier<Long> currentTimeSupplier, Auditor auditor) {
Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.threadPool = threadPool;
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
this.auditor = Objects.requireNonNull(auditor);
this.datafeedJobBuilder = Objects.requireNonNull(datafeedJobBuilder);
this.autodetectProcessManager = autodetectProcessManager;
clusterService.addListener(taskRunner);
}

Expand Down Expand Up @@ -256,6 +260,21 @@ private JobState getJobState(PersistentTasksCustomMetaData tasks, TransportStart
return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks);
}

private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks,
TransportStartDatafeedAction.DatafeedTask datafeedTask) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(getJobId(datafeedTask), tasks);
if (jobTask == null) {
return false;
}

JobTaskState state = (JobTaskState) jobTask.getState();
if (state == null || state.isStatusStale(jobTask)) {
return false;
}

return autodetectProcessManager.hasOpenAutodetectCommunicator(jobTask.getAllocationId());
}

private TimeValue computeNextDelay(long next) {
return new TimeValue(Math.max(1, next - currentTimeSupplier.get()));
}
Expand Down Expand Up @@ -446,7 +465,7 @@ private class TaskRunner implements ClusterStateListener {
private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask) {
ClusterState clusterState = clusterService.state();
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (getJobState(tasks, datafeedTask) == JobState.OPENED) {
if (getJobState(tasks, datafeedTask) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, datafeedTask)) {
runTask(datafeedTask);
} else {
logger.info("Datafeed [{}] is waiting for job [{}] to be opened",
Expand Down Expand Up @@ -485,10 +504,10 @@ public void clusterChanged(ClusterChangedEvent event) {
continue;
}
JobState jobState = getJobState(currentTasks, datafeedTask);
if (jobState == JobState.OPENED) {
runTask(datafeedTask);
} else if (jobState == JobState.OPENING) {
if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, datafeedTask) == false) {
remainingTasks.add(datafeedTask);
} else if (jobState == JobState.OPENED) {
runTask(datafeedTask);
} else {
logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]",
datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ public void killAllProcessesOnThisNode() {
*/
public void persistJob(JobTask jobTask, Consumer<Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot persist because job [%s] does not have a corresponding autodetect process",
jobTask.getJobId());
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
}
communicator.persistJob((aVoid, e) -> handler.accept(e));
}

Expand Down Expand Up @@ -239,7 +246,8 @@ public void processData(JobTask jobTask, AnalysisRegistry analysisRegistry, Inpu
XContentType xContentType, DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open");
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() +
"] does not have a corresponding autodetect process");
}
communicator.writeToJob(input, analysisRegistry, xContentType, params, handler);
}
Expand All @@ -257,7 +265,8 @@ public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener<Flus
logger.debug("Flushing job {}", jobTask.getJobId());
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] does not have a corresponding autodetect process",
jobTask.getJobId());
logger.debug(message);
handler.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
Expand Down Expand Up @@ -307,7 +316,8 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
logger.debug("Forecasting job {}", jobId);
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobId);
String message = String.format(Locale.ROOT,
"Cannot forecast because job [%s] does not have a corresponding autodetect process", jobId);
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
Expand All @@ -327,7 +337,8 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer<Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() +
"] does not have a corresponding autodetect process";
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
Expand Down Expand Up @@ -663,6 +674,14 @@ private AutodetectCommunicator getOpenAutodetectCommunicator(JobTask jobTask) {
return null;
}

public boolean hasOpenAutodetectCommunicator(long jobAllocationId) {
ProcessContext processContext = processByAllocation.get(jobAllocationId);
if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) {
return processContext.getAutodetectCommunicator() != null;
}
return false;
}

public Optional<Duration> jobOpenTime(JobTask jobTask) {
AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask);
if (communicator == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests;
import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
Expand All @@ -48,6 +49,7 @@
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask;
Expand All @@ -74,13 +76,14 @@ public class DatafeedManagerTests extends ESTestCase {
private long currentTime = 120000;
private Auditor auditor;
private ArgumentCaptor<ClusterStateListener> capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class);
private AtomicBoolean hasOpenAutodetectCommunicator;

@Before
@SuppressWarnings("unchecked")
public void setUpTests() {
Job.Builder job = createDatafeedJob().setCreateTime(new Date());

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
DiscoveryNodes nodes = DiscoveryNodes.builder()
Expand Down Expand Up @@ -128,7 +131,12 @@ public void setUpTests() {
return null;
}).when(datafeedJobBuilder).build(any(), any());

datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor);
hasOpenAutodetectCommunicator = new AtomicBoolean(true);
AutodetectProcessManager autodetectProcessManager = mock(AutodetectProcessManager.class);
doAnswer(invocation -> hasOpenAutodetectCommunicator.get()).when(autodetectProcessManager).hasOpenAutodetectCommunicator(anyLong());

datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor,
autodetectProcessManager);

verify(clusterService).addListener(capturedClusterStateListener.capture());
}
Expand Down Expand Up @@ -259,7 +267,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() {
// Verify datafeed has not started running yet as job is still opening
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
Expand All @@ -270,15 +278,52 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() {
// Still no run
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));

capturedClusterStateListener.getValue().clusterChanged(
new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build()));

// Now it should run as the job state chanded to OPENED
// Now it should run as the job state changed to OPENED
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
}

public void testDatafeedTaskWaitsUntilAutodetectCommunicatorIsOpen() {

hasOpenAutodetectCommunicator.set(false);

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
when(clusterService.state()).thenReturn(cs.build());

Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedManager.run(task, handler);

// Verify datafeed has not started running yet as job doesn't have an open autodetect communicator
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));

capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build()));

// Still no run
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

hasOpenAutodetectCommunicator.set(true);

capturedClusterStateListener.getValue().clusterChanged(
new ClusterChangedEvent("_source", cs.build(), anotherJobCs.build()));

// Now it should run as the autodetect communicator is open
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
}

Expand Down

0 comments on commit de295f7

Please sign in to comment.