Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DailyMed NDC->Image File - Initial Work #318

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions airflow/dags/dailymed/dag.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pendulum

from airflow_operator import create_dag
from airflow.utils.helpers import chain

from airflow.decorators import task
import pandas as pd

from airflow.dags.dailymed_images.dailymed_images import *
from sagerx import load_df_to_pg

dag_id = "dailymed_images"

dag = create_dag(
dag_id=dag_id,
schedule= "0 8 * * 1-5", # at 7:45 am once daily)
start_date=pendulum.yesterday(),
catchup=False,
max_active_runs=1,
concurrency=2,
)

"""
Process

1. get xml data from dailymed_daily
2. read xml data
3. find ndc and image data in xml
4. map ndc and image data
5. map ndc and image ids toghether
5.a. check to see if NDC is in the image name, if so map together
5.b. check to see if NDC is under the same 51945-4 component, if so then map together
5.c. Run image PCR to pull NDC fr
6. upload to postgres


"""


@task
def get_dailymed_data():
from airflow.hooks.postgres_hook import PostgresHook

query = "SELECT * FROM sagerx_lake.dailymed_daily"
pg_hook = PostgresHook(postgres_conn_id="postgres_default")
engine = pg_hook.get_sqlalchemy_engine()
df = pd.read_sql(query, con=engine)
print(f"DF length of {len(df)} with columns: {df.columns}")

df['raw_xml_content'] = df.apply(parse_xml_content, axis=1)
df['set_id'] = df.apply(lambda x: extract_set_id(x['raw_xml_content']), axis=1)
df['ndc_ids'] = df.apply(lambda x: find_ndc_numbers(x['raw_xml_content']), axis=1)
df['image_ids'] = df.apply(lambda x: find_image_ids(x['raw_xml_content']), axis=1)
df['ndc_image_mapping'] = df.apply(map_ndcs_parent_function, axis=1)

load_df_to_pg(df[['spl','file_name','set_id','ndc_ids','image_ids','ndc_image_mapping']],"sagerx_lake","dailymed_images","replace",dtype_name="ndc_image_mapping")

#df['med_dict'] = df.apply(extract_data, axis=1)

# dfs = []
# for md in df['med_dict']:
# df_temp = pd.DataFrame.from_dict(md,orient='index')
# df_temp.index.name = 'ndc'
# dfs.append(df_temp)

# df_final = pd.concat(dfs)
# df_final = df_final.reset_index()

#print(df_final)
# return df


with dag:
get_dailymed_data()
170 changes: 170 additions & 0 deletions airflow/dags/dailymed/dailymed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import os
from pathlib import Path
from xml_functions import transform_xml_to_dict, get_xsl_template_path, transform_xml
import pandas as pd
from sagerx import load_df_to_pg
import logging
from airflow import configuration as conf
import re


class DailyMed():
def __init__(self, data_folder: os.PathLike) -> None:
self.data_folder = data_folder
self.rx_folder = Path(data_folder) / "prescription"

airflow_logging_level = conf.get('logging', 'logging_level')

if airflow_logging_level == 'DEBUG':
logging.debug("This is a debug message that will only be logging.debuged when logging_level is set to DEBUG.")
else:
logging.info("This is an info message, but DEBUG level messages will not be logging.debuged.")


###
# Supplementary Functions
###

# order of patterns is important
# largest to smallest
self.ndc_pattern = re.compile(r"""
\d{11} | # 11 digit
\d{10} | # 10 digit
\d{5}-\d{5} | # 5-5
\d{5}-\d{4}-\d{2} | # 5-4-2
\d{5}-\d{4}-\d{1} | # 5-4-1
\d{5}-\d{3}-\d{2} | # 5-3-2
\d{4}-\d{6} | # 4-6
\d{4}-\d{4}-\d{2} # 4-4-2
""", re.X)

def ndc_format(self,text_data):
matches = re.findall(self.ndc_pattern, text_data)
if matches:
return matches
return None

def convert_ndc_10_to_11(self,ndc):
parts = ndc.split('-')
if len(parts[-1]) == 1:
parts[-1] = '0' + parts[-1]
return '-'.join(parts)

def convert_ndc_no_dash(self,ndc):
return ndc.replace("-","")

###
# XML Processing
###

def find_xml_image_ids(self, xml_doc) -> list:
xslt = get_xsl_template_path("package_data.xsl")
results = transform_xml_to_dict(xml_doc,xslt)
return list(set(results.get('Image',[])))

def find_xml_ndc_numbers(self, xml_doc) -> list:
xslt = get_xsl_template_path("ndcs.xsl")
results = transform_xml_to_dict(xml_doc,xslt)
return list(set(results.get('NDCs', {}).get('NDC', [])))

def find_xml_metadata(self, xml_doc) -> dict:
xslt = get_xsl_template_path("doc_metadata.xsl")
results = transform_xml_to_dict(xml_doc,xslt)
return results

def find_xml_package_data(self, xml_doc) -> dict:
xslt = get_xsl_template_path("package_data.xsl")
results = transform_xml_to_dict(xml_doc,xslt)
return results

def metadata_dict_cleanup(self, metadata):
new_dict = {}
for key, value in metadata.items():
if isinstance(value, list) and len(value) == 1:
new_dict[key] = str(value[0])
elif isinstance(value, list) and len(value) > 1:
new_dict[key] = value
return new_dict


def process_xml_doc(self, xml_doc):
image_ids = self.find_xml_image_ids(xml_doc)
ndc_ids = self.find_xml_ndc_numbers(xml_doc)

metadata = self.find_xml_metadata(xml_doc)

metadata['imageIds'] = image_ids
metadata['ndcIds'] = ndc_ids
return metadata

###
# File Processing
###

def unzip_data(self) -> None:
import zipfile
for zip_folder in self.rx_folder.iterdir():
if zip_folder.is_file() and zip_folder.suffix == '.zip':
logging.debug(zip_folder)
with zipfile.ZipFile(zip_folder) as unzipped_folder:
folder_name = zip_folder.stem
extracted_folder_path = self.rx_folder / folder_name
extracted_folder_path.mkdir(exist_ok=True)

for subfile in unzipped_folder.infolist():
unzipped_folder.extract(subfile, extracted_folder_path)

os.remove(zip_folder)

def map_files(self):
file_mapping ={}
for spl_folder in self.rx_folder.iterdir():
if spl_folder.name == '.DS_Store':
continue

image_files = []
xml_file_name = ""

for subfile in spl_folder.iterdir():
if subfile.suffix == '.xml':
xml_file_name = subfile.name
elif subfile.suffix == '.jpg':
image_files.append(subfile.name)

spl = spl_folder.name.split("_")[1]

xml_path = self.get_file_path(spl_folder, xml_file_name)
metadata = self.process_xml_doc(xml_path)

file_dict = {
"xml_file":xml_file_name,
"image_files": image_files,
"spl_folder_name": spl_folder.name
}
file_mapping[spl] = dict(file_dict, **metadata)
self.file_mapping = file_mapping
logging.debug(file_mapping)


def get_file_path(self, spl_folder_name, file_name):
return os.path.join(self.rx_folder,spl_folder_name,file_name)

###
# Data Extraction for DailyMed Daily
###

def extract_and_upload_dmd_base_data(self):
xslt = get_xsl_template_path("dailymed_prescription.xsl")


for spl, mapping in self.file_mapping.items():
spl_folder_name = mapping.get("spl_folder_name")
xml_file = self.get_file_path(spl_folder_name, mapping.get("xml_file"))
xml_content = transform_xml(xml_file, xslt)

df = pd.DataFrame(
columns=["spl","spl_folder_name", "xml_file_name", "xml_content","image_files"],
data=[[spl, spl_folder_name, mapping.get("xml_file"), xml_content, mapping.get("image_files")]],
)

load_df_to_pg(df,"sagerx_lake","dailymed_daily","append")
Loading