Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/extensions] Fish out Create Detector and remove everything else for extensions. #623

Merged
merged 11 commits into from
Aug 15, 2022
Next Next commit
Initial commit for creating detectors
Signed-off-by: Sarat Vemulapalli <vemulapallisarat@gmail.com>
  • Loading branch information
saratvemulapalli committed Aug 1, 2022
commit 8e236c14e3a669daac5867cab56bced53679befa
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ buildscript {
ext {
opensearch_group = "org.opensearch"
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
opensearch_version = System.getProperty("opensearch.version", "2.1.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "2.1.1-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
// 2.1.0-SNAPSHOT -> 2.1.0.0-SNAPSHOT
version_tokens = opensearch_version.tokenize('-')
93 changes: 80 additions & 13 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
@@ -257,6 +257,7 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
/* @anomaly-detection.create-detector
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler = new AnomalyIndexHandler<AnomalyResult>(
client,
settings,
@@ -278,7 +279,9 @@ public List<RestHandler> getRestHandlers(
jobRunner.setAdTaskManager(adTaskManager);

RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction();
*/
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService);
/* @anomaly-detection.create-detector
RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction();
RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction();
RestSearchADTasksAction searchADTasksAction = new RestSearchADTasksAction();
@@ -291,11 +294,12 @@ public List<RestHandler> getRestHandlers(
RestDeleteAnomalyResultsAction deleteAnomalyResultsAction = new RestDeleteAnomalyResultsAction();
RestSearchTopAnomalyResultAction searchTopAnomalyResultAction = new RestSearchTopAnomalyResultAction();
RestValidateAnomalyDetectorAction validateAnomalyDetectorAction = new RestValidateAnomalyDetectorAction(settings, clusterService);

*/
return ImmutableList
.of(
restGetAnomalyDetectorAction,
restIndexAnomalyDetectorAction,
// restGetAnomalyDetectorAction,
restIndexAnomalyDetectorAction
/* @anomaly-detection.create-detector
searchAnomalyDetectorAction,
searchAnomalyResultAction,
searchADTasksAction,
@@ -308,6 +312,7 @@ public List<RestHandler> getRestHandlers(
deleteAnomalyResultsAction,
searchTopAnomalyResultAction,
validateAnomalyDetectorAction
*/
);
}

@@ -330,15 +335,20 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
/* @anomaly-detection.create-detector
EnabledSetting.getInstance().init(clusterService);
NumericSetting.getInstance().init(clusterService);
this.client = client;
this.threadPool = threadPool;
*/
Settings settings = environment.settings();
/* @anomaly-detection.create-detector
Throttler throttler = new Throttler(getClock());
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
this.indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameExpressionResolver);
this.nodeFilter = new DiscoveryNodeFilterer(clusterService);
*/
// AnomalyDetectionIndices is Injected for IndexAnomalyDetectorTrasnportAction constructor
this.anomalyDetectionIndices = new AnomalyDetectionIndices(
client,
clusterService,
@@ -352,6 +362,7 @@ public Collection<Object> createComponents(
SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
// SearchFeatureDao is Injected for IndexAnomalyDetectorTrasnportAction constructor
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(
client,
xContentRegistry,
@@ -407,9 +418,8 @@ public Collection<Object> createComponents(
threadPool,
AD_THREAD_POOL_NAME
);

long heapSizeBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();

/* @anomaly-detection.create-detector
serializeRCFBufferPool = AccessController.doPrivileged(new PrivilegedAction<GenericObjectPool<LinkedBuffer>>() {
@Override
public GenericObjectPool<LinkedBuffer> run() {
@@ -431,7 +441,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
serializeRCFBufferPool.setMinIdle(0);
serializeRCFBufferPool.setBlockWhenExhausted(false);
serializeRCFBufferPool.setTimeBetweenEvictionRuns(AnomalyDetectorSettings.HOURLY_MAINTENANCE);

*/
CheckpointDao checkpoint = new CheckpointDao(
client,
clientUtil,
@@ -476,7 +486,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
stateManager,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
);

/* @anomaly-detection.create-detector
EntityCache cache = new PriorityCache(
checkpoint,
AnomalyDetectorSettings.DEDICATED_CACHE_SIZE.get(settings),
@@ -493,7 +503,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
);

CacheProvider cacheProvider = new CacheProvider(cache);

*/
EntityColdStarter entityColdStarter = new EntityColdStarter(
getClock(),
threadPool,
@@ -513,7 +523,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
checkpointWriteQueue,
AnomalyDetectorSettings.MAX_COLD_START_ROUNDS
);

/* @anomaly-detection.create-detector
EntityColdStartWorker coldstartQueue = new EntityColdStartWorker(
heapSizeBytes,
AnomalyDetectorSettings.ENTITY_REQUEST_SIZE_IN_BYTES,
@@ -533,6 +543,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager
);
*/

ModelManager modelManager = new ModelManager(
checkpoint,
@@ -549,7 +560,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
featureManager,
memoryTracker
);

/* @anomaly-detection.create-detector
MultiEntityResultHandler multiEntityResultHandler = new MultiEntityResultHandler(
client,
settings,
@@ -625,10 +636,10 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager
);

*/
ADDataMigrator dataMigrator = new ADDataMigrator(client, clusterService, xContentRegistry, anomalyDetectionIndices);
HashRing hashRing = new HashRing(nodeFilter, getClock(), settings, client, clusterService, dataMigrator, modelManager);

/* @anomaly-detection.create-detector
anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager, AnomalyDetectorSettings.MAX_PREVIEW_RESULTS);

Map<String, ADStat<?>> stats = ImmutableMap
@@ -674,6 +685,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
adStats = new ADStats(stats);

adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
*/
adTaskManager = new ADTaskManager(
settings,
clusterService,
@@ -685,6 +697,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
adTaskCacheManager,
threadPool
);
/* @anomaly-detection.create-detector
AnomalyResultBulkIndexHandler anomalyResultBulkIndexHandler = new AnomalyResultBulkIndexHandler(
client,
settings,
@@ -747,6 +760,8 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
entityColdStarter,
adTaskCacheManager
);
*/
return ImmutableList.of(searchFeatureDao, anomalyDetectionIndices, adTaskManager);
}

/**
@@ -760,6 +775,7 @@ protected Clock getClock() {

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
/* @anomaly-detection.create-detector
return ImmutableList
.of(
new ScalingExecutorBuilder(
@@ -779,10 +795,13 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
AD_THREAD_POOL_PREFIX + AD_BATCH_TASK_THREAD_POOL_NAME
)
);
*/
return ImmutableList.of();
}

@Override
public List<Setting<?>> getSettings() {
/* @anomaly-detection.create-detector
List<Setting<?>> enabledSetting = EnabledSetting.getInstance().getSettings();
List<Setting<?>> numericSetting = NumericSetting.getInstance().getSettings();

@@ -878,17 +897,61 @@ public List<Setting<?>> getSettings() {
.orElseGet(Stream::empty)
.collect(Collectors.toList())
);
*/
/*
// MAX_ENTITIES_FOR_PREVIEW, PAGE_SIZE is needed for SearchFeatureDao
// AD_RESULT_HISTORY_ROLLOVER_PERIOD, AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD, AD_RESULT_HISTORY_RETENTION_PERIOD, MAX_PRIMARY_SHARDS is needed for AnomalyDetectionIndices
// MODEL_MAX_SIZE_PERCENTAGE is needed for MemoryTracker
// MAX_RETRY_FOR_UNRESPONSIVE_NODE, BACKOFF_MINUTES is needed for NodeStateManager
// AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_MAX_HEAP_PERCENT is needed for RateLimitedRequestWorker
// CHECKPOINT_WRITE_QUEUE_CONCURRENCY is needed for ConcurrentWorker
// CHECKPOINT_WRITE_QUEUE_BATCH_SIZE is needed for BatchWorker
// COOLDOWN_MINUTES is needed for HashRing
// MAX_OLD_AD_TASK_DOCS_PER_DETECTOR, BATCH_TASK_PIECE_INTERVAL_SECONDS,
// DELETE_AD_RESULT_WHEN_DELETE_DETECTOR, MAX_BATCH_TASK_PER_NODE, MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS, REQUEST_TIMEOUT is needed for ADTaskManager
// FILTER_BY_BACKEND_ROLES is needed by IndexAnomalyDetectorTransportAction
// DETECTION_INTERVAL, DETECTION_WINDOW_DELAY, MAX_SINGLE_ENTITY_ANOMALY_DETECTORS, MAX_MULTI_ENTITY_ANOMALY_DETECTORS, MAX_ANOMALY_FEATURES is needed for AbstractAnomalyDetectorAction
*/
// TODO: evaluate if these settings are needed for create detector
return ImmutableList.of(AnomalyDetectorSettings.MAX_ENTITIES_FOR_PREVIEW,
AnomalyDetectorSettings.PAGE_SIZE,
AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD,
AnomalyDetectorSettings.AD_RESULT_HISTORY_ROLLOVER_PERIOD,
AnomalyDetectorSettings.AD_RESULT_HISTORY_RETENTION_PERIOD,
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.MODEL_MAX_SIZE_PERCENTAGE,
AnomalyDetectorSettings.MAX_RETRY_FOR_UNRESPONSIVE_NODE,
AnomalyDetectorSettings.BACKOFF_MINUTES,
AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_MAX_HEAP_PERCENT,
AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_CONCURRENCY,
AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_BATCH_SIZE,
AnomalyDetectorSettings.COOLDOWN_MINUTES,
AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR,
AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS,
AnomalyDetectorSettings.DELETE_AD_RESULT_WHEN_DELETE_DETECTOR,
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE,
AnomalyDetectorSettings.MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS,
AnomalyDetectorSettings.REQUEST_TIMEOUT,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.DETECTION_INTERVAL,
AnomalyDetectorSettings.DETECTION_WINDOW_DELAY,
AnomalyDetectorSettings.MAX_SINGLE_ENTITY_ANOMALY_DETECTORS,
AnomalyDetectorSettings.MAX_MULTI_ENTITY_ANOMALY_DETECTORS,
AnomalyDetectorSettings.MAX_ANOMALY_FEATURES);
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
/* @anomaly-detection.create-detector
return ImmutableList
.of(
AnomalyDetector.XCONTENT_REGISTRY,
AnomalyResult.XCONTENT_REGISTRY,
DetectorInternalState.XCONTENT_REGISTRY,
AnomalyDetectorJob.XCONTENT_REGISTRY
);
*/
return ImmutableList.of();
}

/*
@@ -898,6 +961,7 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays
.asList(
/* @anomaly-detection.create-detector
new ActionHandler<>(DeleteModelAction.INSTANCE, DeleteModelTransportAction.class),
new ActionHandler<>(StopDetectorAction.INSTANCE, StopDetectorTransportAction.class),
new ActionHandler<>(RCFResultAction.INSTANCE, RCFResultTransportAction.class),
@@ -913,7 +977,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class),
new ActionHandler<>(IndexAnomalyDetectorAction.INSTANCE, IndexAnomalyDetectorTransportAction.class),
*/
new ActionHandler<>(IndexAnomalyDetectorAction.INSTANCE, IndexAnomalyDetectorTransportAction.class)
/* @anomaly-detection.create-detector
new ActionHandler<>(AnomalyDetectorJobAction.INSTANCE, AnomalyDetectorJobTransportAction.class),
new ActionHandler<>(ADResultBulkAction.INSTANCE, ADResultBulkTransportAction.class),
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
@@ -928,6 +994,7 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(DeleteAnomalyResultsAction.INSTANCE, DeleteAnomalyResultsTransportAction.class),
new ActionHandler<>(SearchTopAnomalyResultAction.INSTANCE, SearchTopAnomalyResultTransportAction.class),
new ActionHandler<>(ValidateAnomalyDetectorAction.INSTANCE, ValidateAnomalyDetectorTransportAction.class)
*/
);
}