From 1504936ea16d8be786def0cb099e0272a5d52b1f Mon Sep 17 00:00:00 2001 From: Lukasz Przychodzien Date: Mon, 1 Jul 2024 10:04:07 -0400 Subject: [PATCH 1/9] init images --- airflow/dags/dailymed_images/dag.py | 93 +++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 airflow/dags/dailymed_images/dag.py diff --git a/airflow/dags/dailymed_images/dag.py b/airflow/dags/dailymed_images/dag.py new file mode 100644 index 0000000..82c9c2e --- /dev/null +++ b/airflow/dags/dailymed_images/dag.py @@ -0,0 +1,93 @@ +import pendulum + +from airflow_operator import create_dag +from airflow.utils.helpers import chain + +from airflow.decorators import task +from airflow.providers.postgres.operators.postgres import PostgresOperator +import pandas as pd +import xmltodict +import re + +dag_id = "dailymed_images" + +dag = create_dag( + dag_id=dag_id, + schedule= "0 8 * * 1-5", # at 7:45 am once daily) + start_date=pendulum.yesterday(), + catchup=False, + max_active_runs=1, + concurrency=2, +) + +@task +def get_dailymed_data(): + from airflow.hooks.postgres_hook import PostgresHook + query = "SELECT * FROM sagerx_lake.dailymed_daily limit 1" + pg_hook = PostgresHook(postgres_conn_id="postgres_default") + engine = pg_hook.get_sqlalchemy_engine() + df = pd.read_sql(query, con=engine) + + return df + +def isolate_ndc(input_string): + pattern = r"\b\d{5}-\d{3}-\d{2}\b" + match = re.search(pattern, input_string) + + if match: + return match.group(0) + else: + return None + +def clean_string(input_string): + cleaned_string = input_string.strip() + cleaned_string = ' '.join(cleaned_string.split()) + return cleaned_string + +def extract_data(xml_string): + xml_data = xmltodict.parse(xml_string) + + med_dict = {} + set_id = xml_data['document']['setId']['@root'] + + for component in xml_data['document']['component']['structuredBody']['component']: + if component['section']['code']['@code'] == '51945-4': + component_dict = {} + + additional_info = [] + for para in component['section']['text']['paragraph']: + if isinstance(para, str): + if 'NDC' in para: + ndc = isolate_ndc(para) + else: + additional_info.append(clean_string(para)) + elif isinstance(para,dict): + if 'content' in para.keys(): + additional_info.append(clean_string(para['content']['#text'])) + elif '#text' in para.keys(): + if 'NDC' in para['#text']: + ndc = isolate_ndc(clean_string(para['#text'])) + else: + additional_info.append(clean_string(para['#text'])) + + component_dict['set_id'] = set_id + component_dict['effective_time'] = component['section']['effectiveTime']['@value'] + + if component['section']['component']['observationMedia']['value']['@mediaType'] == 'image/jpeg': + image_id = component['section']['component']['observationMedia']['value']['reference']['@value'] + component_dict['image_id'] = image_id + + component_dict['image_url'] = f"https://dailymed.nlm.nih.gov/dailymed/image.cfm?name={image_id}&setid={set_id}&type=img" + component_dict['additional_info'] = additional_info + med_dict[ndc] = component_dict + + df = pd.DataFrame.from_dict(med_dict,orient='index') + df.index.name = 'ndc' + df = df.reset_index() + + return df + +with dag: + dm_data = get_dailymed_data() + + print(dm_data['xml_content']) \ No newline at end of file From 659f397373a80c2ec51a1037b15b8b3332454d6a Mon Sep 17 00:00:00 2001 From: Lukasz Przychodzien Date: Mon, 8 Jul 2024 12:27:30 -0400 Subject: [PATCH 2/9] dailymed images to ndc init --- .../dags/dailymed_daily/dailymed_daily_dag.py | 7 +- airflow/dags/dailymed_images/dag.py | 109 +++++------ airflow/dags/dailymed_images/dag_tasks.py | 175 ++++++++++++++++++ airflow/requirements.txt | 3 +- 4 files changed, 226 insertions(+), 68 deletions(-) create mode 100644 airflow/dags/dailymed_images/dag_tasks.py diff --git a/airflow/dags/dailymed_daily/dailymed_daily_dag.py b/airflow/dags/dailymed_daily/dailymed_daily_dag.py index 3c649f3..8a1b309 100644 --- a/airflow/dags/dailymed_daily/dailymed_daily_dag.py +++ b/airflow/dags/dailymed_daily/dailymed_daily_dag.py @@ -53,12 +53,13 @@ def process_dailymed(data_folder, xslt, ti): for subfile in unzipped_folder.infolist(): if re.match(".*.xml", subfile.filename): new_file = unzipped_folder.extract(subfile, data_folder) + raw_xml_data = etree.tostring(etree.parse(new_file) , encoding='unicode') # xslt transform xml_content = transform_xml(new_file, xslt) - os.remove(new_file) + #os.remove(new_file) df = pd.DataFrame( - columns=["spl", "file_name", "xml_content"], - data=[[folder_name, subfile.filename, xml_content]], + columns=["spl", "file_name", "xml_content", "raw_xml_content"], + data=[[folder_name, subfile.filename, xml_content,raw_xml_data]], ) df.to_sql( "dailymed_daily", diff --git a/airflow/dags/dailymed_images/dag.py b/airflow/dags/dailymed_images/dag.py index 82c9c2e..97a10e8 100644 --- a/airflow/dags/dailymed_images/dag.py +++ b/airflow/dags/dailymed_images/dag.py @@ -4,10 +4,10 @@ from airflow.utils.helpers import chain from airflow.decorators import task -from airflow.providers.postgres.operators.postgres import PostgresOperator import pandas as pd -import xmltodict -import re + +from dailymed_images.dag_tasks import * +from sagerx import load_df_to_pg dag_id = "dailymed_images" @@ -20,74 +20,55 @@ concurrency=2, ) +""" +Process + +1. get xml data from dailymed_daily +2. read xml data +3. find ndc and image data in xml +4. map ndc and image data +5. map ndc and image ids toghether +5.a. check to see if NDC is in the image name, if so map together +5.b. check to see if NDC is under the same 51945-4 component, if so then map together +5.c. Run image PCR to pull NDC fr +6. upload to postgres + + +""" + + @task def get_dailymed_data(): from airflow.hooks.postgres_hook import PostgresHook - query = "SELECT * FROM sagerx_lake.dailymed_daily limit 1" + + query = "SELECT * FROM sagerx_lake.dailymed_daily" pg_hook = PostgresHook(postgres_conn_id="postgres_default") engine = pg_hook.get_sqlalchemy_engine() df = pd.read_sql(query, con=engine) + print(f"DF length of {len(df)} with columns: {df.columns}") - return df + df['raw_xml_content'] = df.apply(parse_xml_content, axis=1) + df['set_id'] = df.apply(lambda x: extract_set_id(x['raw_xml_content']), axis=1) + df['ndc_ids'] = df.apply(lambda x: find_ndc_numbers(x['raw_xml_content']), axis=1) + df['image_ids'] = df.apply(lambda x: find_image_ids(x['raw_xml_content']), axis=1) + df['ndc_image_mapping'] = df.apply(map_ndcs_parent_function, axis=1) -def isolate_ndc(input_string): - pattern = r"\b\d{5}-\d{3}-\d{2}\b" - match = re.search(pattern, input_string) - - if match: - return match.group(0) - else: - return None - -def clean_string(input_string): - cleaned_string = input_string.strip() - cleaned_string = ' '.join(cleaned_string.split()) - return cleaned_string - -def extract_data(xml_string): - xml_data = xmltodict.parse(xml_string) - - med_dict = {} - set_id = xml_data['document']['setId']['@root'] - - for component in xml_data['document']['component']['structuredBody']['component']: - if component['section']['code']['@code'] == '51945-4': - component_dict = {} - - additional_info = [] - for para in component['section']['text']['paragraph']: - if isinstance(para, str): - if 'NDC' in para: - ndc = isolate_ndc(para) - else: - additional_info.append(clean_string(para)) - elif isinstance(para,dict): - if 'content' in para.keys(): - additional_info.append(clean_string(para['content']['#text'])) - elif '#text' in para.keys(): - if 'NDC' in para['#text']: - ndc = isolate_ndc(clean_string(para['#text'])) - else: - additional_info.append(clean_string(para['#text'])) - - component_dict['set_id'] = set_id - component_dict['effective_time'] = component['section']['effectiveTime']['@value'] - - if component['section']['component']['observationMedia']['value']['@mediaType'] == 'image/jpeg': - image_id = component['section']['component']['observationMedia']['value']['reference']['@value'] - component_dict['image_id'] = image_id - - component_dict['image_url'] = f"https://dailymed.nlm.nih.gov/dailymed/image.cfm?name={image_id}&setid={set_id}&type=img" - component_dict['additional_info'] = additional_info - med_dict[ndc] = component_dict - - df = pd.DataFrame.from_dict(med_dict,orient='index') - df.index.name = 'ndc' - df = df.reset_index() - - return df + load_df_to_pg(df[['spl','file_name','set_id','ndc_ids','image_ids','ndc_image_mapping']],"sagerx_lake","dailymed_images","replace",dtype_name="ndc_image_mapping") -with dag: - dm_data = get_dailymed_data() + #df['med_dict'] = df.apply(extract_data, axis=1) + + # dfs = [] + # for md in df['med_dict']: + # df_temp = pd.DataFrame.from_dict(md,orient='index') + # df_temp.index.name = 'ndc' + # dfs.append(df_temp) - print(dm_data['xml_content']) \ No newline at end of file + # df_final = pd.concat(dfs) + # df_final = df_final.reset_index() + + #print(df_final) + # return df + + +with dag: + get_dailymed_data() \ No newline at end of file diff --git a/airflow/dags/dailymed_images/dag_tasks.py b/airflow/dags/dailymed_images/dag_tasks.py new file mode 100644 index 0000000..413cf89 --- /dev/null +++ b/airflow/dags/dailymed_images/dag_tasks.py @@ -0,0 +1,175 @@ +import re +import xmltodict +import pandas as pd + +def parse_xml_content(row:pd.Series): + from airflow.exceptions import AirflowException + + xml_string = row['raw_xml_content'] + xml_data = xmltodict.parse(xml_string) + if 'document' not in xml_data.keys(): + raise AirflowException("Unexpected XML Data, expected DailyMed Document") + + return xml_data + +def clean_string(input_string): + cleaned_string = input_string.strip() + cleaned_string = ' '.join(cleaned_string.split()) + return cleaned_string + +def validate_ndc(ndc): + raw_ndc = ndc.replace("-","") + if len(raw_ndc) > 9: + return ndc + +def find_image_ids(xml_doc, results=None): + if results is None: + results = [] + + if isinstance(xml_doc, dict): + if '@mediaType' in xml_doc and xml_doc['@mediaType'] == 'image/jpeg': + results.append(xml_doc.get('reference', {}).get('@value')) + for _, value in xml_doc.items(): + if isinstance(value, (dict, list)): + find_image_ids(value, results) + elif isinstance(xml_doc, list): + for item in xml_doc: + find_image_ids(item, results) + return list(set(results)) + +def find_ndc_numbers(xml_doc, results=None) -> list: + target_code_system = '2.16.840.1.113883.6.69' + if results is None: + results = [] + + if isinstance(xml_doc, dict): + if '@codeSystem' in xml_doc and xml_doc['@codeSystem'] == target_code_system: + ndc = xml_doc.get('@code') + if ndc and validate_ndc(ndc): + results.append(ndc) + for _, value in xml_doc.items(): + if isinstance(value, (dict, list)): + find_ndc_numbers(value, results) + elif isinstance(xml_doc, list): + for item in xml_doc: + find_ndc_numbers(item,results) + return list(set(results)) + + +def extract_set_id(xml_doc): + return xml_doc['document']['setId']['@root'] + + +def ndc_format(text_data): + # order of patterns is important, largest to smalles + patterns = [ + (r'\d{11}', '11 Digit'), + (r'\d{10}', '10 Digit'), + (r'\d{5}-\d{5}', '5-5'), + (r'\d{5}-\d{4}-\d{2}', '5-4-2'), + (r'\d{5}-\d{4}-\d{1}', '5-4-1'), + (r'\d{5}-\d{3}-\d{2}', '5-3-2'), + (r'\d{4}-\d{6}', '4-6'), + (r'\d{4}-\d{4}-\d{2}', '4-4-2') + ] + + for pattern, _ in patterns: + match = re.search(pattern, text_data) + if match: + return match.group(0) + return None + +def find_image_components(xml_doc): + components = [] + for component in xml_doc['document']['component']['structuredBody']['component']: + if component['section']['code']['@code'] == '51945-4': + components.append(component) + return components + +def find_ndc_in_image_component(component, results=None): + if results is None: + results = [] + + if isinstance(component, str) and ndc_format(component): + results.append(ndc_format(component)) + elif isinstance(component, dict): + if '#text' in component and ndc_format(component['#text']): + results.append(ndc_format(component['#text'])) + for _, value in component.items(): + if isinstance(value, (dict, list)): + find_ndc_in_image_component(value, results) + elif isinstance(component, list): + for item in component: + find_ndc_in_image_component(item,results) + return list(set(results)) + +def create_dailymed_image_url(image_id, set_id): + return f"https://dailymed.nlm.nih.gov/dailymed/image.cfm?name={image_id}&setid={set_id}&type=img" + +def convert_ndc_10_to_11(ndc): + parts = ndc.split('-') + if len(parts[-1]) == 1: + parts[-1] = '0' + parts[-1] + return '-'.join(parts) + +def map_ndcs_from_image_ids(row:pd.Series): + set_id = row['set_id'] + ndc_ids = row['ndc_ids'] + image_ids = row['image_ids'] + med_dict = {} + + converted_ndc_ids = [convert_ndc_10_to_11(ndc) for ndc in ndc_ids] + + for id in image_ids: + image_ndc = ndc_format(id) + if image_ndc and (image_ndc in converted_ndc_ids): + med_dict[image_ndc] = id + # else: + # print(f"NDC {image_ndc} from {id} not found in NDC list of: {converted_ndc_ids}") + return med_dict + +def map_ndcs_from_image_components(row:pd.Series): + xml_doc = row['raw_xml_content'] + set_id = row['set_id'] + ndc_ids = row['ndc_ids'] + image_ids = row['image_ids'] + med_dict = {} + + image_components = find_image_components(xml_doc) + + if image_components == []: + return None + + for component in image_components: + ndcs = find_ndc_in_image_component(component) + images = find_image_ids(component) + + if not ndcs or not images: + continue + + elif len(ndcs) == 1 and len(images) == 1: + ndc = ndcs[0] + image = images[0] + + if ndc in ndc_ids and image in image_ids: + med_dict[ndc] = image + # else: + # print(f"Found unknown ndc or image in set ID: {set_id}") + # print(f"NDCs expected: {ndc_ids}") + # print(f"NDC got: {ndc}") + # print(f"Images expected: {image_ids}") + # print(f"Image got: {image}") + # else: + # print(f"Multiple NDC - Image mapping found for set ID: {set_id}") + # print(f"NDCs got: {ndcs}") + # print(f"Images got: {images}") + + return med_dict + +def map_ndcs_parent_function(row:pd.Series): + med_dict = map_ndcs_from_image_ids(row) + + if not med_dict: + med_dict = map_ndcs_from_image_components(row) + + return med_dict \ No newline at end of file diff --git a/airflow/requirements.txt b/airflow/requirements.txt index 308b594..e02cd71 100644 --- a/airflow/requirements.txt +++ b/airflow/requirements.txt @@ -3,4 +3,5 @@ dbt-core dbt-postgres -apache-airflow[google] \ No newline at end of file +apache-airflow[google] +xmltodict==0.13.0 \ No newline at end of file From 250481920f6791aae8f0eaed02043002ea656648 Mon Sep 17 00:00:00 2001 From: Lukasz Przychodzien Date: Tue, 13 Aug 2024 09:44:42 -0400 Subject: [PATCH 3/9] add ocr --- .../dag.py => dailymed/dag.txt} | 2 +- airflow/dags/dailymed/dailymed.py | 155 +++++++++ airflow/dags/dailymed/dailymed_images.py | 319 ++++++++++++++++++ airflow/dags/dailymed_daily/dag.py | 44 +++ airflow/dags/dailymed_daily/dag_tasks.py | 25 ++ .../dags/dailymed_daily/dailymed_daily_dag.py | 153 --------- ...tion.sql => load_dailymed_interaction.sql} | 0 ...ilymed_main.sql => load_dailymed_main.sql} | 0 ...dailymed_ndc.sql => load_dailymed_ndc.sql} | 0 ...ion.sql => load_dailymed_organization.sql} | 0 ...> load_dailymed_organization_activity.sql} | 0 ...ql => load_dailymed_organization_item.sql} | 0 ...ql => load_dailymed_organization_text.sql} | 0 airflow/dags/dailymed_images/dag_tasks.py | 175 ---------- airflow/dags/sagerx.py | 20 +- airflow/dags/xml_functions.py | 52 +++ airflow/dags/xsl_templates/__init__.py | 0 .../dailymed_prescription.xsl | 0 airflow/dags/xsl_templates/doc_metadata.xsl | 17 + airflow/dags/xsl_templates/ndcs.xsl | 18 + airflow/dags/xsl_templates/package_data.xsl | 25 ++ airflow/requirements.txt | 4 +- .../_dailymed_daily__models.yml | 0 .../_dailymed_daily__sources.yml | 0 24 files changed, 672 insertions(+), 337 deletions(-) rename airflow/dags/{dailymed_images/dag.py => dailymed/dag.txt} (97%) create mode 100644 airflow/dags/dailymed/dailymed.py create mode 100644 airflow/dags/dailymed/dailymed_images.py create mode 100644 airflow/dags/dailymed_daily/dag.py create mode 100644 airflow/dags/dailymed_daily/dag_tasks.py delete mode 100644 airflow/dags/dailymed_daily/dailymed_daily_dag.py rename airflow/dags/dailymed_daily/{staging-dailymed_interaction.sql => load_dailymed_interaction.sql} (100%) rename airflow/dags/dailymed_daily/{staging-dailymed_main.sql => load_dailymed_main.sql} (100%) rename airflow/dags/dailymed_daily/{staging-dailymed_ndc.sql => load_dailymed_ndc.sql} (100%) rename airflow/dags/dailymed_daily/{staging-dailymed_organization.sql => load_dailymed_organization.sql} (100%) rename airflow/dags/dailymed_daily/{staging-dailymed_organization_activity.sql => load_dailymed_organization_activity.sql} (100%) rename airflow/dags/dailymed_daily/{staging-dailymed_organization_item.sql => load_dailymed_organization_item.sql} (100%) rename airflow/dags/dailymed_daily/{staging-dailymed_organization_text.sql => load_dailymed_organization_text.sql} (100%) delete mode 100644 airflow/dags/dailymed_images/dag_tasks.py create mode 100644 airflow/dags/xml_functions.py create mode 100644 airflow/dags/xsl_templates/__init__.py rename airflow/dags/{dailymed_daily => xsl_templates}/dailymed_prescription.xsl (100%) create mode 100644 airflow/dags/xsl_templates/doc_metadata.xsl create mode 100644 airflow/dags/xsl_templates/ndcs.xsl create mode 100644 airflow/dags/xsl_templates/package_data.xsl create mode 100644 dbt/sagerx/models/staging/dailymed_daily/_dailymed_daily__models.yml create mode 100644 dbt/sagerx/models/staging/dailymed_daily/_dailymed_daily__sources.yml diff --git a/airflow/dags/dailymed_images/dag.py b/airflow/dags/dailymed/dag.txt similarity index 97% rename from airflow/dags/dailymed_images/dag.py rename to airflow/dags/dailymed/dag.txt index 97a10e8..317c97c 100644 --- a/airflow/dags/dailymed_images/dag.py +++ b/airflow/dags/dailymed/dag.txt @@ -6,7 +6,7 @@ from airflow.decorators import task import pandas as pd -from dailymed_images.dag_tasks import * +from airflow.dags.dailymed_images.dailymed_images import * from sagerx import load_df_to_pg dag_id = "dailymed_images" diff --git a/airflow/dags/dailymed/dailymed.py b/airflow/dags/dailymed/dailymed.py new file mode 100644 index 0000000..b1d28f3 --- /dev/null +++ b/airflow/dags/dailymed/dailymed.py @@ -0,0 +1,155 @@ +import os +from pathlib import Path +from xml_functions import transform_xml_to_dict, get_xsl_template_path, transform_xml +import pandas as pd +from sagerx import load_df_to_pg + +class DailyMed(): + def __init__(self, data_folder: os.PathLike) -> None: + self.data_folder = data_folder + self.rx_folder = Path(data_folder) / "prescription" + + ### + # Supplementary Functions + ### + + def create_dailymed_url(self,set_id): + return f"https://dailymed.nlm.nih.gov/dailymed/drugInfo.cfm?setid={set_id}" + + def ndc_format(self,text_data): + import re + # order of patterns is important, largest to smalles + patterns = [ + (r'\d{11}', '11 Digit'), + (r'\d{10}', '10 Digit'), + (r'\d{5}-\d{5}', '5-5'), + (r'\d{5}-\d{4}-\d{2}', '5-4-2'), + (r'\d{5}-\d{4}-\d{1}', '5-4-1'), + (r'\d{5}-\d{3}-\d{2}', '5-3-2'), + (r'\d{4}-\d{6}', '4-6'), + (r'\d{4}-\d{4}-\d{2}', '4-4-2') + ] + + for pattern, _ in patterns: + match = re.search(pattern, text_data) + if match: + return match.group(0) + return None + + def convert_ndc_10_to_11(self,ndc): + parts = ndc.split('-') + if len(parts[-1]) == 1: + parts[-1] = '0' + parts[-1] + return '-'.join(parts) + + def convert_ndc_no_dash(self,ndc): + return ndc.replace("-","") + + ### + # XML Processing + ### + + def find_xml_image_ids(self, xml_doc) -> list: + xslt = get_xsl_template_path("package_data.xsl") + results = transform_xml_to_dict(xml_doc,xslt) + return list(set(results.get('Image',[]))) + + def find_xml_ndc_numbers(self, xml_doc) -> list: + xslt = get_xsl_template_path("ndcs.xsl") + results = transform_xml_to_dict(xml_doc,xslt) + return list(set(results.get('NDC',[]))) + + def find_xml_metadata(self, xml_doc) -> dict: + xslt = get_xsl_template_path("doc_metadata.xsl") + results = transform_xml_to_dict(xml_doc,xslt) + return results + + def metadata_dict_cleanup(self, metadata): + new_dict = {} + for key, value in metadata.items(): + if isinstance(value, list) and len(value) == 1: + new_dict[key] = str(value[0]) + elif isinstance(value, list) and len(value) > 1: + new_dict[key] = value + return new_dict + + + def process_xml_doc(self, xml_doc): + image_ids = self.find_xml_image_ids(xml_doc) + ndc_ids = self.find_xml_ndc_numbers(xml_doc) + + metadata = self.find_xml_metadata(xml_doc) + metadata['imageIds'] = image_ids + metadata['ndcIds'] = ndc_ids + return self.metadata_dict_cleanup(metadata) + + ### + # File Processing + ### + + def unzip_data(self) -> None: + import zipfile + for zip_folder in self.rx_folder.iterdir(): + if zip_folder.is_file() and zip_folder.suffix == '.zip': + #print(zip_folder) + with zipfile.ZipFile(zip_folder) as unzipped_folder: + folder_name = zip_folder.stem + extracted_folder_path = self.rx_folder / folder_name + extracted_folder_path.mkdir(exist_ok=True) + + for subfile in unzipped_folder.infolist(): + unzipped_folder.extract(subfile, extracted_folder_path) + + os.remove(zip_folder) + + def map_files(self): + file_mapping ={} + for spl_folder in self.rx_folder.iterdir(): + if spl_folder.name == '.DS_Store': + continue + + image_files = [] + xml_file_name = "" + + for subfile in spl_folder.iterdir(): + if subfile.suffix == '.xml': + xml_file_name = subfile.name + elif subfile.suffix == '.jpg': + image_files.append(subfile.name) + + xml_path = self.get_file_path(spl_folder, xml_file_name) + metadata = self.process_xml_doc(xml_path) + + spl = spl_folder.name.split("_")[1] + file_dict = { + "xml_file":xml_file_name, + "image_files": image_files, + "spl_folder_name": spl_folder.name + } + file_mapping[spl] = dict(file_dict, **metadata) + self.file_mapping = file_mapping + print(file_mapping) + + + def get_file_path(self, spl_folder_name, file_name): + return os.path.join(self.rx_folder,spl_folder_name,file_name) + + ### + # Data Extraction for DailyMed Daily + ### + + def extract_and_upload_dmd_base_data(self): + xslt = get_xsl_template_path("dailymed_prescription.xsl") + + + for spl, mapping in self.file_mapping.items(): + spl_folder_name = mapping.get("spl_folder_name") + xml_file = self.get_file_path(spl_folder_name, mapping.get("xml_file")) + xml_content = transform_xml(xml_file, xslt) + + df = pd.DataFrame( + columns=["spl","spl_folder_name", "xml_file_name", "xml_content","image_files"], + data=[[spl, spl_folder_name, mapping.get("xml_file"), xml_content, mapping.get("image_files")]], + ) + + load_df_to_pg(df,"sagerx_lake","dailymed_daily","append") \ No newline at end of file diff --git a/airflow/dags/dailymed/dailymed_images.py b/airflow/dags/dailymed/dailymed_images.py new file mode 100644 index 0000000..b668f0b --- /dev/null +++ b/airflow/dags/dailymed/dailymed_images.py @@ -0,0 +1,319 @@ +import os +import pandas as pd +from dailymed.dailymed import DailyMed +from xml_functions import parse_dm_xml_to_dict +from sagerx import load_df_to_pg + +class DailyMedImages(DailyMed): + def __init__(self, data_folder: os.PathLike) -> None: + super().__init__(data_folder) + + def create_dailymed_image_url(self, image_id, spl): + return f"https://dailymed.nlm.nih.gov/dailymed/image.cfm?name={image_id}&setid={spl}&type=img" + + + def has_none_values(self,d): + for value in d.values(): + if value is {}: + return True + return False + + def validate_ndc(self,ndc): + """" + Validates that the NDC is greater than 9 characters + """ + raw_ndc = ndc.replace("-","") + if len(raw_ndc) > 9: + return True + return False + + """ + {'1d9f4044-a333-ecd3-e063-6294a90ab1fe': + {'xml_file': '1e004cc6-580a-1e62-e063-6294a90aa220.xml', + 'image_files': ['Xiclofen Box.jpg', 'Xiclofen Tube.jpg'], + 'spl_folder_name': '20240725_1d9f4044-a333-ecd3-e063-6294a90ab1fe', + 'documentId': '1e004cc6-580a-1e62-e063-6294a90aa220', + 'SetId': '1d9f4044-a333-ecd3-e063-6294a90ab1fe', + 'VersionNumber': '3', + 'EffectiveDate': '20240724', + 'MarketStatus': 'unapproved drug other', + 'imageIds': ['Xiclofen Box.jpg', 'Xiclofen Tube.jpg'], + 'ndcIds': '83295-5000-1'}} + """ + + def get_full_ndc_varients(self, ndcs): + ndcs_11 = [self.convert_ndc_10_to_11(ndc) for ndc in ndcs] + ndcs.extend(ndcs_11) + ndcs_nd = [self.convert_ndc_no_dash(ndc) for ndc in ndcs] + ndcs.extend(ndcs_nd) + ndcs.sort(key=lambda s: len(s), reverse=True) + return ndcs + + def get_ndc_from_image_filename(self, ndcs, image_id): + ndcs= self.get_full_ndc_varients(ndcs) + + image_ndc = self.ndc_format(image_id) + + if image_ndc: + for ndc in ndcs: + if ndc == image_ndc: + return ndc + else: + #print(f"NDC {image_ndc} from {id} not found in NDC list of: {converted_ndc_ids}") + return None + + def find_image_components(self,xml_doc): + components = [] + for component in xml_doc['document']['component']['structuredBody']['component']: + if component['section']['code']['@code'] == '51945-4': + components.append(component) + return components + + def find_ndcs_in_component(self, component, results=None): + if results is None: + results = [] + + if isinstance(component, str) and self.ndc_format(component): + results.append(self.ndc_format(component)) + elif isinstance(component, dict): + if '#text' in component and self.ndc_format(component['#text']): + results.append(self.ndc_format(component['#text'])) + for _, value in component.items(): + if isinstance(value, (dict, list)): + self.find_ndcs_in_component(value, results) + elif isinstance(component, list): + for item in component: + self.find_ndcs_in_component(item,results) + return list(set(results)) + + def find_images_in_component(self, xml_doc, results=None): + if results is None: + results = [] + + if isinstance(xml_doc, dict): + if '@mediaType' in xml_doc and xml_doc['@mediaType'] == 'image/jpeg': + results.append(xml_doc.get('reference', {}).get('@value')) + for _, value in xml_doc.items(): + if isinstance(value, (dict, list)): + self.find_images_in_component(value, results) + elif isinstance(xml_doc, list): + for item in xml_doc: + self.find_images_in_component(item, results) + return list(set(results)) + + def get_ndcs_from_image_components(self,xml_doc, ndc_ids, image_ids): + mapped_dict = {} + image_components = self.find_image_components(xml_doc) + + if image_components == []: + return None + + for component in image_components: + ndcs = self.find_ndcs_in_component(component) + images = self.find_images_in_component(component) + + if not ndcs or not images: + continue + + elif len(ndcs) == 1 and len(images) == 1: + ndc = ndcs[0] + image = images[0] + + if ndc in ndc_ids and image in image_ids: + mapped_dict[ndc] = image + else: + print(f"Found unknown ndc or image") + print(f"NDC {ndc}, vs expected {ndc_ids}") + print(f"Image {image}, vs expected {image_ids}") + return mapped_dict + + + def extract_and_upload_mapped_ndcs_from_image_files(self): + mapping_dict = self.file_mapping + image_ndc_mapping = {} + + for spl, mapping in mapping_dict.items(): + #print(spl, mapping) + ndcs = mapping.get('ndcIds') + image_files = mapping.get('image_files') + + # Get NDCs when found in the image filenames + for image_file in image_files: + matched_ndc = self.get_ndc_from_image_filename(ndcs, image_file) + if matched_ndc: + image_ndc_mapping[matched_ndc] = { + 'image_file':image_file, + 'spl':spl, + 'image_url':self.create_dailymed_image_url(image_file, spl), + 'methodology':'image_filename', + 'confidence_level':1, + 'matched':1} + + for ndc in ndcs: + if ndc not in image_ndc_mapping.keys(): + image_ndc_mapping[ndc] = { + 'image_file':'', + 'spl':spl, + 'image_url':'', + 'methodology':'image_filename', + 'confidence_level':1, + 'matched':0} + + + df = pd.DataFrame.from_dict(image_ndc_mapping, orient='index') + df = df.reset_index().rename(columns={'index':'ndc'}) + load_df_to_pg(df,"sagerx_lake","dailymed_image_ndc","append") + + + def extract_and_upload_mapped_ndcs_from_image_components(self): + mapping_dict = self.file_mapping + image_ndc_mapping = {} + + for spl, mapping in mapping_dict.items(): + #print(spl, mapping) + ndcs = mapping.get('ndcIds') + image_files = mapping.get('image_files') + + # Get NDCs from XML components + spl_folder_name = mapping.get("spl_folder_name") + xml_file_path = self.get_file_path(spl_folder_name, mapping.get("xml_file")) + xml_doc = parse_dm_xml_to_dict(xml_file_path) + print(xml_file_path) + + matched_components = self.get_ndcs_from_image_components(xml_doc, ndcs, image_files) + + for ndc,image_file in matched_components.items(): + image_ndc_mapping[ndc] = { + 'image_file':image_file, + 'spl':spl, + 'image_url':self.create_dailymed_image_url(image_file, spl), + 'methodology':'image_component', + 'confidence_level':0.75, + 'matched':1} + + for ndc in ndcs: + if ndc not in image_ndc_mapping.keys(): + image_ndc_mapping[ndc] = { + 'image_file':'', + 'spl':spl, + 'image_url':'', + 'methodology':'image_filename', + 'confidence_level':1, + 'matched':0} + + df = pd.DataFrame.from_dict(image_ndc_mapping, orient='index') + df = df.reset_index().rename(columns={'index':'ndc'}) + load_df_to_pg(df,"sagerx_lake","dailymed_image_ndc","append") + + + def barcode_to_ndc(self,data): + if len(data) > 11: + data = data[:-1] + data = data[2:] + + if len(data) == 10: + data = data[:-1] + '0' + data[-1] + + return data + + def extract_and_upload_mapped_ndcs_from_image_barcode(self): + from PIL import Image, ImageOps + from pyzbar.pyzbar import decode + + mapping_dict = self.file_mapping + image_ndc_mapping = {} + + for spl, mapping in mapping_dict.items(): + #print(spl, mapping) + ndcs = mapping.get('ndcIds') + ndcs = self.get_full_ndc_varients(ndcs) + image_files = mapping.get('image_files') + + spl_folder_name = mapping.get("spl_folder_name") + + for image_file in image_files: + image_file_path = self.get_file_path(spl_folder_name, image_file) + + img = Image.open(image_file_path) + img = img.convert('L') + img = ImageOps.autocontrast(img) + barcodes = decode(img) + + if not barcodes: + print("No barcode found in the image.") + return + + for barcode in barcodes: + barcode_ndc = self.barcode_to_ndc(barcode) + if barcode_ndc in ndcs: + image_ndc_mapping[barcode_ndc] = { + 'image_file':image_file, + 'spl':spl, + 'image_url':self.create_dailymed_image_url(image_file, spl), + 'methodology':'image_barcode', + 'confidence_level':0.5, + 'matched':1} + + for ndc in ndcs: + if ndc not in image_ndc_mapping.keys(): + image_ndc_mapping[ndc] = { + 'image_file':'', + 'spl':spl, + 'image_url':'', + 'methodology':'image_barcode', + 'confidence_level':0.5, + 'matched':0} + + df = pd.DataFrame.from_dict(image_ndc_mapping, orient='index') + df = df.reset_index().rename(columns={'index':'ndc'}) + load_df_to_pg(df,"sagerx_lake","dailymed_image_ndc","append") + + + + def extract_and_upload_mapped_ndcs_from_image_ocr(self): + import pytesseract + from PIL import Image + + mapping_dict = self.file_mapping + image_ndc_mapping = {} + + for spl, mapping in mapping_dict.items(): + #print(spl, mapping) + ndcs = mapping.get('ndcIds') + ndcs = self.get_full_ndc_varients(ndcs) + image_files = mapping.get('image_files') + + spl_folder_name = mapping.get("spl_folder_name") + + for image_file in image_files: + image_file_path = self.get_file_path(spl_folder_name, image_file) + + img = Image.open(image_file_path) + ocr_text = pytesseract.image_to_string(img) + lines = ocr_text.splitlines() + + for line in lines: + matched_ndc = self.ndc_format(line) + + if matched_ndc: + image_ndc_mapping[matched_ndc] = { + 'image_file':image_file, + 'spl':spl, + 'image_url':self.create_dailymed_image_url(image_file, spl), + 'methodology':'image_ocr', + 'confidence_level':0.25, + 'matched':1} + + for ndc in ndcs: + if ndc not in image_ndc_mapping.keys(): + image_ndc_mapping[ndc] = { + 'image_file':'', + 'spl':spl, + 'image_url':'', + 'methodology':'image_ocr', + 'confidence_level':0.25, + 'matched':0} + + df = pd.DataFrame.from_dict(image_ndc_mapping, orient='index') + df = df.reset_index().rename(columns={'index':'ndc'}) + load_df_to_pg(df,"sagerx_lake","dailymed_image_ndc","append") diff --git a/airflow/dags/dailymed_daily/dag.py b/airflow/dags/dailymed_daily/dag.py new file mode 100644 index 0000000..8170425 --- /dev/null +++ b/airflow/dags/dailymed_daily/dag.py @@ -0,0 +1,44 @@ +from airflow_operator import create_dag +from common_dag_tasks import extract, generate_sql_list, get_ds_folder, transform, read_sql_file +from dailymed_daily.dag_tasks import process_dailymed, unzip_data, process_dailymed_images +from airflow.providers.postgres.operators.postgres import PostgresOperator +import logging +logging.getLogger("airflow.task.operators.postgres_operator").setLevel(logging.WARNING) + +dag_id = "dailymed_daily" + +dag = create_dag( + dag_id=dag_id, + schedule= "45 7 * * 1-5", # at 7:45 am once daily + max_active_runs=1, + concurrency=2, +) + + +with dag: + file_name = "{{ macros.ds_format(macros.ds_add(ds,-1), '%Y-%m-%d', '%m%d%Y') }}" + url = f"https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_daily_update_{file_name}.zip" + + ds_folder = get_ds_folder(dag_id) + + extract_task = extract(dag_id,url) + + unzip = unzip_data(extract_task) + process = process_dailymed(extract_task) + process_images = process_dailymed_images(extract_task) + # transform_task = transform(dag_id) + + # sql_tasks = [] + # for sql in generate_sql_list(dag_id): + # sql_path = ds_folder / sql + # task_id = sql[:-4] #remove .sql + # sql_task = PostgresOperator( + # task_id=task_id, + # postgres_conn_id="postgres_default", + # sql=read_sql_file(sql_path).format(data_path=extract_task), + # dag=dag + # ) + # sql_tasks.append(sql_task) + + extract_task >> unzip >> process >> process_images + #>> sql_tasks >> transform_task \ No newline at end of file diff --git a/airflow/dags/dailymed_daily/dag_tasks.py b/airflow/dags/dailymed_daily/dag_tasks.py new file mode 100644 index 0000000..0c93c8c --- /dev/null +++ b/airflow/dags/dailymed_daily/dag_tasks.py @@ -0,0 +1,25 @@ +from airflow.decorators import task +from dailymed.dailymed import DailyMed +from dailymed.dailymed_images import DailyMedImages + +@task +def unzip_data(data_folder): + dm = DailyMed(data_folder) + dm.unzip_data() + + +@task +def process_dailymed(data_folder): + dm = DailyMed(data_folder) + dm.map_files() + dm.extract_and_upload_dmd_base_data() + + +@task +def process_dailymed_images(data_folder): + dmi = DailyMedImages(data_folder) + dmi.map_files() + #dmi.extract_and_upload_mapped_ndcs_from_image_files() + #dmi.extract_and_upload_mapped_ndcs_from_image_components() + dmi.extract_and_upload_mapped_ndcs_from_image_barcode() + #dmi.extract_and_upload_mapped_ndcs_from_image_ocr() \ No newline at end of file diff --git a/airflow/dags/dailymed_daily/dailymed_daily_dag.py b/airflow/dags/dailymed_daily/dailymed_daily_dag.py deleted file mode 100644 index 8a1b309..0000000 --- a/airflow/dags/dailymed_daily/dailymed_daily_dag.py +++ /dev/null @@ -1,153 +0,0 @@ -from datetime import date, timedelta -from textwrap import dedent -from pathlib import Path -from lxml import etree - -from sagerx import ( - read_sql_file, - get_sql_list, - alert_slack_channel, - get_dataset, -) - - -ds = { - "dag_id": "dailymed_daily", - "schedule_interval": "45 7 * * 1-5", # at 7:45 am once daily) - "url": "https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_daily_update_{{ macros.ds_format(macros.ds_add(ds,-1), '%Y-%m-%d', '%m%d%Y') }}.zip", -} - - -def transform_xml(input_xml, xslt): - # load xml input - dom = etree.parse(input_xml) - # load XSLT - xslt_doc = etree.parse(xslt) - xslt_transformer = etree.XSLT(xslt_doc) - # apply XSLT on loaded dom - new_xml = xslt_transformer(dom) - return etree.tostring(new_xml, pretty_print=True).decode("utf-8") - - -def process_dailymed(data_folder, xslt, ti): - import zipfile - import re - import os - import logging - import pandas as pd - import sqlalchemy - - db_conn_string = os.environ["AIRFLOW_CONN_POSTGRES_DEFAULT"] - db_conn = sqlalchemy.create_engine(db_conn_string) - - data_folder = ( - data_folder - / ti.xcom_pull(key="file_path", task_ids="get_dailymed_daily") - / "prescription" - ) - - for zip_folder in data_folder.iterdir(): - logging.info(zip_folder) - with zipfile.ZipFile(zip_folder) as unzipped_folder: - folder_name = zip_folder.stem - for subfile in unzipped_folder.infolist(): - if re.match(".*.xml", subfile.filename): - new_file = unzipped_folder.extract(subfile, data_folder) - raw_xml_data = etree.tostring(etree.parse(new_file) , encoding='unicode') - # xslt transform - xml_content = transform_xml(new_file, xslt) - #os.remove(new_file) - df = pd.DataFrame( - columns=["spl", "file_name", "xml_content", "raw_xml_content"], - data=[[folder_name, subfile.filename, xml_content,raw_xml_data]], - ) - df.to_sql( - "dailymed_daily", - schema="sagerx_lake", - con=db_conn, - if_exists="append", - index=False, - ) - - -########################### DYNAMIC DAG DO NOT TOUCH BELOW HERE ################################# - -# The DAG object; we'll need this to instantiate a DAG -from airflow import DAG - -# Operators; we need this to operate! -from airflow.operators.python_operator import PythonOperator -from airflow.providers.postgres.operators.postgres import PostgresOperator -from airflow.utils.dates import days_ago - - -# builds a dag for each data set in data_set_list -default_args = { - "owner": "airflow", - "start_date": days_ago(0), - "depends_on_past": False, - "email": ["admin@sagerx.io"], - "email_on_failure": False, - "email_on_retry": False, - "retries": 1, - "retry_delay": timedelta(minutes=5), - # none airflow common dag elements - "on_failure_callback": alert_slack_channel, -} - -dag_args = {**default_args, **ds} - - -dag_id = dag_args["dag_id"] -url = dag_args["url"] - -dag = DAG( - dag_id, - schedule_interval=dag_args["schedule_interval"], - default_args=dag_args, - description=f"Processes {dag_id} source", - user_defined_macros=dag_args.get("user_defined_macros"), -) - -ds_folder = Path("/opt/airflow/dags") / dag_id -data_folder = Path("/opt/airflow/data") / dag_id - - -with dag: - # Task to download data from web location - - tl = [] - # Task to download data from web location - tl.append( - PythonOperator( - task_id=f"get_{dag_id}", - python_callable=get_dataset, - op_kwargs={"ds_url": url, "data_folder": data_folder}, - ) - ) - - # Task to load data into source db schema - tl.append( - PythonOperator( - task_id=f"load_{dag_id}", - python_callable=process_dailymed, - op_kwargs={ - "data_folder": data_folder, - "xslt": ds_folder / "dailymed_prescription.xsl", - }, - ) - ) - - for sql in get_sql_list("staging-", ds_folder): - sql_path = ds_folder / sql - tl.append( - PostgresOperator( - task_id=sql, - postgres_conn_id="postgres_default", - sql=read_sql_file(sql_path), - ) - ) - - for i in range(len(tl)): - if i not in [0]: - tl[i - 1] >> tl[i] diff --git a/airflow/dags/dailymed_daily/staging-dailymed_interaction.sql b/airflow/dags/dailymed_daily/load_dailymed_interaction.sql similarity index 100% rename from airflow/dags/dailymed_daily/staging-dailymed_interaction.sql rename to airflow/dags/dailymed_daily/load_dailymed_interaction.sql diff --git a/airflow/dags/dailymed_daily/staging-dailymed_main.sql b/airflow/dags/dailymed_daily/load_dailymed_main.sql similarity index 100% rename from airflow/dags/dailymed_daily/staging-dailymed_main.sql rename to airflow/dags/dailymed_daily/load_dailymed_main.sql diff --git a/airflow/dags/dailymed_daily/staging-dailymed_ndc.sql b/airflow/dags/dailymed_daily/load_dailymed_ndc.sql similarity index 100% rename from airflow/dags/dailymed_daily/staging-dailymed_ndc.sql rename to airflow/dags/dailymed_daily/load_dailymed_ndc.sql diff --git a/airflow/dags/dailymed_daily/staging-dailymed_organization.sql b/airflow/dags/dailymed_daily/load_dailymed_organization.sql similarity index 100% rename from airflow/dags/dailymed_daily/staging-dailymed_organization.sql rename to airflow/dags/dailymed_daily/load_dailymed_organization.sql diff --git a/airflow/dags/dailymed_daily/staging-dailymed_organization_activity.sql b/airflow/dags/dailymed_daily/load_dailymed_organization_activity.sql similarity index 100% rename from airflow/dags/dailymed_daily/staging-dailymed_organization_activity.sql rename to airflow/dags/dailymed_daily/load_dailymed_organization_activity.sql diff --git a/airflow/dags/dailymed_daily/staging-dailymed_organization_item.sql b/airflow/dags/dailymed_daily/load_dailymed_organization_item.sql similarity index 100% rename from airflow/dags/dailymed_daily/staging-dailymed_organization_item.sql rename to airflow/dags/dailymed_daily/load_dailymed_organization_item.sql diff --git a/airflow/dags/dailymed_daily/staging-dailymed_organization_text.sql b/airflow/dags/dailymed_daily/load_dailymed_organization_text.sql similarity index 100% rename from airflow/dags/dailymed_daily/staging-dailymed_organization_text.sql rename to airflow/dags/dailymed_daily/load_dailymed_organization_text.sql diff --git a/airflow/dags/dailymed_images/dag_tasks.py b/airflow/dags/dailymed_images/dag_tasks.py deleted file mode 100644 index 413cf89..0000000 --- a/airflow/dags/dailymed_images/dag_tasks.py +++ /dev/null @@ -1,175 +0,0 @@ -import re -import xmltodict -import pandas as pd - -def parse_xml_content(row:pd.Series): - from airflow.exceptions import AirflowException - - xml_string = row['raw_xml_content'] - xml_data = xmltodict.parse(xml_string) - if 'document' not in xml_data.keys(): - raise AirflowException("Unexpected XML Data, expected DailyMed Document") - - return xml_data - -def clean_string(input_string): - cleaned_string = input_string.strip() - cleaned_string = ' '.join(cleaned_string.split()) - return cleaned_string - -def validate_ndc(ndc): - raw_ndc = ndc.replace("-","") - if len(raw_ndc) > 9: - return ndc - -def find_image_ids(xml_doc, results=None): - if results is None: - results = [] - - if isinstance(xml_doc, dict): - if '@mediaType' in xml_doc and xml_doc['@mediaType'] == 'image/jpeg': - results.append(xml_doc.get('reference', {}).get('@value')) - for _, value in xml_doc.items(): - if isinstance(value, (dict, list)): - find_image_ids(value, results) - elif isinstance(xml_doc, list): - for item in xml_doc: - find_image_ids(item, results) - return list(set(results)) - -def find_ndc_numbers(xml_doc, results=None) -> list: - target_code_system = '2.16.840.1.113883.6.69' - if results is None: - results = [] - - if isinstance(xml_doc, dict): - if '@codeSystem' in xml_doc and xml_doc['@codeSystem'] == target_code_system: - ndc = xml_doc.get('@code') - if ndc and validate_ndc(ndc): - results.append(ndc) - for _, value in xml_doc.items(): - if isinstance(value, (dict, list)): - find_ndc_numbers(value, results) - elif isinstance(xml_doc, list): - for item in xml_doc: - find_ndc_numbers(item,results) - return list(set(results)) - - -def extract_set_id(xml_doc): - return xml_doc['document']['setId']['@root'] - - -def ndc_format(text_data): - # order of patterns is important, largest to smalles - patterns = [ - (r'\d{11}', '11 Digit'), - (r'\d{10}', '10 Digit'), - (r'\d{5}-\d{5}', '5-5'), - (r'\d{5}-\d{4}-\d{2}', '5-4-2'), - (r'\d{5}-\d{4}-\d{1}', '5-4-1'), - (r'\d{5}-\d{3}-\d{2}', '5-3-2'), - (r'\d{4}-\d{6}', '4-6'), - (r'\d{4}-\d{4}-\d{2}', '4-4-2') - ] - - for pattern, _ in patterns: - match = re.search(pattern, text_data) - if match: - return match.group(0) - return None - -def find_image_components(xml_doc): - components = [] - for component in xml_doc['document']['component']['structuredBody']['component']: - if component['section']['code']['@code'] == '51945-4': - components.append(component) - return components - -def find_ndc_in_image_component(component, results=None): - if results is None: - results = [] - - if isinstance(component, str) and ndc_format(component): - results.append(ndc_format(component)) - elif isinstance(component, dict): - if '#text' in component and ndc_format(component['#text']): - results.append(ndc_format(component['#text'])) - for _, value in component.items(): - if isinstance(value, (dict, list)): - find_ndc_in_image_component(value, results) - elif isinstance(component, list): - for item in component: - find_ndc_in_image_component(item,results) - return list(set(results)) - -def create_dailymed_image_url(image_id, set_id): - return f"https://dailymed.nlm.nih.gov/dailymed/image.cfm?name={image_id}&setid={set_id}&type=img" - -def convert_ndc_10_to_11(ndc): - parts = ndc.split('-') - if len(parts[-1]) == 1: - parts[-1] = '0' + parts[-1] - return '-'.join(parts) - -def map_ndcs_from_image_ids(row:pd.Series): - set_id = row['set_id'] - ndc_ids = row['ndc_ids'] - image_ids = row['image_ids'] - med_dict = {} - - converted_ndc_ids = [convert_ndc_10_to_11(ndc) for ndc in ndc_ids] - - for id in image_ids: - image_ndc = ndc_format(id) - if image_ndc and (image_ndc in converted_ndc_ids): - med_dict[image_ndc] = id - # else: - # print(f"NDC {image_ndc} from {id} not found in NDC list of: {converted_ndc_ids}") - return med_dict - -def map_ndcs_from_image_components(row:pd.Series): - xml_doc = row['raw_xml_content'] - set_id = row['set_id'] - ndc_ids = row['ndc_ids'] - image_ids = row['image_ids'] - med_dict = {} - - image_components = find_image_components(xml_doc) - - if image_components == []: - return None - - for component in image_components: - ndcs = find_ndc_in_image_component(component) - images = find_image_ids(component) - - if not ndcs or not images: - continue - - elif len(ndcs) == 1 and len(images) == 1: - ndc = ndcs[0] - image = images[0] - - if ndc in ndc_ids and image in image_ids: - med_dict[ndc] = image - # else: - # print(f"Found unknown ndc or image in set ID: {set_id}") - # print(f"NDCs expected: {ndc_ids}") - # print(f"NDC got: {ndc}") - # print(f"Images expected: {image_ids}") - # print(f"Image got: {image}") - # else: - # print(f"Multiple NDC - Image mapping found for set ID: {set_id}") - # print(f"NDCs got: {ndcs}") - # print(f"Images got: {images}") - - return med_dict - -def map_ndcs_parent_function(row:pd.Series): - med_dict = map_ndcs_from_image_ids(row) - - if not med_dict: - med_dict = map_ndcs_from_image_components(row) - - return med_dict \ No newline at end of file diff --git a/airflow/dags/sagerx.py b/airflow/dags/sagerx.py index 49ba7b1..5c469b0 100644 --- a/airflow/dags/sagerx.py +++ b/airflow/dags/sagerx.py @@ -31,6 +31,14 @@ def read_json_file(json_path:str): json_object = json.load(f) return json_object +def unzip_folder(file_path): + import zipfile + with zipfile.ZipFile(file_path, "r") as zip_ref: + zip_ref.extractall(file_path.with_suffix("")) + Path.unlink(file_path) + file_path = file_path.with_suffix("") + return file_path + # Web functions def download_dataset(url: str, dest: Path = Path.cwd(), file_name: str = None): """Downloads a data set file from provided Url via a requests steam @@ -93,16 +101,12 @@ def get_dataset(ds_url, data_folder, ti=None, file_name=None): ds_url = url to download dataset file from data_folder = path to save dataset to ti = airflow parameter to store task instance for xcoms""" - import zipfile import logging file_path = download_dataset(url=ds_url, dest=data_folder) logging.info(f"requested url: {ds_url}") if file_path.suffix == ".zip": - with zipfile.ZipFile(file_path, "r") as zip_ref: - zip_ref.extractall(file_path.with_suffix("")) - Path.unlink(file_path) - file_path = file_path.with_suffix("") + file_path = unzip_folder(file_path) # change name of file if one is provided if file_name != None: @@ -216,6 +220,8 @@ def parallel_api_calls(api_calls:list) -> list: response = future.result() if not len(response) == 0: output.append({"url":url,"response":response}) - else: - print(f"Empty response for url: {url}") + #else: + #print(f"Empty response for url: {url}") + if len(output) % 1000 == 0: + print(f"Milestone: {len(output)}") return output \ No newline at end of file diff --git a/airflow/dags/xml_functions.py b/airflow/dags/xml_functions.py new file mode 100644 index 0000000..f29bfe0 --- /dev/null +++ b/airflow/dags/xml_functions.py @@ -0,0 +1,52 @@ +from collections import defaultdict +import lxml.etree as ET +import os +import xmltodict + +def get_xsl_template_path(file_name): + xsl_templates_path = os.path.join(os.path.dirname(__file__), 'xsl_templates') + file_path = os.path.join(xsl_templates_path, file_name) + return file_path + +def parse_xml(input_xml): + return ET.parse(input_xml) + +def parse_dm_xml_to_dict(input_xml): + from airflow.exceptions import AirflowException + + with open(input_xml, 'r') as xml_file: + xml_doc = xmltodict.parse(xml_file.read()) + + if 'document' not in xml_doc.keys(): + raise AirflowException("Unexpected XML Data, expected DailyMed Document") + + return xml_doc + +def transform_xml_to_dict(input_xml, xslt): + + dom = ET.parse(input_xml) + xslt_doc = ET.parse(xslt) + xslt_transformer = ET.XSLT(xslt_doc) + new_xml = xslt_transformer(dom) + root = ET.fromstring(ET.tostring(new_xml)) + + result_dict = defaultdict(list) + + for elem in root.iter(): + if elem.tag not in result_dict: + result_dict[elem.tag] = [] + if elem.text is not None: + result_dict[elem.tag].append(elem.text) + + return dict(result_dict) + + +def transform_xml(input_xml, xslt): + # load xml input + dom = ET.parse(input_xml) + # load XSLT + xslt_doc = ET.parse(xslt) + xslt_transformer = ET.XSLT(xslt_doc) + # apply XSLT on loaded dom + new_xml = xslt_transformer(dom) + return ET.tostring(new_xml, pretty_print=True).decode("utf-8") \ No newline at end of file diff --git a/airflow/dags/xsl_templates/__init__.py b/airflow/dags/xsl_templates/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/airflow/dags/dailymed_daily/dailymed_prescription.xsl b/airflow/dags/xsl_templates/dailymed_prescription.xsl similarity index 100% rename from airflow/dags/dailymed_daily/dailymed_prescription.xsl rename to airflow/dags/xsl_templates/dailymed_prescription.xsl diff --git a/airflow/dags/xsl_templates/doc_metadata.xsl b/airflow/dags/xsl_templates/doc_metadata.xsl new file mode 100644 index 0000000..85a2b60 --- /dev/null +++ b/airflow/dags/xsl_templates/doc_metadata.xsl @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/airflow/dags/xsl_templates/ndcs.xsl b/airflow/dags/xsl_templates/ndcs.xsl new file mode 100644 index 0000000..9d051e9 --- /dev/null +++ b/airflow/dags/xsl_templates/ndcs.xsl @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/airflow/dags/xsl_templates/package_data.xsl b/airflow/dags/xsl_templates/package_data.xsl new file mode 100644 index 0000000..1b47af8 --- /dev/null +++ b/airflow/dags/xsl_templates/package_data.xsl @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/airflow/requirements.txt b/airflow/requirements.txt index e02cd71..0343746 100644 --- a/airflow/requirements.txt +++ b/airflow/requirements.txt @@ -1,7 +1,9 @@ # Any change made here should accompany an increment # to the image version on line 5 of docker-compose.yml +apache-airflow[google] dbt-core dbt-postgres -apache-airflow[google] +pillow==9.5.0 +pyzbar==0.1.9 xmltodict==0.13.0 \ No newline at end of file diff --git a/dbt/sagerx/models/staging/dailymed_daily/_dailymed_daily__models.yml b/dbt/sagerx/models/staging/dailymed_daily/_dailymed_daily__models.yml new file mode 100644 index 0000000..e69de29 diff --git a/dbt/sagerx/models/staging/dailymed_daily/_dailymed_daily__sources.yml b/dbt/sagerx/models/staging/dailymed_daily/_dailymed_daily__sources.yml new file mode 100644 index 0000000..e69de29 From 43ee906c2f59f4a5e8edf429a7aa0dc83a24b1d1 Mon Sep 17 00:00:00 2001 From: Lukasz Przychodzien Date: Mon, 19 Aug 2024 23:36:07 -0400 Subject: [PATCH 4/9] add logging and filename fix --- airflow/dags/dailymed/dailymed.py | 14 ++++++- airflow/dags/dailymed/dailymed_images.py | 52 +++++++++++++++++------- airflow/dags/dailymed_daily/dag_tasks.py | 10 +++-- 3 files changed, 56 insertions(+), 20 deletions(-) diff --git a/airflow/dags/dailymed/dailymed.py b/airflow/dags/dailymed/dailymed.py index b1d28f3..b01f2b4 100644 --- a/airflow/dags/dailymed/dailymed.py +++ b/airflow/dags/dailymed/dailymed.py @@ -3,12 +3,22 @@ from xml_functions import transform_xml_to_dict, get_xsl_template_path, transform_xml import pandas as pd from sagerx import load_df_to_pg +import logging +from airflow import configuration as conf class DailyMed(): def __init__(self, data_folder: os.PathLike) -> None: self.data_folder = data_folder self.rx_folder = Path(data_folder) / "prescription" + airflow_logging_level = conf.get('logging', 'logging_level') + + if airflow_logging_level == 'DEBUG': + logging.debug("This is a debug message that will only be logging.debuged when logging_level is set to DEBUG.") + else: + logging.info("This is an info message, but DEBUG level messages will not be logging.debuged.") + + ### # Supplementary Functions ### @@ -91,7 +101,7 @@ def unzip_data(self) -> None: import zipfile for zip_folder in self.rx_folder.iterdir(): if zip_folder.is_file() and zip_folder.suffix == '.zip': - #print(zip_folder) + logging.debug(zip_folder) with zipfile.ZipFile(zip_folder) as unzipped_folder: folder_name = zip_folder.stem extracted_folder_path = self.rx_folder / folder_name @@ -128,7 +138,7 @@ def map_files(self): } file_mapping[spl] = dict(file_dict, **metadata) self.file_mapping = file_mapping - print(file_mapping) + logging.debug(file_mapping) def get_file_path(self, spl_folder_name, file_name): diff --git a/airflow/dags/dailymed/dailymed_images.py b/airflow/dags/dailymed/dailymed_images.py index b668f0b..feafb65 100644 --- a/airflow/dags/dailymed/dailymed_images.py +++ b/airflow/dags/dailymed/dailymed_images.py @@ -3,6 +3,8 @@ from dailymed.dailymed import DailyMed from xml_functions import parse_dm_xml_to_dict from sagerx import load_df_to_pg +import logging + class DailyMedImages(DailyMed): def __init__(self, data_folder: os.PathLike) -> None: @@ -41,7 +43,7 @@ def validate_ndc(self,ndc): 'ndcIds': '83295-5000-1'}} """ - def get_full_ndc_varients(self, ndcs): + def get_full_ndc_variants(self, ndcs): ndcs_11 = [self.convert_ndc_10_to_11(ndc) for ndc in ndcs] ndcs.extend(ndcs_11) ndcs_nd = [self.convert_ndc_no_dash(ndc) for ndc in ndcs] @@ -50,16 +52,19 @@ def get_full_ndc_varients(self, ndcs): return ndcs def get_ndc_from_image_filename(self, ndcs, image_id): - ndcs= self.get_full_ndc_varients(ndcs) - image_ndc = self.ndc_format(image_id) + logging.debug(f"Got NDC varients, total {len(ndcs)}") + logging.debug(f"image ndc: {image_ndc} from {image_id}") + if image_ndc: for ndc in ndcs: if ndc == image_ndc: return ndc + return None + else: - #print(f"NDC {image_ndc} from {id} not found in NDC list of: {converted_ndc_ids}") + #logging.debug(f"NDC {image_ndc} from {id} not found in NDC list of: {ndcs}") return None def find_image_components(self,xml_doc): @@ -122,9 +127,9 @@ def get_ndcs_from_image_components(self,xml_doc, ndc_ids, image_ids): if ndc in ndc_ids and image in image_ids: mapped_dict[ndc] = image else: - print(f"Found unknown ndc or image") - print(f"NDC {ndc}, vs expected {ndc_ids}") - print(f"Image {image}, vs expected {image_ids}") + logging.debug(f"Found unknown ndc or image") + logging.debug(f"NDC {ndc}, vs expected {ndc_ids}") + logging.debug(f"Image {image}, vs expected {image_ids}") return mapped_dict @@ -133,13 +138,23 @@ def extract_and_upload_mapped_ndcs_from_image_files(self): image_ndc_mapping = {} for spl, mapping in mapping_dict.items(): - #print(spl, mapping) + ndcs = mapping.get('ndcIds') + ndc_variants = self.get_full_ndc_variants(ndcs) image_files = mapping.get('image_files') + logging.debug(f"image file check for {spl}") + logging.debug(f"NDCs found from mapping: {len(ndcs)}") + logging.debug(f"NDC varients: {len(ndc_variants)}") + # Get NDCs when found in the image filenames + logging.debug(f"Found {len(image_files)} image files") + for image_file in image_files: - matched_ndc = self.get_ndc_from_image_filename(ndcs, image_file) + matched_ndc = self.get_ndc_from_image_filename(ndc_variants, image_file) + + logging.debug(f"Mapping dict length of: {len(image_ndc_mapping)}") + if matched_ndc: image_ndc_mapping[matched_ndc] = { 'image_file':image_file, @@ -149,7 +164,9 @@ def extract_and_upload_mapped_ndcs_from_image_files(self): 'confidence_level':1, 'matched':1} - for ndc in ndcs: + logging.debug(f"NDCs found from mapping post: {len(mapping.get('ndcIds'))}") + + for ndc in mapping.get('ndcIds'): if ndc not in image_ndc_mapping.keys(): image_ndc_mapping[ndc] = { 'image_file':'', @@ -158,6 +175,8 @@ def extract_and_upload_mapped_ndcs_from_image_files(self): 'methodology':'image_filename', 'confidence_level':1, 'matched':0} + + logging.debug(f"Mapping keys: {image_ndc_mapping.keys()}") df = pd.DataFrame.from_dict(image_ndc_mapping, orient='index') @@ -170,7 +189,8 @@ def extract_and_upload_mapped_ndcs_from_image_components(self): image_ndc_mapping = {} for spl, mapping in mapping_dict.items(): - #print(spl, mapping) + logging.debug(f"image component check for {spl}") + ndcs = mapping.get('ndcIds') image_files = mapping.get('image_files') @@ -178,7 +198,7 @@ def extract_and_upload_mapped_ndcs_from_image_components(self): spl_folder_name = mapping.get("spl_folder_name") xml_file_path = self.get_file_path(spl_folder_name, mapping.get("xml_file")) xml_doc = parse_dm_xml_to_dict(xml_file_path) - print(xml_file_path) + logging.debug(xml_file_path) matched_components = self.get_ndcs_from_image_components(xml_doc, ndcs, image_files) @@ -224,7 +244,8 @@ def extract_and_upload_mapped_ndcs_from_image_barcode(self): image_ndc_mapping = {} for spl, mapping in mapping_dict.items(): - #print(spl, mapping) + logging.debug(f"image barcode check for {spl}") + ndcs = mapping.get('ndcIds') ndcs = self.get_full_ndc_varients(ndcs) image_files = mapping.get('image_files') @@ -240,7 +261,7 @@ def extract_and_upload_mapped_ndcs_from_image_barcode(self): barcodes = decode(img) if not barcodes: - print("No barcode found in the image.") + logging.debug("No barcode found in the image.") return for barcode in barcodes: @@ -278,7 +299,8 @@ def extract_and_upload_mapped_ndcs_from_image_ocr(self): image_ndc_mapping = {} for spl, mapping in mapping_dict.items(): - #print(spl, mapping) + logging.debug(f"image OCR check for {spl}") + ndcs = mapping.get('ndcIds') ndcs = self.get_full_ndc_varients(ndcs) image_files = mapping.get('image_files') diff --git a/airflow/dags/dailymed_daily/dag_tasks.py b/airflow/dags/dailymed_daily/dag_tasks.py index 0c93c8c..d11c23c 100644 --- a/airflow/dags/dailymed_daily/dag_tasks.py +++ b/airflow/dags/dailymed_daily/dag_tasks.py @@ -19,7 +19,11 @@ def process_dailymed(data_folder): def process_dailymed_images(data_folder): dmi = DailyMedImages(data_folder) dmi.map_files() - #dmi.extract_and_upload_mapped_ndcs_from_image_files() - #dmi.extract_and_upload_mapped_ndcs_from_image_components() - dmi.extract_and_upload_mapped_ndcs_from_image_barcode() + print("Completed Mapping") + dmi.extract_and_upload_mapped_ndcs_from_image_files() + print("Image Files Complete") + # dmi.extract_and_upload_mapped_ndcs_from_image_components() + # print("Image Components Complete") + # dmi.extract_and_upload_mapped_ndcs_from_image_barcode() + # print("Barcode Reading complete") #dmi.extract_and_upload_mapped_ndcs_from_image_ocr() \ No newline at end of file From 0dad57bb54e9c1994f90c57bd4cf6952af039282 Mon Sep 17 00:00:00 2001 From: Joey LeGrand Date: Wed, 21 Aug 2024 21:25:34 -0500 Subject: [PATCH 5/9] Initial changes in progress --- airflow/dags/dailymed/dailymed.py | 11 ++++++-- airflow/dags/dailymed_daily/dag.py | 3 ++- airflow/dags/dailymed_daily/dag_tasks.py | 4 +-- airflow/dags/sagerx.py | 14 ++++++----- airflow/dags/xml_functions.py | 28 +++++++++++---------- airflow/dags/xsl_templates/package_data.xsl | 10 +++++--- docker-compose.yml | 4 +-- 7 files changed, 45 insertions(+), 29 deletions(-) diff --git a/airflow/dags/dailymed/dailymed.py b/airflow/dags/dailymed/dailymed.py index b01f2b4..ae9522b 100644 --- a/airflow/dags/dailymed/dailymed.py +++ b/airflow/dags/dailymed/dailymed.py @@ -85,10 +85,15 @@ def metadata_dict_cleanup(self, metadata): def process_xml_doc(self, xml_doc): + print('process_xml_doc') image_ids = self.find_xml_image_ids(xml_doc) + print('found_xml_image_ids') ndc_ids = self.find_xml_ndc_numbers(xml_doc) - + print('found_ndc_ids') + metadata = self.find_xml_metadata(xml_doc) + print('found_xml_metadata') + metadata['imageIds'] = image_ids metadata['ndcIds'] = ndc_ids return self.metadata_dict_cleanup(metadata) @@ -127,10 +132,12 @@ def map_files(self): elif subfile.suffix == '.jpg': image_files.append(subfile.name) + spl = spl_folder.name.split("_")[1] + print(spl) + xml_path = self.get_file_path(spl_folder, xml_file_name) metadata = self.process_xml_doc(xml_path) - spl = spl_folder.name.split("_")[1] file_dict = { "xml_file":xml_file_name, "image_files": image_files, diff --git a/airflow/dags/dailymed_daily/dag.py b/airflow/dags/dailymed_daily/dag.py index 8170425..5716c0e 100644 --- a/airflow/dags/dailymed_daily/dag.py +++ b/airflow/dags/dailymed_daily/dag.py @@ -17,7 +17,8 @@ with dag: file_name = "{{ macros.ds_format(macros.ds_add(ds,-1), '%Y-%m-%d', '%m%d%Y') }}" - url = f"https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_daily_update_{file_name}.zip" + #url = f"https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_daily_update_{file_name}.zip" + url = "https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_release_human_rx_part1.zip" ds_folder = get_ds_folder(dag_id) diff --git a/airflow/dags/dailymed_daily/dag_tasks.py b/airflow/dags/dailymed_daily/dag_tasks.py index d11c23c..f71bc94 100644 --- a/airflow/dags/dailymed_daily/dag_tasks.py +++ b/airflow/dags/dailymed_daily/dag_tasks.py @@ -22,8 +22,8 @@ def process_dailymed_images(data_folder): print("Completed Mapping") dmi.extract_and_upload_mapped_ndcs_from_image_files() print("Image Files Complete") - # dmi.extract_and_upload_mapped_ndcs_from_image_components() - # print("Image Components Complete") + dmi.extract_and_upload_mapped_ndcs_from_image_components() + print("Image Components Complete") # dmi.extract_and_upload_mapped_ndcs_from_image_barcode() # print("Barcode Reading complete") #dmi.extract_and_upload_mapped_ndcs_from_image_ocr() \ No newline at end of file diff --git a/airflow/dags/sagerx.py b/airflow/dags/sagerx.py index 5c469b0..126e641 100644 --- a/airflow/dags/sagerx.py +++ b/airflow/dags/sagerx.py @@ -154,13 +154,15 @@ def alert_slack_channel(context): message=msg, ).execute(context=None) -def load_df_to_pg(df,schema_name:str,table_name:str,if_exists:str,dtype_name:str="",index:bool=True, - create_index: bool = False, index_columns: list = None) -> None: - from airflow.hooks.postgres_hook import PostgresHook - import sqlalchemy +from airflow.hooks.postgres_hook import PostgresHook +import sqlalchemy - pg_hook = PostgresHook(postgres_conn_id="postgres_default") - engine = pg_hook.get_sqlalchemy_engine() +pg_hook = PostgresHook(postgres_conn_id="postgres_default") +engine = pg_hook.get_sqlalchemy_engine() + + +def load_df_to_pg(df,schema_name:str,table_name:str,if_exists:str,dtype_name:str="",index:bool=True, + create_index: bool = False, index_columns: list = None, engine=engine) -> None: if dtype_name: dtype = {dtype_name:sqlalchemy.types.JSON} diff --git a/airflow/dags/xml_functions.py b/airflow/dags/xml_functions.py index f29bfe0..08df244 100644 --- a/airflow/dags/xml_functions.py +++ b/airflow/dags/xml_functions.py @@ -23,20 +23,22 @@ def parse_dm_xml_to_dict(input_xml): return xml_doc def transform_xml_to_dict(input_xml, xslt): - - dom = ET.parse(input_xml) - xslt_doc = ET.parse(xslt) - xslt_transformer = ET.XSLT(xslt_doc) - new_xml = xslt_transformer(dom) - root = ET.fromstring(ET.tostring(new_xml)) - result_dict = defaultdict(list) - - for elem in root.iter(): - if elem.tag not in result_dict: - result_dict[elem.tag] = [] - if elem.text is not None: - result_dict[elem.tag].append(elem.text) + try: + dom = ET.parse(input_xml) + xslt_doc = ET.parse(xslt) + xslt_transformer = ET.XSLT(xslt_doc) + new_xml = xslt_transformer(dom) + root = ET.fromstring(ET.tostring(new_xml)) + + + for elem in root.iter(): + if elem.tag not in result_dict: + result_dict[elem.tag] = [] + if elem.text is not None: + result_dict[elem.tag].append(elem.text) + except: + print('ERROR in transform_xml_to_dict') return dict(result_dict) diff --git a/airflow/dags/xsl_templates/package_data.xsl b/airflow/dags/xsl_templates/package_data.xsl index 1b47af8..91bb447 100644 --- a/airflow/dags/xsl_templates/package_data.xsl +++ b/airflow/dags/xsl_templates/package_data.xsl @@ -1,10 +1,15 @@ - + - + + + + + + @@ -19,7 +24,6 @@ - \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7b00e23..ec6e6e9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: "3.8" x-airflow-common: &airflow-common build: context: ./airflow - image: sagerx_airflow:v0.0.5 # versioning allows a rebuild of docker image where necessary + image: sagerx_airflow:v0.0.77 # versioning allows a rebuild of docker image where necessary networks: - airflow-dbt-network env_file: @@ -73,7 +73,7 @@ services: - .env environment: AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: google-cloud-platform://?key_path=/opt/gcp.json - image: dbt:v0.0.5 + image: dbt:v0.0.77 networks: - airflow-dbt-network container_name: dbt From 746f136b129604de7f2c0541f8e6a57cfcd713b5 Mon Sep 17 00:00:00 2001 From: Joey LeGrand Date: Thu, 22 Aug 2024 15:45:03 -0500 Subject: [PATCH 6/9] Got working --- airflow/dags/dailymed/dailymed.py | 18 ++++-- airflow/dags/dailymed/dailymed_images.py | 71 ++++++++------------- airflow/dags/dailymed_daily/dag.py | 7 +- airflow/dags/xml_functions.py | 38 ++++++----- airflow/dags/xsl_templates/doc_metadata.xsl | 8 ++- airflow/dags/xsl_templates/ndcs.xsl | 22 +++---- airflow/dags/xsl_templates/package_data.xsl | 2 +- 7 files changed, 83 insertions(+), 83 deletions(-) diff --git a/airflow/dags/dailymed/dailymed.py b/airflow/dags/dailymed/dailymed.py index ae9522b..9348157 100644 --- a/airflow/dags/dailymed/dailymed.py +++ b/airflow/dags/dailymed/dailymed.py @@ -67,6 +67,7 @@ def find_xml_image_ids(self, xml_doc) -> list: def find_xml_ndc_numbers(self, xml_doc) -> list: xslt = get_xsl_template_path("ndcs.xsl") results = transform_xml_to_dict(xml_doc,xslt) + #print(results) return list(set(results.get('NDC',[]))) def find_xml_metadata(self, xml_doc) -> dict: @@ -74,6 +75,11 @@ def find_xml_metadata(self, xml_doc) -> dict: results = transform_xml_to_dict(xml_doc,xslt) return results + def find_xml_package_data(self, xml_doc) -> dict: + xslt = get_xsl_template_path("package_data.xsl") + results = transform_xml_to_dict(xml_doc,xslt) + return results + def metadata_dict_cleanup(self, metadata): new_dict = {} for key, value in metadata.items(): @@ -85,18 +91,18 @@ def metadata_dict_cleanup(self, metadata): def process_xml_doc(self, xml_doc): - print('process_xml_doc') + #print('process_xml_doc') image_ids = self.find_xml_image_ids(xml_doc) - print('found_xml_image_ids') + #print('found_xml_image_ids') ndc_ids = self.find_xml_ndc_numbers(xml_doc) - print('found_ndc_ids') + #print('found_ndc_ids') metadata = self.find_xml_metadata(xml_doc) - print('found_xml_metadata') + #print('found_xml_metadata') metadata['imageIds'] = image_ids metadata['ndcIds'] = ndc_ids - return self.metadata_dict_cleanup(metadata) + return metadata ### # File Processing @@ -133,7 +139,7 @@ def map_files(self): image_files.append(subfile.name) spl = spl_folder.name.split("_")[1] - print(spl) + #print(spl) xml_path = self.get_file_path(spl_folder, xml_file_name) metadata = self.process_xml_doc(xml_path) diff --git a/airflow/dags/dailymed/dailymed_images.py b/airflow/dags/dailymed/dailymed_images.py index feafb65..a835d6c 100644 --- a/airflow/dags/dailymed/dailymed_images.py +++ b/airflow/dags/dailymed/dailymed_images.py @@ -40,7 +40,7 @@ def validate_ndc(self,ndc): 'EffectiveDate': '20240724', 'MarketStatus': 'unapproved drug other', 'imageIds': ['Xiclofen Box.jpg', 'Xiclofen Tube.jpg'], - 'ndcIds': '83295-5000-1'}} + 'ndcIds': ['83295-5000-1']}} """ def get_full_ndc_variants(self, ndcs): @@ -74,49 +74,23 @@ def find_image_components(self,xml_doc): components.append(component) return components - def find_ndcs_in_component(self, component, results=None): - if results is None: - results = [] - - if isinstance(component, str) and self.ndc_format(component): - results.append(self.ndc_format(component)) - elif isinstance(component, dict): - if '#text' in component and self.ndc_format(component['#text']): - results.append(self.ndc_format(component['#text'])) - for _, value in component.items(): - if isinstance(value, (dict, list)): - self.find_ndcs_in_component(value, results) - elif isinstance(component, list): - for item in component: - self.find_ndcs_in_component(item,results) - return list(set(results)) + def find_ndcs_in_component(self, component): + ndcs = self.ndc_format(component['Text'][0]) + # NOTE: may need to modify ndc_format function to return multiple matches + # for now, just returning a list containing the single response + return [ndcs] - def find_images_in_component(self, xml_doc, results=None): - if results is None: - results = [] - - if isinstance(xml_doc, dict): - if '@mediaType' in xml_doc and xml_doc['@mediaType'] == 'image/jpeg': - results.append(xml_doc.get('reference', {}).get('@value')) - for _, value in xml_doc.items(): - if isinstance(value, (dict, list)): - self.find_images_in_component(value, results) - elif isinstance(xml_doc, list): - for item in xml_doc: - self.find_images_in_component(item, results) - return list(set(results)) - - def get_ndcs_from_image_components(self,xml_doc, ndc_ids, image_ids): + def get_ndcs_from_image_components(self, xml_doc, ndc_ids, image_ids): mapped_dict = {} - image_components = self.find_image_components(xml_doc) + + image_components = [xml_doc] # leaving as a list because we need to modify the XSL to find multiple PRIMARY DISPLAY PANELS if image_components == []: return None for component in image_components: ndcs = self.find_ndcs_in_component(component) - images = self.find_images_in_component(component) - + images = component['Image'] if not ndcs or not images: continue @@ -140,6 +114,7 @@ def extract_and_upload_mapped_ndcs_from_image_files(self): for spl, mapping in mapping_dict.items(): ndcs = mapping.get('ndcIds') + #print(mapping) ndc_variants = self.get_full_ndc_variants(ndcs) image_files = mapping.get('image_files') @@ -194,22 +169,26 @@ def extract_and_upload_mapped_ndcs_from_image_components(self): ndcs = mapping.get('ndcIds') image_files = mapping.get('image_files') + # Get NDCs from XML components spl_folder_name = mapping.get("spl_folder_name") xml_file_path = self.get_file_path(spl_folder_name, mapping.get("xml_file")) - xml_doc = parse_dm_xml_to_dict(xml_file_path) + xml_doc = self.find_xml_package_data(xml_file_path) + #xml_doc = parse_dm_xml_to_dict(xml_file_path) + print(xml_doc) logging.debug(xml_file_path) - matched_components = self.get_ndcs_from_image_components(xml_doc, ndcs, image_files) + if ('Image' in xml_doc): + matched_components = self.get_ndcs_from_image_components(xml_doc, ndcs, image_files) - for ndc,image_file in matched_components.items(): - image_ndc_mapping[ndc] = { - 'image_file':image_file, - 'spl':spl, - 'image_url':self.create_dailymed_image_url(image_file, spl), - 'methodology':'image_component', - 'confidence_level':0.75, - 'matched':1} + for ndc,image_file in matched_components.items(): + image_ndc_mapping[ndc] = { + 'image_file':image_file, + 'spl':spl, + 'image_url':self.create_dailymed_image_url(image_file, spl), + 'methodology':'image_component', + 'confidence_level':0.75, + 'matched':1} for ndc in ndcs: if ndc not in image_ndc_mapping.keys(): diff --git a/airflow/dags/dailymed_daily/dag.py b/airflow/dags/dailymed_daily/dag.py index 5716c0e..593f5b7 100644 --- a/airflow/dags/dailymed_daily/dag.py +++ b/airflow/dags/dailymed_daily/dag.py @@ -18,14 +18,14 @@ with dag: file_name = "{{ macros.ds_format(macros.ds_add(ds,-1), '%Y-%m-%d', '%m%d%Y') }}" #url = f"https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_daily_update_{file_name}.zip" - url = "https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_release_human_rx_part1.zip" + url = "https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_release_human_rx_part3.zip" ds_folder = get_ds_folder(dag_id) extract_task = extract(dag_id,url) unzip = unzip_data(extract_task) - process = process_dailymed(extract_task) + #process = process_dailymed(extract_task) process_images = process_dailymed_images(extract_task) # transform_task = transform(dag_id) @@ -41,5 +41,6 @@ # ) # sql_tasks.append(sql_task) - extract_task >> unzip >> process >> process_images + #extract_task >> unzip >> process >> process_images + extract_task >> unzip >> process_images #>> sql_tasks >> transform_task \ No newline at end of file diff --git a/airflow/dags/xml_functions.py b/airflow/dags/xml_functions.py index 08df244..d33d0b5 100644 --- a/airflow/dags/xml_functions.py +++ b/airflow/dags/xml_functions.py @@ -22,33 +22,41 @@ def parse_dm_xml_to_dict(input_xml): return xml_doc +from lxml.etree import XMLParser +p = XMLParser(huge_tree=True) + def transform_xml_to_dict(input_xml, xslt): result_dict = defaultdict(list) try: dom = ET.parse(input_xml) - xslt_doc = ET.parse(xslt) - xslt_transformer = ET.XSLT(xslt_doc) - new_xml = xslt_transformer(dom) - root = ET.fromstring(ET.tostring(new_xml)) - - - for elem in root.iter(): - if elem.tag not in result_dict: - result_dict[elem.tag] = [] - if elem.text is not None: - result_dict[elem.tag].append(elem.text) except: - print('ERROR in transform_xml_to_dict') + dom = ET.parse(input_xml, parser=p) + + xslt_doc = ET.parse(xslt) + xslt_transformer = ET.XSLT(xslt_doc) + new_xml = xslt_transformer(dom) + root = ET.fromstring(ET.tostring(new_xml)) + + + for elem in root.iter(): + if elem.tag not in result_dict: + result_dict[elem.tag] = [] + if elem.text is not None: + result_dict[elem.tag].append(elem.text) return dict(result_dict) def transform_xml(input_xml, xslt): - # load xml input - dom = ET.parse(input_xml) + try: + # load xml input + dom = ET.parse(input_xml) + except: + dom = ET.parse(input_xml, parser=p) + # load XSLT xslt_doc = ET.parse(xslt) xslt_transformer = ET.XSLT(xslt_doc) # apply XSLT on loaded dom new_xml = xslt_transformer(dom) - return ET.tostring(new_xml, pretty_print=True).decode("utf-8") \ No newline at end of file + return ET.tostring(new_xml, pretty_print=True).decode("utf-8") diff --git a/airflow/dags/xsl_templates/doc_metadata.xsl b/airflow/dags/xsl_templates/doc_metadata.xsl index 85a2b60..d381cf1 100644 --- a/airflow/dags/xsl_templates/doc_metadata.xsl +++ b/airflow/dags/xsl_templates/doc_metadata.xsl @@ -1,8 +1,14 @@ - + + + + + + + diff --git a/airflow/dags/xsl_templates/ndcs.xsl b/airflow/dags/xsl_templates/ndcs.xsl index 9d051e9..b28be30 100644 --- a/airflow/dags/xsl_templates/ndcs.xsl +++ b/airflow/dags/xsl_templates/ndcs.xsl @@ -1,18 +1,18 @@ - + - - - - - + + + + + - - - - - + + + + + \ No newline at end of file diff --git a/airflow/dags/xsl_templates/package_data.xsl b/airflow/dags/xsl_templates/package_data.xsl index 91bb447..7615cb4 100644 --- a/airflow/dags/xsl_templates/package_data.xsl +++ b/airflow/dags/xsl_templates/package_data.xsl @@ -1,6 +1,6 @@ - + From 182c26c4aa5369262d79622f772e5c56d8d0f711 Mon Sep 17 00:00:00 2001 From: Joey LeGrand Date: Fri, 23 Aug 2024 22:42:14 -0500 Subject: [PATCH 7/9] Refactor image file NDCs --- airflow/dags/dailymed/dailymed.py | 28 +++--- airflow/dags/dailymed/dailymed_images.py | 102 +++++++------------- airflow/dags/dailymed_daily/dag.py | 2 +- airflow/dags/dailymed_daily/dag_tasks.py | 4 +- airflow/dags/xml_functions.py | 7 +- airflow/dags/xsl_templates/images.xsl | 18 ++++ airflow/dags/xsl_templates/ndcs.xsl | 4 +- airflow/dags/xsl_templates/package_data.xsl | 54 ++++++----- 8 files changed, 106 insertions(+), 113 deletions(-) create mode 100644 airflow/dags/xsl_templates/images.xsl diff --git a/airflow/dags/dailymed/dailymed.py b/airflow/dags/dailymed/dailymed.py index 9348157..d37bfc7 100644 --- a/airflow/dags/dailymed/dailymed.py +++ b/airflow/dags/dailymed/dailymed.py @@ -23,13 +23,12 @@ def __init__(self, data_folder: os.PathLike) -> None: # Supplementary Functions ### - def create_dailymed_url(self,set_id): - return f"https://dailymed.nlm.nih.gov/dailymed/drugInfo.cfm?setid={set_id}" - def ndc_format(self,text_data): - import re - # order of patterns is important, largest to smalles - patterns = [ + import re + + # order of patterns is important + # largest to smallest + patterns = [ (r'\d{11}', '11 Digit'), (r'\d{10}', '10 Digit'), (r'\d{5}-\d{5}', '5-5'), @@ -38,13 +37,14 @@ def ndc_format(self,text_data): (r'\d{5}-\d{3}-\d{2}', '5-3-2'), (r'\d{4}-\d{6}', '4-6'), (r'\d{4}-\d{4}-\d{2}', '4-4-2') - ] - - for pattern, _ in patterns: - match = re.search(pattern, text_data) - if match: - return match.group(0) - return None + ] + + for pattern, _ in patterns: + match = re.search(pattern, text_data) + if match: + return match.group(0) + + return None def convert_ndc_10_to_11(self,ndc): parts = ndc.split('-') @@ -68,7 +68,7 @@ def find_xml_ndc_numbers(self, xml_doc) -> list: xslt = get_xsl_template_path("ndcs.xsl") results = transform_xml_to_dict(xml_doc,xslt) #print(results) - return list(set(results.get('NDC',[]))) + return list(set(results.get('NDCs', {}).get('NDC', []))) def find_xml_metadata(self, xml_doc) -> dict: xslt = get_xsl_template_path("doc_metadata.xsl") diff --git a/airflow/dags/dailymed/dailymed_images.py b/airflow/dags/dailymed/dailymed_images.py index a835d6c..f142325 100644 --- a/airflow/dags/dailymed/dailymed_images.py +++ b/airflow/dags/dailymed/dailymed_images.py @@ -10,39 +10,6 @@ class DailyMedImages(DailyMed): def __init__(self, data_folder: os.PathLike) -> None: super().__init__(data_folder) - def create_dailymed_image_url(self, image_id, spl): - return f"https://dailymed.nlm.nih.gov/dailymed/image.cfm?name={image_id}&setid={spl}&type=img" - - - def has_none_values(self,d): - for value in d.values(): - if value is {}: - return True - return False - - def validate_ndc(self,ndc): - """" - Validates that the NDC is greater than 9 characters - """ - raw_ndc = ndc.replace("-","") - if len(raw_ndc) > 9: - return True - return False - - """ - {'1d9f4044-a333-ecd3-e063-6294a90ab1fe': - {'xml_file': '1e004cc6-580a-1e62-e063-6294a90aa220.xml', - 'image_files': ['Xiclofen Box.jpg', 'Xiclofen Tube.jpg'], - 'spl_folder_name': '20240725_1d9f4044-a333-ecd3-e063-6294a90ab1fe', - 'documentId': '1e004cc6-580a-1e62-e063-6294a90aa220', - 'SetId': '1d9f4044-a333-ecd3-e063-6294a90ab1fe', - 'VersionNumber': '3', - 'EffectiveDate': '20240724', - 'MarketStatus': 'unapproved drug other', - 'imageIds': ['Xiclofen Box.jpg', 'Xiclofen Tube.jpg'], - 'ndcIds': ['83295-5000-1']}} - """ - def get_full_ndc_variants(self, ndcs): ndcs_11 = [self.convert_ndc_10_to_11(ndc) for ndc in ndcs] ndcs.extend(ndcs_11) @@ -51,20 +18,24 @@ def get_full_ndc_variants(self, ndcs): ndcs.sort(key=lambda s: len(s), reverse=True) return ndcs - def get_ndc_from_image_filename(self, ndcs, image_id): + def get_ndc_from_image_filename(self, ndc_variants, image_id): + # attempt to regex an NDC from image file name image_ndc = self.ndc_format(image_id) - logging.debug(f"Got NDC varients, total {len(ndcs)}") - logging.debug(f"image ndc: {image_ndc} from {image_id}") - - if image_ndc: - for ndc in ndcs: + # if NDC match found + if image_ndc: + # compare it against all valid NDC variants in SPL + # TODO: convert ndc_variants to a dict and iterate + # through items so that it compares to the list of variants, + # but returns the original NDC that is represnted by those variants + for ndc in ndc_variants: if ndc == image_ndc: return ndc + # if no valid NDC variant match, assume it is a + # random NDC-length number and disregard match return None - + # if no NDC match found in image file name, return None else: - #logging.debug(f"NDC {image_ndc} from {id} not found in NDC list of: {ndcs}") return None def find_image_components(self,xml_doc): @@ -109,50 +80,49 @@ def get_ndcs_from_image_components(self, xml_doc, ndc_ids, image_ids): def extract_and_upload_mapped_ndcs_from_image_files(self): mapping_dict = self.file_mapping + image_ndc_mapping = {} for spl, mapping in mapping_dict.items(): - - ndcs = mapping.get('ndcIds') - #print(mapping) - ndc_variants = self.get_full_ndc_variants(ndcs) + # get all image file names associated with the SPL image_files = mapping.get('image_files') - - logging.debug(f"image file check for {spl}") - logging.debug(f"NDCs found from mapping: {len(ndcs)}") - logging.debug(f"NDC varients: {len(ndc_variants)}") - - # Get NDCs when found in the image filenames - logging.debug(f"Found {len(image_files)} image files") + print(image_files) + # get all NDCs associated with the SPL + ndcs = mapping.get('ndcIds') + print(ndcs) + # get all variants of each NDC to check against potential + # different formatting in the image name + # TODO: reconfigure this as a dict so that the original NDC + # points to a list of all variations of itself, including itself + ndc_variants = self.get_full_ndc_variants(ndcs) for image_file in image_files: + # attempt to regex an NDC out of each image + # and also ensure that the NDC matches an NDC + # from the SPL - not a random NDC-length number matched_ndc = self.get_ndc_from_image_filename(ndc_variants, image_file) - - logging.debug(f"Mapping dict length of: {len(image_ndc_mapping)}") - + + # if a match is found, add it to a mapping dict if matched_ndc: image_ndc_mapping[matched_ndc] = { 'image_file':image_file, 'spl':spl, - 'image_url':self.create_dailymed_image_url(image_file, spl), 'methodology':'image_filename', 'confidence_level':1, - 'matched':1} + 'matched':1 + } - logging.debug(f"NDCs found from mapping post: {len(mapping.get('ndcIds'))}") - - for ndc in mapping.get('ndcIds'): + # add un-matched NDCs to the list + # NOTE: maybe instead, we add un-matched images to the list? + for ndc in ndcs: if ndc not in image_ndc_mapping.keys(): image_ndc_mapping[ndc] = { 'image_file':'', 'spl':spl, - 'image_url':'', 'methodology':'image_filename', 'confidence_level':1, - 'matched':0} - - logging.debug(f"Mapping keys: {image_ndc_mapping.keys()}") - + 'matched':0 + } df = pd.DataFrame.from_dict(image_ndc_mapping, orient='index') df = df.reset_index().rename(columns={'index':'ndc'}) @@ -175,7 +145,7 @@ def extract_and_upload_mapped_ndcs_from_image_components(self): xml_file_path = self.get_file_path(spl_folder_name, mapping.get("xml_file")) xml_doc = self.find_xml_package_data(xml_file_path) #xml_doc = parse_dm_xml_to_dict(xml_file_path) - print(xml_doc) + #print(xml_doc) logging.debug(xml_file_path) if ('Image' in xml_doc): diff --git a/airflow/dags/dailymed_daily/dag.py b/airflow/dags/dailymed_daily/dag.py index 593f5b7..380b92c 100644 --- a/airflow/dags/dailymed_daily/dag.py +++ b/airflow/dags/dailymed_daily/dag.py @@ -18,7 +18,7 @@ with dag: file_name = "{{ macros.ds_format(macros.ds_add(ds,-1), '%Y-%m-%d', '%m%d%Y') }}" #url = f"https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_daily_update_{file_name}.zip" - url = "https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_release_human_rx_part3.zip" + url = "https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_release_human_rx_part4.zip" ds_folder = get_ds_folder(dag_id) diff --git a/airflow/dags/dailymed_daily/dag_tasks.py b/airflow/dags/dailymed_daily/dag_tasks.py index f71bc94..efac038 100644 --- a/airflow/dags/dailymed_daily/dag_tasks.py +++ b/airflow/dags/dailymed_daily/dag_tasks.py @@ -22,8 +22,8 @@ def process_dailymed_images(data_folder): print("Completed Mapping") dmi.extract_and_upload_mapped_ndcs_from_image_files() print("Image Files Complete") - dmi.extract_and_upload_mapped_ndcs_from_image_components() - print("Image Components Complete") + #dmi.extract_and_upload_mapped_ndcs_from_image_components() + #print("Image Components Complete") # dmi.extract_and_upload_mapped_ndcs_from_image_barcode() # print("Barcode Reading complete") #dmi.extract_and_upload_mapped_ndcs_from_image_ocr() \ No newline at end of file diff --git a/airflow/dags/xml_functions.py b/airflow/dags/xml_functions.py index d33d0b5..4e51add 100644 --- a/airflow/dags/xml_functions.py +++ b/airflow/dags/xml_functions.py @@ -35,6 +35,7 @@ def transform_xml_to_dict(input_xml, xslt): xslt_doc = ET.parse(xslt) xslt_transformer = ET.XSLT(xslt_doc) new_xml = xslt_transformer(dom) + ''' root = ET.fromstring(ET.tostring(new_xml)) @@ -43,8 +44,10 @@ def transform_xml_to_dict(input_xml, xslt): result_dict[elem.tag] = [] if elem.text is not None: result_dict[elem.tag].append(elem.text) - - return dict(result_dict) + ''' + xml_dict = xmltodict.parse(new_xml) + print(xml_dict) + return xml_dict def transform_xml(input_xml, xslt): diff --git a/airflow/dags/xsl_templates/images.xsl b/airflow/dags/xsl_templates/images.xsl new file mode 100644 index 0000000..b28be30 --- /dev/null +++ b/airflow/dags/xsl_templates/images.xsl @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/airflow/dags/xsl_templates/ndcs.xsl b/airflow/dags/xsl_templates/ndcs.xsl index b28be30..e7ed34c 100644 --- a/airflow/dags/xsl_templates/ndcs.xsl +++ b/airflow/dags/xsl_templates/ndcs.xsl @@ -5,11 +5,11 @@ - + - + diff --git a/airflow/dags/xsl_templates/package_data.xsl b/airflow/dags/xsl_templates/package_data.xsl index 7615cb4..0263699 100644 --- a/airflow/dags/xsl_templates/package_data.xsl +++ b/airflow/dags/xsl_templates/package_data.xsl @@ -1,29 +1,31 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From ef1e92f5242a457303bf537c6d1236f2e0b85530 Mon Sep 17 00:00:00 2001 From: Joey LeGrand Date: Sat, 24 Aug 2024 01:16:47 -0500 Subject: [PATCH 8/9] Got most things working --- airflow/dags/dailymed/dailymed.py | 48 ++++------ airflow/dags/dailymed/dailymed_images.py | 112 +++++++++++++---------- airflow/dags/dailymed_daily/dag_tasks.py | 4 +- airflow/dags/xml_functions.py | 15 +-- 4 files changed, 87 insertions(+), 92 deletions(-) diff --git a/airflow/dags/dailymed/dailymed.py b/airflow/dags/dailymed/dailymed.py index d37bfc7..b2257e3 100644 --- a/airflow/dags/dailymed/dailymed.py +++ b/airflow/dags/dailymed/dailymed.py @@ -5,6 +5,8 @@ from sagerx import load_df_to_pg import logging from airflow import configuration as conf +import re + class DailyMed(): def __init__(self, data_folder: os.PathLike) -> None: @@ -19,31 +21,27 @@ def __init__(self, data_folder: os.PathLike) -> None: logging.info("This is an info message, but DEBUG level messages will not be logging.debuged.") - ### - # Supplementary Functions - ### - - def ndc_format(self,text_data): - import re + ### + # Supplementary Functions + ### # order of patterns is important # largest to smallest - patterns = [ - (r'\d{11}', '11 Digit'), - (r'\d{10}', '10 Digit'), - (r'\d{5}-\d{5}', '5-5'), - (r'\d{5}-\d{4}-\d{2}', '5-4-2'), - (r'\d{5}-\d{4}-\d{1}', '5-4-1'), - (r'\d{5}-\d{3}-\d{2}', '5-3-2'), - (r'\d{4}-\d{6}', '4-6'), - (r'\d{4}-\d{4}-\d{2}', '4-4-2') - ] - - for pattern, _ in patterns: - match = re.search(pattern, text_data) - if match: - return match.group(0) - + self.ndc_pattern = re.compile(r""" + \d{11} | # 11 digit + \d{10} | # 10 digit + \d{5}-\d{5} | # 5-5 + \d{5}-\d{4}-\d{2} | # 5-4-2 + \d{5}-\d{4}-\d{1} | # 5-4-1 + \d{5}-\d{3}-\d{2} | # 5-3-2 + \d{4}-\d{6} | # 4-6 + \d{4}-\d{4}-\d{2} # 4-4-2 + """, re.X) + + def ndc_format(self,text_data): + matches = re.findall(self.ndc_pattern, text_data) + if matches: + return matches return None def convert_ndc_10_to_11(self,ndc): @@ -67,7 +65,6 @@ def find_xml_image_ids(self, xml_doc) -> list: def find_xml_ndc_numbers(self, xml_doc) -> list: xslt = get_xsl_template_path("ndcs.xsl") results = transform_xml_to_dict(xml_doc,xslt) - #print(results) return list(set(results.get('NDCs', {}).get('NDC', []))) def find_xml_metadata(self, xml_doc) -> dict: @@ -91,14 +88,10 @@ def metadata_dict_cleanup(self, metadata): def process_xml_doc(self, xml_doc): - #print('process_xml_doc') image_ids = self.find_xml_image_ids(xml_doc) - #print('found_xml_image_ids') ndc_ids = self.find_xml_ndc_numbers(xml_doc) - #print('found_ndc_ids') metadata = self.find_xml_metadata(xml_doc) - #print('found_xml_metadata') metadata['imageIds'] = image_ids metadata['ndcIds'] = ndc_ids @@ -139,7 +132,6 @@ def map_files(self): image_files.append(subfile.name) spl = spl_folder.name.split("_")[1] - #print(spl) xml_path = self.get_file_path(spl_folder, xml_file_name) metadata = self.process_xml_doc(xml_path) diff --git a/airflow/dags/dailymed/dailymed_images.py b/airflow/dags/dailymed/dailymed_images.py index f142325..4cc0ec1 100644 --- a/airflow/dags/dailymed/dailymed_images.py +++ b/airflow/dags/dailymed/dailymed_images.py @@ -20,10 +20,11 @@ def get_full_ndc_variants(self, ndcs): def get_ndc_from_image_filename(self, ndc_variants, image_id): # attempt to regex an NDC from image file name - image_ndc = self.ndc_format(image_id) + ndc_matches = self.ndc_format(image_id) # if NDC match found - if image_ndc: + if ndc_matches: + image_ndc = ndc_matches[0] # compare it against all valid NDC variants in SPL # TODO: convert ndc_variants to a dict and iterate # through items so that it compares to the list of variants, @@ -44,40 +45,59 @@ def find_image_components(self,xml_doc): if component['section']['code']['@code'] == '51945-4': components.append(component) return components - - def find_ndcs_in_component(self, component): - ndcs = self.ndc_format(component['Text'][0]) - # NOTE: may need to modify ndc_format function to return multiple matches - # for now, just returning a list containing the single response - return [ndcs] def get_ndcs_from_image_components(self, xml_doc, ndc_ids, image_ids): mapped_dict = {} + #print(xml_doc) + packages = xml_doc.get('PackageData', {}).get('Package', []) + if not isinstance(packages, list): + packages = [packages] + + # loop through the packages and apply the regex + for package in packages: + text = package.get('Text', '') + + # there can be multiple Media in a Package + # for some reason the xmltodict and/or XML + # stores as a non-list if only one element + media_list = package.get('MediaList', {}) + if media_list: + medias = media_list.get('Media', []) + if not isinstance(medias, list): + medias = [medias] + + # add all valid images + images = [] + for media in medias: + image = media.get('Image', '') + # TODO: not sure we need the below check + # since we are starting with a subset of components + # we know/believe to be package label info + if image in image_ids: + images.append(image) + + # check if the text matches the regex pattern + ndc_matches = self.ndc_format(text) + # get distinct ndc_matches because + # sometimes the NDC is repeated multiples + # times in a component + if ndc_matches: + ndc_matches = list(set(ndc_matches)) + # if the number of NDC maches equals + # the number of images + if len(ndc_matches) == len(images): + for idx, ndc_match in enumerate(ndc_matches): + # if ndc is valid compared to + # all known NDCs in SPL + if ndc_match in ndc_ids: + # map the NDC to the image in the + # same list position + # NOTE: this is an assumption and needs + # to be validated / verified + mapped_dict[ndc_match] = images[idx] - image_components = [xml_doc] # leaving as a list because we need to modify the XSL to find multiple PRIMARY DISPLAY PANELS - - if image_components == []: - return None - - for component in image_components: - ndcs = self.find_ndcs_in_component(component) - images = component['Image'] - if not ndcs or not images: - continue - - elif len(ndcs) == 1 and len(images) == 1: - ndc = ndcs[0] - image = images[0] - - if ndc in ndc_ids and image in image_ids: - mapped_dict[ndc] = image - else: - logging.debug(f"Found unknown ndc or image") - logging.debug(f"NDC {ndc}, vs expected {ndc_ids}") - logging.debug(f"Image {image}, vs expected {image_ids}") return mapped_dict - def extract_and_upload_mapped_ndcs_from_image_files(self): mapping_dict = self.file_mapping @@ -86,10 +106,8 @@ def extract_and_upload_mapped_ndcs_from_image_files(self): for spl, mapping in mapping_dict.items(): # get all image file names associated with the SPL image_files = mapping.get('image_files') - print(image_files) # get all NDCs associated with the SPL ndcs = mapping.get('ndcIds') - print(ndcs) # get all variants of each NDC to check against potential # different formatting in the image name # TODO: reconfigure this as a dict so that the original NDC @@ -131,34 +149,29 @@ def extract_and_upload_mapped_ndcs_from_image_files(self): def extract_and_upload_mapped_ndcs_from_image_components(self): mapping_dict = self.file_mapping + image_ndc_mapping = {} for spl, mapping in mapping_dict.items(): - logging.debug(f"image component check for {spl}") - - ndcs = mapping.get('ndcIds') + # get all image file names associated with the SPL image_files = mapping.get('image_files') - + # get all NDCs associated with the SPL + ndcs = mapping.get('ndcIds') # Get NDCs from XML components spl_folder_name = mapping.get("spl_folder_name") xml_file_path = self.get_file_path(spl_folder_name, mapping.get("xml_file")) xml_doc = self.find_xml_package_data(xml_file_path) - #xml_doc = parse_dm_xml_to_dict(xml_file_path) - #print(xml_doc) - logging.debug(xml_file_path) - if ('Image' in xml_doc): - matched_components = self.get_ndcs_from_image_components(xml_doc, ndcs, image_files) + matched_components = self.get_ndcs_from_image_components(xml_doc, ndcs, image_files) - for ndc,image_file in matched_components.items(): - image_ndc_mapping[ndc] = { - 'image_file':image_file, - 'spl':spl, - 'image_url':self.create_dailymed_image_url(image_file, spl), - 'methodology':'image_component', - 'confidence_level':0.75, - 'matched':1} + for ndc,image_file in matched_components.items(): + image_ndc_mapping[ndc] = { + 'image_file':image_file, + 'spl':spl, + 'methodology':'image_component', + 'confidence_level':0.75, + 'matched':1} for ndc in ndcs: if ndc not in image_ndc_mapping.keys(): @@ -166,7 +179,6 @@ def extract_and_upload_mapped_ndcs_from_image_components(self): 'image_file':'', 'spl':spl, 'image_url':'', - 'methodology':'image_filename', 'confidence_level':1, 'matched':0} diff --git a/airflow/dags/dailymed_daily/dag_tasks.py b/airflow/dags/dailymed_daily/dag_tasks.py index efac038..f71bc94 100644 --- a/airflow/dags/dailymed_daily/dag_tasks.py +++ b/airflow/dags/dailymed_daily/dag_tasks.py @@ -22,8 +22,8 @@ def process_dailymed_images(data_folder): print("Completed Mapping") dmi.extract_and_upload_mapped_ndcs_from_image_files() print("Image Files Complete") - #dmi.extract_and_upload_mapped_ndcs_from_image_components() - #print("Image Components Complete") + dmi.extract_and_upload_mapped_ndcs_from_image_components() + print("Image Components Complete") # dmi.extract_and_upload_mapped_ndcs_from_image_barcode() # print("Barcode Reading complete") #dmi.extract_and_upload_mapped_ndcs_from_image_ocr() \ No newline at end of file diff --git a/airflow/dags/xml_functions.py b/airflow/dags/xml_functions.py index 4e51add..f7b772f 100644 --- a/airflow/dags/xml_functions.py +++ b/airflow/dags/xml_functions.py @@ -26,27 +26,18 @@ def parse_dm_xml_to_dict(input_xml): p = XMLParser(huge_tree=True) def transform_xml_to_dict(input_xml, xslt): - result_dict = defaultdict(list) try: dom = ET.parse(input_xml) except: + # for extra large XML files dom = ET.parse(input_xml, parser=p) xslt_doc = ET.parse(xslt) xslt_transformer = ET.XSLT(xslt_doc) new_xml = xslt_transformer(dom) - ''' - root = ET.fromstring(ET.tostring(new_xml)) - - - for elem in root.iter(): - if elem.tag not in result_dict: - result_dict[elem.tag] = [] - if elem.text is not None: - result_dict[elem.tag].append(elem.text) - ''' + xml_dict = xmltodict.parse(new_xml) - print(xml_dict) + #print(xml_dict) return xml_dict From f424c503bf8ec92e1b2d79eff9cee687277d7fb5 Mon Sep 17 00:00:00 2001 From: Joey LeGrand Date: Mon, 26 Aug 2024 20:08:54 -0500 Subject: [PATCH 9/9] Fix component NDC bug --- airflow/dags/dailymed/dailymed_images.py | 40 ++++++++++++++---------- airflow/dags/dailymed_daily/dag.py | 2 +- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/airflow/dags/dailymed/dailymed_images.py b/airflow/dags/dailymed/dailymed_images.py index 4cc0ec1..5bfa15f 100644 --- a/airflow/dags/dailymed/dailymed_images.py +++ b/airflow/dags/dailymed/dailymed_images.py @@ -75,26 +75,38 @@ def get_ndcs_from_image_components(self, xml_doc, ndc_ids, image_ids): # we know/believe to be package label info if image in image_ids: images.append(image) + + print(images) # check if the text matches the regex pattern ndc_matches = self.ndc_format(text) + print(f'initial ndc_matches: {ndc_matches}') # get distinct ndc_matches because # sometimes the NDC is repeated multiples # times in a component if ndc_matches: - ndc_matches = list(set(ndc_matches)) + # keep the list of ndc_matches in the same order it was found + # but filter out NDCs that are not in the SPL + final_ndc_matches = [] + found_ndc_matches = set() + + # compare ndc_matches to list of SPL NDCs + # maintain the order of NDCs found in the free text + # remove duplicate NDCs found in the free text + for ndc in ndc_matches: + if ndc in ndc_ids and ndc not in found_ndc_matches: + final_ndc_matches.append(ndc) + found_ndc_matches.add(ndc) + print(f'de-dup ndc_matches: {final_ndc_matches}') # if the number of NDC maches equals # the number of images - if len(ndc_matches) == len(images): - for idx, ndc_match in enumerate(ndc_matches): - # if ndc is valid compared to - # all known NDCs in SPL - if ndc_match in ndc_ids: - # map the NDC to the image in the - # same list position - # NOTE: this is an assumption and needs - # to be validated / verified - mapped_dict[ndc_match] = images[idx] + if len(final_ndc_matches) == len(images): + for idx, ndc_match in enumerate(final_ndc_matches): + # map the NDC to the image in the + # same list position + # NOTE: this is an assumption and needs + # to be validated / verified + mapped_dict[ndc_match] = images[idx] return mapped_dict @@ -178,7 +190,7 @@ def extract_and_upload_mapped_ndcs_from_image_components(self): image_ndc_mapping[ndc] = { 'image_file':'', 'spl':spl, - 'image_url':'', + 'methodology':'image_component', 'confidence_level':1, 'matched':0} @@ -231,7 +243,6 @@ def extract_and_upload_mapped_ndcs_from_image_barcode(self): image_ndc_mapping[barcode_ndc] = { 'image_file':image_file, 'spl':spl, - 'image_url':self.create_dailymed_image_url(image_file, spl), 'methodology':'image_barcode', 'confidence_level':0.5, 'matched':1} @@ -241,7 +252,6 @@ def extract_and_upload_mapped_ndcs_from_image_barcode(self): image_ndc_mapping[ndc] = { 'image_file':'', 'spl':spl, - 'image_url':'', 'methodology':'image_barcode', 'confidence_level':0.5, 'matched':0} @@ -282,7 +292,6 @@ def extract_and_upload_mapped_ndcs_from_image_ocr(self): image_ndc_mapping[matched_ndc] = { 'image_file':image_file, 'spl':spl, - 'image_url':self.create_dailymed_image_url(image_file, spl), 'methodology':'image_ocr', 'confidence_level':0.25, 'matched':1} @@ -292,7 +301,6 @@ def extract_and_upload_mapped_ndcs_from_image_ocr(self): image_ndc_mapping[ndc] = { 'image_file':'', 'spl':spl, - 'image_url':'', 'methodology':'image_ocr', 'confidence_level':0.25, 'matched':0} diff --git a/airflow/dags/dailymed_daily/dag.py b/airflow/dags/dailymed_daily/dag.py index 380b92c..57f4a6b 100644 --- a/airflow/dags/dailymed_daily/dag.py +++ b/airflow/dags/dailymed_daily/dag.py @@ -18,7 +18,7 @@ with dag: file_name = "{{ macros.ds_format(macros.ds_add(ds,-1), '%Y-%m-%d', '%m%d%Y') }}" #url = f"https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_daily_update_{file_name}.zip" - url = "https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_release_human_rx_part4.zip" + url = "https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_release_human_rx_part2.zip" ds_folder = get_ds_folder(dag_id)