Skip to content

Commit

Permalink
Iteration: automated updated with saving GEO location: postgis, mapbo…
Browse files Browse the repository at this point in the history
…x and google maps support
  • Loading branch information
dnikolayev committed Feb 6, 2023
1 parent 4083c75 commit ad248fd
Show file tree
Hide file tree
Showing 17 changed files with 867 additions and 366 deletions.
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/venv/*
/venv*
.env
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM python:3.9-bullseye
FROM python:3.11-bullseye

WORKDIR /wheels
ADD ./requirements.txt /wheels
ADD ./requirements-dev.txt /wheels

WORKDIR /opt
RUN apt update && apt install -y nginx && python3.9 -m venv venv && . venv/bin/activate && pip install --no-compile --upgrade pip && \
pip install --no-compile -r /wheels/requirements.txt -f /wheels \
RUN apt update && apt install -y nginx && python3 -m venv venv && . venv/bin/activate && pip install --no-compile --upgrade pip && \
pip install --no-compile -r /wheels/requirements-dev.txt -f /wheels \
&& rm -rf /wheels \
&& rm -rf /root/.cache/pip/* && \
find . -name *.pyc -execdir rm -f {} \;
Expand Down
8 changes: 6 additions & 2 deletions api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
from api.endpoint.plan import blueprint as v1_plan
from api.endpoint.importer import blueprint as v1_import
from api.endpoint.issuer import blueprint as v1_issuer
from api.endpoint.npi import blueprint as v1_npi
from api.endpoint.nucc import blueprint as v1_nucc



def init_api(api):
db.init_app(api)
api_bluenprint = Blueprint.group([v1_healthcheck, v1_plan, v1_import, v1_issuer], version_prefix="/api/v")
api.blueprint(api_bluenprint)
api_bluenprint = Blueprint.group([v1_healthcheck, v1_plan, v1_import, v1_issuer, v1_npi, v1_nucc],
version_prefix="/api/v")
api.blueprint(api_bluenprint)
8 changes: 2 additions & 6 deletions api/endpoint/importer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import asyncio
from datetime import datetime
import urllib.parse

import sanic.exceptions
from sanic import response
from sanic import Blueprint
from sqlalchemy import literal

from db.models import db, Issuer, Plan, PlanFormulary, PlanTransparency, ImportLog, ImportHistory

from db.models import db, Issuer, Plan, ImportLog, ImportHistory

blueprint = Blueprint('import', url_prefix='/import', version=1)

Expand Down
6 changes: 1 addition & 5 deletions api/endpoint/issuer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import asyncio
from datetime import datetime
import urllib.parse

import sanic.exceptions
from sanic import response
from sanic import Blueprint

from db.models import db, Plan, PlanFormulary, Issuer, ImportLog
from db.models import db, Plan, Issuer, ImportLog

blueprint = Blueprint('issuer', url_prefix='/issuer', version=1)

Expand Down
370 changes: 370 additions & 0 deletions api/endpoint/npi.py

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions api/endpoint/nucc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import asyncio
from datetime import datetime
from api.for_human import attributes_labels
import urllib.parse
from sqlalchemy.sql import func

import sanic.exceptions
from sanic import response
from sanic import Blueprint

from db.models import db, NUCCTaxonomy

blueprint = Blueprint('nucc', url_prefix='/nucc', version=1)


@blueprint.get('/')
async def index_status_nucc(request):

return response.json({})

@blueprint.get('/all')
async def all_of_nucc(request):
# plan_data = await NUCCTaxonomy.query.gino.all()
data = []
async with db.transaction():
async for p in NUCCTaxonomy.query.gino.iterate():
data.append(p.to_json_dict())
return response.json(data)
2 changes: 1 addition & 1 deletion api/endpoint/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def all_plans(request):
return response.json(data)

@blueprint.get('/all/variants')
async def all_plans(request):
async def all_plans_variants(request):
plan_data = await db.select(
[Plan.marketing_name, Plan.plan_id, PlanAttributes.full_plan_id, Plan.year]).select_from(Plan.join(PlanAttributes, ((Plan.plan_id == func.substr(PlanAttributes.full_plan_id, 1, 14)) & (Plan.year == PlanAttributes.year)))).\
group_by(PlanAttributes.full_plan_id, Plan.plan_id, Plan.marketing_name, Plan.year).gino.all()
Expand Down
90 changes: 72 additions & 18 deletions db/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from sqlalchemy import DateTime, DATE, Column, String, Integer, Float, BigInteger, Boolean, ARRAY, JSON, TIMESTAMP, TEXT
from sqlalchemy import DateTime, Numeric, DATE, Column, String, Integer, Float, BigInteger, Boolean, ARRAY, JSON, TIMESTAMP, TEXT

from db.connection import db
from db.json_mixin import JSONOutputMixin
Expand Down Expand Up @@ -95,7 +95,6 @@ class PlanAttributes(db.Model, JSONOutputMixin):
attr_value = Column(String)



class PlanTransparency(db.Model, JSONOutputMixin):
__tablename__ = 'plan_transparency'
__main_table__ = __tablename__
Expand All @@ -115,6 +114,7 @@ class PlanTransparency(db.Model, JSONOutputMixin):
metal = Column(String)
claims_payment_policies_url = Column(String)


class NPIData(db.Model, JSONOutputMixin):
__tablename__ = 'npi'
__main_table__ = __tablename__
Expand Down Expand Up @@ -142,22 +142,6 @@ class NPIData(db.Model, JSONOutputMixin):
provider_other_name_suffix_text = Column(String)
provider_other_credential_text = Column(String)
provider_other_last_name_type_code = Column(String)
provider_first_line_business_mailing_address = Column(String)
provider_second_line_business_mailing_address = Column(String)
provider_business_mailing_address_city_name = Column(String)
provider_business_mailing_address_state_name = Column(String)
provider_business_mailing_address_postal_code = Column(String)
provider_business_mailing_address_country_code = Column(String)
provider_business_mailing_address_telephone_number = Column(String)
provider_business_mailing_address_fax_number = Column(String)
provider_first_line_business_practice_location_address = Column(String)
provider_second_line_business_practice_location_address = Column(String)
provider_business_practice_location_address_city_name = Column(String)
provider_business_practice_location_address_state_name = Column(String)
provider_business_practice_location_address_postal_code = Column(String)
provider_business_practice_location_address_country_code = Column(String)
provider_business_practice_location_address_telephone_number = Column(String)
provider_business_practice_location_address_fax_number = Column(String)
provider_enumeration_date = Column(DATE)
last_update_date = Column(DATE)
npi_deactivation_reason_code = Column(String)
Expand Down Expand Up @@ -185,6 +169,7 @@ class NPIDataTaxonomy(db.Model, JSONOutputMixin):
{'schema': os.getenv('DB_SCHEMA') or 'mrf', 'extend_existing': True},
)
__my_index_elements__ = ['npi', 'checksum']
__my_additional_indexes__ = [{'index_elements': ('healthcare_provider_taxonomy_code', 'npi',)}, ]

npi = Column(BigInteger)
checksum = Column(BigInteger)
Expand Down Expand Up @@ -219,3 +204,72 @@ class NPIDataTaxonomyGroup(db.Model, JSONOutputMixin):
npi = Column(BigInteger)
checksum = Column(BigInteger)
healthcare_provider_taxonomy_group = Column(String)


class NUCCTaxonomy(db.Model, JSONOutputMixin):
__tablename__ = 'nucc_taxonomy'
__main_table__ = __tablename__
__table_args__ = (
{'schema': os.getenv('DB_SCHEMA') or 'mrf', 'extend_existing': True},
)
__my_index_elements__ = ['code']
__my_additional_indexes__ = [{'index_elements': ('int_code',)}, {'index_elements': ('display_name',)}]

int_code = Column(Integer)
code = Column(String)
grouping = Column(String)
classification = Column(String)
specialization = Column(String)
definition = Column(TEXT)
notes = Column(TEXT)
display_name = Column(String)
section = Column(String)



class AddressPrototype(db.Model, JSONOutputMixin):
__table_args__ = (
{'schema': os.getenv('DB_SCHEMA') or 'mrf', 'extend_existing': True},
)

checksum = Column(BigInteger)
first_line = Column(String)
second_line = Column(String)
city_name = Column(String)
state_name = Column(String)
postal_code = Column(String)
country_code = Column(String)
telephone_number = Column(String)
fax_number = Column(String)
formatted_address = Column(String)
lat = Column(Numeric(scale=8, precision=11, asdecimal=False, decimal_return_scale=None))
long = Column(Numeric(scale=8, precision=11, asdecimal=False, decimal_return_scale=None))
date_added = Column(DATE)
place_id = Column(String)


class AddressArchive(AddressPrototype):
__tablename__ = 'address_archive'
__main_table__ = __tablename__
__my_index_elements__ = ['checksum']
# __my_additional_indexes__ = [{'index_elements': ('healthcare_provider_taxonomy_code', 'npi',)}, ]


class NPIAddress(AddressPrototype):
__tablename__ = 'npi_address'
__main_table__ = __tablename__
__my_index_elements__ = ['npi', 'checksum', 'type']
__my_additional_indexes__ = [{'index_elements': ('postal_code',)},
{'index_elements': ('checksum',), 'using': 'gin'},
{'index_elements': ('city_name', 'state_name', 'country_code'), 'using': 'gin'},
{'index_elements': (
'Geography(ST_MakePoint(long, lat))', 'taxonomy_array gist__intbig_ops', 'plans_array gist__intbig_ops'),
'using': 'gist',
'name': 'geo_index_with_taxonomy_and_plans'}]

npi = Column(BigInteger)
type = Column(String)
taxonomy_array = Column(ARRAY(Integer))
plans_array = Column(ARRAY(Integer))

# NPI Provider Secondary Practice Location Address- Address Line 1 Provider Secondary Practice Location Address- Address Line 2 Provider Secondary Practice Location Address - City Name Provider Secondary Practice Location Address - State Name Provider Secondary Practice Location Address - Postal Code Provider Secondary Practice Location Address - Country Code (If outside U.S.) Provider Secondary Practice Location Address - Telephone Number Provider Secondary Practice Location Address - Telephone Extension Provider Practice Location Address - Fax Number
14 changes: 7 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,25 @@
from asyncpg import connection
from asyncpg.connection import ServerCapabilities
from sanic import Sanic
from api import init_api

env_path = Path(__file__).absolute().parent / '.env'
from dotenv import load_dotenv
load_dotenv(dotenv_path=env_path)
from api import init_api


import arq.cli

from db.migrator import db_group


env_path = Path(__file__).absolute().parent / '.env'
load_dotenv(dotenv_path=env_path)
with open(os.environ['HLTHPRT_LOG_CFG'], encoding="utf-8") as fobj:
logging.config.dictConfig(yaml.safe_load(fobj))

from process import process_group



api = Sanic('mrf-api', env_prefix="HLTHPRT_")
init_api(api)

@click.command(help="Run sanic server")
@click.option('--host', help='Setup host ip to listen up, default to 0.0.0.0', default='0.0.0.0')
Expand All @@ -39,10 +41,8 @@ def start(host, port, workers, debug, accesslog):
sql_reset=False,
sql_close_all=False
)
api = Sanic(__name__, env_prefix="HLTHPRT_")
if debug:
os.environ['HLTHPRT_DB_ECHO'] = 'True'
init_api(api)
with open(api.config['LOG_CFG']) as fobj:
logging.config.dictConfig(yaml.safe_load(fobj))
api.run(
Expand Down
20 changes: 20 additions & 0 deletions process/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from process.initial import main as initiate_mrf, init_file, startup as initial_startup, shutdown as initial_shutdown, process_plan, process_json_index
from process.attributes import main as initiate_plan_attributes, save_attributes, process_attributes, startup as attr_startup, shutdown as attr_shutdown
from process.npi import main as initiate_npi, process_npi_chunk, save_npi_data, startup as npi_startup, shutdown as npi_shutdown, process_data as process_npi_data
from process.nucc import main as initiate_nucc, startup as nucc_startup, shutdown as nucc_shutdown, process_data as process_nucc_data
uvloop.install()


Expand Down Expand Up @@ -45,6 +46,18 @@ class NPI:
job_serializer = lambda b: msgpack.packb(b, datetime=True)
job_deserializer = lambda b: msgpack.unpackb(b, timestamp=3, raw=False)

class NUCC:
functions = [process_nucc_data]
on_startup = nucc_startup
on_shutdown = nucc_shutdown
max_jobs=20
queue_read_limit = 5
job_timeout=86400
redis_settings = RedisSettings.from_dsn(os.environ.get('HLTHPRT_REDIS_ADDRESS'))
job_serializer = lambda b: msgpack.packb(b, datetime=True)
job_deserializer = lambda b: msgpack.unpackb(b, timestamp=3, raw=False)


@click.group()
def process_group():
"""
Expand All @@ -65,7 +78,14 @@ def npi():
asyncio.run(initiate_npi())


@click.command(help="Run NUCC Taxonomy Import")
def nucc():
asyncio.run(initiate_nucc())


process_group.add_command(mrf)
process_group.add_command(plan_attributes)
process_group.add_command(npi)
process_group.add_command(nucc)


14 changes: 11 additions & 3 deletions process/ext/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
from asyncpg.exceptions import UniqueViolationError, InterfaceError
from aiofile import async_open
from arq import Retry
from fastcrc import crc32
from fastcrc import crc32, crc16
import humanize
from sqlalchemy.dialects.postgresql import insert
from db.connection import db
from random import choice
import json
from db.json_mixin import JSONOutputMixin

if os.environ.get('HLTHPRT_SOCKS_PROXY'):
transport = httpx.AsyncHTTPTransport(retries=3, proxy=httpx.Proxy(os.environ['HLTHPRT_SOCKS_PROXY']))
print(os.environ.get('HLTHPRT_SOCKS_PROXY'))
print(json.loads(os.environ.get('HLTHPRT_SOCKS_PROXY')))
transport = httpx.AsyncHTTPTransport(retries=3,
proxy=httpx.Proxy(choice(json.loads(os.environ['HLTHPRT_SOCKS_PROXY']))))
else:
transport = httpx.AsyncHTTPTransport(retries=3)

Expand Down Expand Up @@ -104,13 +109,16 @@ class MyClass(Base):
err_obj_list = []
err_obj_key = {}

def return_checksum(arr: list):
def return_checksum(arr: list, crc=32):
for i in range(0, len(arr)):
arr[i] = str(arr[i])
checksum = '|'.join(arr)
checksum = bytes(checksum, 'utf-8')
if crc == 16:
return crc16.xmodem(checksum)
return crc32.cksum(checksum)


async def log_error(type, error, issuer_array, url, source, level, cls):
for issuer_id in issuer_array:
checksum = return_checksum([type, str(error), str(issuer_id), str(url), source, level])
Expand Down
10 changes: 9 additions & 1 deletion process/initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ async def init_file(ctx):

tmp_filename = glob.glob(f"{tmpdirname}/*.xlsx")[0]
xls_file = xl.readxl(tmp_filename)
ws_name = xls_file.ws_names[1]
ws_name = xls_file.ws_names[-1]
os.unlink(tmp_filename)

count = 0
Expand Down Expand Up @@ -364,6 +364,14 @@ async def shutdown(ctx):
await db.status("CREATE EXTENSION IF NOT EXISTS pg_trgm;")
await db.status("CREATE EXTENSION IF NOT EXISTS btree_gin;")

test = make_class(Plan, import_date)
plans_count = await db.func.count(test.plan_id).gino.scalar()
if (not plans_count) or (plans_count < 500):
print(f"Failed Import: Plans number:{plans_count}")
exit(1)



tables = {}
async with db.transaction():
for cls in (Issuer, Plan, PlanFormulary, PlanTransparency, ImportLog):
Expand Down
Loading

0 comments on commit ad248fd

Please sign in to comment.