Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Improve handling of exception while starting DFA process #61838

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,17 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
// Fetch existing model state (if any)
BytesReference state = getModelState(config);

if (processContext.startProcess(dataExtractorFactory, task, state)) {
boolean isProcessStarted;
try {
isProcessStarted = processContext.startProcess(dataExtractorFactory, task, state);
} catch (Exception e) {
processContext.stop();
task.setFailed(processContext.getFailureReason() == null ?
e : ExceptionsHelper.serverError(processContext.getFailureReason()));
return;
}

if (isProcessStarted) {
executorServiceForProcess.execute(() -> processContext.resultProcessor.get().process(processContext.process.get()));
executorServiceForProcess.execute(() -> processData(task, processContext, state));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
// The extra 2 are for the checksum and the control field
int numberOfFields = analyticsProcessConfig.cols() + 2;

createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes, executorService);
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);

NativeAnalyticsProcess analyticsProcess =
new NativeAnalyticsProcess(
jobId, nativeController, processPipes, numberOfFields, filesToDelete,
onProcessCrash, processConnectTimeout, analyticsProcessConfig, namedXContentRegistry);

try {
startProcess(config, executorService, processPipes, analyticsProcess);
startProcess(config, executorService, analyticsProcess);
return analyticsProcess;
} catch (IOException | EsRejectedExecutionException e) {
String msg = "Failed to connect to data frame analytics process for job " + jobId;
Expand All @@ -102,8 +102,8 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
}
}

private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService, ProcessPipes processPipes,
NativeAnalyticsProcess process) throws IOException {
private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService,
NativeAnalyticsProcess process) throws IOException {
if (config.getAnalysis().persistsState()) {
IndexingStateProcessor stateProcessor = new IndexingStateProcessor(config.getId(), resultsPersisterService, auditor);
process.start(executorService, stateProcessor);
Expand All @@ -113,7 +113,7 @@ private void startProcess(DataFrameAnalyticsConfig config, ExecutorService execu
}

private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List<Path> filesToDelete,
ProcessPipes processPipes, ExecutorService executorService) {
ProcessPipes processPipes) {
AnalyticsBuilder analyticsBuilder =
new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -190,6 +191,20 @@ public void testRunJob_Ok() {
verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task);
}

public void testRunJob_ProcessNotAliveAfterStart() {
when(process.isProcessAlive()).thenReturn(false);
when(task.getParams()).thenReturn(
new StartDataFrameAnalyticsAction.TaskParams("data_frame_id", Version.CURRENT, Collections.emptyList(), false));

processManager.runJob(task, dataFrameAnalyticsConfig, dataExtractorFactory);
assertThat(processManager.getProcessContextCount(), equalTo(1));

ArgumentCaptor<Exception> errorCaptor = ArgumentCaptor.forClass(Exception.class);
verify(task).setFailed(errorCaptor.capture());

assertThat(errorCaptor.getValue().getMessage(), equalTo("Failed to start data frame analytics process"));
}

public void testProcessContext_GetSetFailureReason() {
AnalyticsProcessManager.ProcessContext processContext = processManager.new ProcessContext(dataFrameAnalyticsConfig);
assertThat(processContext.getFailureReason(), is(nullValue()));
Expand Down