Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโ€™ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anomaly detection using Isolation Forest #41

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ Activate Orion's regression detection tool for performance-scale CPT runs effort

Additionally, users can specify a custom path for the output CSV file using the ```--output``` flag, providing control over the location where the generated CSV will be stored.

Orion now supports anomaly detection for your data. Use the ```--anomaly-detection``` command to start the anomaly detection process.

**_NOTE:_** The ```--hunter-analyze``` and ```--anomaly-detection``` flags are mutually exclusive. They cannot be used together because they represent different algorithms designed for distinct use cases.

### Daemon mode
The core purpose of Daemon mode is to operate Orion as a self-contained server, dedicated to handling incoming requests. By sending a POST request accompanied by a test name of predefined tests, users can trigger change point detection on the provided metadata and metrics. Following the processing, the response is formatted in JSON, providing a structured output for seamless integration and analysis. To trigger daemon mode just use the following commands

Expand All @@ -110,7 +114,7 @@ To interact with the Daemon Service, you can send a POST request using `curl` wi
*Request URL*

```
POST http://127.0.0.1:8000/daemon
POST http://127.0.0.1:8080/daemon/changepoint
```

*Parameters*
Expand All @@ -124,7 +128,7 @@ POST http://127.0.0.1:8000/daemon

Example
```
curl -L -X POST 'http://127.0.0.1:8000/daemon?filter_changepoints=true&version=4.14&test_name=small-scale-node-density-cni'
curl -L -X POST 'http://127.0.0.1:8080/daemon/changepoint?filter_changepoints=true&version=4.14&test_name=small-scale-node-density-cni'
```


Expand Down Expand Up @@ -160,6 +164,7 @@ Below is a sample output structure: the top level of the JSON contains the test
}
```

Similarly, one can use the ```/daemon/anomaly``` endpoint to get anomaly detection on the preset metadata.

**Querying List of Tests Available to the Daemon Service**

Expand Down
71 changes: 66 additions & 5 deletions orion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
This is the cli file for orion, tool to detect regressions using hunter
"""

# pylint: disable = import-error
# pylint: disable = import-error, line-too-long
import logging
import sys
import warnings
Expand All @@ -11,8 +11,53 @@
from pkg.logrus import SingletonLogger
from pkg.runTest import run
from pkg.utils import load_config
from pkg.types import OptionMap
import pkg.constants as cnsts

warnings.filterwarnings("ignore", message="Unverified HTTPS request.*")
warnings.filterwarnings(
"ignore", category=UserWarning, message=".*Connecting to.*verify_certs=False.*"
)


class MutuallyExclusiveOption(click.Option):
vishnuchalla marked this conversation as resolved.
Show resolved Hide resolved
"""Class to implement mutual exclusivity between options in click

Args:
click (Option): _description_
"""

def __init__(self, *args, **kwargs):
self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", []))
help = kwargs.get("help", "") # pylint: disable=redefined-builtin
if self.mutually_exclusive:
ex_str = ", ".join(self.mutually_exclusive)
kwargs["help"] = help + (
" NOTE: This argument is mutually exclusive with "
" arguments: [" + ex_str + "]."
)
super().__init__(*args, **kwargs)

def handle_parse_result(self, ctx, opts, args):
if self.mutually_exclusive.intersection(opts) and self.name in opts:
raise click.UsageError(
f"Illegal usage: `{self.name}` is mutually exclusive with "
f"arguments `{', '.join(self.mutually_exclusive)}`."
)
return super().handle_parse_result(ctx, opts, args)


def validate_anomaly_options(ctx, param, value): # pylint: disable = W0613
""" validate options so that can only be used with certain flags
"""
if value or (
ctx.params.get("anomaly_window") or ctx.params.get("min_anomaly_percent")
):
if not ctx.params.get("anomaly_detection"):
raise click.UsageError(
"`--anomaly-window` and `--min-anomaly-percent` can only be used when `--anomaly-detection` is enabled."
)
return value


@click.group()
Expand All @@ -29,12 +74,27 @@ def cli(max_content_width=120): # pylint: disable=unused-argument
"--output-path", default="output.csv", help="Path to save the output csv file"
)
@click.option("--debug", default=False, is_flag=True, help="log level")
@click.option("--hunter-analyze", is_flag=True, help="run hunter analyze")
@click.option(
"--hunter-analyze",
is_flag=True,
help="run hunter analyze",
cls=MutuallyExclusiveOption,
mutually_exclusive=["anomaly_detection"],
)
@click.option("--anomaly-window", type=int, callback=validate_anomaly_options, help="set window size for moving average for anomaly-detection")
@click.option("--min-anomaly-percent", type=int, callback=validate_anomaly_options, help="set minimum percentage difference from moving average for data point to be detected as anomaly")
@click.option(
"--anomaly-detection",
is_flag=True,
help="run anomaly detection algorithm powered by isolation forest",
cls=MutuallyExclusiveOption,
mutually_exclusive=["hunter_analyze"],
)
@click.option(
"-o",
"--output-format",
type=click.Choice(["json", "text"]),
default="text",
type=click.Choice([cnsts.JSON, cnsts.TEXT]),
default=cnsts.TEXT,
help="Choose output format (json or text)",
)
@click.option("--uuid", default="", help="UUID to use as base for comparisons")
Expand All @@ -49,7 +109,8 @@ def cmd_analysis(**kwargs):
logger_instance = SingletonLogger(debug=level).logger
logger_instance.info("๐Ÿน Starting Orion in command-line mode")
kwargs["configMap"] = load_config(kwargs["config"])
output = run(**kwargs)
OptionMap.set_map(kwargs)
output = run()
if output is None:
logger_instance.error("Terminating test")
sys.exit(0)
Expand Down
39 changes: 39 additions & 0 deletions pkg/algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Module for Generic Algorithm class"""
from abc import ABC, abstractmethod
import pkg.constants as cnsts

class Algorithm(ABC):
"""Generic Algorithm class for algorithm factory
"""

def __init__(self, matcher, dataframe, test):
self.matcher = matcher
self.dataframe = dataframe
self.test = test

@abstractmethod
def output_json(self):
vishnuchalla marked this conversation as resolved.
Show resolved Hide resolved
"""Outputs the data in json format
"""
@abstractmethod
def output_text(self):
"""Outputs the data in text/tabular format
"""

def output(self,output_format):
"""Method to select output method

Args:
output_format (str): format of the output

Raises:
ValueError: In case of unmatched output

Returns:
method: return method to be used
"""
if output_format==cnsts.JSON:
return self.output_json()
if output_format==cnsts.TEXT:
return self.output_text()
raise ValueError("Unsupported output format {output_format} selected")
30 changes: 30 additions & 0 deletions pkg/algorithmFactory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
Algorithm Factory to choose avaiable algorithms
"""
from pkg.edivisive import EDivisive
from pkg.isolationForest import IsolationForestWeightedMean
import pkg.constants as cnsts

class AlgorithmFactory: # pylint: disable= too-few-public-methods
"""Algorithm Factory to choose algorithm
"""
def instantiate_algorithm(self, algorithm, matcher, dataframe, test):
"""Algorithm instantiation method

Args:
algorithm (str): Name of the algorithm
matcher (Matcher): Matcher class
dataframe (pd.Dataframe): dataframe with data
test (dict): test information dictionary

Raises:
ValueError: When invalid algo is chosen

Returns:
Algorithm : Algorithm
"""
if algorithm == cnsts.EDIVISIVE:
return EDivisive(matcher, dataframe, test)
if algorithm == cnsts.ISOLATION_FOREST:
return IsolationForestWeightedMean(matcher, dataframe, test)
raise ValueError("Invalid algorithm called")
7 changes: 7 additions & 0 deletions pkg/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
module for storing constants across orion
"""
EDIVISIVE="EDivisive"
ISOLATION_FOREST="IsolationForest"
JSON="json"
TEXT="text"
60 changes: 54 additions & 6 deletions pkg/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@
import pkg_resources
import yaml
from pkg.logrus import SingletonLogger
from pkg.types import OptionMap
import pkg.constants as cnsts

from . import runTest

app = FastAPI()
logger_instance = SingletonLogger(debug=logging.INFO).logger


@app.post("/daemon")
async def daemon(
version: str = "4.15",
@app.get("/daemon/changepoint")
async def daemon_changepoint(
version: str = "4.17",
uuid: str = "",
baseline: str = "",
filter_changepoints="",
Expand All @@ -35,19 +37,21 @@ async def daemon(
"""
parameters = {"version": version}
config_file_name=test_name+".yml"
argDict = {
option_arguments = {
"config": config_file_name,
"output_path": "output.csv",
"hunter_analyze": True,
"output_format": "json",
"anomaly_detection": False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we already have hunter_analyze to indicate that this is related to change point detection, do we need anamaly_detection as an argument here? Vice versa applies to the endpoint /daemon/anomaly too.

"output_format": cnsts.JSON,
"uuid": uuid,
"baseline": baseline,
"configMap": render_template(config_file_name, parameters),
}
filter_changepoints = (
True if filter_changepoints == "true" else False # pylint: disable = R1719
)
result = runTest.run(**argDict)
OptionMap.set_map(option_arguments)
result = runTest.run()
if result is None:
return {"Error":"No UUID with given metadata"}
if filter_changepoints:
Expand Down Expand Up @@ -82,6 +86,50 @@ async def get_options():
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e

@app.get("/daemon/anomaly")
async def daemon_anomaly( # pylint: disable = R0913
version: str = "4.17",
uuid: str = "",
baseline: str = "",
filter_points="",
test_name="small-scale-cluster-density",
anomaly_window=5,
min_anomaly_percent=10
):
"""starts listening on port 8000 on url /daemon

Args:
file (UploadFile, optional): config file for the test. Defaults to File(...).

Returns:
json: json object of the changepoints and metrics
"""
parameters = {"version": version}
config_file_name=test_name+".yml"
option_arguments = {
"config": config_file_name,
"output_path": "output.csv",
"hunter_analyze": False,
"anomaly_detection": True,
"output_format": cnsts.JSON,
"uuid": uuid,
"baseline": baseline,
"configMap": render_template(config_file_name, parameters),
"anomaly_window": int(anomaly_window),
"min_anomaly_percent":int(min_anomaly_percent)
}
filter_points = (
True if filter_points == "true" else False # pylint: disable = R1719
)
OptionMap.set_map(option_arguments)
result = runTest.run()
if result is None:
return {"Error":"No UUID with given metadata"}
if filter_points:
for key, value in result.items():
result[key] = list(filter(lambda x: x.get("is_changepoint", False), value))
return result


def render_template(test_name, parameters):
"""replace parameters in the config file
Expand Down
78 changes: 78 additions & 0 deletions pkg/edivisive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""EDivisive Algorithm from hunter"""

import json
import pandas as pd
from hunter.report import Report, ReportType
from hunter.series import Metric, Series
from pkg.algorithm import Algorithm


class EDivisive(Algorithm):
"""Implementation of the EDivisive algorithm using hunter

Args:
Algorithm (Algorithm): Inherits
"""
def output_json(self):
_, series = self._analyze()
change_points_by_metric = series.analyze().change_points
dataframe_json = self.dataframe.to_json(orient="records")
dataframe_json = json.loads(dataframe_json)

for index, entry in enumerate(dataframe_json):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just thinking if we can abstract out these structure that give us only a list of changepoints and the full list of json. As they can reused by any model integration. @shashank-boyapally any thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be adding this to in follow-on PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we convert this comment into an RFE issue? So we don't lose track

entry["metrics"] = {
key: {"value": entry.pop(key), "percentage_change": 0}
for key in entry.keys() - {"uuid", "timestamp", "buildUrl"}
}
entry["is_changepoint"] = False

for key, value in change_points_by_metric.items():
for change_point in value:
index = change_point.index
percentage_change = (
(change_point.stats.mean_2 - change_point.stats.mean_1)
/ change_point.stats.mean_1
) * 100
dataframe_json[index]["metrics"][key][
"percentage_change"
] = percentage_change
dataframe_json[index]["is_changepoint"] = True

return self.test["name"], dataframe_json

def output_text(self):
report, _ = self._analyze()
output_table = report.produce_report(
test_name=self.test["name"], report_type=ReportType.LOG
)
return self.test["name"], output_table

def _analyze(self):
self.dataframe["timestamp"] = pd.to_datetime(self.dataframe["timestamp"])
self.dataframe["timestamp"] = self.dataframe["timestamp"].astype(int) // 10**9
metrics = {
column: Metric(1, 1.0)
for column in self.dataframe.columns
if column not in ["uuid", "timestamp", "buildUrl"]
}
data = {
column: self.dataframe[column]
for column in self.dataframe.columns
if column not in ["uuid", "timestamp", "buildUrl"]
}
attributes = {
column: self.dataframe[column]
for column in self.dataframe.columns
if column in ["uuid", "buildUrl"]
}
series = Series(
test_name=self.test["name"],
branch=None,
time=list(self.dataframe["timestamp"]),
metrics=metrics,
data=data,
attributes=attributes,
)
change_points = series.analyze().change_points_by_time
report = Report(series, change_points)
return report, series
Loading
Loading