Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Job in index: Get datafeed and job stats from index #34645

Merged
merged 2 commits into from
Oct 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
*/

PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap(
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap(
expandedJobIds -> {
validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap(
response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,33 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction<GetDatafeedsStatsAction.Request,
GetDatafeedsStatsAction.Response> {

private final DatafeedConfigProvider datafeedConfigProvider;

@Inject
public TransportGetDatafeedsStatsAction(Settings settings, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
IndexNameExpressionResolver indexNameExpressionResolver,
DatafeedConfigProvider datafeedConfigProvider) {
super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, GetDatafeedsStatsAction.Request::new);
this.datafeedConfigProvider = datafeedConfigProvider;
}

@Override
Expand All @@ -57,16 +60,18 @@ protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterS
ActionListener<GetDatafeedsStatsAction.Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());

MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());

PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<GetDatafeedsStatsAction.Response.DatafeedStats> results = expandedDatafeedIds.stream()
.map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
.collect(Collectors.toList());
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage = new QueryPage<>(results, results.size(),
DatafeedConfig.RESULTS_FIELD);
listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage));
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap(
expandedDatafeedIds -> {
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<GetDatafeedsStatsAction.Response.DatafeedStats> results = expandedDatafeedIds.stream()
.map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
.collect(Collectors.toList());
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage = new QueryPage<>(results, results.size(),
DatafeedConfig.RESULTS_FIELD);
listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage));
},
listener::onFailure
));
}

private static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
Expand All @@ -32,7 +32,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;

Expand All @@ -54,28 +54,37 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
private final ClusterService clusterService;
private final AutodetectProcessManager processManager;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;

@Inject
public TransportGetJobsStatsAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager, JobResultsProvider jobResultsProvider) {
AutodetectProcessManager processManager, JobResultsProvider jobResultsProvider,
JobConfigProvider jobConfigProvider) {
super(settings, GetJobsStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, GetJobsStatsAction.Request::new, GetJobsStatsAction.Response::new,
ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.processManager = processManager;
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
}

@Override
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> listener) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs())));
ActionListener<GetJobsStatsAction.Response> finalListener = listener;
listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata,
request, response, finalListener), listener::onFailure);
super.doExecute(task, request, listener);
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> finalListener) {

jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap(
expandedIds -> {
request.setExpandedJobsIds(new ArrayList<>(expandedIds));
ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
response -> gatherStatsForClosedJobs(request, response, finalListener),
finalListener::onFailure
);
super.doExecute(task, request, jobStatsListener);
},
finalListener::onFailure
));
}

@Override
Expand Down Expand Up @@ -123,21 +132,20 @@ protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJo

// Up until now we gathered the stats for jobs that were open,
// This method will fetch the stats for missing jobs, that was stored in the jobs index
void gatherStatsForClosedJobs(MlMetadata mlMetadata, GetJobsStatsAction.Request request, GetJobsStatsAction.Response response,
void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAction.Response response,
ActionListener<GetJobsStatsAction.Response> listener) {
List<String> jobIds = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
request.getExpandedJobsIds(), response.getResponse().results());
if (jobIds.isEmpty()) {
List<String> closedJobIds = determineJobIdsWithoutLiveStats(request.getExpandedJobsIds(), response.getResponse().results());
if (closedJobIds.isEmpty()) {
listener.onResponse(response);
return;
}

AtomicInteger counter = new AtomicInteger(jobIds.size());
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(jobIds.size());
AtomicInteger counter = new AtomicInteger(closedJobIds.size());
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(closedJobIds.size());
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
for (int i = 0; i < jobIds.size(); i++) {
for (int i = 0; i < closedJobIds.size(); i++) {
int slot = i;
String jobId = jobIds.get(i);
String jobId = closedJobIds.get(i);
gatherForecastStats(jobId, forecastStats -> {
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
JobState jobState = MlTasks.getJobState(jobId, tasks);
Expand Down Expand Up @@ -180,11 +188,9 @@ static TimeValue durationToTimeValue(Optional<Duration> duration) {
}
}

static List<String> determineNonDeletedJobIdsWithoutLiveStats(MlMetadata mlMetadata,
List<String> requestedJobIds,
List<GetJobsStatsAction.Response.JobStats> stats) {
static List<String> determineJobIdsWithoutLiveStats(List<String> requestedJobIds,
List<GetJobsStatsAction.Response.JobStats> stats) {
Set<String> excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet());
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) &&
!mlMetadata.isJobDeleting(jobId)).collect(Collectors.toList());
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId)).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private void getJobFromClusterState(String jobId, ActionListener<Job> jobListene
public void expandJobs(String expression, boolean allowNoJobs, ActionListener<QueryPage<Job>> jobsListener) {
Map<String, Job> clusterStateJobs = expandJobsFromClusterState(expression, allowNoJobs, clusterService.state());

jobConfigProvider.expandJobs(expression, allowNoJobs, ActionListener.wrap(
jobConfigProvider.expandJobs(expression, allowNoJobs, false, ActionListener.wrap(
jobBuilders -> {
// Check for duplicate jobs
for (Job.Builder jb : jobBuilders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.ExistsQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
Expand Down Expand Up @@ -489,11 +490,12 @@ public void markJobAsDeleting(String jobId, ActionListener<Boolean> listener) {
* @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}.
* This only applies to wild card expressions, if {@code expression} is not a
* wildcard then setting this true will not suppress the exception
* @param excludeDeleting If true exclude jobs marked as deleting
* @param listener The expanded job Ids listener
*/
public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener<Set<String>> listener) {
public void expandJobsIds(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<Set<String>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());
sourceBuilder.fetchSource(false);
sourceBuilder.docValueField(Job.ID.getPreferredName());
Expand Down Expand Up @@ -535,21 +537,22 @@ public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener
}

/**
* The same logic as {@link #expandJobsIds(String, boolean, ActionListener)} but
* The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but
* the full anomaly detector job configuration is returned.
*
* See {@link #expandJobsIds(String, boolean, ActionListener)}
* See {@link #expandJobsIds(String, boolean, boolean, ActionListener)}
*
* @param expression the expression to resolve
* @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}.
* This only applies to wild card expressions, if {@code expression} is not a
* wildcard then setting this true will not suppress the exception
* @param excludeDeleting If true exclude jobs marked as deleting
* @param listener The expanded jobs listener
*/
// NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them
public void expandJobs(String expression, boolean allowNoJobs, ActionListener<List<Job.Builder>> listener) {
public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<List<Job.Builder>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());

SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
Expand Down Expand Up @@ -594,7 +597,7 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener<Li

/**
* Expands the list of job group Ids to the set of jobs which are members of the groups.
* Unlike {@link #expandJobsIds(String, boolean, ActionListener)} it is not an error
* Unlike {@link #expandJobsIds(String, boolean, boolean, ActionListener)} it is not an error
* if a group Id does not exist.
* Wildcard expansion of group Ids is not supported.
*
Expand Down Expand Up @@ -698,9 +701,9 @@ private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IO
}
}

private QueryBuilder buildQuery(String [] tokens) {
private QueryBuilder buildQuery(String [] tokens, boolean excludeDeleting) {
QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
if (Strings.isAllOrWildcard(tokens)) {
if (Strings.isAllOrWildcard(tokens) && excludeDeleting == false) {
// match all
return jobQuery;
}
Expand All @@ -709,6 +712,16 @@ private QueryBuilder buildQuery(String [] tokens) {
boolQueryBuilder.filter(jobQuery);
BoolQueryBuilder shouldQueries = new BoolQueryBuilder();

if (excludeDeleting) {
// field exists only when the job is marked as deleting
shouldQueries.mustNot(new ExistsQueryBuilder(Job.DELETING.getPreferredName()));

if (Strings.isAllOrWildcard(tokens)) {
boolQueryBuilder.filter(shouldQueries);
return boolQueryBuilder;
}
}

List<String> terms = new ArrayList<>();
for (String token : tokens) {
if (Regex.isSimpleMatchPattern(token)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,11 @@ private void mockDatafeedConfigFindDatafeeds(Set<String> datafeedIds) {

private void mockJobConfigProviderExpandIds(Set<String> expandedIds) {
doAnswer(invocation -> {
ActionListener<Set<String>> listener = (ActionListener<Set<String>>) invocation.getArguments()[2];
ActionListener<Set<String>> listener = (ActionListener<Set<String>>) invocation.getArguments()[3];
listener.onResponse(expandedIds);

return null;
}).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), any(ActionListener.class));
}).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(ActionListener.class));
}

}
Loading