Skip to content

Commit

Permalink
DFA Get Stats can return multiple responses if more than one error oc…
Browse files Browse the repository at this point in the history
…curs (#60900)

If the search for get stats with multiple job Ids fails the listener is called for each failure. 
This change waits for all responses then returns the first error if there was one.
  • Loading branch information
davidkyle authored Aug 11, 2020
1 parent d154091 commit c5cef17
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
Expand Down Expand Up @@ -167,21 +168,32 @@ void gatherStatsForStoppedTasks(List<DataFrameAnalyticsConfig> configs, GetDataF

AtomicInteger counter = new AtomicInteger(stoppedConfigs.size());
AtomicArray<Stats> jobStats = new AtomicArray<>(stoppedConfigs.size());
AtomicReference<Exception> searchException = new AtomicReference<>();
for (int i = 0; i < stoppedConfigs.size(); i++) {
final int slot = i;
DataFrameAnalyticsConfig config = stoppedConfigs.get(i);
searchStats(config, ActionListener.wrap(
stats -> {
jobStats.set(slot, stats);
if (counter.decrementAndGet() == 0) {
if (searchException.get() != null) {
listener.onFailure(searchException.get());
return;
}
List<Stats> allTasksStats = new ArrayList<>(runningTasksResponse.getResponse().results());
allTasksStats.addAll(jobStats.asList());
Collections.sort(allTasksStats, Comparator.comparing(Stats::getId));
listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response(new QueryPage<>(
allTasksStats, allTasksStats.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)));
}
},
listener::onFailure)
e -> {
// take the first error
searchException.compareAndSet(null, e);
if (counter.decrementAndGet() == 0) {
listener.onFailure(e);
}
})
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -143,7 +144,17 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc
}

AtomicInteger counter = new AtomicInteger(closedJobIds.size());
AtomicReference<Exception> searchException = new AtomicReference<>();
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(closedJobIds.size());

Consumer<Exception> errorHandler = e -> {
// take the first error
searchException.compareAndSet(null, e);
if (counter.decrementAndGet() == 0) {
listener.onFailure(e);
}
};

PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
for (int i = 0; i < closedJobIds.size(); i++) {
int slot = i;
Expand All @@ -159,14 +170,19 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc
jobStats.set(slot, new JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState,
null, assignmentExplanation, null, timingStats));
if (counter.decrementAndGet() == 0) {
if (searchException.get() != null) {
// there was an error
listener.onFailure(searchException.get());
return;
}
List<JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)));
}
}, listener::onFailure);
}, listener::onFailure);
}, errorHandler);
}, errorHandler);
}
}

Expand Down

0 comments on commit c5cef17

Please sign in to comment.