Skip to content

Commit

Permalink
Adding multiprocessing to scan results (#100)
Browse files Browse the repository at this point in the history
Signed-off-by: vishnuchalla <vishnuchalla47@gmail.com>
  • Loading branch information
vishnuchalla authored Jun 11, 2024
1 parent 592a572 commit d50f48b
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 36 deletions.
30 changes: 20 additions & 10 deletions backend/app/api/v1/endpoints/cpt/cptJobs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import asyncio
import multiprocessing
from fastapi import Response
import pandas as pd
from datetime import datetime, timedelta, date
Expand Down Expand Up @@ -42,20 +44,15 @@ async def jobs(start_date: date = Query(None, description="Start date for search
if start_date > end_date:
return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422)

results = pd.DataFrame()
for product in products:
try:
df = await products[product](start_date, end_date)
results = pd.concat([results, df.loc[:, ["ciSystem", "uuid", "releaseStream", "jobStatus", "buildUrl", "startDate", "endDate", "product", "version", "testName"]]])
except ConnectionError:
print("Connection Error in mapper for product " + product)
except:
print("Date range returned no values or Unknown error in mapper for product " + product)
results_df = pd.DataFrame()
with multiprocessing.Pool() as pool:
results = [pool.apply(fetch_product, args=(product, start_date, end_date)) for product in products]
results_df = pd.concat(results)

response = {
'startDate': start_date.__str__(),
'endDate': end_date.__str__(),
'results': results.to_dict('records')
'results': results_df.to_dict('records')
}

if pretty:
Expand All @@ -64,3 +61,16 @@ async def jobs(start_date: date = Query(None, description="Start date for search

jsonstring = json.dumps(response)
return jsonstring

async def fetch_product_async(product, start_date, end_date):
try:
df = await products[product](start_date, end_date)
return df.loc[:, ["ciSystem", "uuid", "releaseStream", "jobStatus", "buildUrl", "startDate", "endDate", "product", "version", "testName"]] if len(df) != 0 else df
except ConnectionError:
print("Connection Error in mapper for product " + product)
except Exception as e:
print(f"Error in mapper for product {product}: {e}")
return pd.DataFrame()

def fetch_product(product, start_date, end_date):
return asyncio.run(fetch_product_async(product, start_date, end_date))
2 changes: 2 additions & 0 deletions backend/app/api/v1/endpoints/cpt/maps/hce.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
################################################################
async def hceMapper(start_datetime: date, end_datetime: date):
df = await getData(start_datetime, end_datetime, f'hce.elasticsearch')
if len(df) == 0:
return df
df["releaseStream"] = "Nightly"
df["ciSystem"] = "Jenkins"
df["testName"] = df["product"] + ":" + df["test"]
Expand Down
2 changes: 2 additions & 0 deletions backend/app/api/v1/endpoints/cpt/maps/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
################################################################
async def ocpMapper(start_datetime: date, end_datetime: date):
df = await getData(start_datetime, end_datetime, f'ocp.elasticsearch')
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "ocp")
df["releaseStream"] = df.apply(getReleaseStream, axis=1)
df["version"] = df["shortVersion"]
Expand Down
2 changes: 2 additions & 0 deletions backend/app/api/v1/endpoints/cpt/maps/quay.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#####################################################################
async def quayMapper(start_datetime: date, end_datetime: date):
df = await getData(start_datetime, end_datetime, f'quay.elasticsearch')
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "quay")
df["version"] = df["releaseStream"]
df["testName"] = df["benchmark"]
Expand Down
2 changes: 2 additions & 0 deletions backend/app/api/v1/endpoints/cpt/maps/telco.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#####################################################################
async def telcoMapper(start_datetime: date, end_datetime: date):
df = await getData(start_datetime, end_datetime, f'telco.splunk')
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "telco")
df["releaseStream"] = df.apply(getReleaseStream, axis=1)
df["version"] = df["shortVersion"]
Expand Down
44 changes: 21 additions & 23 deletions backend/app/services/splunk.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import orjson
from app import config
from multiprocessing import Pool, cpu_count
from concurrent.futures import ProcessPoolExecutor, as_completed
from splunklib import client, results


Expand Down Expand Up @@ -31,7 +33,7 @@ def __init__(self,configpath="", index=""):
except Exception as e:
print(f"Error connecting to splunk: {e}")
return None

async def query(self, query, searchList='', max_results=10000):
"""
Query data from splunk server using splunk lib sdk
Expand All @@ -41,41 +43,37 @@ async def query(self, query, searchList='', max_results=10000):
OPTIONAL: searchList (string): additional query parameters for index
"""
query["count"] = max_results

# If additional search parameters are provided, include those in searchindex
if(searchList != ''):
# Run a one-shot search and display the results using the results reader:
try:
searchindex = "search index={} {}".format(self.indice, searchList)
oneshotsearch_results = self.service.jobs.oneshot(searchindex, **query)
except Exception as e:
print('Error querying splunk: {}'.format(e))
return None
else:
try:
searchindex = "search index={}".format(self.indice)
oneshotsearch_results = self.service.jobs.oneshot(searchindex, **query)
except Exception as e:
print('Error querying splunk: {}'.format(e))
return None

searchindex = "search index={} {}".format(self.indice, searchList) if searchList else "search index={}".format(self.indice)
try:
oneshotsearch_results = self.service.jobs.oneshot(searchindex, **query)
except Exception as e:
print('Error querying splunk: {}'.format(e))
return None

# Get the results and display them using the JSONResultsReader
records_reader = results.JSONResultsReader(oneshotsearch_results)
res_array = []
for record in records_reader:
async for record in self._stream_results(oneshotsearch_results):
try:
res_array.append({
'data': json.loads(record['_raw']),
'data': orjson.loads(record['_raw']),
'host': record['host'],
'source': record['source'],
'sourcetype': record['sourcetype'],
'bucket': record['_bkt'],
'serial': record['_serial'],
'timestamp': record['_indextime']
})
})
except Exception as e:
print('Error on including Splunk record query in results array: {}'.format(e))
print(f'Error on including Splunk record query in results array: {e}')

return res_array

async def _stream_results(self, oneshotsearch_results):
for record in results.JSONResultsReader(oneshotsearch_results):
yield record

async def close(self):
"""Closes splunk client connections"""
await self.service.logout()
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ export const DisplayTableDataLayout = ({initialState, tableMetaData, tableData,
expandableComponent}) => {

const getRows = () => {
console.log(tableData)
return tableData && tableData.map( items => {
const tableRows = tableMetaData.map( metadata => {
console.log(items[metadata.value])
if(metadata.name === 'Status')
if(items[metadata.value].toLowerCase() === "success")
return <Badge style={{backgroundColor: '#008000'}} children={items[metadata.value]} />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import PropTypes from "prop-types";
export const SidebarLayout = ({sidebarComponents}) => {

const DisplayDate = ({startDate, endDate, setStartDate, setEndDate}) => {
console.log(startDate)
const dateView = (name, dateValue, onChange) => {
return <Stack>
<StackItem children={<Text4 value={name}/>}/>
Expand Down

0 comments on commit d50f48b

Please sign in to comment.