Skip to content

Commit

Permalink
Add plan prices. Extend Issuer Name list
Browse files Browse the repository at this point in the history
  • Loading branch information
dnikolayev committed Oct 16, 2023
1 parent 4d42978 commit 24c2567
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 22 deletions.
38 changes: 37 additions & 1 deletion api/endpoint/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from sanic import response
from sanic import Blueprint

from db.models import db, PlanNetworkTierRaw, PlanNPIRaw, Plan, PlanFormulary, Issuer, ImportLog, PlanAttributes
from db.models import db, PlanPrices, PlanNetworkTierRaw, PlanNPIRaw, Plan, PlanFormulary, Issuer, ImportLog, \
PlanAttributes

blueprint = Blueprint('plan', url_prefix='/plan', version=1)

Expand Down Expand Up @@ -113,6 +114,41 @@ async def get_plans(text, limit=10):
return response.json({'plans': list(await get_plans(text))})


@blueprint.get('/price/<plan_id>', name="get_price_plan_by_plan_id")
@blueprint.get('/price/<plan_id>/year', name="get_price_plan_by_plan_id_and_year")
async def get_price_plan(request, plan_id, year=None, variant=None):
age = request.args.get("age")
rating_area = request.args.get("rating_area")

q = PlanPrices.query.where(PlanPrices.plan_id == plan_id)
if year:
try:
year = int(year)
except:
raise sanic.exceptions.BadRequest
q = q.where(PlanPrices.year == year)

if age:
try:
age = int(age)
except:
raise sanic.exceptions.BadRequest
q = q.where(PlanPrices.min_age <= age).where(PlanPrices.max_age >= age)

if rating_area:
q = q.where(PlanPrices.rating_area_id == rating_area)

q = q.order_by(PlanPrices.year, PlanPrices.rating_area_id, PlanPrices.min_age)

res = []
async with db.acquire() as conn:
async with conn.transaction():
async for x in q.gino.iterate():
res.append(x.to_json_dict())

return response.json(res)


@blueprint.get('/id/<plan_id>', name="get_plan_by_plan_id")
@blueprint.get('/id/<plan_id>/<year>', name="get_plan_by_plan_id_and_year")
@blueprint.get('/id/<plan_id>/<year>/<variant>', name="get_plan_variant_by_plan_id_and_year")
Expand Down
6 changes: 3 additions & 3 deletions api/for_human.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@
'TEHBDedInnTier2FamilyPerGroup': 'Combined Medical and Drug EHB Deductible, In Network (Tier 2), Family Per Group',
'TEHBDedInnTier2Coinsurance': 'Combined Medical and Drug EHB Deductible, In Network (Tier 2), Default Coinsurance',
'TEHBDedOutOfNetIndividual': 'Combined Medical and Drug EHB Deductible, Out of Network, Individual',
'TEHBDedOutofNetFamily': 'Combined Medical and Drug EHB Deductible, Out of Network, Family',
'TEHBDedOutofNetFamilyPerPerson': 'Combined Medical and Drug EHB Deductible, Out of Network, Family Per Person',
'TEHBDedOutofNetFamilyPerGroup': 'Combined Medical and Drug EHB Deductible, Out of Network, Family Per Group',
'TEHBDedOutOfNetFamily': 'Combined Medical and Drug EHB Deductible, Out of Network, Family',
'TEHBDedOutOfNetFamilyPerPerson': 'Combined Medical and Drug EHB Deductible, Out of Network, Family Per Person',
'TEHBDedOutOfNetFamilyPerGroup': 'Combined Medical and Drug EHB Deductible, Out of Network, Family Per Group',
'TEHBDedCombInnOonIndividual': 'Combined Medical and Drug EHB Deductible, Combined In/Out of Network, Individual',
'TEHBDedCombInnOonFamily': 'Combined Medical and Drug EHB Deductible, Combined In/Out of Network, Family',
'TEHBDedCombInnOonFamilyPerPerson': 'Combined Medical and Drug EHB Deductible, Combined In/Out of Network, '
Expand Down
45 changes: 44 additions & 1 deletion db/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from sqlalchemy import DateTime, Numeric, DATE, Column,\
String, Integer, Float, BigInteger, Boolean, ARRAY, JSON, TIMESTAMP, TEXT
String, Integer, Float, BigInteger, Boolean, ARRAY, JSON, TIMESTAMP, TEXT, SMALLINT

from sqlalchemy.dialects.postgresql import MONEY

from db.connection import db
from db.json_mixin import JSONOutputMixin
Expand Down Expand Up @@ -129,12 +131,53 @@ class PlanAttributes(db.Model, JSONOutputMixin):
{'schema': os.getenv('DB_SCHEMA') or 'mrf', 'extend_existing': True},
)
__my_index_elements__ = ['full_plan_id', 'year', 'attr_name']
__my_additional_indexes__ = [
{'index_elements': ('full_plan_id gin_trgm_ops', 'year'),
'using': 'gin',
'name': 'find_all_variants'}]
full_plan_id = Column(db.String(17), nullable=False)
year = Column(Integer)
attr_name = Column(String)
attr_value = Column(String)


class PlanPrices(db.Model, JSONOutputMixin):
__tablename__ = 'plan_prices'
__main_table__ = __tablename__
__table_args__ = (
{'schema': os.getenv('DB_SCHEMA') or 'mrf', 'extend_existing': True},
)
__my_index_elements__ = ['plan_id', 'year', 'checksum', ]
__my_additional_indexes__ = [
{'index_elements': ('state', 'year', 'min_age', 'max_age', 'rating_area_id', 'couple'),
'using': 'gin',
'name': 'find_plan'}]

plan_id = Column(db.String(14), nullable=False)
year = Column(Integer)
state = Column(String(2))
checksum = Column(Integer)
rate_effective_date = Column(DATE)
rate_expiration_date = Column(DATE)
rating_area_id = Column(String)
tobacco = Column(String)
min_age = Column(SMALLINT)
max_age = Column(SMALLINT)
individual_rate = Column(Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None))
individual_tobacco_rate = Column(Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None))
couple = Column(Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None))
primary_subscriber_and_one_dependent = Column(
Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None))
primary_subscriber_and_two_dependents = Column(
Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None))
primary_subscriber_and_three_or_more_dependents = Column(
Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None))
couple_and_one_dependent = Column(Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None))
couple_and_two_dependents = Column(Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None))
couple_and_three_or_more_dependents = Column(
Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None))


class PlanTransparency(db.Model, JSONOutputMixin):
__tablename__ = 'plan_transparency'
__main_table__ = __tablename__
Expand Down
8 changes: 4 additions & 4 deletions process/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from process.initial import main as initiate_mrf, init_file, startup as initial_startup, shutdown as initial_shutdown, \
process_plan, process_json_index, process_provider
from process.attributes import main as initiate_plan_attributes, save_attributes, process_state_attributes, \
process_attributes, startup as attr_startup, shutdown as attr_shutdown
process_attributes, process_prices, startup as attr_startup, shutdown as attr_shutdown
from process.npi import main as initiate_npi, process_npi_chunk, save_npi_data, startup as npi_startup, \
shutdown as npi_shutdown, process_data as process_npi_data
from process.nucc import main as initiate_nucc, startup as nucc_startup, shutdown as nucc_shutdown, \
Expand All @@ -32,14 +32,14 @@ class MRF:


class Attributes:
functions = [process_attributes, process_state_attributes, save_attributes]
functions = [process_attributes, process_state_attributes, process_prices, save_attributes]
on_startup = attr_startup
on_shutdown = attr_shutdown
max_jobs=20
queue_read_limit = 5
redis_settings = RedisSettings.from_dsn(os.environ.get('HLTHPRT_REDIS_ADDRESS'))
job_serializer = msgpack.packb
job_deserializer = lambda b: msgpack.unpackb(b, raw=False)
job_serializer = lambda b: msgpack.packb(b, datetime=True)
job_deserializer = lambda b: msgpack.unpackb(b, timestamp=3, raw=False)


class NPI:
Expand Down
140 changes: 134 additions & 6 deletions process/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import msgpack
import asyncio
import datetime
import pytz
import tempfile
import json
import glob
Expand All @@ -18,7 +19,7 @@
from process.ext.utils import download_it_and_save, make_class, push_objects, log_error, print_time_info, \
flush_error_log, return_checksum
from dateutil.parser import parse as parse_date
from db.models import Issuer, Plan, PlanAttributes, db
from db.models import Issuer, Plan, PlanAttributes, PlanPrices, db
from db.connection import init_db

from api.for_human import plan_attributes_labels_to_key
Expand All @@ -38,7 +39,7 @@ async def startup(ctx):

tables = {} # for the future complex usage

for cls in (PlanAttributes,):
for cls in (PlanAttributes, PlanPrices, ):
tables[cls.__main_table__] = make_class(cls, import_date)
obj = tables[cls.__main_table__]
await db.status(f"DROP TABLE IF EXISTS {db_schema}.{obj.__main_table__}_{import_date};")
Expand All @@ -54,10 +55,35 @@ async def shutdown(ctx):
import_date = ctx['import_date']
db_schema = os.getenv('DB_SCHEMA') if os.getenv('DB_SCHEMA') else 'mrf'
tables = {}
async with db.transaction():
for cls in (PlanAttributes,):

processing_classes_array = (PlanAttributes, PlanPrices, )

for cls in processing_classes_array:
tables[cls.__main_table__] = make_class(cls, import_date)
obj = tables[cls.__main_table__]

if hasattr(cls, '__my_additional_indexes__') and cls.__my_additional_indexes__:
for index in cls.__my_additional_indexes__:
index_name = index.get('name', '_'.join(index.get('index_elements')))
using = ""
if t := index.get('using'):
using = f"USING {t} "
create_index_sql = f"CREATE INDEX IF NOT EXISTS {obj.__tablename__}_idx_{index_name} " \
f"ON {db_schema}.{obj.__tablename__} {using}" \
f"({', '.join(index.get('index_elements'))});"
print(create_index_sql)
x = await db.status(create_index_sql)

print(f"Post-Index VACUUM FULL ANALYZE {db_schema}.{obj.__tablename__};");
await db.status(f"VACUUM FULL ANALYZE {db_schema}.{obj.__tablename__};")



async with db.transaction() as tx:
for cls in processing_classes_array:
tables[cls.__main_table__] = make_class(cls, import_date)
obj = tables[cls.__main_table__]

table = obj.__main_table__
await db.status(f"DROP TABLE IF EXISTS {db_schema}.{table}_old;")
await db.status(f"ALTER TABLE IF EXISTS {db_schema}.{table} RENAME TO {table}_old;")
Expand All @@ -71,12 +97,25 @@ async def shutdown(ctx):
f"{db_schema}.{obj.__tablename__}_idx_primary RENAME TO "
f"{table}_idx_primary;")

if hasattr(cls, '__my_additional_indexes__') and obj.__my_additional_indexes__:
for index in obj.__my_additional_indexes__:
index_name = index.get('name', '_'.join(index.get('index_elements')))
await db.status(f"ALTER INDEX IF EXISTS "
f"{db_schema}.{table}_idx_{index_name} RENAME TO "
f"{table}_idx_{index_name}_old;")
await db.status(f"ALTER INDEX IF EXISTS "
f"{db_schema}.{obj.__tablename__}_idx_{index_name} RENAME TO "
f"{table}_idx_{index_name};")

print_time_info(ctx['context']['start'])


async def save_attributes(ctx, task):
import_date = ctx['import_date']
myplanattributes = make_class(PlanAttributes, import_date)
if ('type' in task) and task['type'] == 'PlanPrices':
myplanattributes = make_class(PlanPrices, import_date)
else:
myplanattributes = make_class(PlanAttributes, import_date)
await push_objects(task['attr_obj_list'], myplanattributes)


Expand Down Expand Up @@ -135,7 +174,88 @@ async def process_attributes(ctx, task):
if attr_obj_list:
await push_objects(attr_obj_list, myplanattributes)

async def process_prices(ctx, task):
redis = ctx['redis']

print('Downloading data from: ', task['url'])

import_date = ctx['import_date']
myissuer = make_class(Issuer, import_date)
myplanprices = make_class(PlanPrices, import_date)

with tempfile.TemporaryDirectory() as tmpdirname:
p = 'rate.csv'
tmp_filename = str(PurePath(str(tmpdirname), p + '.zip'))
await download_it_and_save(task['url'], tmp_filename)
await unzip(tmp_filename, tmpdirname)

tmp_filename = glob.glob(f"{tmpdirname}/*.csv")[0]
total_count = 0
attr_obj_list = []

count = 0

range_regex = re.compile(r'^(\d+)-(\d+)$')
int_more_regex = re.compile(r'^(\d+) and over$')
clean_int = re.compile(r'^(\d+)$')
async with async_open(tmp_filename, 'r') as afp:
async for row in AsyncDictReader(afp, delimiter=","):
if not row['PlanId']:
continue
count += 1

obj = {
'plan_id': row['PlanId'],
'state': row['StateCode'].upper(),
'year': int(task['year']),
'rate_effective_date': pytz.utc.localize(
parse_date(row['RateEffectiveDate'], fuzzy=True)) if row['RateEffectiveDate'] else None,
'rate_expiration_date': pytz.utc.localize(
parse_date(row['RateExpirationDate'], fuzzy=True)) if row[
'RateExpirationDate'] else None,
'rating_area_id': row['RatingAreaId'],
'tobacco': row['Tobacco'],
'min_age': 0,
'max_age': 125,
'individual_rate': float(row['IndividualRate']) if row['IndividualRate'] else None,
'individual_tobacco_rate': float(row['IndividualTobaccoRate']) if row['IndividualTobaccoRate'] else None,
'couple': float(row['Couple']) if row['Couple'] else None,
'primary_subscriber_and_one_dependent': float(row['PrimarySubscriberAndOneDependent']) if row['PrimarySubscriberAndOneDependent'] else None,
'primary_subscriber_and_two_dependents': float(row['PrimarySubscriberAndTwoDependents']) if row['PrimarySubscriberAndTwoDependents'] else None,
'primary_subscriber_and_three_or_more_dependents': float(row[
'PrimarySubscriberAndThreeOrMoreDependents']) if row[
'PrimarySubscriberAndThreeOrMoreDependents'] else None,
'couple_and_one_dependent': float(row['CoupleAndOneDependent']) if row['CoupleAndOneDependent'] else None,
'couple_and_two_dependents': float(row['CoupleAndTwoDependents']) if row['CoupleAndTwoDependents'] else None,
'couple_and_three_or_more_dependents': float(row['CoupleAndThreeOrMoreDependents']) if row['CoupleAndThreeOrMoreDependents'] else None,
}

match row['Age'].strip():
case x if t := clean_int.search(x):
obj['min_age'] = int(t.group(1))
obj['max_age'] = obj['min_age']
case x if t := range_regex.search(x):
obj['min_age'] = int(t.group(1))
obj['max_age'] = int(t.group(2))
case x if t := int_more_regex.search(x):
obj['min_age'] = int(t.group(1))

obj['checksum'] = return_checksum(
[obj['plan_id'], obj['year'], obj['rate_effective_date'], obj['rate_expiration_date'],
obj['rating_area_id'], obj['min_age'], obj['max_age']])

attr_obj_list.append(obj)

if count > 1000000:
total_count += count
await redis.enqueue_job('save_attributes', {'type': 'PlanPrices', 'attr_obj_list': attr_obj_list})
attr_obj_list.clear()
count = 0
else:
count += 1

if attr_obj_list:
await push_objects(attr_obj_list, myplanprices)



Expand Down Expand Up @@ -358,13 +478,21 @@ async def main():
attribute_files = json.loads(os.environ['HLTHPRT_CMSGOV_PLAN_ATTRIBUTES_URL_PUF'])
state_attribute_files = json.loads(os.environ['HLTHPRT_CMSGOV_STATE_PLAN_ATTRIBUTES_URL_PUF'])

price_files = json.loads(os.environ['HLTHPRT_CMSGOV_PRICE_PLAN_URL_PUF'])


print("Starting to process STATE Plan Attribute files..")
for file in state_attribute_files:
print("Adding: ", file)
x = await redis.enqueue_job('process_state_attributes', {'url': file['url'], 'year': file['year']})


print("Starting to process Plan Attribute files..")
for file in attribute_files:
print("Adding: ", file)
x = await redis.enqueue_job('process_attributes', {'url': file['url'], 'year': file['year']})
x = await redis.enqueue_job('process_attributes', {'url': file['url'], 'year': file['year']})

print("Starting to process Plan Prices files..")
for file in price_files:
print("Adding: ", file)
x = await redis.enqueue_job('process_prices', {'url': file['url'], 'year': file['year']})
Loading

0 comments on commit 24c2567

Please sign in to comment.