From ebc2301f6d20fb02f67eea969179ab80cd918e6e Mon Sep 17 00:00:00 2001 From: vishnuchalla Date: Tue, 4 Jun 2024 21:37:27 -0400 Subject: [PATCH] Adding multiprocessing to scan results Signed-off-by: vishnuchalla --- backend/app/api/v1/endpoints/cpt/cptJobs.py | 30 ++++++++----- backend/app/api/v1/endpoints/cpt/maps/hce.py | 2 + backend/app/api/v1/endpoints/cpt/maps/ocp.py | 2 + backend/app/api/v1/endpoints/cpt/maps/quay.py | 2 + .../app/api/v1/endpoints/cpt/maps/telco.py | 2 + backend/app/services/splunk.py | 44 +++++++++---------- .../HomeLayout/DisplayTableDataLayout.js | 2 - .../templates/HomeLayout/SidebarLayout.js | 1 - 8 files changed, 49 insertions(+), 36 deletions(-) diff --git a/backend/app/api/v1/endpoints/cpt/cptJobs.py b/backend/app/api/v1/endpoints/cpt/cptJobs.py index 6c4f3766..fd8c424f 100644 --- a/backend/app/api/v1/endpoints/cpt/cptJobs.py +++ b/backend/app/api/v1/endpoints/cpt/cptJobs.py @@ -1,4 +1,6 @@ import json +import asyncio +import multiprocessing from fastapi import Response import pandas as pd from datetime import datetime, timedelta, date @@ -42,20 +44,15 @@ async def jobs(start_date: date = Query(None, description="Start date for search if start_date > end_date: return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422) - results = pd.DataFrame() - for product in products: - try: - df = await products[product](start_date, end_date) - results = pd.concat([results, df.loc[:, ["ciSystem", "uuid", "releaseStream", "jobStatus", "buildUrl", "startDate", "endDate", "product", "version", "testName"]]]) - except ConnectionError: - print("Connection Error in mapper for product " + product) - except: - print("Date range returned no values or Unknown error in mapper for product " + product) + results_df = pd.DataFrame() + with multiprocessing.Pool() as pool: + results = [pool.apply(fetch_product, args=(product, start_date, end_date)) for product in products] + results_df = pd.concat(results) response = { 'startDate': start_date.__str__(), 'endDate': end_date.__str__(), - 'results': results.to_dict('records') + 'results': results_df.to_dict('records') } if pretty: @@ -64,3 +61,16 @@ async def jobs(start_date: date = Query(None, description="Start date for search jsonstring = json.dumps(response) return jsonstring + +async def fetch_product_async(product, start_date, end_date): + try: + df = await products[product](start_date, end_date) + return df.loc[:, ["ciSystem", "uuid", "releaseStream", "jobStatus", "buildUrl", "startDate", "endDate", "product", "version", "testName"]] if len(df) != 0 else df + except ConnectionError: + print("Connection Error in mapper for product " + product) + except Exception as e: + print(f"Error in mapper for product {product}: {e}") + return pd.DataFrame() + +def fetch_product(product, start_date, end_date): + return asyncio.run(fetch_product_async(product, start_date, end_date)) \ No newline at end of file diff --git a/backend/app/api/v1/endpoints/cpt/maps/hce.py b/backend/app/api/v1/endpoints/cpt/maps/hce.py index f0f48712..c4d6fa17 100644 --- a/backend/app/api/v1/endpoints/cpt/maps/hce.py +++ b/backend/app/api/v1/endpoints/cpt/maps/hce.py @@ -17,6 +17,8 @@ ################################################################ async def hceMapper(start_datetime: date, end_datetime: date): df = await getData(start_datetime, end_datetime, f'hce.elasticsearch') + if len(df) == 0: + return df df["releaseStream"] = "Nightly" df["ciSystem"] = "Jenkins" df["testName"] = df["product"] + ":" + df["test"] diff --git a/backend/app/api/v1/endpoints/cpt/maps/ocp.py b/backend/app/api/v1/endpoints/cpt/maps/ocp.py index 63a6961e..69a3649d 100644 --- a/backend/app/api/v1/endpoints/cpt/maps/ocp.py +++ b/backend/app/api/v1/endpoints/cpt/maps/ocp.py @@ -8,6 +8,8 @@ ################################################################ async def ocpMapper(start_datetime: date, end_datetime: date): df = await getData(start_datetime, end_datetime, f'ocp.elasticsearch') + if len(df) == 0: + return df df.insert(len(df.columns), "product", "ocp") df["releaseStream"] = df.apply(getReleaseStream, axis=1) df["version"] = df["shortVersion"] diff --git a/backend/app/api/v1/endpoints/cpt/maps/quay.py b/backend/app/api/v1/endpoints/cpt/maps/quay.py index 2bd5fab6..9eea25b1 100644 --- a/backend/app/api/v1/endpoints/cpt/maps/quay.py +++ b/backend/app/api/v1/endpoints/cpt/maps/quay.py @@ -7,6 +7,8 @@ ##################################################################### async def quayMapper(start_datetime: date, end_datetime: date): df = await getData(start_datetime, end_datetime, f'quay.elasticsearch') + if len(df) == 0: + return df df.insert(len(df.columns), "product", "quay") df["version"] = df["releaseStream"] df["testName"] = df["benchmark"] diff --git a/backend/app/api/v1/endpoints/cpt/maps/telco.py b/backend/app/api/v1/endpoints/cpt/maps/telco.py index c924f0f8..51bb2d41 100644 --- a/backend/app/api/v1/endpoints/cpt/maps/telco.py +++ b/backend/app/api/v1/endpoints/cpt/maps/telco.py @@ -8,6 +8,8 @@ ##################################################################### async def telcoMapper(start_datetime: date, end_datetime: date): df = await getData(start_datetime, end_datetime, f'telco.splunk') + if len(df) == 0: + return df df.insert(len(df.columns), "product", "telco") df["releaseStream"] = df.apply(getReleaseStream, axis=1) df["version"] = df["shortVersion"] diff --git a/backend/app/services/splunk.py b/backend/app/services/splunk.py index 2cadb461..0d25c25b 100644 --- a/backend/app/services/splunk.py +++ b/backend/app/services/splunk.py @@ -1,5 +1,7 @@ -import json +import orjson from app import config +from multiprocessing import Pool, cpu_count +from concurrent.futures import ProcessPoolExecutor, as_completed from splunklib import client, results @@ -31,7 +33,7 @@ def __init__(self,configpath="", index=""): except Exception as e: print(f"Error connecting to splunk: {e}") return None - + async def query(self, query, searchList='', max_results=10000): """ Query data from splunk server using splunk lib sdk @@ -41,41 +43,37 @@ async def query(self, query, searchList='', max_results=10000): OPTIONAL: searchList (string): additional query parameters for index """ query["count"] = max_results + # If additional search parameters are provided, include those in searchindex - if(searchList != ''): - # Run a one-shot search and display the results using the results reader: - try: - searchindex = "search index={} {}".format(self.indice, searchList) - oneshotsearch_results = self.service.jobs.oneshot(searchindex, **query) - except Exception as e: - print('Error querying splunk: {}'.format(e)) - return None - else: - try: - searchindex = "search index={}".format(self.indice) - oneshotsearch_results = self.service.jobs.oneshot(searchindex, **query) - except Exception as e: - print('Error querying splunk: {}'.format(e)) - return None - + searchindex = "search index={} {}".format(self.indice, searchList) if searchList else "search index={}".format(self.indice) + try: + oneshotsearch_results = self.service.jobs.oneshot(searchindex, **query) + except Exception as e: + print('Error querying splunk: {}'.format(e)) + return None + # Get the results and display them using the JSONResultsReader - records_reader = results.JSONResultsReader(oneshotsearch_results) res_array = [] - for record in records_reader: + async for record in self._stream_results(oneshotsearch_results): try: res_array.append({ - 'data': json.loads(record['_raw']), + 'data': orjson.loads(record['_raw']), 'host': record['host'], 'source': record['source'], 'sourcetype': record['sourcetype'], 'bucket': record['_bkt'], 'serial': record['_serial'], 'timestamp': record['_indextime'] - }) + }) except Exception as e: - print('Error on including Splunk record query in results array: {}'.format(e)) + print(f'Error on including Splunk record query in results array: {e}') + return res_array + async def _stream_results(self, oneshotsearch_results): + for record in results.JSONResultsReader(oneshotsearch_results): + yield record + async def close(self): """Closes splunk client connections""" await self.service.logout() diff --git a/frontend/src/components/templates/HomeLayout/DisplayTableDataLayout.js b/frontend/src/components/templates/HomeLayout/DisplayTableDataLayout.js index e78eff51..eb8dd90d 100644 --- a/frontend/src/components/templates/HomeLayout/DisplayTableDataLayout.js +++ b/frontend/src/components/templates/HomeLayout/DisplayTableDataLayout.js @@ -10,10 +10,8 @@ export const DisplayTableDataLayout = ({initialState, tableMetaData, tableData, expandableComponent}) => { const getRows = () => { - console.log(tableData) return tableData && tableData.map( items => { const tableRows = tableMetaData.map( metadata => { - console.log(items[metadata.value]) if(metadata.name === 'Status') if(items[metadata.value].toLowerCase() === "success") return diff --git a/frontend/src/components/templates/HomeLayout/SidebarLayout.js b/frontend/src/components/templates/HomeLayout/SidebarLayout.js index 8a6d6913..42ee2bab 100644 --- a/frontend/src/components/templates/HomeLayout/SidebarLayout.js +++ b/frontend/src/components/templates/HomeLayout/SidebarLayout.js @@ -9,7 +9,6 @@ import PropTypes from "prop-types"; export const SidebarLayout = ({sidebarComponents}) => { const DisplayDate = ({startDate, endDate, setStartDate, setEndDate}) => { - console.log(startDate) const dateView = (name, dateValue, onChange) => { return }/>