Skip to content

Commit

Permalink
And for Job Stats
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Aug 10, 2020
1 parent 1f49c57 commit b09b33d
Showing 1 changed file with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -143,7 +144,17 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc
}

AtomicInteger counter = new AtomicInteger(closedJobIds.size());
AtomicReference<Exception> searchException = new AtomicReference<>();
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(closedJobIds.size());

Consumer<Exception> errorHandler = e -> {
// take the first error
searchException.compareAndSet(null, e);
if (counter.decrementAndGet() == 0) {
listener.onFailure(e);
}
};

PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
for (int i = 0; i < closedJobIds.size(); i++) {
int slot = i;
Expand All @@ -159,14 +170,19 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc
jobStats.set(slot, new JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState,
null, assignmentExplanation, null, timingStats));
if (counter.decrementAndGet() == 0) {
if (searchException.get() != null) {
// there was an error
listener.onFailure(searchException.get());
return;
}
List<JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)));
}
}, listener::onFailure);
}, listener::onFailure);
}, errorHandler);
}, errorHandler);
}
}

Expand Down

0 comments on commit b09b33d

Please sign in to comment.