Skip to content

Commit

Permalink
Merge pull request #28 from bcgov/release/1_8
Browse files Browse the repository at this point in the history
Release/1 8
  • Loading branch information
amlanc1 authored Nov 9, 2023
2 parents b15d3d1 + fd5b977 commit fe7a04e
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 36 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_push_pr_onopen_proddeploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
environment: prod
env:
DOCKER_IMAGE_TAG: ${{ github.sha}}
DOCKER_IMAGE_NAME: rbe5-images/ride-recon-service
DOCKER_IMAGE_NAME: rbe5-images/ride-recon-service-prod
DOCKER_FILE: Dockerfile
steps:
- name: Checkout
Expand Down Expand Up @@ -111,4 +111,4 @@ jobs:
git push -u origin deployment/recon-svc


2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ RUN pip install --no-cache-dir -r requirements.txt
# USER app
# CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "5000"]
USER 1001
CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:5000", "--error-logfile", "-"]
CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:5000", "--timeout","800","--error-logfile", "-"]
158 changes: 158 additions & 0 deletions misc/reconfunctions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@

import os
from destsqldbfuncs import SqlDBFunctions
from commonutils import map_event_type_destination,map_source_db,split_etk_event_payloads
import json


def recondestination(dbclient,main_staging_collection,main_table_collection,logger):

# DONE: Query records in staging table
results = main_staging_collection.find()
reconstatus=True

# DONE: Query for each record in destination db(based on row type)
for row in results:
# DONE: If found delete from staging if not update recon count column
logger.debug('processing row')
logger.debug(row)
datasrc=''
if row['datasource']:
datasrc=row['datasource']
else:
continue

if datasrc=='df':
try:
if row['eventType']:
bi_table_name=map_event_type_destination(row['eventType'])
bi_db_name=map_source_db(row['datasource'])
# print(bi_db_name)
# DONE: Query SQL DB
bi_sql_db_obj = SqlDBFunctions(bi_db_name, os.getenv('BI_SQL_DB_SERVER'),os.getenv('BI_SQL_DB_USERNAME'), os.getenv('BI_SQL_DB_PASSWORD'))
qrystr = bi_sql_db_obj.prepQuerystr(row['payloaddata'],row['datasource'])
table_name=bi_table_name
reconqrystr = f'SELECT * FROM {table_name} WHERE {qrystr}'
# print(reconqrystr)
found = bi_sql_db_obj.reconQuery(reconqrystr,logger)
if found:
main_staging_collection.delete_one(row)
# DONE: If found save to master table
# DONE: Dedup before saving to master
query_main_table = main_table_collection.find(row)
if len(list(query_main_table)) > 0:
return True
else:
result = main_table_collection.insert_one(row)
else:
recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row)
new_column = {"$set": {"recon_count": recon_count_val}}
result = main_staging_collection.update_one(row, new_column)
except Exception as e:
reconstatus=False
recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row)
new_column = {"$set": {"recon_count": recon_count_val}}
result = main_staging_collection.update_one(row, new_column)
logger.error('error in recon for this row')
logger.error(row)
logger.error(e)
elif datasrc=='etk':
try:
if row['eventType']:
bi_table_name=map_event_type_destination(row['eventType'])
bi_db_name=map_source_db(row['datasource'])
bi_events_table_name = os.getenv('BI_ETK_EVENTS_TABLE')
bi_violations_table_name = os.getenv('BI_ETK_VIOLATIONS_TABLE')
bi_geo_table=os.getenv('ETK_GEOLOCATION_TABLE_NAME')
# print(bi_db_name)
# DONE: Query SQL DB
bi_sql_db_obj = SqlDBFunctions(bi_db_name, os.getenv('BI_SQL_DB_SERVER'),os.getenv('BI_SQL_DB_USERNAME'), os.getenv('BI_SQL_DB_PASSWORD'))
mainpayload,eventpayload,countspayload,geopayload=split_etk_event_payloads(row['payloaddata'],row['eventType'])
mainqrystr,eventqrystr,countsqrystr=None,None,None
mainfound, eventfound, countsfound,geofound = False, False, False,False
if mainpayload:
mainqrystr = bi_sql_db_obj.prepQuerystr(mainpayload,row['datasource'])
table_name = bi_table_name
reconqrystr = f'SELECT * FROM {table_name} WHERE {mainqrystr}'
mainfound = bi_sql_db_obj.reconQuery(reconqrystr,logger)
if eventpayload:
eventqrystr = bi_sql_db_obj.prepQuerystr(eventpayload,row['datasource'])
table_name = bi_events_table_name
reconqrystr = f'SELECT * FROM {table_name} WHERE {eventqrystr}'
eventfound = bi_sql_db_obj.reconQuery(reconqrystr,logger)
if countspayload:
for countsrw in countspayload:
tmp_countsrw=json.dumps(countsrw)
countsqrystr = bi_sql_db_obj.prepQuerystr(tmp_countsrw,row['datasource'])
print(countsqrystr)
table_name = bi_violations_table_name
reconqrystr = f'SELECT * FROM {table_name} WHERE {countsqrystr}'
print(f'Here is: {reconqrystr}')
countsfound = bi_sql_db_obj.reconQuery(reconqrystr,logger)
if not countsfound:
break
else:
countsfound=True
if geopayload:
eventqrystr = bi_sql_db_obj.prepQuerystr(geopayload, row['datasource'])
table_name = bi_geo_table
reconqrystr = f'SELECT * FROM {table_name} WHERE {eventqrystr}'
geofound = bi_sql_db_obj.reconQuery(reconqrystr, logger)
else:
geofound=True
# qrystr = bi_sql_db_obj.prepQuerystr(row['payloaddata'],row['datasource'])

if mainfound and eventfound and countsfound and geofound:
main_staging_collection.delete_one(row)
# DONE: If found save to master table
# DONE: Dedup before saving to master
query_main_table = main_table_collection.find(row)
if len(list(query_main_table)) > 0:
return True
else:
result = main_table_collection.insert_one(row)
else:
recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row)
new_column = {"$set": {"recon_count": recon_count_val}}
result = main_staging_collection.update_one(row, new_column)



# reconqrystr = f'SELECT * FROM {table_name} WHERE {qrystr}'
# # print(reconqrystr)
# found = bi_sql_db_obj.reconQuery(reconqrystr,logger)
# if found:
# main_staging_collection.delete_one(row)
# # DONE: If found save to master table
# # DONE: Dedup before saving to master
# query_main_table = main_table_collection.find(row)
# if len(list(query_main_table)) > 0:
# return True
# else:
# result = main_table_collection.insert_one(row)
# else:
# recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row)
# new_column = {"$set": {"recon_count": recon_count_val}}
# result = main_staging_collection.update_one(row, new_column)
except Exception as e:
reconstatus=False
recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row)
new_column = {"$set": {"recon_count": recon_count_val}}
result = main_staging_collection.update_one(row, new_column)
logger.error('error in recon for this row')
logger.error(row)
logger.error(e)
return reconstatus





# try:
# if row['event_type'] and row['event_type']== 'event_1':
#
# table_name=os.getenv('BI_SQL_APP1_TABLE1')
# # reconqrystr=f'SELECT * FROM {os.getenv("BI_SQL_APP1_DB")}.{os.getenv("BI_SQL_APP1_SCHEMA")}.{table_name} WHERE {qrystr}'
# # found=bi_sql_db_obj.reconQuery(reconqrystr)
# # print(qrystr)
# # found=False
26 changes: 26 additions & 0 deletions src/commonutils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import os
import json

def map_event_type_destination(event_type):
if event_type=='app_accepted':
Expand All @@ -24,13 +25,38 @@ def map_event_type_destination(event_type):
return os.getenv('ETK_PAYQUERY_TABLE_NAME')
elif event_type=='etk_dispute':
return os.getenv('ETK_DISPUTE_TABLE_NAME')
elif event_type == 'geolocation':
return os.getenv('ETK_GEOLOCATION_TABLE_NAME')

def map_source_db(source):
if source=='df':
return os.getenv('DF_BI_DB')
elif source=='etk':
return os.getenv('ETK_BI_DB')


def split_etk_event_payloads(payload,eventtype):
payload_dict=json.loads(payload)
eventpayload = None
countspayload = None
geopayload = None
if eventtype=='geolocation':
tmp_event=payload_dict.pop('event')
main_event={}
main_event['business_id']=payload_dict['ticket_number']
payload_dict=main_event
else:
eventpayload=payload_dict.pop('event')
countspayload=None
geopayload=None
if eventtype=='etk_issuance':
countspayload=payload_dict.pop('counts')
geopayload={}
geopayload['business_id']=payload_dict['ticket_number']
geopayload=json.dumps(geopayload)
eventpayload=json.dumps(eventpayload)
return json.dumps(payload_dict),eventpayload,countspayload,geopayload

# def map_source_api_keys(source):
# if source=='df':
# return os.getenv('R')
Expand Down
5 changes: 5 additions & 0 deletions src/errorretryfunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ def error_retry_task(dbclient,err_staging_collection,err_table_collection,err_th
if row['eventType'] :
# DONE: Retry sending to producer api
payload_json=row['payloaddata']
if row['eventType']=='geolocation':
payload_arr=[]
payload_arr.append(json.loads(payload_json))
payload_json=payload_arr
payload_json=json.dumps(payload_json)
producer_api_obj=producerAPITasks(os.getenv('PRODUCER_API_HOST'),logger)
headers={'ride-api-key':os.getenv('RIDE_API_KEY'),'Content-Type':'application/json'}
logger.debug(payload_json)
Expand Down
Loading

0 comments on commit fe7a04e

Please sign in to comment.