Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Merge the Jindex master feature branch #36702

Merged
merged 71 commits into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
6add7df
[ML] Job and datafeed mappings with index template (#32719)
davidkyle Aug 9, 2018
27a81f7
[ML] Job config document CRUD operations (#32738)
davidkyle Aug 13, 2018
59a1205
[ML] Datafeed config CRUD operations (#32854)
davidkyle Aug 16, 2018
7b56999
[ML] Change JobManager to work with Job config in index (#33064)
davidkyle Aug 29, 2018
92979f4
[ML] Change Datafeed actions to read config from the config index (#3…
davidkyle Sep 18, 2018
b34eef1
[ML] Allocate jobs based on JobParams rather than cluster state confi…
davidkyle Sep 27, 2018
c75a7d4
[ML] Return missing job error when .ml-config is does not exist (#34177)
davidkyle Oct 1, 2018
e3821be
[ML] Close job in index (#34217)
davidkyle Oct 18, 2018
e6e78fe
[ML] Adjust finalize job action to work with documents (#34226)
davidkyle Oct 16, 2018
f8614a1
[ML] Job in index: Datafeed node selector (#34218)
davidkyle Oct 17, 2018
4e3d565
[ML] Job in Index: Stop and preview datafeed (#34605)
davidkyle Oct 19, 2018
cef9f30
[ML] Delete job document (#34595)
davidkyle Oct 19, 2018
040da13
[ML] Convert job data remover to work with index configs (#34532)
davidkyle Oct 18, 2018
e8c3951
[ML] Job in index: Get datafeed and job stats from index (#34645)
davidkyle Oct 22, 2018
7fd4b9d
[ML] Job in Index: Convert get calendar events to index docs (#34710)
davidkyle Oct 23, 2018
cb7b0e7
[ML] Job in index: delete filter action (#34642)
dimitris-athanasiou Oct 19, 2018
2194128
[ML] Job in Index: Enable integ tests (#34851)
davidkyle Oct 29, 2018
c3afb65
Merge branch 'master' into feature-jindex-master
davidkyle Oct 31, 2018
ce64484
Merge branch 'master' into feature-jindex-master
davidkyle Nov 7, 2018
0c343ca
[ML] Reimplement established model memory (#35500)
droberts195 Nov 14, 2018
7b5da88
[ML] Need to wait for shards to replicate in distributed test (#35541)
droberts195 Nov 14, 2018
0ef50cb
Merge branch 'master' into feature-jindex-master
droberts195 Nov 15, 2018
7ba1abd
Merge branch 'master' into feature-jindex-master
davidkyle Nov 16, 2018
a6f1be2
[ML] DelayedDataCheckConfig index mappings (#35646)
davidkyle Nov 19, 2018
cfad7b6
Merge branch 'master' into feature-jindex-master
davidkyle Nov 19, 2018
da3a6fd
Merge branch 'master' into feature-jindex-master
davidkyle Nov 23, 2018
6eddb15
Merge branch 'master' into feature-jindex-master
davidkyle Nov 27, 2018
d72ad3b
Merge branch 'master' into feature-jindex-master
davidkyle Nov 28, 2018
f048c52
[ML] JIndex: Restore finalize job action (#35939)
davidkyle Nov 30, 2018
fd1e6d4
[ML] Replace Version.CURRENT in streaming functions (#36118)
davidkyle Dec 3, 2018
29fc10d
[ML] Use 'anomaly-detector' in job config doc name (#36254)
davidkyle Dec 5, 2018
81549e6
[ML] Job In Index: Migrate config from the clusterstate (#35834)
davidkyle Dec 5, 2018
1802a92
Merge branch 'master' into feature-jindex-master
davidkyle Dec 6, 2018
95042c0
[ML] Check groups against job Ids on update (#36317)
davidkyle Dec 7, 2018
518f8b9
Merge branch 'master' into feature-jindex-master
davidkyle Dec 13, 2018
d3ead91
Merge branch 'master' into feature-jindex-master
davidkyle Dec 14, 2018
d4b4817
[ML] Adapt to periodic persistent task refresh (#36633)
droberts195 Dec 14, 2018
9bb881f
[ML] Default search size for configs
davidkyle Dec 12, 2018
cee07b1
Fix TooManyJobsIT.testMultipleNodes
droberts195 Dec 14, 2018
95df751
Use execute() instead of submit() in MlMemoryTracker
droberts195 Dec 14, 2018
cbe9099
[ML][TEST] Fix NPE in JobManagerTests
davidkyle Dec 12, 2018
a842453
[ML] JIindex: Limit the size of bulk migrations (#36481)
davidkyle Dec 16, 2018
6eae400
Merge branch 'master' into feature-jindex-master
davidkyle Dec 17, 2018
5eb20d2
[ML] Prevent updates and upgrade tests (#36649)
davidkyle Dec 17, 2018
7fd250e
[FEATURE][ML] Add cluster setting that enables/disables config migra…
dimitris-athanasiou Dec 17, 2018
3449283
Merge branch 'master' into feature-jindex-master
davidkyle Dec 17, 2018
4808c65
[ML] Snapshot ml configs before migrating (#36645)
davidkyle Dec 17, 2018
b112743
[FEATURE][ML] Split in batches and migrate all jobs and datafeeds (#3…
dimitris-athanasiou Dec 17, 2018
2049651
SQL: Fix translation of LIKE/RLIKE keywords (#36672)
costin Dec 17, 2018
f71b77d
Fixing line length for EnvironmentTests and RecoveryTests (#36657)
ebadyano Dec 17, 2018
e9bd724
Add back one line removed by mistake regarding java version check and
astefan Dec 17, 2018
86e6d18
Do not resolve addresses in remote connection info (#36671)
jasontedor Dec 17, 2018
46f86b7
[Painless] Add boxed type to boxed type casts for method/return (#36571)
jdconrad Dec 17, 2018
c522341
SNAPSHOTS: Adjust BwC Versions in Restore Logic (#36718)
original-brownbear Dec 17, 2018
6b4c8c8
ingest: fix on_failure with Drop processor (#36686)
jakelandis Dec 17, 2018
992b531
Initialize startup `CcrRepositories` (#36730)
Tim-Brooks Dec 17, 2018
e2489a7
[TEST] fix float comparison in RandomObjects#getExpectedParsedValue
javanna Dec 17, 2018
6903043
[Geo] Integrate Lucene's LatLonShape (BKD Backed GeoShapes) as defaul…
nknize Dec 17, 2018
36ef191
TESTS:Debug Log. IndexStatsIT#testFilterCacheStats
original-brownbear Dec 17, 2018
47c923d
ingest: support default pipelines + bulk upserts (#36618)
jakelandis Dec 17, 2018
2ae0fa2
Fix duplicate phrase in shrink/split error message (#36734)
jasontedor Dec 17, 2018
0aebd20
Deprecate types in get_source and exist_source (#36426)
Dec 17, 2018
b8f9238
Revert "[Geo] Integrate Lucene's LatLonShape (BKD Backed GeoShapes) a…
nknize Dec 18, 2018
16ba5bf
Enhance Invalidate Token API (#35388)
jkakavas Dec 18, 2018
fe0c22a
Add raw sort values to SearchSortValues transport serialization (#36617)
javanna Dec 18, 2018
997703f
Watcher: Ensure all internal search requests count hits (#36697)
spinscale Dec 18, 2018
17f27cc
Ensure MapperService#getAllMetaFields elements order is deterministic…
javanna Dec 18, 2018
557a5be
[TEST] Ensure shard follow tasks have really stopped.
martijnvg Dec 18, 2018
7872365
Expose Sequence Number based Optimistic Concurrency Control in the re…
bleskes Dec 18, 2018
28998f4
[ML] Mute MlDistributedFailureIT
davidkyle Dec 18, 2018
c0a1f1e
Merge branch 'master' into feature-jindex-master
droberts195 Dec 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class Job implements ToXContentObject {
public static final ParseField DATA_DESCRIPTION = new ParseField("data_description");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField FINISHED_TIME = new ParseField("finished_time");
public static final ParseField ESTABLISHED_MODEL_MEMORY = new ParseField("established_model_memory");
public static final ParseField MODEL_PLOT_CONFIG = new ParseField("model_plot_config");
public static final ParseField RENORMALIZATION_WINDOW_DAYS = new ParseField("renormalization_window_days");
public static final ParseField BACKGROUND_PERSIST_INTERVAL = new ParseField("background_persist_interval");
Expand All @@ -84,7 +83,6 @@ public class Job implements ToXContentObject {
(p) -> TimeUtil.parseTimeField(p, FINISHED_TIME.getPreferredName()),
FINISHED_TIME,
ValueType.VALUE);
PARSER.declareLong(Builder::setEstablishedModelMemory, ESTABLISHED_MODEL_MEMORY);
PARSER.declareObject(Builder::setAnalysisConfig, AnalysisConfig.PARSER, ANALYSIS_CONFIG);
PARSER.declareObject(Builder::setAnalysisLimits, AnalysisLimits.PARSER, ANALYSIS_LIMITS);
PARSER.declareObject(Builder::setDataDescription, DataDescription.PARSER, DATA_DESCRIPTION);
Expand All @@ -107,7 +105,6 @@ public class Job implements ToXContentObject {
private final String description;
private final Date createTime;
private final Date finishedTime;
private final Long establishedModelMemory;
private final AnalysisConfig analysisConfig;
private final AnalysisLimits analysisLimits;
private final DataDescription dataDescription;
Expand All @@ -122,7 +119,7 @@ public class Job implements ToXContentObject {
private final Boolean deleting;

private Job(String jobId, String jobType, List<String> groups, String description,
Date createTime, Date finishedTime, Long establishedModelMemory,
Date createTime, Date finishedTime,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
Expand All @@ -134,7 +131,6 @@ private Job(String jobId, String jobType, List<String> groups, String descriptio
this.description = description;
this.createTime = createTime;
this.finishedTime = finishedTime;
this.establishedModelMemory = establishedModelMemory;
this.analysisConfig = analysisConfig;
this.analysisLimits = analysisLimits;
this.dataDescription = dataDescription;
Expand Down Expand Up @@ -204,16 +200,6 @@ public Date getFinishedTime() {
return finishedTime;
}

/**
* The established model memory of the job, or <code>null</code> if model
* memory has not reached equilibrium yet.
*
* @return The established model memory of the job
*/
public Long getEstablishedModelMemory() {
return establishedModelMemory;
}

/**
* The analysis configuration object
*
Expand Down Expand Up @@ -306,9 +292,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.timeField(FINISHED_TIME.getPreferredName(), FINISHED_TIME.getPreferredName() + humanReadableSuffix,
finishedTime.getTime());
}
if (establishedModelMemory != null) {
builder.field(ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
}
builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params);
if (analysisLimits != null) {
builder.field(ANALYSIS_LIMITS.getPreferredName(), analysisLimits, params);
Expand Down Expand Up @@ -364,7 +347,6 @@ public boolean equals(Object other) {
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.finishedTime, that.finishedTime)
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory)
&& Objects.equals(this.analysisConfig, that.analysisConfig)
&& Objects.equals(this.analysisLimits, that.analysisLimits)
&& Objects.equals(this.dataDescription, that.dataDescription)
Expand All @@ -381,7 +363,7 @@ public boolean equals(Object other) {

@Override
public int hashCode() {
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, establishedModelMemory,
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName, deleting);
Expand All @@ -407,7 +389,6 @@ public static class Builder {
private DataDescription dataDescription;
private Date createTime;
private Date finishedTime;
private Long establishedModelMemory;
private ModelPlotConfig modelPlotConfig;
private Long renormalizationWindowDays;
private TimeValue backgroundPersistInterval;
Expand Down Expand Up @@ -435,7 +416,6 @@ public Builder(Job job) {
this.dataDescription = job.getDataDescription();
this.createTime = job.getCreateTime();
this.finishedTime = job.getFinishedTime();
this.establishedModelMemory = job.getEstablishedModelMemory();
this.modelPlotConfig = job.getModelPlotConfig();
this.renormalizationWindowDays = job.getRenormalizationWindowDays();
this.backgroundPersistInterval = job.getBackgroundPersistInterval();
Expand Down Expand Up @@ -496,11 +476,6 @@ Builder setFinishedTime(Date finishedTime) {
return this;
}

public Builder setEstablishedModelMemory(Long establishedModelMemory) {
this.establishedModelMemory = establishedModelMemory;
return this;
}

public Builder setDataDescription(DataDescription.Builder description) {
dataDescription = Objects.requireNonNull(description, DATA_DESCRIPTION.getPreferredName()).build();
return this;
Expand Down Expand Up @@ -555,7 +530,7 @@ public Job build() {
Objects.requireNonNull(id, "[" + ID.getPreferredName() + "] must not be null");
Objects.requireNonNull(jobType, "[" + JOB_TYPE.getPreferredName() + "] must not be null");
return new Job(
id, jobType, groups, description, createTime, finishedTime, establishedModelMemory,
id, jobType, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName, deleting);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ public static Job.Builder createRandomizedJobBuilder() {
if (randomBoolean()) {
builder.setFinishedTime(new Date(randomNonNegativeLong()));
}
if (randomBoolean()) {
builder.setEstablishedModelMemory(randomNonNegativeLong());
}
builder.setAnalysisConfig(AnalysisConfigTests.createRandomized());
builder.setAnalysisLimits(AnalysisLimitsTests.createRandomized());

Expand Down
5 changes: 0 additions & 5 deletions docs/reference/ml/apis/jobresource.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ so do not set the `background_persist_interval` value too low.
`description`::
(string) An optional description of the job.

`established_model_memory`::
(long) The approximate amount of memory resources that have been used for
analytical processing. This field is present only when the analytics have used
a stable amount of memory for several consecutive buckets.

`finished_time`::
(string) If the job closed or failed, this is the time the job finished,
otherwise it is `null`. This property is informational; you cannot change its
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.xpack.core.logstash.LogstashFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
Expand Down Expand Up @@ -363,9 +364,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new),
// ML - Persistent action requests
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
StartDatafeedAction.DatafeedParams::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
OpenJobAction.JobParams::new),
// ML - Task states
new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new),
Expand Down Expand Up @@ -433,9 +434,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"),
parser -> MlMetadata.LENIENT_PARSER.parse(parser, null).build()),
// ML - Persistent action requests
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.TASK_NAME),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.DATAFEED_TASK_NAME),
StartDatafeedAction.DatafeedParams::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.JOB_TASK_NAME),
OpenJobAction.JobParams::fromXContent),
// ML - Task states
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public final class MlMetaIndex {
*/
public static final String INDEX_NAME = ".ml-meta";

public static final String INCLUDE_TYPE_KEY = "include_type";

public static final String TYPE = "doc";

private MlMetaIndex() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.core.ml;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
Expand Down Expand Up @@ -146,7 +145,6 @@ public MlMetadata(StreamInput in) throws IOException {
datafeeds.put(in.readString(), new DatafeedConfig(in));
}
this.datafeeds = datafeeds;

this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
}

Expand All @@ -167,7 +165,7 @@ private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOut
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
DelegatingMapParams extendedParams =
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_CLUSTER_STATE, "true"), params);
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
return builder;
Expand Down Expand Up @@ -196,9 +194,14 @@ public MlMetadataDiff(StreamInput in) throws IOException {
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
MlMetadataDiff::readJobDiffFrom);
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
MlMetadataDiff::readSchedulerDiffFrom);
MlMetadataDiff::readDatafeedDiffFrom);
}

/**
* Merge the diff with the ML metadata.
* @param part The current ML metadata.
* @return The new ML metadata.
*/
@Override
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
Expand All @@ -221,7 +224,7 @@ static Diff<Job> readJobDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(Job::new, in);
}

static Diff<DatafeedConfig> readSchedulerDiffFrom(StreamInput in) throws IOException {
static Diff<DatafeedConfig> readDatafeedDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in);
}
}
Expand Down Expand Up @@ -295,7 +298,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {

public Builder putDatafeed(DatafeedConfig datafeedConfig, Map<String, String> headers) {
if (datafeeds.containsKey(datafeedConfig.getId())) {
throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists");
throw ExceptionsHelper.datafeedAlreadyExists(datafeedConfig.getId());
}
String jobId = datafeedConfig.getJobId();
checkJobIsAvailableForDatafeed(jobId);
Expand Down Expand Up @@ -369,14 +372,14 @@ private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, Per
}
}

private Builder putJobs(Collection<Job> jobs) {
public Builder putJobs(Collection<Job> jobs) {
for (Job job : jobs) {
putJob(job, true);
}
return this;
}

private Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
for (DatafeedConfig datafeed : datafeeds) {
this.datafeeds.put(datafeed.getId(), datafeed);
}
Expand Down Expand Up @@ -421,8 +424,6 @@ void checkJobHasNoDatafeed(String jobId) {
}
}



public static MlMetadata getMlMetadata(ClusterState state) {
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE);
if (mlMetadata == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,19 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public final class MlTasks {

public static final String JOB_TASK_NAME = "xpack/ml/job";
public static final String DATAFEED_TASK_NAME = "xpack/ml/datafeed";

private static final String JOB_TASK_ID_PREFIX = "job-";
private static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";

private MlTasks() {
}

Expand All @@ -22,15 +33,15 @@ private MlTasks() {
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
*/
public static String jobTaskId(String jobId) {
return "job-" + jobId;
return JOB_TASK_ID_PREFIX + jobId;
}

/**
* Namespaces the task ids for datafeeds.
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
*/
public static String datafeedTaskId(String datafeedId) {
return "datafeed-" + datafeedId;
return DATAFEED_TASK_ID_PREFIX + datafeedId;
}

@Nullable
Expand Down Expand Up @@ -67,4 +78,64 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
return DatafeedState.STOPPED;
}
}

/**
* The job Ids of anomaly detector job tasks.
* All anomaly detector jobs are returned regardless of the status of the
* task (OPEN, CLOSED, FAILED etc).
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @return The job Ids of anomaly detector job tasks
*/
public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tasks) {
if (tasks == null) {
return Collections.emptySet();
}

return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}

/**
* The datafeed Ids of started datafeed tasks
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @return The Ids of running datafeed tasks
*/
public static Set<String> startedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks) {
if (tasks == null) {
return Collections.emptySet();
}

return tasks.findTasks(DATAFEED_TASK_NAME, task -> true)
.stream()
.map(t -> t.getId().substring(DATAFEED_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}

/**
* Is there an ml anomaly detector job task for the job {@code jobId}?
* @param jobId The job id
* @param tasks Persistent tasks
* @return True if the job has a task
*/
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
return openJobIds(tasks).contains(jobId);
}

/**
* Read the active anomaly detector job tasks.
* Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}.
*
* @param tasks Persistent tasks
* @return The job tasks excluding closed and failed jobs
*/
public static List<PersistentTasksCustomMetaData.PersistentTask<?>> activeJobTasks(PersistentTasksCustomMetaData tasks) {
return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false)
.collect(Collectors.toList());
}
}
Loading