diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..ac7a6fd --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +/venv/* +/venv* +.env \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 827e1a7..548a890 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,11 @@ -FROM python:3.9-bullseye +FROM python:3.11-bullseye WORKDIR /wheels -ADD ./requirements.txt /wheels +ADD ./requirements-dev.txt /wheels WORKDIR /opt -RUN apt update && apt install -y nginx && python3.9 -m venv venv && . venv/bin/activate && pip install --no-compile --upgrade pip && \ - pip install --no-compile -r /wheels/requirements.txt -f /wheels \ +RUN apt update && apt install -y nginx && python3 -m venv venv && . venv/bin/activate && pip install --no-compile --upgrade pip && \ + pip install --no-compile -r /wheels/requirements-dev.txt -f /wheels \ && rm -rf /wheels \ && rm -rf /root/.cache/pip/* && \ find . -name *.pyc -execdir rm -f {} \; diff --git a/api/__init__.py b/api/__init__.py index 3e2d286..cb2e33d 100644 --- a/api/__init__.py +++ b/api/__init__.py @@ -4,9 +4,13 @@ from api.endpoint.plan import blueprint as v1_plan from api.endpoint.importer import blueprint as v1_import from api.endpoint.issuer import blueprint as v1_issuer +from api.endpoint.npi import blueprint as v1_npi +from api.endpoint.nucc import blueprint as v1_nucc + def init_api(api): db.init_app(api) - api_bluenprint = Blueprint.group([v1_healthcheck, v1_plan, v1_import, v1_issuer], version_prefix="/api/v") - api.blueprint(api_bluenprint) \ No newline at end of file + api_bluenprint = Blueprint.group([v1_healthcheck, v1_plan, v1_import, v1_issuer, v1_npi, v1_nucc], + version_prefix="/api/v") + api.blueprint(api_bluenprint) diff --git a/api/endpoint/importer.py b/api/endpoint/importer.py index 1da8f44..f7d343e 100644 --- a/api/endpoint/importer.py +++ b/api/endpoint/importer.py @@ -1,13 +1,9 @@ -import asyncio -from datetime import datetime -import urllib.parse - import sanic.exceptions from sanic import response from sanic import Blueprint -from sqlalchemy import literal -from db.models import db, Issuer, Plan, PlanFormulary, PlanTransparency, ImportLog, ImportHistory + +from db.models import db, Issuer, Plan, ImportLog, ImportHistory blueprint = Blueprint('import', url_prefix='/import', version=1) diff --git a/api/endpoint/issuer.py b/api/endpoint/issuer.py index 68d4acd..bcaf29a 100644 --- a/api/endpoint/issuer.py +++ b/api/endpoint/issuer.py @@ -1,12 +1,8 @@ -import asyncio -from datetime import datetime -import urllib.parse - import sanic.exceptions from sanic import response from sanic import Blueprint -from db.models import db, Plan, PlanFormulary, Issuer, ImportLog +from db.models import db, Plan, Issuer, ImportLog blueprint = Blueprint('issuer', url_prefix='/issuer', version=1) diff --git a/api/endpoint/npi.py b/api/endpoint/npi.py new file mode 100644 index 0000000..10c7a4c --- /dev/null +++ b/api/endpoint/npi.py @@ -0,0 +1,370 @@ +import os +import asyncio +import json +from datetime import datetime +from process.ext.utils import download_it +import urllib.parse +from sqlalchemy import inspect +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.sql import func, tuple_, text + +import sanic.exceptions +from sanic import response +from sanic import Blueprint + +from db.models import db, NPIData, NPIAddress, AddressArchive, NPIDataTaxonomy, NPIDataTaxonomyGroup, NUCCTaxonomy + +blueprint = Blueprint('npi', url_prefix='/npi', version=1) + + +@blueprint.get('/') +async def npi_index_status(request): + async def get_npi_count(): + async with db.acquire(): + return await db.func.count(NPIData.npi).gino.scalar() + + async def get_npi_address_count(): + async with db.acquire(): + return await db.func.count(tuple_(NPIAddress.npi, NPIAddress.checksum, NPIAddress.type)).gino.scalar() + + + npi_count, npi_address_count = await asyncio.gather(get_npi_count(), get_npi_address_count()) + data = { + 'date': datetime.utcnow().isoformat(), + 'release': request.app.config.get('RELEASE'), + 'environment': request.app.config.get('ENVIRONMENT'), + 'product_count': npi_count, + 'import_log_errors': npi_address_count, + } + + return response.json(data) + +@blueprint.get('/all') +async def get_all(request): + start = float(request.args.get("start", 0)) + limit = float(request.args.get("limit", 0)) + classification = request.args.get("classification") + section = request.args.get("section") + display_name = request.args.get("display_name") + async def get_count(classification, section, display_name): + where = [] + if classification: + where.append('classification = :classification') + if section: + where.append('section = :section') + if display_name: + where.append('display_name = :display_name') + q = text(f"""select count(distinct (npi)) +from mrf.npi_taxonomy, + (select ARRAY_AGG(code) as codes from mrf.nucc_taxonomy where {' and '.join(where)}) as q +where healthcare_provider_taxonomy_code = ANY (codes);""") + async with db.acquire() as conn: + return (await conn.all(q, classification=classification, section=section, display_name=display_name))[0][0] + + async def get_results(start, limit, classification, section, display_name): + where = [] + if classification: + where.append('classification = :classification') + if section: + where.append('section = :section') + if display_name: + where.append('display_name = :display_name') + q = text(f"""select b.*, g.* from mrf.npi as b, (select c.* + from + mrf.npi_address as c, + (select ARRAY_AGG(int_code) as codes from mrf.nucc_taxonomy where {' and '.join(where)}) as q + where c.taxonomy_array && q.codes + and c.type = 'primary' + ORDER BY c.npi + limit :limit offset :start) as g WHERE b.npi=g.npi;""") + res = [] + async with db.acquire() as conn: + for r in await conn.all(q, start=start, limit=limit, classification=classification, section=section, + display_name=display_name): + obj = {} + count = -1 + for c in NPIData.__table__.columns: + count += 1 + obj[c.key] = r[count] + temp = NPIAddress.__table__.columns + # temp = ['npi', + # 'type', + # 'checksum', + # 'first_line', + # 'second_line', + # 'city_name', + # 'state_name', + # 'postal_code', + # 'country_code', + # 'telephone_number', + # 'fax_number', + # 'formatted_address', + # 'lat', + # 'long', + # 'date_added', 'taxonomy_array', 'place_id'] + for c in temp: + count += 1 + obj[c.key] = r[count] + res.append(obj) + return res + + total, rows = await asyncio.gather( + get_count(classification, section, display_name), + get_results(start, limit, classification, section, display_name) + ) + return response.json({'total': total, 'rows': rows}, default=str) + +# @blueprint.get('/all/variants') +# async def all_plans(request): +# plan_data = await db.select( +# [Plan.marketing_name, Plan.plan_id, PlanAttributes.full_plan_id, Plan.year]).select_from(Plan.join(PlanAttributes, ((Plan.plan_id == func.substr(PlanAttributes.full_plan_id, 1, 14)) & (Plan.year == PlanAttributes.year)))).\ +# group_by(PlanAttributes.full_plan_id, Plan.plan_id, Plan.marketing_name, Plan.year).gino.all() +# data = [] +# for p in plan_data: +# data.append({'marketing_name': p[0], 'plan_id': p[1], 'full_plan_id': p[2], 'year': p[3]}) +# return response.json(data) + +@blueprint.get('/near/') +async def get_near_npi(request): + in_long = float(request.args.get("long")) + in_lat = float(request.args.get("lat")) + codes = request.args.get("codes") + if codes: + codes = [x.strip() for x in codes.split(',')] + classification = request.args.get("classification") + section = request.args.get('section') + display_name = request.args.get('display_name') + name_like = request.args.get('name_like') + exclude_npi = int(request.args.get("exclude_npi", 0)) + limit = int(request.args.get('limit', 5)) + radius = int(request.args.get('radius', 30)) + async with db.acquire() as conn: + res = [] + exclude_npi_sql = "" + ilike_name = "" + if exclude_npi: + exclude_npi_sql = "and a.npi <> :exclude_npi" + + where = [] + if classification: + where.append('classification = :classification') + if section: + where.append('section = :section') + if display_name: + where.append('display_name = :display_name') + if codes: + where.append('code = ANY(:codes)') + if name_like: + name_like = f'%{name_like}%' + ilike_name = " and (d.provider_last_name ilike :name_like OR d.provider_other_organization_name ilike :name_like OR d.provider_organization_name ilike :name_like)" + q = f"""select round(cast(st_distance(Geography(ST_MakePoint(q.long, q.lat)), + Geography(ST_MakePoint(:in_long, :in_lat))) / 1609.34 as numeric), 2) as distance, + q.*, d.* +from mrf.npi as d, +(select a.* from mrf.npi_address as a, + (select ARRAY_AGG(int_code) as codes from mrf.nucc_taxonomy where {' and '.join(where)}) as g +where ST_DWithin(Geography(ST_MakePoint(long, lat)), + Geography(ST_MakePoint(:in_long, :in_lat)), + :radius * 1609.34) + and a.taxonomy_array && g.codes + and a.type = 'primary' + {exclude_npi_sql} +ORDER by round(cast(st_distance(Geography(ST_MakePoint(a.long, a.lat)), + Geography(ST_MakePoint(:in_long, :in_lat))) / 1609.34 as numeric), 2) asc LIMIT :limit) as q WHERE q.npi=d.npi{ilike_name}; +""" + for r in await conn.all(text(q), in_long=in_long, in_lat=in_lat, classification=classification, limit=limit, radius=radius, + exclude_npi=exclude_npi, section=section, display_name=display_name, name_like=name_like, + codes=codes,): + obj = {} + count = 0 + obj['distance'] = r[count] + temp = NPIAddress.__table__.columns + # temp = ['npi', + # 'type', + # 'checksum', + # 'first_line', + # 'second_line', + # 'city_name', + # 'state_name', + # 'postal_code', + # 'country_code', + # 'telephone_number', + # 'fax_number', + # 'formatted_address', + # 'lat', + # 'long', + # 'date_added', 'taxonomy_array', 'place_id'] + for c in temp: + count += 1 + obj[c.key] = r[count] + for c in NPIData.__table__.columns: + count += 1 + if c.key in obj: + continue + obj[c.key] = r[count] + res.append(obj) + return response.json(res, default=str) + +@blueprint.get('/id//full_taxonomy') +async def get_full_taxonomy_list(request, npi): + t = [] + npi = int(npi) + # plan_data = await db.select( + # [Plan.marketing_name, Plan.plan_id, PlanAttributes.full_plan_id, Plan.year]).select_from( + # Plan.join(PlanAttributes, ((Plan.plan_id == func.substr(PlanAttributes.full_plan_id, 1, 14)) & ( + # Plan.year == PlanAttributes.year)))). \ + # group_by(PlanAttributes.full_plan_id, Plan.plan_id, Plan.marketing_name, Plan.year).gino.all() + data = [] + async with db.acquire() as conn: + for x in await db.select([NPIDataTaxonomy.__table__.columns,NUCCTaxonomy.__table__.columns]).where(NPIDataTaxonomy.npi == npi).where(NUCCTaxonomy.code == NPIDataTaxonomy.healthcare_provider_taxonomy_code).gino.all(): + t.append(x.to_json_dict()) + return response.json(t) + + +@blueprint.get('/id/') +async def get_npi(request, npi): + async def update_addr_coordinates(checksum, long, lat, formatted_address, place_id): + async with db.acquire() as conn: + async with conn.transaction() as tx: + await NPIAddress.update.values(long=long, + lat=lat, + formatted_address=formatted_address, + place_id=place_id)\ + .where(NPIAddress.checksum == checksum).gino.status() + temp = AddressArchive.__table__.columns + async for x in NPIAddress.query.where(NPIAddress.checksum == checksum).gino.iterate(): + obj = {} + t = x.to_dict() + for c in temp: + obj[c.key] = t[c.key] + + await insert(AddressArchive).values([obj]).on_conflict_do_update( + index_elements=AddressArchive.__my_index_elements__, + set_=obj + ).gino.model(AddressArchive).status() + + async def get_npi_data(npi): + async with db.acquire(): + t = await NPIData.query.where(NPIData.npi == npi).gino.first() + return t.to_json_dict() + + async def get_address_list(npi): + t = [] + async with db.acquire() as conn: + g = await NPIAddress.query.where((NPIAddress.npi == npi) & (NPIAddress.type == 'primary')).gino.all() + for x in g: + postal_code = x.postal_code + if postal_code and len(postal_code)>5: + postal_code = f"{postal_code[0:5]}-{postal_code[5:]}" + t_addr = ', '.join( + [x.first_line, x.second_line, x.city_name, f"{x.state_name} {postal_code}"]) + + t_addr = t_addr.replace(' , ', ' ') + if not (x.long and x.lat): + raw_sql = text(f"""SELECT + g.rating, + ST_X(g.geomout) As lon, + ST_Y(g.geomout) As lat, + pprint_addy(g.addy) as formatted_address + from mrf.npi, + standardize_address('us_lex', + 'us_gaz', 'us_rules', :addr) as addr, + geocode(( + (addr).house_num, --address + null, --predirabbrev + (addr).name, --streetname + (addr).suftype, --streettypeabbrev + null, --postdirabbrev + (addr).unit, --internal + (addr).city, --location + (addr).state, --stateabbrev + (addr).postcode, --zip + true, --parsed + null, -- zip4 + (addr).house_num -- address_alphanumeric + )::norm_addy) as g + where npi = :npi""") + addr = await conn.status(raw_sql, addr=t_addr, npi=npi) + d = x.to_json_dict() + if addr and len(addr[-1]) and addr[-1][0] and addr[-1][0][0] < 15: + d['long'] = addr[-1][0][1] + d['lat'] = addr[-1][0][2] + d['formatted_address'] = addr[-1][0][3] + d['place_id'] = None + + if not d['lat']: + try: + params = { + request.app.config.get('GEOCODE_MAPBOX_STYLE_KEY_PARAM'): request.app.config.get( + 'GEOCODE_MAPBOX_STYLE_KEY')} + encoded_params = '.json?'.join( + (urllib.parse.quote_plus(t_addr), urllib.parse.urlencode(params, doseq=True),)) + if qp:=request.app.config.get('GEOCODE_MAPBOX_STYLE_ADDITIONAL_QUERY_PARAMS'): + encoded_params = '&'.join((encoded_params,qp,)) + url = request.app.config.get('GEOCODE_MAPBOX_STYLE_URL')+encoded_params + resp = await download_it(url) + geo_data = json.loads(resp.content) + if geo_data.get('features', []): + d['long'] = geo_data['features'][0]['geometry']['coordinates'][0] + d['lat'] = geo_data['features'][0]['geometry']['coordinates'][1] + d['formatted_address'] = geo_data['features'][0]['place_name'] + d['place_id'] = None + except: + pass + + if not d['lat']: + try: + params = {request.app.config.get('GEOCODE_GOOGLE_STYLE_ADDRESS_PARAM'): t_addr, + request.app.config.get('GEOCODE_GOOGLE_STYLE_KEY_PARAM'): request.app.config.get( + 'GEOCODE_GOOGLE_STYLE_KEY')} + encoded_params = urllib.parse.urlencode(params, doseq=True) + if qp:=request.app.config.get('GEOCODE_GOOGLE_STYLE_ADDITIONAL_QUERY_PARAMS'): + encoded_params = '&'.join((encoded_params,qp,)) + url = '?'.join((request.app.config.get('GEOCODE_GOOGLE_STYLE_URL'), encoded_params,)) + resp = await download_it(url) + geo_data = json.loads(resp.content) + if geo_data.get('results', []): + d['long'] = geo_data['results'][0]['geometry']['location']['lng'] + d['lat'] = geo_data['results'][0]['geometry']['location']['lat'] + d['formatted_address'] = geo_data['results'][0]['formatted_address'] + d['place_id'] = geo_data['results'][0]['place_id'] + except: + pass + if os.getenv('HLTHPRT_NPI_API_UPDATE_GEOCODE') and d.get('lat'): + request.app.add_task(update_addr_coordinates(x.checksum, d['long'], d['lat'], d['formatted_address'], d['place_id'])) + t.append(d) + else: + t.append(x.to_json_dict()) + + return t + + async def get_taxonomy_list(npi): + t = [] + async with db.acquire(): + for x in await NPIDataTaxonomy.query.where(NPIDataTaxonomy.npi == npi).gino.all(): + t.append(x.to_json_dict()) + return t + + async def get_taxonomy_group_list(npi): + t = [] + async with db.acquire(): + for x in await NPIDataTaxonomyGroup.query.where(NPIDataTaxonomyGroup.npi == npi).gino.all(): + t.append(x.to_json_dict()) + return t + + npi = int(npi) + data, address_list, taxonomy_list, taxonomy_group_list = await asyncio.gather( + get_npi_data(npi), + get_address_list(npi), + get_taxonomy_list(npi), + get_taxonomy_group_list(npi) + ) + if not data: + raise sanic.exceptions.NotFound + + + data['address_list'] = address_list + data['taxonomy_list'] = taxonomy_list + data['taxonomy_group_list'] = taxonomy_group_list + + return response.json(data) diff --git a/api/endpoint/nucc.py b/api/endpoint/nucc.py new file mode 100644 index 0000000..e7c73b5 --- /dev/null +++ b/api/endpoint/nucc.py @@ -0,0 +1,28 @@ +import asyncio +from datetime import datetime +from api.for_human import attributes_labels +import urllib.parse +from sqlalchemy.sql import func + +import sanic.exceptions +from sanic import response +from sanic import Blueprint + +from db.models import db, NUCCTaxonomy + +blueprint = Blueprint('nucc', url_prefix='/nucc', version=1) + + +@blueprint.get('/') +async def index_status_nucc(request): + + return response.json({}) + +@blueprint.get('/all') +async def all_of_nucc(request): + # plan_data = await NUCCTaxonomy.query.gino.all() + data = [] + async with db.transaction(): + async for p in NUCCTaxonomy.query.gino.iterate(): + data.append(p.to_json_dict()) + return response.json(data) diff --git a/api/endpoint/plan.py b/api/endpoint/plan.py index ee24708..8356e04 100644 --- a/api/endpoint/plan.py +++ b/api/endpoint/plan.py @@ -44,7 +44,7 @@ async def all_plans(request): return response.json(data) @blueprint.get('/all/variants') -async def all_plans(request): +async def all_plans_variants(request): plan_data = await db.select( [Plan.marketing_name, Plan.plan_id, PlanAttributes.full_plan_id, Plan.year]).select_from(Plan.join(PlanAttributes, ((Plan.plan_id == func.substr(PlanAttributes.full_plan_id, 1, 14)) & (Plan.year == PlanAttributes.year)))).\ group_by(PlanAttributes.full_plan_id, Plan.plan_id, Plan.marketing_name, Plan.year).gino.all() diff --git a/db/models.py b/db/models.py index 0ddf6d0..7100231 100644 --- a/db/models.py +++ b/db/models.py @@ -1,5 +1,5 @@ import os -from sqlalchemy import DateTime, DATE, Column, String, Integer, Float, BigInteger, Boolean, ARRAY, JSON, TIMESTAMP, TEXT +from sqlalchemy import DateTime, Numeric, DATE, Column, String, Integer, Float, BigInteger, Boolean, ARRAY, JSON, TIMESTAMP, TEXT from db.connection import db from db.json_mixin import JSONOutputMixin @@ -95,7 +95,6 @@ class PlanAttributes(db.Model, JSONOutputMixin): attr_value = Column(String) - class PlanTransparency(db.Model, JSONOutputMixin): __tablename__ = 'plan_transparency' __main_table__ = __tablename__ @@ -115,6 +114,7 @@ class PlanTransparency(db.Model, JSONOutputMixin): metal = Column(String) claims_payment_policies_url = Column(String) + class NPIData(db.Model, JSONOutputMixin): __tablename__ = 'npi' __main_table__ = __tablename__ @@ -142,22 +142,6 @@ class NPIData(db.Model, JSONOutputMixin): provider_other_name_suffix_text = Column(String) provider_other_credential_text = Column(String) provider_other_last_name_type_code = Column(String) - provider_first_line_business_mailing_address = Column(String) - provider_second_line_business_mailing_address = Column(String) - provider_business_mailing_address_city_name = Column(String) - provider_business_mailing_address_state_name = Column(String) - provider_business_mailing_address_postal_code = Column(String) - provider_business_mailing_address_country_code = Column(String) - provider_business_mailing_address_telephone_number = Column(String) - provider_business_mailing_address_fax_number = Column(String) - provider_first_line_business_practice_location_address = Column(String) - provider_second_line_business_practice_location_address = Column(String) - provider_business_practice_location_address_city_name = Column(String) - provider_business_practice_location_address_state_name = Column(String) - provider_business_practice_location_address_postal_code = Column(String) - provider_business_practice_location_address_country_code = Column(String) - provider_business_practice_location_address_telephone_number = Column(String) - provider_business_practice_location_address_fax_number = Column(String) provider_enumeration_date = Column(DATE) last_update_date = Column(DATE) npi_deactivation_reason_code = Column(String) @@ -185,6 +169,7 @@ class NPIDataTaxonomy(db.Model, JSONOutputMixin): {'schema': os.getenv('DB_SCHEMA') or 'mrf', 'extend_existing': True}, ) __my_index_elements__ = ['npi', 'checksum'] + __my_additional_indexes__ = [{'index_elements': ('healthcare_provider_taxonomy_code', 'npi',)}, ] npi = Column(BigInteger) checksum = Column(BigInteger) @@ -219,3 +204,72 @@ class NPIDataTaxonomyGroup(db.Model, JSONOutputMixin): npi = Column(BigInteger) checksum = Column(BigInteger) healthcare_provider_taxonomy_group = Column(String) + + +class NUCCTaxonomy(db.Model, JSONOutputMixin): + __tablename__ = 'nucc_taxonomy' + __main_table__ = __tablename__ + __table_args__ = ( + {'schema': os.getenv('DB_SCHEMA') or 'mrf', 'extend_existing': True}, + ) + __my_index_elements__ = ['code'] + __my_additional_indexes__ = [{'index_elements': ('int_code',)}, {'index_elements': ('display_name',)}] + + int_code = Column(Integer) + code = Column(String) + grouping = Column(String) + classification = Column(String) + specialization = Column(String) + definition = Column(TEXT) + notes = Column(TEXT) + display_name = Column(String) + section = Column(String) + + + +class AddressPrototype(db.Model, JSONOutputMixin): + __table_args__ = ( + {'schema': os.getenv('DB_SCHEMA') or 'mrf', 'extend_existing': True}, + ) + + checksum = Column(BigInteger) + first_line = Column(String) + second_line = Column(String) + city_name = Column(String) + state_name = Column(String) + postal_code = Column(String) + country_code = Column(String) + telephone_number = Column(String) + fax_number = Column(String) + formatted_address = Column(String) + lat = Column(Numeric(scale=8, precision=11, asdecimal=False, decimal_return_scale=None)) + long = Column(Numeric(scale=8, precision=11, asdecimal=False, decimal_return_scale=None)) + date_added = Column(DATE) + place_id = Column(String) + + +class AddressArchive(AddressPrototype): + __tablename__ = 'address_archive' + __main_table__ = __tablename__ + __my_index_elements__ = ['checksum'] + # __my_additional_indexes__ = [{'index_elements': ('healthcare_provider_taxonomy_code', 'npi',)}, ] + + +class NPIAddress(AddressPrototype): + __tablename__ = 'npi_address' + __main_table__ = __tablename__ + __my_index_elements__ = ['npi', 'checksum', 'type'] + __my_additional_indexes__ = [{'index_elements': ('postal_code',)}, + {'index_elements': ('checksum',), 'using': 'gin'}, + {'index_elements': ('city_name', 'state_name', 'country_code'), 'using': 'gin'}, + {'index_elements': ( + 'Geography(ST_MakePoint(long, lat))', 'taxonomy_array gist__intbig_ops', 'plans_array gist__intbig_ops'), + 'using': 'gist', + 'name': 'geo_index_with_taxonomy_and_plans'}] + + npi = Column(BigInteger) + type = Column(String) + taxonomy_array = Column(ARRAY(Integer)) + plans_array = Column(ARRAY(Integer)) + + # NPI Provider Secondary Practice Location Address- Address Line 1 Provider Secondary Practice Location Address- Address Line 2 Provider Secondary Practice Location Address - City Name Provider Secondary Practice Location Address - State Name Provider Secondary Practice Location Address - Postal Code Provider Secondary Practice Location Address - Country Code (If outside U.S.) Provider Secondary Practice Location Address - Telephone Number Provider Secondary Practice Location Address - Telephone Extension Provider Practice Location Address - Fax Number diff --git a/main.py b/main.py index c647195..e9f4cc8 100755 --- a/main.py +++ b/main.py @@ -7,23 +7,25 @@ from asyncpg import connection from asyncpg.connection import ServerCapabilities from sanic import Sanic -from api import init_api +env_path = Path(__file__).absolute().parent / '.env' from dotenv import load_dotenv +load_dotenv(dotenv_path=env_path) +from api import init_api + + import arq.cli from db.migrator import db_group - -env_path = Path(__file__).absolute().parent / '.env' -load_dotenv(dotenv_path=env_path) with open(os.environ['HLTHPRT_LOG_CFG'], encoding="utf-8") as fobj: logging.config.dictConfig(yaml.safe_load(fobj)) from process import process_group - +api = Sanic('mrf-api', env_prefix="HLTHPRT_") +init_api(api) @click.command(help="Run sanic server") @click.option('--host', help='Setup host ip to listen up, default to 0.0.0.0', default='0.0.0.0') @@ -39,10 +41,8 @@ def start(host, port, workers, debug, accesslog): sql_reset=False, sql_close_all=False ) - api = Sanic(__name__, env_prefix="HLTHPRT_") if debug: os.environ['HLTHPRT_DB_ECHO'] = 'True' - init_api(api) with open(api.config['LOG_CFG']) as fobj: logging.config.dictConfig(yaml.safe_load(fobj)) api.run( diff --git a/process/__init__.py b/process/__init__.py index 3f15160..af452d2 100644 --- a/process/__init__.py +++ b/process/__init__.py @@ -9,6 +9,7 @@ from process.initial import main as initiate_mrf, init_file, startup as initial_startup, shutdown as initial_shutdown, process_plan, process_json_index from process.attributes import main as initiate_plan_attributes, save_attributes, process_attributes, startup as attr_startup, shutdown as attr_shutdown from process.npi import main as initiate_npi, process_npi_chunk, save_npi_data, startup as npi_startup, shutdown as npi_shutdown, process_data as process_npi_data +from process.nucc import main as initiate_nucc, startup as nucc_startup, shutdown as nucc_shutdown, process_data as process_nucc_data uvloop.install() @@ -45,6 +46,18 @@ class NPI: job_serializer = lambda b: msgpack.packb(b, datetime=True) job_deserializer = lambda b: msgpack.unpackb(b, timestamp=3, raw=False) +class NUCC: + functions = [process_nucc_data] + on_startup = nucc_startup + on_shutdown = nucc_shutdown + max_jobs=20 + queue_read_limit = 5 + job_timeout=86400 + redis_settings = RedisSettings.from_dsn(os.environ.get('HLTHPRT_REDIS_ADDRESS')) + job_serializer = lambda b: msgpack.packb(b, datetime=True) + job_deserializer = lambda b: msgpack.unpackb(b, timestamp=3, raw=False) + + @click.group() def process_group(): """ @@ -65,7 +78,14 @@ def npi(): asyncio.run(initiate_npi()) +@click.command(help="Run NUCC Taxonomy Import") +def nucc(): + asyncio.run(initiate_nucc()) + + process_group.add_command(mrf) process_group.add_command(plan_attributes) process_group.add_command(npi) +process_group.add_command(nucc) + diff --git a/process/ext/utils.py b/process/ext/utils.py index 4f96cf2..14142c4 100644 --- a/process/ext/utils.py +++ b/process/ext/utils.py @@ -9,14 +9,19 @@ from asyncpg.exceptions import UniqueViolationError, InterfaceError from aiofile import async_open from arq import Retry -from fastcrc import crc32 +from fastcrc import crc32, crc16 import humanize from sqlalchemy.dialects.postgresql import insert from db.connection import db +from random import choice +import json from db.json_mixin import JSONOutputMixin if os.environ.get('HLTHPRT_SOCKS_PROXY'): - transport = httpx.AsyncHTTPTransport(retries=3, proxy=httpx.Proxy(os.environ['HLTHPRT_SOCKS_PROXY'])) + print(os.environ.get('HLTHPRT_SOCKS_PROXY')) + print(json.loads(os.environ.get('HLTHPRT_SOCKS_PROXY'))) + transport = httpx.AsyncHTTPTransport(retries=3, + proxy=httpx.Proxy(choice(json.loads(os.environ['HLTHPRT_SOCKS_PROXY'])))) else: transport = httpx.AsyncHTTPTransport(retries=3) @@ -104,13 +109,16 @@ class MyClass(Base): err_obj_list = [] err_obj_key = {} -def return_checksum(arr: list): +def return_checksum(arr: list, crc=32): for i in range(0, len(arr)): arr[i] = str(arr[i]) checksum = '|'.join(arr) checksum = bytes(checksum, 'utf-8') + if crc == 16: + return crc16.xmodem(checksum) return crc32.cksum(checksum) + async def log_error(type, error, issuer_array, url, source, level, cls): for issuer_id in issuer_array: checksum = return_checksum([type, str(error), str(issuer_id), str(url), source, level]) diff --git a/process/initial.py b/process/initial.py index fb5c475..025459a 100644 --- a/process/initial.py +++ b/process/initial.py @@ -283,7 +283,7 @@ async def init_file(ctx): tmp_filename = glob.glob(f"{tmpdirname}/*.xlsx")[0] xls_file = xl.readxl(tmp_filename) - ws_name = xls_file.ws_names[1] + ws_name = xls_file.ws_names[-1] os.unlink(tmp_filename) count = 0 @@ -364,6 +364,14 @@ async def shutdown(ctx): await db.status("CREATE EXTENSION IF NOT EXISTS pg_trgm;") await db.status("CREATE EXTENSION IF NOT EXISTS btree_gin;") + test = make_class(Plan, import_date) + plans_count = await db.func.count(test.plan_id).gino.scalar() + if (not plans_count) or (plans_count < 500): + print(f"Failed Import: Plans number:{plans_count}") + exit(1) + + + tables = {} async with db.transaction(): for cls in (Issuer, Plan, PlanFormulary, PlanTransparency, ImportLog): diff --git a/process/npi.py b/process/npi.py index 0894754..1b75971 100644 --- a/process/npi.py +++ b/process/npi.py @@ -12,6 +12,7 @@ from arq.connections import RedisSettings from pathlib import Path, PurePath from aiocsv import AsyncDictReader, AsyncReader +from asyncpg import DuplicateTableError import csv @@ -23,7 +24,8 @@ make_class, push_objects, log_error, print_time_info, \ flush_error_log -from db.models import Issuer, NPIData, NPIDataTaxonomyGroup, NPIDataOtherIdentifier, NPIDataTaxonomy, db +from db.models import AddressArchive, NPIAddress, NPIData, NPIDataTaxonomyGroup, NPIDataOtherIdentifier, \ + NPIDataTaxonomy, db from db.connection import init_db latin_pattern= re.compile(r'[^\x00-\x7f]') @@ -37,6 +39,8 @@ async def process_npi_chunk(ctx, task): npi_taxonomy_list_dict = {} npi_other_id_list_dict = {} npi_taxonomy_group_list_dict = {} + npi_taxonomy_group_list_dict = {} + npi_address_list_dict = {} npi_csv_map = task['npi_csv_map'] npi_csv_map_reverse = task['npi_csv_map_reverse'] @@ -56,78 +60,51 @@ async def process_npi_chunk(ctx, task): t = pytz.utc.localize(parse_date(t, fuzzy=True)) obj[npi_csv_map[key]] = t - # obj = {'npi': int(row['NPI']), - # 'entity_type_code': int(row['Entity Type Code']) if row['Entity Type Code'] else None, - # 'replacement_npi': int(row['Replacement NPI']) if row['Replacement NPI'] else None, - # 'employer_identification_number': row['Employer Identification Number (EIN)'], - # 'provider_organization_name': row['Provider Organization Name (Legal Business Name)'], - # 'provider_last_name': row['Provider Last Name (Legal Name)'], - # 'provider_first_name': row['Provider First Name'], - # 'provider_middle_name': row['Provider Middle Name'], - # 'provider_name_prefix_text': row['Provider Name Prefix Text'], - # 'provider_name_suffix_text': row['Provider Name Suffix Text'], - # 'provider_credential_text': row['Provider Credential Text'], - # 'provider_other_organization_name': row['Provider Other Organization Name'], - # 'provider_other_organization_name_type_code': row['Provider Other Organization Name Type Code'], - # 'provider_other_last_name': row['Provider Other Last Name'], - # 'provider_other_first_name': row['Provider Other First Name'], - # 'provider_other_middle_name': row['Provider Other Middle Name'], - # 'provider_other_name_prefix_text': row['Provider Other Name Prefix Text'], - # 'provider_other_name_suffix_text': row['Provider Other Name Suffix Text'], - # 'provider_other_credential_text': row['Provider Other Credential Text'], - # 'provider_other_last_name_type_code': row['Provider Other Last Name Type Code'], - # 'provider_first_line_business_mailing_address': row['Provider First Line Business Mailing Address'], - # 'provider_second_line_business_mailing_address': row['Provider Second Line Business Mailing Address'], - # 'provider_business_mailing_address_city_name': row['Provider Business Mailing Address City Name'], - # 'provider_business_mailing_address_state_name': row['Provider Business Mailing Address State Name'], - # 'provider_business_mailing_address_postal_code': row['Provider Business Mailing Address Postal Code'], - # 'provider_business_mailing_address_country_code': row[ - # 'Provider Business Mailing Address Country Code (If outside U.S.)'], - # 'provider_business_mailing_address_telephone_number': row[ - # 'Provider Business Mailing Address Telephone Number'], - # 'provider_business_mailing_address_fax_number': row['Provider Business Mailing Address Fax Number'], - # 'provider_first_line_business_practice_location_address': row[ - # 'Provider First Line Business Practice Location Address'], - # 'provider_second_line_business_practice_location_address': row[ - # 'Provider Second Line Business Practice Location Address'], - # 'provider_business_practice_location_address_city_name': row[ - # 'Provider Business Practice Location Address City Name'], - # 'provider_business_practice_location_address_state_name': row[ - # 'Provider Business Practice Location Address State Name'], - # 'provider_business_practice_location_address_postal_code': row[ - # 'Provider Business Practice Location Address Postal Code'], - # 'provider_business_practice_location_address_country_code': row[ - # 'Provider Business Practice Location Address Country Code (If outside U.S.)'], - # 'provider_business_practice_location_address_telephone_number': row[ - # 'Provider Business Practice Location Address Telephone Number'], - # 'provider_business_practice_location_address_fax_number': row[ - # 'Provider Business Practice Location Address Fax Number'], - # 'provider_enumeration_date': pytz.utc.localize(parse_date(row['Provider Enumeration Date'], fuzzy=True)) if - # row['Provider Enumeration Date'] else None, - # 'last_update_date': pytz.utc.localize(parse_date(row['Last Update Date'], fuzzy=True)) if row[ - # 'Last Update Date'] else None, - # 'npi_deactivation_reason_code': row['NPI Deactivation Reason Code'], - # 'npi_deactivation_date': pytz.utc.localize(parse_date(row['NPI Deactivation Date'], fuzzy=True)) if row[ - # 'NPI Deactivation Date'] else None, - # 'npi_reactivation_date': pytz.utc.localize(parse_date(row['NPI Reactivation Date'], fuzzy=True)) if row[ - # 'NPI Reactivation Date'] else None, - # 'provider_gender_code': row['Provider Gender Code'], - # 'authorized_official_last_name': row['Authorized Official Last Name'], - # 'authorized_official_first_name': row['Authorized Official First Name'], - # 'authorized_official_middle_name': row['Authorized Official Middle Name'], - # 'authorized_official_title_or_position': row['Authorized Official Title or Position'], - # 'authorized_official_telephone_number': row['Authorized Official Telephone Number'], - # 'is_sole_proprietor': row['Is Sole Proprietor'], - # 'is_organization_subpart': row['Is Organization Subpart'], - # 'parent_organization_lbn': row['Parent Organization LBN'], - # 'parent_organization_tin': row['Parent Organization TIN'], - # 'authorized_official_name_prefix_text': row['Authorized Official Name Prefix Text'], - # 'authorized_official_name_suffix_text': row['Authorized Official Name Suffix Text'], - # 'authorized_official_credential_text': row['Authorized Official Credential Text'], - # 'certification_date': pytz.utc.localize(parse_date(row['Certification Date'], fuzzy=True)) if row[ - # 'Certification Date'] else None} + npi_obj_list.append(obj) + if (row['Provider First Line Business Practice Location Address']): + obj = { + 'first_line': row['Provider First Line Business Practice Location Address'], + 'second_line': row['Provider Second Line Business Practice Location Address'], + 'city_name': row['Provider Business Practice Location Address City Name'], + 'state_name': row['Provider Business Practice Location Address State Name'], + 'postal_code': row['Provider Business Practice Location Address Postal Code'], + 'country_code': row['Provider Business Practice Location Address Country Code (If outside U.S.)'], + } + + obj.update({ + 'checksum': return_checksum(list(obj.values())), #addresses have blank symbols + 'npi': int(row['NPI']), + 'type': 'primary', + 'telephone_number': row['Provider Business Practice Location Address Telephone Number'], + 'fax_number': row['Provider Business Practice Location Address Fax Number'], + 'date_added': pytz.utc.localize(parse_date(row['Last Update Date'], fuzzy=True)) if row[ + 'Last Update Date'] else None + }) + npi_address_list_dict['_'.join([str(obj['npi']), str(obj['checksum']), obj['type'],])] = obj + + if (row['Provider First Line Business Mailing Address']): + obj = { + 'first_line': row['Provider First Line Business Mailing Address'], + 'second_line': row['Provider Second Line Business Mailing Address'], + 'city_name': row['Provider Business Mailing Address City Name'], + 'state_name': row['Provider Business Mailing Address State Name'], + 'postal_code': row['Provider Business Mailing Address Postal Code'], + 'country_code': row['Provider Business Mailing Address Country Code (If outside U.S.)'], + } + + obj.update({ + 'checksum': return_checksum(list(obj.values())), # addresses have blank symbols + 'npi': int(row['NPI']), + 'type': 'mail', + 'telephone_number': row['Provider Business Mailing Address Telephone Number'], + 'fax_number': row['Provider Business Mailing Address Fax Number'], + 'date_added': pytz.utc.localize(parse_date(row['Last Update Date'], fuzzy=True)) if row[ + 'Last Update Date'] else None + }) + npi_address_list_dict['_'.join([str(obj['npi']), str(obj['checksum']), obj['type'],])] = obj + for i in range(1, 16): if row[f'Healthcare Provider Taxonomy Code_{i}']: t = { @@ -171,37 +148,12 @@ async def process_npi_chunk(ctx, task): else: break - # task = {} - # insert_limit = 10000 - # if len(npi_obj_list) > insert_limit: - # # int(os.environ.get('HLTHPRT_SAVE_PER_PACK', 100)): - # task['npi_obj_list'] = npi_obj_list - # if len(npi_taxonomy_list) > insert_limit: - # task['npi_taxonomy_list'] = npi_taxonomy_list - # if len(npi_other_id_list) > insert_limit: - # task['npi_other_id_list'] = npi_other_id_list - # if len(npi_taxonomy_group_list) > insert_limit: - # task['npi_taxonomy_group_list'] = npi_taxonomy_group_list - # - # if task: - # await redis.enqueue_job('save_npi_data', task) - # for key in task: - # if key == 'npi_obj_list': - # npi_obj_list.clear() - # elif key == 'npi_taxonomy_list': - # npi_taxonomy_list.clear() - # elif key == 'npi_other_id_list': - # npi_other_id_list.clear() - # elif key == 'npi_taxonomy_group_list': - # npi_taxonomy_group_list.clear() - # else: - # print('Some wrong key passed') - await redis.enqueue_job('save_npi_data', { 'npi_obj_list': npi_obj_list, 'npi_taxonomy_list': list(npi_taxonomy_list_dict.values()), 'npi_other_id_list': list(npi_other_id_list_dict.values()), - 'npi_taxonomy_group_list': list(npi_taxonomy_group_list_dict.values()) + 'npi_taxonomy_group_list': list(npi_taxonomy_group_list_dict.values()), + 'npi_address_list': list(npi_address_list_dict.values()), }) @@ -241,31 +193,17 @@ async def process_data(ctx): npi_csv_map_reverse = {} int_key_re = re.compile(r'.*_\d+$') - # count = 0 - # now = datetime.datetime.now() - # async with async_open(npi_file, 'r') as afp: - # async for row in AsyncDictReader(afp, delimiter=","): - # count += 1 - # if not count % 100_000: - # print(f"Processed: {count}") - # pass - # now2 = datetime.datetime.now() - # print(f"Processed: {count}") - # print('Time Delta: ', now2-now) - # exit(1) + async with async_open(npi_file, 'r') as afp: async for row in AsyncDictReader(afp, delimiter=","): for key in row: - if int_key_re.match(key): + if int_key_re.match(key) or ' Address' in key: continue t = re.sub(r"\(.*\)", r"", key.lower()).strip().replace(' ', '_') npi_csv_map[key] = t npi_csv_map_reverse[t] = key break - # for key in npi_csv_map_reverse: - # print(f"'{key}': row['{npi_csv_map_reverse[key]}'],") - # exit(1) count = 0 total_count = 0 @@ -283,14 +221,50 @@ async def process_data(ctx): await process_npi_chunk(ctx, {'row_list': row_list, 'npi_csv_map': npi_csv_map, 'npi_csv_map_reverse': npi_csv_map_reverse}) - # await redis.enqueue_job('process_npi_chunk', {'row_list': row_list, - # 'npi_csv_map': npi_csv_map, - # 'npi_csv_map_reverse': npi_csv_map_reverse}) row_list.clear() count = 0 else: count += 1 + npi_address_list_dict = {} + async with async_open(pl_file, 'r') as afp: + async for row in AsyncDictReader(afp, delimiter=","): + if not (row['NPI'] or row['Provider Secondary Practice Location Address- Address Line 1']): + continue + count += 1 + if not count % 100_000: + print(f"Processed: {count}") + obj = { + 'first_line': row['Provider Secondary Practice Location Address- Address Line 1'], + 'second_line': row['Provider Secondary Practice Location Address- Address Line 2'], + 'city_name': row['Provider Secondary Practice Location Address - City Name'], + 'state_name': row['Provider Secondary Practice Location Address - State Name'], + 'postal_code': row['Provider Secondary Practice Location Address - Postal Code'], + 'country_code': row['Provider Secondary Practice Location Address - Country Code (If outside U.S.)'], + } + + obj.update({ + 'checksum': return_checksum(list(obj.values())), # addresses have blank symbols + 'npi': int(row['NPI']), + 'type': 'secondary', + 'telephone_number': row['Provider Secondary Practice Location Address - Telephone Number'], + 'fax_number': row['Provider Practice Location Address - Fax Number'], + 'date_added': pytz.utc.localize(datetime.datetime.now()) + }) + npi_address_list_dict['_'.join([str(obj['npi']), str(obj['checksum']), obj['type'], ])] = obj + + if count > 9999: + await redis.enqueue_job('save_npi_data', { + 'npi_address_list': list(npi_address_list_dict.values()), + }) + npi_address_list_dict = {} + count = 0 + else: + count += 1 + await redis.enqueue_job('save_npi_data', { + 'npi_address_list': list(npi_address_list_dict.values()), + }) + print(f"Processed: {count}") @@ -306,7 +280,17 @@ async def startup(ctx): tables = {} # for the future complex usage - for cls in (NPIData, NPIDataTaxonomyGroup, NPIDataOtherIdentifier, NPIDataTaxonomy,): + try: + obj = AddressArchive + await AddressArchive.__table__.gino.create() + if hasattr(AddressArchive, "__my_index_elements__"): + await db.status( + f"CREATE UNIQUE INDEX {obj.__tablename__}_idx_primary ON " + f"{db_schema}.{obj.__tablename__} ({', '.join(obj.__my_index_elements__)});") + except DuplicateTableError: + pass + + for cls in (NPIData, NPIDataTaxonomyGroup, NPIDataOtherIdentifier, NPIDataTaxonomy, NPIAddress): tables[cls.__main_table__] = make_class(cls, import_date) obj = tables[cls.__main_table__] await db.status(f"DROP TABLE IF EXISTS {db_schema}.{obj.__main_table__}_{import_date};") @@ -322,9 +306,55 @@ async def shutdown(ctx): import_date = ctx['import_date'] db_schema = os.getenv('DB_SCHEMA') if os.getenv('DB_SCHEMA') else 'mrf' tables = {} - async with db.transaction(): - for cls in (NPIData, NPIDataTaxonomyGroup, NPIDataOtherIdentifier, NPIDataTaxonomy, ): - tables[cls.__main_table__] = make_class(cls, import_date) + + test = make_class(NPIAddress, import_date) + npi_address_count = await db.func.count(test.npi).gino.scalar() + if (not npi_address_count) or (npi_address_count < 5000000): + print(f"Failed Import: Address number:{npi_address_count}") + exit(1) + + processing_classes_array = (NPIData, NPIDataTaxonomyGroup, NPIDataOtherIdentifier, NPIDataTaxonomy, NPIAddress,) + + + for cls in processing_classes_array: + tables[cls.__main_table__] = make_class(cls, import_date) + obj = tables[cls.__main_table__] + if cls is NPIAddress: + print("Updating NUCC Taxonomy for NPI Addresses...") + await db.status(f"""WITH x AS ( +SELECT + int_code, code as target_code +FROM + {db_schema}.nucc_taxonomy +) +UPDATE {db_schema}.{obj.__tablename__} as addr SET taxonomy_array=b.res FROM ( +select npi, ARRAY_AGG(x.int_code) as res from mrf.npi_taxonomy_{import_date} +INNER JOIN x ON healthcare_provider_taxonomy_code = x.target_code +GROUP BY npi) as b WHERE addr.npi = b.npi;""") + print("Updating NPI Addresses Geo from Archive...") + await db.status( + f"UPDATE {db_schema}.{obj.__tablename__} as a SET formatted_address = b.formatted_address, lat = b.lat, " + f"long = b.long, " + f"place_id = b.place_id FROM {db_schema}.address_archive as b WHERE a.checksum = b.checksum") + + + if hasattr(cls, '__my_additional_indexes__') and cls.__my_additional_indexes__: + for index in cls.__my_additional_indexes__: + index_name = index.get('name', '_'.join(index.get('index_elements'))) + using = "" + if t:=index.get('using'): + using = f"USING {t} " + create_index_sql = f"CREATE INDEX IF NOT EXISTS {obj.__tablename__}_idx_{index_name} " \ + f"ON {db_schema}.{obj.__tablename__} {using}" \ + f"({', '.join(index.get('index_elements'))});" + print(create_index_sql) + x = await db.status(create_index_sql) + + print(f"VACUUM FULL ANALYZE {db_schema}.{obj.__tablename__};"); + await db.status(f"VACUUM FULL ANALYZE {db_schema}.{obj.__tablename__};") + + async with db.transaction() as tx: + for cls in processing_classes_array: obj = tables[cls.__main_table__] table = obj.__main_table__ await db.status(f"DROP TABLE IF EXISTS {db_schema}.{table}_old;") @@ -339,6 +369,16 @@ async def shutdown(ctx): f"{db_schema}.{obj.__tablename__}_idx_primary RENAME TO " f"{table}_idx_primary;") + if hasattr(cls, '__my_additional_indexes__') and obj.__my_additional_indexes__: + for index in obj.__my_additional_indexes__: + index_name = index.get('name', '_'.join(index.get('index_elements'))) + await db.status(f"ALTER INDEX IF EXISTS " + f"{db_schema}.{table}_idx_{index_name} RENAME TO " + f"{table}_idx_{index_name}_old;") + await db.status(f"ALTER INDEX IF EXISTS " + f"{db_schema}.{obj.__tablename__}_idx_{index_name} RENAME TO " + f"{table}_idx_{index_name};") + print_time_info(ctx['context']['start']) @@ -346,174 +386,27 @@ async def save_npi_data(ctx, task): import_date = ctx['import_date'] x = [] for key in task: - if key == 'npi_obj_list': - mynpidata = make_class(NPIData, import_date) - x.append(push_objects(task['npi_obj_list'], mynpidata, rewrite=True)) - elif key == 'npi_taxonomy_list': - mynpidatataxonomy = make_class(NPIDataTaxonomy, import_date) - x.append(push_objects(task['npi_taxonomy_list'], mynpidatataxonomy, rewrite=True)) - elif key == 'npi_other_id_list': - mynpidataotheridentifier = make_class(NPIDataOtherIdentifier, import_date) - x.append(push_objects(task['npi_other_id_list'], mynpidataotheridentifier, rewrite=True)) - elif key == 'npi_taxonomy_group_list': - mynpidatataxonomygroup = make_class(NPIDataTaxonomyGroup, import_date) - x.append(push_objects(task['npi_taxonomy_group_list'], mynpidatataxonomygroup, rewrite=True)) - else: - print('Some wrong key passed') + match key: + case 'npi_obj_list': + mynpidata = make_class(NPIData, import_date) + x.append(push_objects(task['npi_obj_list'], mynpidata, rewrite=True)) + case 'npi_taxonomy_list': + mynpidatataxonomy = make_class(NPIDataTaxonomy, import_date) + x.append(push_objects(task['npi_taxonomy_list'], mynpidatataxonomy, rewrite=True)) + case 'npi_other_id_list': + mynpidataotheridentifier = make_class(NPIDataOtherIdentifier, import_date) + x.append(push_objects(task['npi_other_id_list'], mynpidataotheridentifier, rewrite=True)) + case 'npi_taxonomy_group_list': + mynpidatataxonomygroup = make_class(NPIDataTaxonomyGroup, import_date) + x.append(push_objects(task['npi_taxonomy_group_list'], mynpidatataxonomygroup, rewrite=True)) + case 'npi_address_list': + mynpiaddress = make_class(NPIAddress, import_date) + x.append(push_objects(task['npi_address_list'], mynpiaddress, rewrite=True)) + case _: + print('Some wrong key passed') await asyncio.gather(*x) -async def process_attributes(ctx, task): - redis = ctx['redis'] - - print('Downloading data from: ', task['url']) - - import_date = ctx['import_date'] - myissuer = make_class(Issuer, import_date) - myplanattributes = make_class(PlanAttributes, import_date) - - - with tempfile.TemporaryDirectory() as tmpdirname: - p = 'attr.csv' - tmp_filename = str(PurePath(str(tmpdirname), p + '.zip')) - await download_it_and_save(task['url'], tmp_filename) - await unzip(tmp_filename, tmpdirname) - - tmp_filename = glob.glob(f"{tmpdirname}/*.csv")[0] - total_count = 0 - attr_obj_list = [] - - count = 0 - #return 1 - async with async_open(tmp_filename, 'r') as afp: - async for row in AsyncDictReader(afp, delimiter=","): - if not (row['StandardComponentId'] and row['PlanId']): - continue - count += 1 - for key in row: - if not ((key in ('StandardComponentId',)) and (row[key] is None)) and (t := str(row[key]).strip()): - obj = { - 'full_plan_id': row['PlanId'], - 'year': int(task['year']), # int(row['\ufeffBusinessYear']) - 'attr_name': re.sub(latin_pattern,r'', key), - 'attr_value': t - } - - attr_obj_list.append(obj) - - if count > 100: - #int(os.environ.get('HLTHPRT_SAVE_PER_PACK', 100)): - total_count += count - await redis.enqueue_job('save_attributes', {'attr_obj_list': attr_obj_list}) - # await push_objects(attr_obj_list, myplanattributes) - # test = {} - # for x in attr_obj_list: - # test[x['full_plan_id']] = 1 - # print(f"{task['year']}: processed {total_count} + rows {len(attr_obj_list)} -- {row['StandardComponentId']} -- {len(test.keys())}") - attr_obj_list.clear() - count = 0 - else: - count += 1 - - if attr_obj_list: - await push_objects(attr_obj_list, myplanattributes) - - # obj_list = [] - # for ws_name in xls_file.ws_names: - # print(ws_name) - # if not ws_name.startswith('Transparency'): - # continue - # count = 0 - # template = {} - # convert = { - # 'State': 'state', - # 'Issuer_Name': 'issuer_name', - # 'Issuer_ID': 'issuer_id', - # 'Is_Issuer_New_to_Exchange? (Yes_or_No)': 'new_issuer_to_exchange', - # 'SADP_Only?': 'sadp_only', - # 'Plan_ID': 'plan_id', - # 'QHP/SADP': 'qhp_sadp', - # 'Plan_Type': 'plan_type', - # 'Metal_Level': 'metal', - # 'URL_Claims_Payment_Policies': 'claims_payment_policies_url' - # } - # for k, v in convert.items(): - # template[v] = -1 - # - # for row in xls_file.ws(ws=ws_name).rows: - # if count > 2: - # obj = {} - # obj['state'] = row[template['state']].upper() - # obj['issuer_name'] = row[template['issuer_name']] - # obj['issuer_id'] = int(row[template['issuer_id']]) - # obj['new_issuer_to_exchange'] = True if row[template['new_issuer_to_exchange']] in ( - # 'Yes', 'yes', 'y') else False - # obj['sadp_only'] = True if row[template['sadp_only']] in ('Yes', 'yes', 'y') else False - # obj['plan_id'] = row[template['plan_id']] - # obj['year'] = int(file['year']) - # obj['qhp_sadp'] = row[template['qhp_sadp']] - # obj['plan_type'] = row[template['plan_type']] - # obj['metal'] = row[template['metal']] - # obj['claims_payment_policies_url'] = row[template['claims_payment_policies_url']] - # - # obj_list.append(obj) - # if count > int(os.environ.get('HLTHPRT_SAVE_PER_PACK', 50)): - # count = 3 - # await push_objects(obj_list, myplantransparency) - # obj_list = [] - # elif count == 2: - # i = 0 - # for name in row: - # if name in convert: - # template[convert[name]] = i - # i += 1 - # count += 1 - # - # await push_objects(obj_list, myplantransparency) - # - # p = 'mrf_puf.xlsx' - # tmp_filename = str(PurePath(str(tmpdirname), p + '.zip')) - # await download_it_and_save(os.environ['HLTHPRT_CMSGOV_MRF_URL_PUF'], tmp_filename) - # await unzip(tmp_filename, tmpdirname) - # - # tmp_filename = glob.glob(f"{tmpdirname}/*.xlsx")[0] - # xls_file = xl.readxl(tmp_filename) - # ws_name = xls_file.ws_names[1] - # os.unlink(tmp_filename) - # - # count = 0 - # url_list = [] - # obj_list = [] - # url2issuer = {} - # - # for row in xls_file.ws(ws=ws_name).rows: - # if count != 0: - # url_list.append(row[2]) - # obj = {} - # obj['state'] = row[0].upper() - # obj['issuer_id'] = int(row[1]) - # obj['mrf_url'] = row[2] - # issuer_name = await myplantransparency.select('issuer_name').where( - # myplantransparency.issuer_id == obj['issuer_id']).gino.scalar() - # obj['issuer_name'] = issuer_name if issuer_name else 'N/A' - # obj['data_contact_email'] = row[3] - # obj_list.append(obj) - # if obj['mrf_url'] in url2issuer: - # url2issuer[obj['mrf_url']].append(obj['issuer_id']) - # else: - # url2issuer[obj['mrf_url']] = [obj['issuer_id'], ] - # count += 1 - # if not (count % 100): - # await push_objects(obj_list, myissuer) - # obj_list.clear() - # - # url_list = list(set(url_list)) - # await push_objects(obj_list, myissuer) - # - # for url in url_list: - # await redis.enqueue_job('process_json_index', {'url': url, 'issuer_array': url2issuer[url]}) - # # break - async def main(): redis = await create_pool(RedisSettings.from_dsn(os.environ.get('HLTHPRT_REDIS_ADDRESS')), job_serializer=msgpack.packb, diff --git a/process/nucc.py b/process/nucc.py new file mode 100644 index 0000000..662cf4a --- /dev/null +++ b/process/nucc.py @@ -0,0 +1,130 @@ +import os +import msgpack +import asyncio +import datetime +import tempfile +import re +from arq import create_pool +from arq.connections import RedisSettings +from pathlib import Path, PurePath +from aiocsv import AsyncDictReader +from aiofile import async_open + +from process.ext.utils import download_it, download_it_and_save, \ + make_class, push_objects, print_time_info, return_checksum + +from db.models import NUCCTaxonomy, db +from db.connection import init_db + +latin_pattern= re.compile(r'[^\x00-\x7f]') + + +async def process_data(ctx): + import_date = ctx['import_date'] + html_source = await download_it( + os.environ['HLTHPRT_NUCC_DOWNLOAD_URL_DIR'] + os.environ['HLTHPRT_NUCC_DOWNLOAD_URL_FILE']) + + for p in re.findall(r'\"(.*?nucc_taxonomy.*?\.csv)\"', html_source.text): + with tempfile.TemporaryDirectory() as tmpdirname: + print(f"Found: {p}") + file_name = p.split('/')[-1] + tmp_filename = str(PurePath(str(tmpdirname), file_name)) + await download_it_and_save(os.environ['HLTHPRT_NUCC_DOWNLOAD_URL_DIR'] + p, tmp_filename, + chunk_size=10 * 1024 * 1024, cache_dir='/tmp') + print(f"Downloaded: {p}") + csv_map, csv_map_reverse = ({}, {}) + async with async_open(tmp_filename, 'r') as afp: + async for row in AsyncDictReader(afp, delimiter=","): + for key in row: + t = re.sub(r"\(.*\)", r"", key.lower()).strip().replace(' ', '_') + csv_map[key] = t + csv_map_reverse[t] = key + break + + count = 0 + + + row_list = [] + myNUCCTaxonomy = make_class(NUCCTaxonomy, import_date) + async with async_open(tmp_filename, 'r') as afp: + async for row in AsyncDictReader(afp, delimiter=","): + if not (row['Code']): + continue + count += 1 + if not count % 100_000: + print(f"Processed: {count}") + obj = {} + for key in csv_map: + t = row[key] + if not t: + obj[csv_map[key]] = None + continue + obj[csv_map[key]] = t + obj['int_code'] = return_checksum([obj['code'],], crc=16) + row_list.append(obj) + if count > 9999: + await push_objects(row_list, myNUCCTaxonomy) + row_list.clear() + count = 0 + else: + count += 1 + + await push_objects(row_list, myNUCCTaxonomy) + print(f"Processed: {count}") + return 1 + + +async def startup(ctx): + loop = asyncio.get_event_loop() + ctx['context'] = {} + ctx['context']['start'] = datetime.datetime.now() + ctx['context']['run'] = 0 + ctx['import_date'] = datetime.datetime.now().strftime("%Y%m%d") + await init_db(db, loop) + import_date = ctx['import_date'] + db_schema = os.getenv('HLTHPRT_DB_SCHEMA') if os.getenv('HLTHPRT_DB_SCHEMA') else 'mrf' + + tables = {} # for the future complex usage + + for cls in (NUCCTaxonomy,): + tables[cls.__main_table__] = make_class(cls, import_date) + obj = tables[cls.__main_table__] + await db.status(f"DROP TABLE IF EXISTS {db_schema}.{obj.__main_table__}_{import_date};") + await obj.__table__.gino.create() + if hasattr(obj, "__my_index_elements__"): + await db.status( + f"CREATE UNIQUE INDEX {obj.__tablename__}_idx_primary ON " + f"{db_schema}.{obj.__tablename__} ({', '.join(obj.__my_index_elements__)});") + + print("Preparing done") + + +async def shutdown(ctx): + import_date = ctx['import_date'] + db_schema = os.getenv('DB_SCHEMA') if os.getenv('DB_SCHEMA') else 'mrf' + tables = {} + async with db.transaction(): + for cls in (NUCCTaxonomy, ): + tables[cls.__main_table__] = make_class(cls, import_date) + obj = tables[cls.__main_table__] + table = obj.__main_table__ + await db.status(f"DROP TABLE IF EXISTS {db_schema}.{table}_old;") + await db.status(f"ALTER TABLE IF EXISTS {db_schema}.{table} RENAME TO {table}_old;") + await db.status(f"ALTER TABLE IF EXISTS {db_schema}.{obj.__tablename__} RENAME TO {table};") + + await db.status(f"ALTER INDEX IF EXISTS " + f"{db_schema}.{table}_idx_primary RENAME TO " + f"{table}_idx_primary_old;") + + await db.status(f"ALTER INDEX IF EXISTS " + f"{db_schema}.{obj.__tablename__}_idx_primary RENAME TO " + f"{table}_idx_primary;") + + print_time_info(ctx['context']['start']) + + +async def main(): + redis = await create_pool(RedisSettings.from_dsn(os.environ.get('HLTHPRT_REDIS_ADDRESS')), + job_serializer=msgpack.packb, + job_deserializer=lambda b: msgpack.unpackb(b, raw=False)) + x = await redis.enqueue_job('process_data') \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt index a0884bc..93c7473 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,6 +1,6 @@ aiocsv sanic -git+https://github.com/samuelcolvin/arq +arq gino python-dotenv alembic diff --git a/requirements.txt b/requirements.txt index 812aacd..cb75970 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,54 +1,45 @@ aiocsv==1.2.3 aiofile==3.8.1 aiofiles==22.1.0 -alembic==1.8.1 -anyio==3.6.1 -appdirs==1.4.4 -arq @ git+https://github.com/samuelcolvin/arq@1495be6c234509df1dfdc0db620388d909c8cdb8 +aioshutil==1.2 +alembic==1.9.1 +anyio==3.6.2 +arq==0.25 async-timeout==4.0.2 -async-unzip==0.2.1 -asyncpg==0.26.0 -caio==0.9.8 -certifi==2022.9.24 -charset-normalizer==2.1.1 +async-unzip==0.3.1 +asyncpg==0.27.0 +caio==0.9.11 +certifi==2022.12.7 click==8.1.3 -Deprecated==1.2.13 fastcrc==0.2.1 gino==1.0.1 -h11==0.12.0 -hiredis==2.0.0 -httpcore==0.15.0 +h11==0.14.0 +hiredis==2.1.0 +httpcore==0.16.3 httptools==0.5.0 -httpx==0.23.0 +httpx==0.23.3 humanize==4.4.0 idna==3.4 -ijson==3.1.4 -latest-user-agents==0.0.3 -Mako==1.2.3 +ijson==3.2.0.post0 +Mako==1.2.4 MarkupSafe==2.1.1 msgpack==1.0.4 -multidict==6.0.2 -packaging==21.3 +multidict==6.0.4 pyaml==21.10.1 -pylightxl==1.60 -pyparsing==3.0.9 +pylightxl==1.61 python-dateutil==2.8.2 python-dotenv==0.21.0 -python-socks==2.0.3 -pytz==2022.2.1 +pytz==2022.7 PyYAML==6.0 -redis==4.3.4 -requests==2.28.1 +redis==4.4.0 rfc3986==1.5.0 -sanic==22.6.2 -sanic-routing==22.3.0 +sanic==22.12.0 +sanic-routing==22.8.0 six==1.16.0 sniffio==1.3.0 socksio==1.0.0 SQLAlchemy==1.3.24 -typing_extensions==4.3.0 -ujson==5.5.0 -urllib3==1.26.12 +typing_extensions==4.4.0 +ujson==5.6.0 uvloop==0.17.0 -websockets==10.3 -wrapt==1.14.1 +websockets==10.4