forked from opensearch-project/anomaly-detection
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Kaituo Li <kaituo@amazon.com>
- Loading branch information
Showing
628 changed files
with
38,766 additions
and
19,783 deletions.
There are no files selected for viewing
46 changes: 46 additions & 0 deletions
46
src/main/java/org/opensearch/ad/ADEntityProfileRunner.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
package org.opensearch.ad; | ||
|
||
import org.opensearch.ad.constant.ADCommonName; | ||
import org.opensearch.ad.model.AnomalyDetector; | ||
import org.opensearch.ad.model.AnomalyResult; | ||
import org.opensearch.ad.settings.ADNumericSetting; | ||
import org.opensearch.ad.transport.ADEntityProfileAction; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.timeseries.AnalysisType; | ||
import org.opensearch.timeseries.EntityProfileRunner; | ||
import org.opensearch.timeseries.util.SecurityClientUtil; | ||
|
||
public class ADEntityProfileRunner extends EntityProfileRunner<ADEntityProfileAction> { | ||
|
||
public ADEntityProfileRunner( | ||
Client client, | ||
SecurityClientUtil clientUtil, | ||
NamedXContentRegistry xContentRegistry, | ||
long requiredSamples | ||
) { | ||
super( | ||
client, | ||
clientUtil, | ||
xContentRegistry, | ||
requiredSamples, | ||
AnomalyDetector::parse, | ||
ADNumericSetting.maxCategoricalFields(), | ||
AnalysisType.AD, | ||
ADEntityProfileAction.INSTANCE, | ||
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS, | ||
AnomalyResult.DETECTOR_ID_FIELD | ||
); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ad; | ||
|
||
import java.time.Instant; | ||
import java.util.List; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.ad.indices.ADIndex; | ||
import org.opensearch.ad.indices.ADIndexManagement; | ||
import org.opensearch.ad.model.ADTask; | ||
import org.opensearch.ad.model.ADTaskType; | ||
import org.opensearch.ad.model.AnomalyResult; | ||
import org.opensearch.ad.rest.handler.ADIndexJobActionHandler; | ||
import org.opensearch.ad.settings.AnomalyDetectorSettings; | ||
import org.opensearch.ad.task.ADTaskCacheManager; | ||
import org.opensearch.ad.task.ADTaskManager; | ||
import org.opensearch.ad.transport.ADProfileAction; | ||
import org.opensearch.ad.transport.AnomalyResultAction; | ||
import org.opensearch.ad.transport.AnomalyResultRequest; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.jobscheduler.spi.LockModel; | ||
import org.opensearch.jobscheduler.spi.utils.LockService; | ||
import org.opensearch.timeseries.AnalysisType; | ||
import org.opensearch.timeseries.JobProcessor; | ||
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; | ||
import org.opensearch.timeseries.common.exception.EndRunException; | ||
import org.opensearch.timeseries.model.Config; | ||
import org.opensearch.timeseries.model.Job; | ||
import org.opensearch.timeseries.transport.ResultRequest; | ||
|
||
public class ADJobProcessor extends | ||
JobProcessor<ADIndex, ADIndexManagement, ADTaskCacheManager, ADTaskType, ADTask, ADTaskManager, AnomalyResult, ADProfileAction, ExecuteADResultResponseRecorder, ADIndexJobActionHandler> { | ||
|
||
private static final Logger log = LogManager.getLogger(ADJobProcessor.class); | ||
|
||
private static ADJobProcessor INSTANCE; | ||
|
||
public static ADJobProcessor getInstance() { | ||
if (INSTANCE != null) { | ||
return INSTANCE; | ||
} | ||
synchronized (ADJobProcessor.class) { | ||
if (INSTANCE != null) { | ||
return INSTANCE; | ||
} | ||
INSTANCE = new ADJobProcessor(); | ||
return INSTANCE; | ||
} | ||
} | ||
|
||
private ADJobProcessor() { | ||
// Singleton class, use getJobRunnerInstance method instead of constructor | ||
super(AnalysisType.AD, TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME, AnomalyResultAction.INSTANCE); | ||
} | ||
|
||
public void registerSettings(Settings settings) { | ||
super.registerSettings(settings, AnomalyDetectorSettings.AD_MAX_RETRY_FOR_END_RUN_EXCEPTION); | ||
} | ||
|
||
@Override | ||
protected ResultRequest createResultRequest(String configId, long start, long end) { | ||
return new AnomalyResultRequest(configId, start, end); | ||
} | ||
|
||
@Override | ||
protected void validateResultIndexAndRunJob( | ||
Job jobParameter, | ||
LockService lockService, | ||
LockModel lock, | ||
Instant executionStartTime, | ||
Instant executionEndTime, | ||
String configId, | ||
String user, | ||
List<String> roles, | ||
ExecuteADResultResponseRecorder recorder, | ||
Config detector | ||
) { | ||
String resultIndex = jobParameter.getCustomResultIndex(); | ||
if (resultIndex == null) { | ||
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector); | ||
return; | ||
} | ||
ActionListener<Boolean> listener = ActionListener.wrap(r -> { log.debug("Custom index is valid"); }, e -> { | ||
Exception exception = new EndRunException(configId, e.getMessage(), false); | ||
handleException(jobParameter, lockService, lock, executionStartTime, executionEndTime, exception, recorder, detector); | ||
}); | ||
indexManagement.validateCustomIndexForBackendJob(resultIndex, configId, user, roles, () -> { | ||
listener.onResponse(true); | ||
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector); | ||
}, listener); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ad; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.ad.model.ADTask; | ||
import org.opensearch.ad.model.ADTaskProfile; | ||
import org.opensearch.ad.transport.ADTaskProfileAction; | ||
import org.opensearch.ad.transport.ADTaskProfileNodeResponse; | ||
import org.opensearch.ad.transport.ADTaskProfileRequest; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.timeseries.TaskProfileRunner; | ||
import org.opensearch.timeseries.cluster.HashRing; | ||
import org.opensearch.timeseries.model.EntityTaskProfile; | ||
|
||
public class ADTaskProfileRunner implements TaskProfileRunner<ADTask, ADTaskProfile> { | ||
public final Logger logger = LogManager.getLogger(ADTaskProfileRunner.class); | ||
|
||
private final HashRing hashRing; | ||
private final Client client; | ||
|
||
public ADTaskProfileRunner(HashRing hashRing, Client client) { | ||
this.hashRing = hashRing; | ||
this.client = client; | ||
} | ||
|
||
@Override | ||
public void getTaskProfile(ADTask configLevelTask, ActionListener<ADTaskProfile> listener) { | ||
String detectorId = configLevelTask.getConfigId(); | ||
|
||
hashRing.getAllEligibleDataNodesWithKnownVersion(dataNodes -> { | ||
ADTaskProfileRequest adTaskProfileRequest = new ADTaskProfileRequest(detectorId, dataNodes); | ||
client.execute(ADTaskProfileAction.INSTANCE, adTaskProfileRequest, ActionListener.wrap(response -> { | ||
if (response.hasFailures()) { | ||
listener.onFailure(response.failures().get(0)); | ||
return; | ||
} | ||
|
||
List<EntityTaskProfile> adEntityTaskProfiles = new ArrayList<>(); | ||
ADTaskProfile detectorTaskProfile = new ADTaskProfile(configLevelTask); | ||
for (ADTaskProfileNodeResponse node : response.getNodes()) { | ||
ADTaskProfile taskProfile = node.getAdTaskProfile(); | ||
if (taskProfile != null) { | ||
if (taskProfile.getNodeId() != null) { | ||
// HC detector: task profile from coordinating node | ||
// Single entity detector: task profile from worker node | ||
detectorTaskProfile.setTaskId(taskProfile.getTaskId()); | ||
detectorTaskProfile.setRcfTotalUpdates(taskProfile.getRcfTotalUpdates()); | ||
detectorTaskProfile.setThresholdModelTrained(taskProfile.getThresholdModelTrained()); | ||
detectorTaskProfile.setThresholdModelTrainingDataSize(taskProfile.getThresholdModelTrainingDataSize()); | ||
detectorTaskProfile.setModelSizeInBytes(taskProfile.getModelSizeInBytes()); | ||
detectorTaskProfile.setNodeId(taskProfile.getNodeId()); | ||
detectorTaskProfile.setTotalEntitiesCount(taskProfile.getTotalEntitiesCount()); | ||
detectorTaskProfile.setDetectorTaskSlots(taskProfile.getDetectorTaskSlots()); | ||
detectorTaskProfile.setPendingEntitiesCount(taskProfile.getPendingEntitiesCount()); | ||
detectorTaskProfile.setRunningEntitiesCount(taskProfile.getRunningEntitiesCount()); | ||
detectorTaskProfile.setRunningEntities(taskProfile.getRunningEntities()); | ||
detectorTaskProfile.setTaskType(taskProfile.getTaskType()); | ||
} | ||
if (taskProfile.getEntityTaskProfiles() != null) { | ||
adEntityTaskProfiles.addAll(taskProfile.getEntityTaskProfiles()); | ||
} | ||
} | ||
} | ||
if (adEntityTaskProfiles != null && adEntityTaskProfiles.size() > 0) { | ||
detectorTaskProfile.setEntityTaskProfiles(adEntityTaskProfiles); | ||
} | ||
listener.onResponse(detectorTaskProfile); | ||
}, e -> { | ||
logger.error("Failed to get task profile for task " + configLevelTask.getTaskId(), e); | ||
listener.onFailure(e); | ||
})); | ||
}, listener); | ||
|
||
} | ||
|
||
} |
Oops, something went wrong.