diff --git a/README.md b/README.md index c6ee3e7..9f96102 100644 --- a/README.md +++ b/README.md @@ -123,11 +123,16 @@ Additionally, users can specify a custom path for the output CSV file using the Orion now supports anomaly detection for your data. Use the ```--anomaly-detection``` command to start the anomaly detection process. + +To be able to find significant percent differences in workload runs, use the ```--cmr``` command. This will compare the most recent run with any previous matching runs or baseline UUIDs. If more than 1 other run is found from the most recent, the values will be meaned together and then compared with the previous run. Use with *direction: 0* (set in the config) when using ```-o json``` format to see percent differences + +![cmr percent difference](percentdiff.jpg) + You can now constrain your look-back period using the ```--lookback``` option. The format for look-back is ```XdYh```, where X represents the number of days and Y represents the number of hours. You can open the match requirement by using the ```--node-count``` option to find any matching uuid based on the metadata and not have to have the same jobConfig.jobIterations. This variable is a ```True``` or ```False```, defaulted to False. -**_NOTE:_** The ```--hunter-analyze``` and ```--anomaly-detection``` flags are mutually exclusive. They cannot be used together because they represent different algorithms designed for distinct use cases. +**_NOTE:_** The ```cmr```, ```--hunter-analyze``` and ```--anomaly-detection``` flags are mutually exclusive. They cannot be used together because they represent different algorithms designed for distinct use cases. ### Daemon mode The core purpose of Daemon mode is to operate Orion as a self-contained server, dedicated to handling incoming requests. By sending a POST request accompanied by a test name of predefined tests, users can trigger change point detection on the provided metadata and metrics. Following the processing, the response is formatted in JSON, providing a structured output for seamless integration and analysis. To trigger daemon mode just use the following commands diff --git a/orion.py b/orion.py index 9626fae..0278cb5 100644 --- a/orion.py +++ b/orion.py @@ -69,6 +69,14 @@ def cli(max_content_width=120): # pylint: disable=unused-argument # pylint: disable=too-many-locals @cli.command(name="cmd") +@click.option( + "--cmr", + is_flag=True, + help="Generate percent difference in comparison", + cls=MutuallyExclusiveOption, + mutually_exclusive=["anomaly_detection","hunter_analyze"], +) +@click.option("--filter", is_flag=True, help="Generate percent difference in comparison") @click.option("--config", default="config.yaml", help="Path to the configuration file") @click.option( "--save-data-path", default="data.csv", help="Path to save the output file" @@ -79,7 +87,7 @@ def cli(max_content_width=120): # pylint: disable=unused-argument is_flag=True, help="run hunter analyze", cls=MutuallyExclusiveOption, - mutually_exclusive=["anomaly_detection"], + mutually_exclusive=["anomaly_detection","cmr"], ) @click.option("--anomaly-window", type=int, callback=validate_anomaly_options, help="set window size for moving average for anomaly-detection") @click.option("--min-anomaly-percent", type=int, callback=validate_anomaly_options, help="set minimum percentage difference from moving average for data point to be detected as anomaly") @@ -88,7 +96,7 @@ def cli(max_content_width=120): # pylint: disable=unused-argument is_flag=True, help="run anomaly detection algorithm powered by isolation forest", cls=MutuallyExclusiveOption, - mutually_exclusive=["hunter_analyze"], + mutually_exclusive=["hunter_analyze","cmr"], ) @click.option( "-o", diff --git a/percentdiff.jpg b/percentdiff.jpg new file mode 100644 index 0000000..c66352b Binary files /dev/null and b/percentdiff.jpg differ diff --git a/pkg/algorithms/algorithmFactory.py b/pkg/algorithms/algorithmFactory.py index a4d47f8..720a35d 100644 --- a/pkg/algorithms/algorithmFactory.py +++ b/pkg/algorithms/algorithmFactory.py @@ -6,6 +6,7 @@ import pkg.constants as cnsts from .edivisive import EDivisive from .isolationforest import IsolationForestWeightedMean +from .cmr import CMR class AlgorithmFactory: # pylint: disable= too-few-public-methods, too-many-arguments, line-too-long @@ -30,4 +31,6 @@ def instantiate_algorithm(self, algorithm: str, matcher: Matcher, dataframe:pd.D return EDivisive(matcher, dataframe, test, options, metrics_config) if algorithm == cnsts.ISOLATION_FOREST: return IsolationForestWeightedMean(matcher, dataframe, test, options, metrics_config) + if algorithm == cnsts.CMR: + return CMR(matcher, dataframe, test, options, metrics_config) raise ValueError("Invalid algorithm called") diff --git a/pkg/algorithms/cmr/__init__.py b/pkg/algorithms/cmr/__init__.py new file mode 100644 index 0000000..f93d771 --- /dev/null +++ b/pkg/algorithms/cmr/__init__.py @@ -0,0 +1,4 @@ +""" +Init for CMR Algorithm +""" +from .cmr import CMR diff --git a/pkg/algorithms/cmr/cmr.py b/pkg/algorithms/cmr/cmr.py new file mode 100644 index 0000000..7959af6 --- /dev/null +++ b/pkg/algorithms/cmr/cmr.py @@ -0,0 +1,127 @@ +"""CMR - Comparing Mean Responses Algorithm""" + +# pylint: disable = line-too-long +from typing import List +import pandas as pd +import numpy + +from fmatch.logrus import SingletonLogger +from hunter.series import ChangePoint, ComparativeStats +from pkg.algorithms.algorithm import Algorithm + + +class CMR(Algorithm): + """Implementation of the CMR algorithm + Will Combine metrics into 2 lines and compare with a tolerancy to set pass fail + + Args: + Algorithm (Algorithm): Inherits + """ + + + def _analyze(self): + """Analyze the dataframe with meaning any previous data and generate percent change with a current uuid + + Returns: + series: data series that contains attributes and full dataframe + change_points_by_metric: list of ChangePoints + """ + logger_instance = SingletonLogger.getLogger("Orion") + logger_instance.info("Starting analysis using CMR") + self.dataframe["timestamp"] = pd.to_datetime(self.dataframe["timestamp"]) + self.dataframe["timestamp"] = self.dataframe["timestamp"].astype(int) // 10**9 + + if len(self.dataframe.index) == 1: + series= self.setup_series() + series.data = self.dataframe + return series, {} + # if larger than 2 rows, need to get the mean of 0 through -2 + self.dataframe = self.combine_and_average_runs( self.dataframe) + + series= self.setup_series() + tolerancy = 20 + + df, change_points_by_metric = self.run_cmr(tolerancy, self.dataframe) + series.data= df + return series, change_points_by_metric + + + def run_cmr(self, tolerancy: int, dataframe_list: pd.DataFrame): + """ + Generate the percent difference in a 2 row dataframe + + Args: + tolerancy (int): tolerancy to compare on + metric_columns (List[str]): string list of metric column names + dataframe_list (pd.DataFrame): data frame of all data to compare on + + Returns: + pd.Dataframe, dict[metric_name, ChangePoint]: Returned data frame and change points + """ + metric_columns = self.metrics_config.keys() + change_points_by_metric={ k:[] for k in metric_columns } + max_date_time = pd.Timestamp.max.to_pydatetime() + max_time = max_date_time.timestamp() + # difference = ["difference", max_time] + # pass_fail_list = ["Pass/Fail", max_time] + for column in metric_columns: + pct_change_result = dataframe_list[column].pct_change() + single_pct_diff = round(pct_change_result.iloc[[-1]].values[0] * 100) + pass_fail = "Pass" + if single_pct_diff > tolerancy: + pass_fail = "Fail" + + change_point = ChangePoint(metric=column, + index=1, + time=max_time, + stats=ComparativeStats( + mean_1=dataframe_list[column][0], + mean_2=dataframe_list[column][1], + std_1=0, + std_2=0, + pvalue=1 + )) + change_points_by_metric[column].append(change_point) + # difference.append(single_pct_diff) + # pass_fail_list.append(pass_fail) + # difference.append("none") + # pass_fail_list.append("none") + # dataframe_list.loc[len(dataframe_list.index)] = difference + #dataframe_list.loc[len(dataframe_list.index)] = pass_fail_list + # logger_instance.info("final data frame " + str(dataframe_list)) + + # based on change point generate pass/fail + return dataframe_list, change_points_by_metric + + def combine_and_average_runs(self, dataFrame: pd.DataFrame): + """ + If more than 1 previous run, mean data together into 1 single row + Combine with current run into 1 data frame (current run being -1 index) + + Args: + dataFrame (pd.DataFrame): data to combine into 2 rows + + Returns: + pd.Dataframe: data frame of most recent run and averaged previous runs + """ + i = 0 + + last_row = dataFrame.tail(1) + dF = dataFrame[:-1] + data2 = {} + + metric_columns = list(dataFrame.columns) + for column in metric_columns: + + if isinstance(dF.loc[0, column], (numpy.float64, numpy.int64)): + mean = dF[column].mean() + data2[column] = [mean] + else: + column_list = dF[column].tolist() + non_numeric_joined_list = ','.join(column_list) + data2[column] = [non_numeric_joined_list] + i += 1 + df2 = pd.DataFrame(data2) + + result = pd.concat([df2, last_row], ignore_index=True) + return result diff --git a/pkg/constants.py b/pkg/constants.py index 87813a3..ea9a770 100644 --- a/pkg/constants.py +++ b/pkg/constants.py @@ -6,3 +6,4 @@ JSON="json" TEXT="text" JUNIT="junit" +CMR="cmr" diff --git a/pkg/runTest.py b/pkg/runTest.py index d4cb7e4..2f76f86 100644 --- a/pkg/runTest.py +++ b/pkg/runTest.py @@ -50,6 +50,8 @@ def run(**kwargs: dict[str, Any]) -> dict[str, Any]: algorithm_name = cnsts.EDIVISIVE elif kwargs["anomaly_detection"]: algorithm_name = cnsts.ISOLATION_FOREST + elif kwargs['cmr']: + algorithm_name = cnsts.CMR else: return None diff --git a/pkg/utils.py b/pkg/utils.py index 202ea22..e4a4f68 100644 --- a/pkg/utils.py +++ b/pkg/utils.py @@ -258,16 +258,33 @@ def process_test( shortener = pyshorteners.Shortener(timeout=10) merged_df["buildUrl"] = merged_df["uuid"].apply( lambda uuid: ( - shortener.tinyurl.short(buildUrls[uuid]) + shorten_url(shortener, buildUrls[uuid]) if options["convert_tinyurl"] else buildUrls[uuid] - ) # pylint: disable = cell-var-from-loop + ) + + # pylint: disable = cell-var-from-loop ) #save the dataframe output_file_path = f"{options['save_data_path'].split('.')[0]}-{test['name']}.csv" match.save_results(merged_df, csv_file_path=output_file_path) return merged_df, metrics_config +def shorten_url(shortener: any, uuids: str) -> str: + """Shorten url if there is a list of buildUrls + + Args: + shortener (any): shortener object to use tinyrl.short on + uuids (List[str]): List of uuids to shorten + + Returns: + str: a combined string of shortened urls + """ + short_url_list = [] + for buildUrl in uuids.split(","): + short_url_list.append(shortener.tinyurl.short(buildUrl)) + short_url = ','.join(short_url_list) + return short_url def get_metadata_with_uuid(uuid: str, match: Matcher) -> Dict[Any, Any]: """Gets metadata of the run from each test diff --git a/utils/orion_funcs.py b/utils/orion_funcs.py deleted file mode 100644 index 4fcc9de..0000000 --- a/utils/orion_funcs.py +++ /dev/null @@ -1,239 +0,0 @@ -# pylint: disable=cyclic-import -""" -module for all utility functions orion uses -""" -# pylint: disable = import-error - -import logging -import sys - -import yaml -import pandas as pd - -from hunter.report import Report, ReportType -from hunter.series import Metric, Series - - -def run_hunter_analyze(merged_df,test): - """Start hunter analyze function - - Args: - merged_df (Dataframe): merged dataframe of all the metrics - test (dict): test dictionary with the each test information - """ - merged_df["timestamp"] = pd.to_datetime(merged_df["timestamp"]) - merged_df["timestamp"] = merged_df["timestamp"].astype(int) // 10**9 - metrics = {column: Metric(1, 1.0) - for column in merged_df.columns - if column not in ["uuid","timestamp","buildUrl"]} - data = {column: merged_df[column] - for column in merged_df.columns - if column not in ["uuid","timestamp","buildUrl"]} - attributes={column: merged_df[column] - for column in merged_df.columns if column in ["uuid","buildUrl"]} - series=Series( - test_name=test["name"], - branch=None, - time=list(merged_df["timestamp"]), - metrics=metrics, - data=data, - attributes=attributes - ) - change_points=series.analyze().change_points_by_time - report=Report(series,change_points) - output = report.produce_report(test_name="test",report_type=ReportType.LOG) - print(output) - return change_points - -def run_cmr(tolerancy, dataframe_list,logger): - for i, df in enumerate(dataframe_list): - column_name = df.columns.values.tolist()[-1] - pct_change_result = df[column_name].pct_change().iloc[[-1]].values[0] * 100 - pass_fail = "Pass" - if pct_change_result > tolerancy: - pass_fail = "Fail" - dataframe_list[i].loc[len(dataframe_list[i].index)] = ["difference", pct_change_result] - dataframe_list[i].loc[len(dataframe_list[i].index)] = ["Pass/Fail", pass_fail] - - logger.info('return_list '+str(dataframe_list)) - return dataframe_list - - -# pylint: disable=too-many-locals -def get_metric_data(ids, index, metrics, match, logger): - """Gets details metrics basked on metric yaml list - - Args: - ids (list): list of all uuids - index (dict): index in es of where to find data - metrics (dict): metrics to gather data on - match (Matcher): current matcher instance - logger (logger): log data to one output - - Returns: - dataframe_list: dataframe of the all metrics - """ - dataframe_list = [] - for metric in metrics: - metric_name = metric['name'] - logger.info("Collecting %s", metric_name) - metric_of_interest = metric['metric_of_interest'] - - if "agg" in metric.keys(): - try: - cpu = match.get_agg_metric_query( - ids, index, metric - ) - agg_value = metric['agg']['value'] - agg_type = metric['agg']['agg_type'] - agg_name = agg_value + "_" + agg_type - cpu_df = match.convert_to_df(cpu, columns=["uuid","timestamp", agg_name]) - cpu_df = cpu_df.rename( - columns={agg_name: metric_name+ "_" + agg_name} - ) - dataframe_list.append(cpu_df) - logger.debug(cpu_df) - - except Exception as e: # pylint: disable=broad-exception-caught - logger.error( - "Couldn't get agg metrics %s, exception %s", - metric_name, - e, - ) - else: - try: - podl = match.getResults("", ids, index, metric) - podl_df = match.convert_to_df( - podl, columns=["uuid", "timestamp", metric_of_interest] - ) - dataframe_list.append(podl_df) - logger.debug(podl_df) - except Exception as e: # pylint: disable=broad-exception-caught - logger.error( - "Couldn't get metrics %s, exception %s", - metric_name, - e, - ) - return dataframe_list - - -def get_metadata(test,logger): - """Gets metadata of the run from each test - - Args: - test (dict): test dictionary - - Returns: - dict: dictionary of the metadata - """ - metadata=test['metadata'] - metadata["ocpVersion"] = str(metadata["ocpVersion"]) - logger.debug('metadata' + str(metadata)) - return metadata - - -def get_build_urls(index, uuids,match): - """Gets metadata of the run from each test - to get the build url - - Args: - uuids (list): str list of uuid to find build urls of - match: the fmatch instance - - - Returns: - dict: dictionary of the metadata - """ - - test = match.getResults("",uuids,index,{}) - buildUrls = {run["uuid"]: run["buildUrl"] for run in test} - return buildUrls - - -def filter_metadata(uuid,match,logger): - """Gets metadata of the run from each test - - Args: - uuid (str): str of uuid ot find metadata of - match: the fmatch instance - - - Returns: - dict: dictionary of the metadata - """ - - test = match.get_metadata_by_uuid(uuid) - metadata = { - 'platform': '', - 'clusterType': '', - 'masterNodesCount': 0, - 'workerNodesCount': 0, - 'infraNodesCount': 0, - 'masterNodesType': '', - 'workerNodesType': '', - 'infraNodesType': '', - 'totalNodesCount': 0, - 'ocpVersion': '', - 'networkType': '', - 'ipsec': '', - 'fips': '', - 'encrypted': '', - 'publish': '', - 'computeArch': '', - 'controlPlaneArch': '' - } - for k,v in test.items(): - if k not in metadata: - continue - metadata[k] = v - metadata['benchmark.keyword'] = test['benchmark'] - metadata["ocpVersion"] = str(metadata["ocpVersion"]) - - #Remove any keys that have blank values - no_blank_meta = {k: v for k, v in metadata.items() if v} - logger.debug('No blank metadata dict: ' + str(no_blank_meta)) - return no_blank_meta - - - -def set_logging(level, logger): - """sets log level and format - - Args: - level (_type_): level of the log - logger (_type_): logger object - - Returns: - logging.Logger: a formatted and level set logger - """ - logger.setLevel(level) - handler = logging.StreamHandler(sys.stdout) - handler.setLevel(level) - formatter = logging.Formatter( - "%(asctime)s [%(name)s:%(filename)s:%(lineno)d] %(levelname)s: %(message)s" - ) - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger - -def load_config(config,logger): - """Loads config file - - Args: - config (str): path to config file - logger (Logger): logger - - Returns: - dict: dictionary of the config file - """ - try: - with open(config, "r", encoding="utf-8") as file: - data = yaml.safe_load(file) - logger.debug("The %s file has successfully loaded", config) - except FileNotFoundError as e: - logger.error("Config file not found: %s", e) - sys.exit(1) - except Exception as e: # pylint: disable=broad-exception-caught - logger.error("An error occurred: %s", e) - sys.exit(1) - return data