Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo committed Mar 18, 2024
1 parent 7192d84 commit 83118a5
Show file tree
Hide file tree
Showing 603 changed files with 36,470 additions and 18,088 deletions.
9 changes: 6 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.11.1'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.0.0'

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.16.1"
Expand All @@ -149,6 +149,9 @@ dependencies {
exclude group: 'org.ow2.asm', module: 'asm-tree'
}

// used for output encoding of config descriptions
implementation group: 'org.owasp.encoder' , name: 'encoder', version: '1.2.3'

testImplementation group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.9.0'
testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.3'
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/org/opensearch/ad/ADEntityProfileRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.ad;

import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.settings.ADNumericSetting;
import org.opensearch.ad.transport.ADEntityProfileAction;
import org.opensearch.client.Client;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.EntityProfileRunner;
import org.opensearch.timeseries.util.SecurityClientUtil;

public class ADEntityProfileRunner extends EntityProfileRunner<ADEntityProfileAction> {

public ADEntityProfileRunner(
Client client,
SecurityClientUtil clientUtil,
NamedXContentRegistry xContentRegistry,
long requiredSamples
) {
super(
client,
clientUtil,
xContentRegistry,
requiredSamples,
AnomalyDetector::parse,
ADNumericSetting.maxCategoricalFields(),
AnalysisType.AD,
ADEntityProfileAction.INSTANCE,
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
AnomalyResult.DETECTOR_ID_FIELD
);
}
}
98 changes: 98 additions & 0 deletions src/main/java/org/opensearch/ad/ADJobProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad;

import java.time.Instant;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.rest.handler.ADIndexJobActionHandler;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.ADProfileAction;
import org.opensearch.ad.transport.AnomalyResultAction;
import org.opensearch.ad.transport.AnomalyResultRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.JobProcessor;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.transport.ResultRequest;

public class ADJobProcessor extends
JobProcessor<ADIndex, ADIndexManagement, ADTaskCacheManager, ADTaskType, ADTask, ADTaskManager, AnomalyResult, ADProfileAction, ExecuteADResultResponseRecorder, ADIndexJobActionHandler> {

private static final Logger log = LogManager.getLogger(ADJobProcessor.class);

private static ADJobProcessor INSTANCE;

public static ADJobProcessor getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (ADJobProcessor.class) {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new ADJobProcessor();
return INSTANCE;
}
}

private ADJobProcessor() {
// Singleton class, use getJobRunnerInstance method instead of constructor
super(AnalysisType.AD, TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME, AnomalyResultAction.INSTANCE);
}

public void registerSettings(Settings settings) {
super.registerSettings(settings, AnomalyDetectorSettings.AD_MAX_RETRY_FOR_END_RUN_EXCEPTION);
}

@Override
protected ResultRequest createResultRequest(String configId, long start, long end) {
return new AnomalyResultRequest(configId, start, end);
}

@Override
protected void validateResultIndexAndRunJob(
Job jobParameter,
LockService lockService,
LockModel lock,
Instant executionStartTime,
Instant executionEndTime,
String configId,
String user,
List<String> roles,
ExecuteADResultResponseRecorder recorder,
Config detector
) {
String resultIndex = jobParameter.getCustomResultIndex();
if (resultIndex == null) {
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector);
return;
}
ActionListener<Boolean> listener = ActionListener.wrap(r -> { log.debug("Custom index is valid"); }, e -> {
Exception exception = new EndRunException(configId, e.getMessage(), false);
handleException(jobParameter, lockService, lock, executionStartTime, executionEndTime, exception, recorder, detector);
});
indexManagement.validateCustomIndexForBackendJob(resultIndex, configId, user, roles, () -> {
listener.onResponse(true);
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector);
}, listener);
}
}
85 changes: 85 additions & 0 deletions src/main/java/org/opensearch/ad/ADTaskProfileRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad;

import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskProfile;
import org.opensearch.ad.transport.ADTaskProfileAction;
import org.opensearch.ad.transport.ADTaskProfileNodeResponse;
import org.opensearch.ad.transport.ADTaskProfileRequest;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.timeseries.TaskProfileRunner;
import org.opensearch.timeseries.cluster.HashRing;
import org.opensearch.timeseries.model.EntityTaskProfile;

public class ADTaskProfileRunner implements TaskProfileRunner<ADTask, ADTaskProfile> {
public final Logger logger = LogManager.getLogger(ADTaskProfileRunner.class);

private final HashRing hashRing;
private final Client client;

public ADTaskProfileRunner(HashRing hashRing, Client client) {
this.hashRing = hashRing;
this.client = client;
}

@Override
public void getTaskProfile(ADTask configLevelTask, ActionListener<ADTaskProfile> listener) {
String detectorId = configLevelTask.getConfigId();

hashRing.getAllEligibleDataNodesWithKnownVersion(dataNodes -> {
ADTaskProfileRequest adTaskProfileRequest = new ADTaskProfileRequest(detectorId, dataNodes);
client.execute(ADTaskProfileAction.INSTANCE, adTaskProfileRequest, ActionListener.wrap(response -> {
if (response.hasFailures()) {
listener.onFailure(response.failures().get(0));
return;
}

List<EntityTaskProfile> adEntityTaskProfiles = new ArrayList<>();
ADTaskProfile detectorTaskProfile = new ADTaskProfile(configLevelTask);
for (ADTaskProfileNodeResponse node : response.getNodes()) {
ADTaskProfile taskProfile = node.getAdTaskProfile();
if (taskProfile != null) {
if (taskProfile.getNodeId() != null) {
// HC detector: task profile from coordinating node
// Single entity detector: task profile from worker node
detectorTaskProfile.setTaskId(taskProfile.getTaskId());
detectorTaskProfile.setRcfTotalUpdates(taskProfile.getRcfTotalUpdates());
detectorTaskProfile.setThresholdModelTrained(taskProfile.getThresholdModelTrained());
detectorTaskProfile.setThresholdModelTrainingDataSize(taskProfile.getThresholdModelTrainingDataSize());
detectorTaskProfile.setModelSizeInBytes(taskProfile.getModelSizeInBytes());
detectorTaskProfile.setNodeId(taskProfile.getNodeId());
detectorTaskProfile.setTotalEntitiesCount(taskProfile.getTotalEntitiesCount());
detectorTaskProfile.setDetectorTaskSlots(taskProfile.getDetectorTaskSlots());
detectorTaskProfile.setPendingEntitiesCount(taskProfile.getPendingEntitiesCount());
detectorTaskProfile.setRunningEntitiesCount(taskProfile.getRunningEntitiesCount());
detectorTaskProfile.setRunningEntities(taskProfile.getRunningEntities());
detectorTaskProfile.setTaskType(taskProfile.getTaskType());
}
if (taskProfile.getEntityTaskProfiles() != null) {
adEntityTaskProfiles.addAll(taskProfile.getEntityTaskProfiles());
}
}
}
if (adEntityTaskProfiles != null && adEntityTaskProfiles.size() > 0) {
detectorTaskProfile.setEntityTaskProfiles(adEntityTaskProfiles);
}
listener.onResponse(detectorTaskProfile);
}, e -> {
logger.error("Failed to get task profile for task " + configLevelTask.getTaskId(), e);
listener.onFailure(e);
}));
}, listener);

}

}
Loading

0 comments on commit 83118a5

Please sign in to comment.