diff --git a/.github/workflows/build_push_pr_onopen_proddeploy.yml b/.github/workflows/build_push_pr_onopen_proddeploy.yml index b939b33..429356c 100644 --- a/.github/workflows/build_push_pr_onopen_proddeploy.yml +++ b/.github/workflows/build_push_pr_onopen_proddeploy.yml @@ -13,7 +13,7 @@ jobs: environment: prod env: DOCKER_IMAGE_TAG: ${{ github.sha}} - DOCKER_IMAGE_NAME: rbe5-images/ride-recon-service + DOCKER_IMAGE_NAME: rbe5-images/ride-recon-service-prod DOCKER_FILE: Dockerfile steps: - name: Checkout @@ -111,4 +111,4 @@ jobs: git push -u origin deployment/recon-svc - \ No newline at end of file + diff --git a/Dockerfile b/Dockerfile index 44bdbf8..def4f02 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,4 +17,4 @@ RUN pip install --no-cache-dir -r requirements.txt # USER app # CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "5000"] USER 1001 -CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:5000", "--error-logfile", "-"] \ No newline at end of file +CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:5000", "--timeout","800","--error-logfile", "-"] diff --git a/misc/reconfunctions.py b/misc/reconfunctions.py new file mode 100644 index 0000000..c21e8c0 --- /dev/null +++ b/misc/reconfunctions.py @@ -0,0 +1,158 @@ + +import os +from destsqldbfuncs import SqlDBFunctions +from commonutils import map_event_type_destination,map_source_db,split_etk_event_payloads +import json + + +def recondestination(dbclient,main_staging_collection,main_table_collection,logger): + + # DONE: Query records in staging table + results = main_staging_collection.find() + reconstatus=True + + # DONE: Query for each record in destination db(based on row type) + for row in results: + # DONE: If found delete from staging if not update recon count column + logger.debug('processing row') + logger.debug(row) + datasrc='' + if row['datasource']: + datasrc=row['datasource'] + else: + continue + + if datasrc=='df': + try: + if row['eventType']: + bi_table_name=map_event_type_destination(row['eventType']) + bi_db_name=map_source_db(row['datasource']) + # print(bi_db_name) + # DONE: Query SQL DB + bi_sql_db_obj = SqlDBFunctions(bi_db_name, os.getenv('BI_SQL_DB_SERVER'),os.getenv('BI_SQL_DB_USERNAME'), os.getenv('BI_SQL_DB_PASSWORD')) + qrystr = bi_sql_db_obj.prepQuerystr(row['payloaddata'],row['datasource']) + table_name=bi_table_name + reconqrystr = f'SELECT * FROM {table_name} WHERE {qrystr}' + # print(reconqrystr) + found = bi_sql_db_obj.reconQuery(reconqrystr,logger) + if found: + main_staging_collection.delete_one(row) + # DONE: If found save to master table + # DONE: Dedup before saving to master + query_main_table = main_table_collection.find(row) + if len(list(query_main_table)) > 0: + return True + else: + result = main_table_collection.insert_one(row) + else: + recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) + new_column = {"$set": {"recon_count": recon_count_val}} + result = main_staging_collection.update_one(row, new_column) + except Exception as e: + reconstatus=False + recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) + new_column = {"$set": {"recon_count": recon_count_val}} + result = main_staging_collection.update_one(row, new_column) + logger.error('error in recon for this row') + logger.error(row) + logger.error(e) + elif datasrc=='etk': + try: + if row['eventType']: + bi_table_name=map_event_type_destination(row['eventType']) + bi_db_name=map_source_db(row['datasource']) + bi_events_table_name = os.getenv('BI_ETK_EVENTS_TABLE') + bi_violations_table_name = os.getenv('BI_ETK_VIOLATIONS_TABLE') + bi_geo_table=os.getenv('ETK_GEOLOCATION_TABLE_NAME') + # print(bi_db_name) + # DONE: Query SQL DB + bi_sql_db_obj = SqlDBFunctions(bi_db_name, os.getenv('BI_SQL_DB_SERVER'),os.getenv('BI_SQL_DB_USERNAME'), os.getenv('BI_SQL_DB_PASSWORD')) + mainpayload,eventpayload,countspayload,geopayload=split_etk_event_payloads(row['payloaddata'],row['eventType']) + mainqrystr,eventqrystr,countsqrystr=None,None,None + mainfound, eventfound, countsfound,geofound = False, False, False,False + if mainpayload: + mainqrystr = bi_sql_db_obj.prepQuerystr(mainpayload,row['datasource']) + table_name = bi_table_name + reconqrystr = f'SELECT * FROM {table_name} WHERE {mainqrystr}' + mainfound = bi_sql_db_obj.reconQuery(reconqrystr,logger) + if eventpayload: + eventqrystr = bi_sql_db_obj.prepQuerystr(eventpayload,row['datasource']) + table_name = bi_events_table_name + reconqrystr = f'SELECT * FROM {table_name} WHERE {eventqrystr}' + eventfound = bi_sql_db_obj.reconQuery(reconqrystr,logger) + if countspayload: + for countsrw in countspayload: + tmp_countsrw=json.dumps(countsrw) + countsqrystr = bi_sql_db_obj.prepQuerystr(tmp_countsrw,row['datasource']) + print(countsqrystr) + table_name = bi_violations_table_name + reconqrystr = f'SELECT * FROM {table_name} WHERE {countsqrystr}' + print(f'Here is: {reconqrystr}') + countsfound = bi_sql_db_obj.reconQuery(reconqrystr,logger) + if not countsfound: + break + else: + countsfound=True + if geopayload: + eventqrystr = bi_sql_db_obj.prepQuerystr(geopayload, row['datasource']) + table_name = bi_geo_table + reconqrystr = f'SELECT * FROM {table_name} WHERE {eventqrystr}' + geofound = bi_sql_db_obj.reconQuery(reconqrystr, logger) + else: + geofound=True + # qrystr = bi_sql_db_obj.prepQuerystr(row['payloaddata'],row['datasource']) + + if mainfound and eventfound and countsfound and geofound: + main_staging_collection.delete_one(row) + # DONE: If found save to master table + # DONE: Dedup before saving to master + query_main_table = main_table_collection.find(row) + if len(list(query_main_table)) > 0: + return True + else: + result = main_table_collection.insert_one(row) + else: + recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) + new_column = {"$set": {"recon_count": recon_count_val}} + result = main_staging_collection.update_one(row, new_column) + + + + # reconqrystr = f'SELECT * FROM {table_name} WHERE {qrystr}' + # # print(reconqrystr) + # found = bi_sql_db_obj.reconQuery(reconqrystr,logger) + # if found: + # main_staging_collection.delete_one(row) + # # DONE: If found save to master table + # # DONE: Dedup before saving to master + # query_main_table = main_table_collection.find(row) + # if len(list(query_main_table)) > 0: + # return True + # else: + # result = main_table_collection.insert_one(row) + # else: + # recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) + # new_column = {"$set": {"recon_count": recon_count_val}} + # result = main_staging_collection.update_one(row, new_column) + except Exception as e: + reconstatus=False + recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) + new_column = {"$set": {"recon_count": recon_count_val}} + result = main_staging_collection.update_one(row, new_column) + logger.error('error in recon for this row') + logger.error(row) + logger.error(e) + return reconstatus + + + + + + # try: + # if row['event_type'] and row['event_type']== 'event_1': + # + # table_name=os.getenv('BI_SQL_APP1_TABLE1') + # # reconqrystr=f'SELECT * FROM {os.getenv("BI_SQL_APP1_DB")}.{os.getenv("BI_SQL_APP1_SCHEMA")}.{table_name} WHERE {qrystr}' + # # found=bi_sql_db_obj.reconQuery(reconqrystr) + # # print(qrystr) + # # found=False \ No newline at end of file diff --git a/src/commonutils.py b/src/commonutils.py index 36cffd7..66581a3 100644 --- a/src/commonutils.py +++ b/src/commonutils.py @@ -1,5 +1,6 @@ import os +import json def map_event_type_destination(event_type): if event_type=='app_accepted': @@ -24,6 +25,8 @@ def map_event_type_destination(event_type): return os.getenv('ETK_PAYQUERY_TABLE_NAME') elif event_type=='etk_dispute': return os.getenv('ETK_DISPUTE_TABLE_NAME') + elif event_type == 'geolocation': + return os.getenv('ETK_GEOLOCATION_TABLE_NAME') def map_source_db(source): if source=='df': @@ -31,6 +34,29 @@ def map_source_db(source): elif source=='etk': return os.getenv('ETK_BI_DB') + +def split_etk_event_payloads(payload,eventtype): + payload_dict=json.loads(payload) + eventpayload = None + countspayload = None + geopayload = None + if eventtype=='geolocation': + tmp_event=payload_dict.pop('event') + main_event={} + main_event['business_id']=payload_dict['ticket_number'] + payload_dict=main_event + else: + eventpayload=payload_dict.pop('event') + countspayload=None + geopayload=None + if eventtype=='etk_issuance': + countspayload=payload_dict.pop('counts') + geopayload={} + geopayload['business_id']=payload_dict['ticket_number'] + geopayload=json.dumps(geopayload) + eventpayload=json.dumps(eventpayload) + return json.dumps(payload_dict),eventpayload,countspayload,geopayload + # def map_source_api_keys(source): # if source=='df': # return os.getenv('R') diff --git a/src/errorretryfunctions.py b/src/errorretryfunctions.py index 32658f2..f25db4e 100644 --- a/src/errorretryfunctions.py +++ b/src/errorretryfunctions.py @@ -20,6 +20,11 @@ def error_retry_task(dbclient,err_staging_collection,err_table_collection,err_th if row['eventType'] : # DONE: Retry sending to producer api payload_json=row['payloaddata'] + if row['eventType']=='geolocation': + payload_arr=[] + payload_arr.append(json.loads(payload_json)) + payload_json=payload_arr + payload_json=json.dumps(payload_json) producer_api_obj=producerAPITasks(os.getenv('PRODUCER_API_HOST'),logger) headers={'ride-api-key':os.getenv('RIDE_API_KEY'),'Content-Type':'application/json'} logger.debug(payload_json) diff --git a/src/reconfunctions.py b/src/reconfunctions.py index 29529c9..8b17e34 100644 --- a/src/reconfunctions.py +++ b/src/reconfunctions.py @@ -1,7 +1,8 @@ import os from destsqldbfuncs import SqlDBFunctions -from commonutils import map_event_type_destination,map_source_db +from commonutils import map_event_type_destination,map_source_db,split_etk_event_payloads +import json def recondestination(dbclient,main_staging_collection,main_table_collection,logger): @@ -15,39 +16,113 @@ def recondestination(dbclient,main_staging_collection,main_table_collection,logg # DONE: If found delete from staging if not update recon count column logger.debug('processing row') logger.debug(row) - try: - if row['eventType']: - bi_table_name=map_event_type_destination(row['eventType']) - bi_db_name=map_source_db(row['datasource']) - # print(bi_db_name) - # DONE: Query SQL DB - bi_sql_db_obj = SqlDBFunctions(bi_db_name, os.getenv('BI_SQL_DB_SERVER'),os.getenv('BI_SQL_DB_USERNAME'), os.getenv('BI_SQL_DB_PASSWORD')) - qrystr = bi_sql_db_obj.prepQuerystr(row['payloaddata'],row['datasource']) - table_name=bi_table_name - reconqrystr = f'SELECT * FROM {table_name} WHERE {qrystr}' - # print(reconqrystr) - found = bi_sql_db_obj.reconQuery(reconqrystr,logger) - if found: - main_staging_collection.delete_one(row) - # DONE: If found save to master table - # DONE: Dedup before saving to master - query_main_table = main_table_collection.find(row) - if len(list(query_main_table)) > 0: - return True + datasrc='' + if row['datasource']: + datasrc=row['datasource'] + else: + continue + if datasrc=='df': + try: + if row['eventType']: + bi_table_name=map_event_type_destination(row['eventType']) + bi_db_name=map_source_db(row['datasource']) + # print(bi_db_name) + # DONE: Query SQL DB + bi_sql_db_obj = SqlDBFunctions(bi_db_name, os.getenv('BI_SQL_DB_SERVER'),os.getenv('BI_SQL_DB_USERNAME'), os.getenv('BI_SQL_DB_PASSWORD')) + qrystr = bi_sql_db_obj.prepQuerystr(row['payloaddata'],row['datasource']) + table_name=bi_table_name + reconqrystr = f'SELECT * FROM {table_name} WHERE {qrystr}' + # print(reconqrystr) + found = bi_sql_db_obj.reconQuery(reconqrystr,logger) + if found: + main_staging_collection.delete_one(row) + # DONE: If found save to master table + # DONE: Dedup before saving to master + query_main_table = main_table_collection.find(row) + if len(list(query_main_table)) > 0: + return True + else: + result = main_table_collection.insert_one(row) else: - result = main_table_collection.insert_one(row) - else: - recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) - new_column = {"$set": {"recon_count": recon_count_val}} - result = main_staging_collection.update_one(row, new_column) - except Exception as e: - reconstatus=False - recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) - new_column = {"$set": {"recon_count": recon_count_val}} - result = main_staging_collection.update_one(row, new_column) - logger.error('error in recon for this row') - logger.error(row) - logger.error(e) + recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) + new_column = {"$set": {"recon_count": recon_count_val}} + result = main_staging_collection.update_one(row, new_column) + except Exception as e: + reconstatus=False + recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) + new_column = {"$set": {"recon_count": recon_count_val}} + result = main_staging_collection.update_one(row, new_column) + logger.error('error in recon for this row') + logger.error(row) + logger.error(e) + elif datasrc=='etk': + try: + if row['eventType']: + bi_table_name=map_event_type_destination(row['eventType']) + bi_db_name=map_source_db(row['datasource']) + bi_events_table_name = os.getenv('BI_ETK_EVENTS_TABLE') + bi_violations_table_name = os.getenv('ETK_VIOLATIONS_TABLE_NAME') + bi_geo_table=os.getenv('ETK_GEOLOCATION_TABLE_NAME') + # print(bi_db_name) + # DONE: Query SQL DB + bi_sql_db_obj = SqlDBFunctions(bi_db_name, os.getenv('BI_SQL_DB_SERVER'),os.getenv('BI_SQL_DB_USERNAME'), os.getenv('BI_SQL_DB_PASSWORD')) + mainpayload,eventpayload,countspayload,geopayload=split_etk_event_payloads(row['payloaddata'],row['eventType']) + mainqrystr,eventqrystr,countsqrystr=None,None,None + mainfound, eventfound, countsfound,geofound = False, False, False,False + if mainpayload: + mainqrystr = bi_sql_db_obj.prepQuerystr(mainpayload,row['datasource']) + table_name=(lambda x: bi_geo_table if x['eventType']=='geolocation' else bi_table_name)(row) + # table_name = bi_table_name + reconqrystr = f'SELECT * FROM {table_name} WHERE {mainqrystr}' + mainfound = bi_sql_db_obj.reconQuery(reconqrystr,logger) + if eventpayload: + eventqrystr = bi_sql_db_obj.prepQuerystr(eventpayload,row['datasource']) + table_name = bi_events_table_name + reconqrystr = f'SELECT * FROM {table_name} WHERE {eventqrystr}' + eventfound = bi_sql_db_obj.reconQuery(reconqrystr,logger) + if countspayload: + for countsrw in countspayload: + tmp_countsrw=json.dumps(countsrw) + countsqrystr = bi_sql_db_obj.prepQuerystr(tmp_countsrw,row['datasource']) + print(countsqrystr) + table_name = bi_violations_table_name + reconqrystr = f'SELECT * FROM {table_name} WHERE {countsqrystr}' + print(f'Here is: {reconqrystr}') + countsfound = bi_sql_db_obj.reconQuery(reconqrystr,logger) + if not countsfound: + break + else: + countsfound=True + if geopayload: + eventqrystr = bi_sql_db_obj.prepQuerystr(geopayload, row['datasource']) + table_name = bi_geo_table + reconqrystr = f'SELECT * FROM {table_name} WHERE {eventqrystr}' + geofound = bi_sql_db_obj.reconQuery(reconqrystr, logger) + else: + geofound=True + # qrystr = bi_sql_db_obj.prepQuerystr(row['payloaddata'],row['datasource']) + + if mainfound and eventfound and countsfound and geofound: + main_staging_collection.delete_one(row) + # DONE: If found save to master table + # DONE: Dedup before saving to master + query_main_table = main_table_collection.find(row) + if len(list(query_main_table)) > 0: + return True + else: + result = main_table_collection.insert_one(row) + else: + recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) + new_column = {"$set": {"recon_count": recon_count_val}} + result = main_staging_collection.update_one(row, new_column) + except Exception as e: + reconstatus=False + recon_count_val = (lambda x: 1 if not ('recon_count' in x.keys()) else x['recon_count'] + 1)(row) + new_column = {"$set": {"recon_count": recon_count_val}} + result = main_staging_collection.update_one(row, new_column) + logger.error('error in recon for this row') + logger.error(row) + logger.error(e) return reconstatus