Skip to content

Commit

Permalink
Upgrade rcf to 4.0
Browse files Browse the repository at this point in the history
This PR upgrades rcf to 4.0 as it has bug fixes and support for streaming imputation mode.

Testing done:
1. gradle build

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo committed Mar 26, 2024
1 parent 8ca1d0d commit ff68a06
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 72 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/backport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:

jobs:
backport:
if: github.event.pull_request.merged == true
runs-on: ubuntu-latest
permissions:
contents: write
Expand All @@ -25,4 +26,5 @@ jobs:
uses: VachaShah/backport@v2.2.0
with:
github_token: ${{ steps.github_app_token.outputs.token }}
branch_name: backport/backport-${{ github.event.number }}
head_template: backport/backport-<%= number %>-to-<%= base %>
failure_labels: backport-failed
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:

# anomaly-detection
- name: Checkout AD
uses: actions/checkout@v4
uses: actions/checkout@v3

- name: Build and Run Tests
run: |
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test_build_multi_platform.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:

- name: Build and Run Tests
run: |
./gradlew build
./gradlew build -x spotlessJava
- name: Publish to Maven Local
run: |
./gradlew publishToMavenLocal
Expand Down Expand Up @@ -85,13 +85,13 @@ jobs:
java-version: ${{ matrix.java }}

- name: Checkout AD
uses: actions/checkout@v4
uses: actions/checkout@v3

- name: Assemble / build / mavenlocal / integTest
run: |
chown -R 1000:1000 `pwd`
su `id -un 1000` -c "./gradlew assemble &&
./gradlew build &&
./gradlew build -x spotlessJava &&
./gradlew publishToMavenLocal &&
./gradlew integTest -PnumNodes=3"
- name: Upload Coverage Report
Expand Down Expand Up @@ -127,7 +127,7 @@ jobs:
./gradlew assemble
- name: Build and Run Tests
run: |
./gradlew build
./gradlew build -x spotlessJava
- name: Publish to Maven Local
run: |
./gradlew publishToMavenLocal
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_bwc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:

# anomaly-detection
- name: Checkout AD
uses: actions/checkout@v4
uses: actions/checkout@v3

- name: Assemble anomaly-detection
run: |
Expand Down
23 changes: 13 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ buildscript {
js_resource_folder = "src/test/resources/job-scheduler"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
bwcVersionShort = "2.10.0"
bwcVersionShort = "2.14.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
Expand Down Expand Up @@ -126,9 +126,9 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.12.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.0.0'

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.16.1"
Expand All @@ -149,6 +149,9 @@ dependencies {
exclude group: 'org.ow2.asm', module: 'asm-tree'
}

// used for output encoding of config descriptions
implementation group: 'org.owasp.encoder' , name: 'encoder', version: '1.2.3'

testImplementation group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.9.0'
testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.3'
Expand Down Expand Up @@ -538,7 +541,7 @@ List<Provider<RegularFile>> plugins = [

// Creates 2 test clusters with 3 nodes of the old version.
2.times {i ->
task "${baseName}#oldVersionClusterTask$i"(type: StandaloneRestIntegTestTask) {
task "${baseName}#oldVersionClusterTask$i"(type: RestIntegTestTask) {
useCluster testClusters."${baseName}$i"
filter {
includeTestsMatching "org.opensearch.ad.bwc.*IT"
Expand All @@ -554,7 +557,7 @@ List<Provider<RegularFile>> plugins = [
// Upgrades one node of the old cluster to new OpenSearch version with upgraded plugin version
// This results in a mixed cluster with 2 nodes on the old version and 1 upgraded node.
// This is also used as a one third upgraded cluster for a rolling upgrade.
task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#mixedClusterTask"(type: RestIntegTestTask) {
useCluster testClusters."${baseName}0"
dependsOn "${baseName}#oldVersionClusterTask0"
doFirst {
Expand All @@ -573,7 +576,7 @@ task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) {
// Upgrades the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded.
// This results in a mixed cluster with 1 node on the old version and 2 upgraded nodes.
// This is used for rolling upgrade.
task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#twoThirdsUpgradedClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#mixedClusterTask"
useCluster testClusters."${baseName}0"
doFirst {
Expand All @@ -592,7 +595,7 @@ task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTas
// Upgrades the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded.
// This results in a fully upgraded cluster.
// This is used for rolling upgrade.
task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#rollingUpgradeClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#twoThirdsUpgradedClusterTask"
useCluster testClusters."${baseName}0"
doFirst {
Expand All @@ -611,7 +614,7 @@ task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask)

// Upgrades all the nodes of the old cluster to new OpenSearch version with upgraded plugin version
// at the same time resulting in a fully upgraded cluster.
task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#fullRestartClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#oldVersionClusterTask1"
useCluster testClusters."${baseName}1"
doFirst {
Expand All @@ -627,7 +630,7 @@ task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
}

// A bwc test suite which runs all the bwc tasks combined.
task bwcTestSuite(type: StandaloneRestIntegTestTask) {
task bwcTestSuite(type: RestIntegTestTask) {
exclude '**/*Test*'
exclude '**/*IT*'
dependsOn tasks.named("${baseName}#mixedClusterTask")
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/ad/ml/CheckpointDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,8 @@ private Optional<ThresholdedRandomCutForest> convertToTRCF(Optional<RandomCutFor
if (kllThreshold.isPresent()) {
scores = kllThreshold.get().extractScores();
}
return Optional.of(new ThresholdedRandomCutForest(rcf.get(), anomalyRate, scores));
// last parameter is lastShingledInput. Since we don't know it, use all 0 double array
return Optional.of(new ThresholdedRandomCutForest(rcf.get(), anomalyRate, scores, new double[rcf.get().getDimensions()]));
}

/**
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,7 @@
import static org.opensearch.ad.constant.ADCommonMessages.NO_ELIGIBLE_NODE_TO_RUN_DETECTOR;
import static org.opensearch.ad.constant.ADCommonName.DETECTION_STATE_INDEX;
import static org.opensearch.ad.indices.ADIndexManagement.ALL_AD_RESULTS_INDEX_PATTERN;
import static org.opensearch.ad.model.ADTask.COORDINATING_NODE_FIELD;
import static org.opensearch.ad.model.ADTask.DETECTOR_ID_FIELD;
import static org.opensearch.ad.model.ADTask.ERROR_FIELD;
import static org.opensearch.ad.model.ADTask.ESTIMATED_MINUTES_LEFT_FIELD;
import static org.opensearch.ad.model.ADTask.EXECUTION_END_TIME_FIELD;
import static org.opensearch.ad.model.ADTask.EXECUTION_START_TIME_FIELD;
import static org.opensearch.ad.model.ADTask.INIT_PROGRESS_FIELD;
import static org.opensearch.ad.model.ADTask.IS_LATEST_FIELD;
import static org.opensearch.ad.model.ADTask.LAST_UPDATE_TIME_FIELD;
import static org.opensearch.ad.model.ADTask.PARENT_TASK_ID_FIELD;
import static org.opensearch.ad.model.ADTask.STATE_FIELD;
import static org.opensearch.ad.model.ADTask.STOPPED_BY_FIELD;
import static org.opensearch.ad.model.ADTask.TASK_PROGRESS_FIELD;
import static org.opensearch.ad.model.ADTask.TASK_TYPE_FIELD;
import static org.opensearch.ad.model.ADTaskType.ALL_HISTORICAL_TASK_TYPES;
import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES;
import static org.opensearch.ad.model.ADTaskType.REALTIME_TASK_TYPES;
Expand All @@ -52,6 +39,19 @@
import static org.opensearch.timeseries.constant.CommonName.TASK_ID_FIELD;
import static org.opensearch.timeseries.model.TaskState.NOT_ENDED_STATES;
import static org.opensearch.timeseries.model.TaskType.taskTypeToString;
import static org.opensearch.timeseries.model.TimeSeriesTask.COORDINATING_NODE_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.ERROR_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.ESTIMATED_MINUTES_LEFT_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.EXECUTION_END_TIME_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.EXECUTION_START_TIME_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.INIT_PROGRESS_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.IS_LATEST_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.LAST_UPDATE_TIME_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.PARENT_TASK_ID_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.STATE_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.STOPPED_BY_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.TASK_PROGRESS_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.TASK_TYPE_FIELD;
import static org.opensearch.timeseries.settings.TimeSeriesSettings.NUM_MIN_SAMPLES;
import static org.opensearch.timeseries.util.ExceptionUtil.getErrorMessage;
import static org.opensearch.timeseries.util.ExceptionUtil.getShardsFailure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testBackwardsCompatibility() throws Exception {
case MIXED:
// TODO: We have no way to specify whether send request to old node or new node now.
// Add more test later when it's possible to specify request node.
Assert.assertTrue(pluginNames.contains("opensearch-anomaly-detection"));
Assert.assertTrue(pluginNames.contains("opensearch-time-series-analytics"));
Assert.assertTrue(pluginNames.contains("opensearch-job-scheduler"));

// Create single entity detector and start realtime job
Expand Down
59 changes: 28 additions & 31 deletions src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1067,27 +1067,22 @@ public void testDeserializeTRCFModel() throws Exception {
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
// This scores were generated with the sample data on RCF4.0. RCF4.0 changed implementation
// and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching
// rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores.
List<Double> scores = new ArrayList<>();
scores.add(4.814651669367903);
scores.add(5.566968073093689);
scores.add(5.919907610660049);
scores.add(5.770278090352401);
scores.add(5.319779117320102);

List<Double> grade = new ArrayList<>();
grade.add(1.0);
grade.add(0.0);
grade.add(0.0);
grade.add(0.0);
grade.add(0.0);
scores.add(5.052069275347555);
scores.add(6.117465704461799);
scores.add(6.6401649744661055);
scores.add(6.918514609476484);
scores.add(6.928318158276434);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9);
}
}

Expand Down Expand Up @@ -1133,21 +1128,22 @@ public void testDeserialize_rcf3_rc3_single_stream_model() throws Exception {
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
// This scores were generated with the sample data on RCF4.0. RCF4.0 changed implementation
// and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching
// rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores.
List<Double> scores = new ArrayList<>();
scores.add(3.3830441158587066);
scores.add(2.825961659490065);
scores.add(2.4685871670647384);
scores.add(2.3123460886413647);
scores.add(2.1401987653477135);
scores.add(3.678754481587072);
scores.add(3.6809634269790252);
scores.add(3.683659822587799);
scores.add(3.6852688612219646);
scores.add(3.6859330728661064);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9);
}
}

Expand Down Expand Up @@ -1190,21 +1186,22 @@ public void testDeserialize_rcf3_rc3_hc_model() throws Exception {
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
// This scores were generated with the sample data but on RCF4.0 that changed implementation
// and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching
// rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores.
List<Double> scores = new ArrayList<>();
scores.add(1.86645896573027);
scores.add(1.8760247712797833);
scores.add(1.6809181763279901);
scores.add(1.7126716645678555);
scores.add(1.323776514074674);
scores.add(2.119532552959117);
scores.add(2.7347456872746325);
scores.add(3.066704948143919);
scores.add(3.2965580521876725);
scores.add(3.1888920146607047);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ public void testAccuracyOneMinuteIntervalNoInterpolation() throws Exception {
clusterService
);

accuracyTemplate(1, 0.6f, 0.6f);
accuracyTemplate(1, 0.5f, 0.5f);
}

private ModelState<EntityModel> createStateForCacheRelease() {
Expand Down
25 changes: 19 additions & 6 deletions src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.ADUnitTestCase;
import org.opensearch.ad.cluster.HashRing;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.mock.model.MockSimpleLog;
Expand All @@ -89,6 +88,7 @@
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.stats.InternalStatNames;
import org.opensearch.ad.transport.ADStatsNodeResponse;
import org.opensearch.ad.transport.ADStatsNodesResponse;
Expand Down Expand Up @@ -120,6 +120,7 @@
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AbstractTimeSeriesTest;
import org.opensearch.timeseries.TestHelpers;
import org.opensearch.timeseries.common.exception.DuplicateTaskException;
import org.opensearch.timeseries.constant.CommonName;
Expand All @@ -139,7 +140,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public class ADTaskManagerTests extends ADUnitTestCase {
public class ADTaskManagerTests extends AbstractTimeSeriesTest {

private Settings settings;
private Client client;
Expand Down Expand Up @@ -1447,10 +1448,22 @@ public void testForwardRequestToLeadNodeWithNotExistingNode() throws IOException
@SuppressWarnings("unchecked")
public void testScaleTaskLaneOnCoordinatingNode() {
ADTask adTask = mock(ADTask.class);
when(adTask.getCoordinatingNode()).thenReturn(node1.getId());
when(nodeFilter.getEligibleDataNodes()).thenReturn(new DiscoveryNode[] { node1, node2 });
ActionListener<JobResponse> listener = mock(ActionListener.class);
adTaskManager.scaleTaskLaneOnCoordinatingNode(adTask, 2, transportService, listener);
try {
// bring up real transport service as mockito cannot mock final method
// and transportService.sendRequest is called. A lot of null pointer
// exception will be thrown if we use mocked transport service.
setUpThreadPool(ADTaskManagerTests.class.getSimpleName());
setupTestNodes(AnomalyDetectorSettings.AD_MAX_ENTITIES_PER_QUERY, AnomalyDetectorSettings.AD_PAGE_SIZE);
when(adTask.getCoordinatingNode()).thenReturn(testNodes[1].getNodeId());
when(nodeFilter.getEligibleDataNodes())
.thenReturn(new DiscoveryNode[] { testNodes[0].discoveryNode(), testNodes[1].discoveryNode() });
ActionListener<JobResponse> listener = mock(ActionListener.class);

adTaskManager.scaleTaskLaneOnCoordinatingNode(adTask, 2, testNodes[1].transportService, listener);
} finally {
tearDownTestNodes();
tearDownThreadPool();
}
}

@SuppressWarnings("unchecked")
Expand Down
Loading

0 comments on commit ff68a06

Please sign in to comment.