From 24c2567b0cb9ef3a2cf3148904c24df680ae5b09 Mon Sep 17 00:00:00 2001 From: Dmytro Nikolayev Date: Mon, 16 Oct 2023 18:10:39 +0200 Subject: [PATCH] Add plan prices. Extend Issuer Name list --- api/endpoint/plan.py | 38 +++++++++++- api/for_human.py | 6 +- db/models.py | 45 +++++++++++++- process/__init__.py | 8 +-- process/attributes.py | 140 ++++++++++++++++++++++++++++++++++++++++-- process/initial.py | 59 +++++++++++++++--- 6 files changed, 274 insertions(+), 22 deletions(-) diff --git a/api/endpoint/plan.py b/api/endpoint/plan.py index 73473ee..da10dbf 100644 --- a/api/endpoint/plan.py +++ b/api/endpoint/plan.py @@ -9,7 +9,8 @@ from sanic import response from sanic import Blueprint -from db.models import db, PlanNetworkTierRaw, PlanNPIRaw, Plan, PlanFormulary, Issuer, ImportLog, PlanAttributes +from db.models import db, PlanPrices, PlanNetworkTierRaw, PlanNPIRaw, Plan, PlanFormulary, Issuer, ImportLog, \ + PlanAttributes blueprint = Blueprint('plan', url_prefix='/plan', version=1) @@ -113,6 +114,41 @@ async def get_plans(text, limit=10): return response.json({'plans': list(await get_plans(text))}) +@blueprint.get('/price/', name="get_price_plan_by_plan_id") +@blueprint.get('/price//year', name="get_price_plan_by_plan_id_and_year") +async def get_price_plan(request, plan_id, year=None, variant=None): + age = request.args.get("age") + rating_area = request.args.get("rating_area") + + q = PlanPrices.query.where(PlanPrices.plan_id == plan_id) + if year: + try: + year = int(year) + except: + raise sanic.exceptions.BadRequest + q = q.where(PlanPrices.year == year) + + if age: + try: + age = int(age) + except: + raise sanic.exceptions.BadRequest + q = q.where(PlanPrices.min_age <= age).where(PlanPrices.max_age >= age) + + if rating_area: + q = q.where(PlanPrices.rating_area_id == rating_area) + + q = q.order_by(PlanPrices.year, PlanPrices.rating_area_id, PlanPrices.min_age) + + res = [] + async with db.acquire() as conn: + async with conn.transaction(): + async for x in q.gino.iterate(): + res.append(x.to_json_dict()) + + return response.json(res) + + @blueprint.get('/id/', name="get_plan_by_plan_id") @blueprint.get('/id//', name="get_plan_by_plan_id_and_year") @blueprint.get('/id///', name="get_plan_variant_by_plan_id_and_year") diff --git a/api/for_human.py b/api/for_human.py index ac3d6f1..c384d66 100644 --- a/api/for_human.py +++ b/api/for_human.py @@ -338,9 +338,9 @@ 'TEHBDedInnTier2FamilyPerGroup': 'Combined Medical and Drug EHB Deductible, In Network (Tier 2), Family Per Group', 'TEHBDedInnTier2Coinsurance': 'Combined Medical and Drug EHB Deductible, In Network (Tier 2), Default Coinsurance', 'TEHBDedOutOfNetIndividual': 'Combined Medical and Drug EHB Deductible, Out of Network, Individual', - 'TEHBDedOutofNetFamily': 'Combined Medical and Drug EHB Deductible, Out of Network, Family', - 'TEHBDedOutofNetFamilyPerPerson': 'Combined Medical and Drug EHB Deductible, Out of Network, Family Per Person', - 'TEHBDedOutofNetFamilyPerGroup': 'Combined Medical and Drug EHB Deductible, Out of Network, Family Per Group', + 'TEHBDedOutOfNetFamily': 'Combined Medical and Drug EHB Deductible, Out of Network, Family', + 'TEHBDedOutOfNetFamilyPerPerson': 'Combined Medical and Drug EHB Deductible, Out of Network, Family Per Person', + 'TEHBDedOutOfNetFamilyPerGroup': 'Combined Medical and Drug EHB Deductible, Out of Network, Family Per Group', 'TEHBDedCombInnOonIndividual': 'Combined Medical and Drug EHB Deductible, Combined In/Out of Network, Individual', 'TEHBDedCombInnOonFamily': 'Combined Medical and Drug EHB Deductible, Combined In/Out of Network, Family', 'TEHBDedCombInnOonFamilyPerPerson': 'Combined Medical and Drug EHB Deductible, Combined In/Out of Network, ' diff --git a/db/models.py b/db/models.py index ddfe2b3..644b357 100644 --- a/db/models.py +++ b/db/models.py @@ -1,6 +1,8 @@ import os from sqlalchemy import DateTime, Numeric, DATE, Column,\ - String, Integer, Float, BigInteger, Boolean, ARRAY, JSON, TIMESTAMP, TEXT + String, Integer, Float, BigInteger, Boolean, ARRAY, JSON, TIMESTAMP, TEXT, SMALLINT + +from sqlalchemy.dialects.postgresql import MONEY from db.connection import db from db.json_mixin import JSONOutputMixin @@ -129,12 +131,53 @@ class PlanAttributes(db.Model, JSONOutputMixin): {'schema': os.getenv('DB_SCHEMA') or 'mrf', 'extend_existing': True}, ) __my_index_elements__ = ['full_plan_id', 'year', 'attr_name'] + __my_additional_indexes__ = [ + {'index_elements': ('full_plan_id gin_trgm_ops', 'year'), + 'using': 'gin', + 'name': 'find_all_variants'}] full_plan_id = Column(db.String(17), nullable=False) year = Column(Integer) attr_name = Column(String) attr_value = Column(String) +class PlanPrices(db.Model, JSONOutputMixin): + __tablename__ = 'plan_prices' + __main_table__ = __tablename__ + __table_args__ = ( + {'schema': os.getenv('DB_SCHEMA') or 'mrf', 'extend_existing': True}, + ) + __my_index_elements__ = ['plan_id', 'year', 'checksum', ] + __my_additional_indexes__ = [ + {'index_elements': ('state', 'year', 'min_age', 'max_age', 'rating_area_id', 'couple'), + 'using': 'gin', + 'name': 'find_plan'}] + + plan_id = Column(db.String(14), nullable=False) + year = Column(Integer) + state = Column(String(2)) + checksum = Column(Integer) + rate_effective_date = Column(DATE) + rate_expiration_date = Column(DATE) + rating_area_id = Column(String) + tobacco = Column(String) + min_age = Column(SMALLINT) + max_age = Column(SMALLINT) + individual_rate = Column(Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None)) + individual_tobacco_rate = Column(Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None)) + couple = Column(Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None)) + primary_subscriber_and_one_dependent = Column( + Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None)) + primary_subscriber_and_two_dependents = Column( + Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None)) + primary_subscriber_and_three_or_more_dependents = Column( + Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None)) + couple_and_one_dependent = Column(Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None)) + couple_and_two_dependents = Column(Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None)) + couple_and_three_or_more_dependents = Column( + Numeric(scale=2, precision=8, asdecimal=False, decimal_return_scale=None)) + + class PlanTransparency(db.Model, JSONOutputMixin): __tablename__ = 'plan_transparency' __main_table__ = __tablename__ diff --git a/process/__init__.py b/process/__init__.py index ee5931b..7cc0ab5 100644 --- a/process/__init__.py +++ b/process/__init__.py @@ -9,7 +9,7 @@ from process.initial import main as initiate_mrf, init_file, startup as initial_startup, shutdown as initial_shutdown, \ process_plan, process_json_index, process_provider from process.attributes import main as initiate_plan_attributes, save_attributes, process_state_attributes, \ - process_attributes, startup as attr_startup, shutdown as attr_shutdown + process_attributes, process_prices, startup as attr_startup, shutdown as attr_shutdown from process.npi import main as initiate_npi, process_npi_chunk, save_npi_data, startup as npi_startup, \ shutdown as npi_shutdown, process_data as process_npi_data from process.nucc import main as initiate_nucc, startup as nucc_startup, shutdown as nucc_shutdown, \ @@ -32,14 +32,14 @@ class MRF: class Attributes: - functions = [process_attributes, process_state_attributes, save_attributes] + functions = [process_attributes, process_state_attributes, process_prices, save_attributes] on_startup = attr_startup on_shutdown = attr_shutdown max_jobs=20 queue_read_limit = 5 redis_settings = RedisSettings.from_dsn(os.environ.get('HLTHPRT_REDIS_ADDRESS')) - job_serializer = msgpack.packb - job_deserializer = lambda b: msgpack.unpackb(b, raw=False) + job_serializer = lambda b: msgpack.packb(b, datetime=True) + job_deserializer = lambda b: msgpack.unpackb(b, timestamp=3, raw=False) class NPI: diff --git a/process/attributes.py b/process/attributes.py index 4a42b91..2e76932 100644 --- a/process/attributes.py +++ b/process/attributes.py @@ -2,6 +2,7 @@ import msgpack import asyncio import datetime +import pytz import tempfile import json import glob @@ -18,7 +19,7 @@ from process.ext.utils import download_it_and_save, make_class, push_objects, log_error, print_time_info, \ flush_error_log, return_checksum from dateutil.parser import parse as parse_date -from db.models import Issuer, Plan, PlanAttributes, db +from db.models import Issuer, Plan, PlanAttributes, PlanPrices, db from db.connection import init_db from api.for_human import plan_attributes_labels_to_key @@ -38,7 +39,7 @@ async def startup(ctx): tables = {} # for the future complex usage - for cls in (PlanAttributes,): + for cls in (PlanAttributes, PlanPrices, ): tables[cls.__main_table__] = make_class(cls, import_date) obj = tables[cls.__main_table__] await db.status(f"DROP TABLE IF EXISTS {db_schema}.{obj.__main_table__}_{import_date};") @@ -54,10 +55,35 @@ async def shutdown(ctx): import_date = ctx['import_date'] db_schema = os.getenv('DB_SCHEMA') if os.getenv('DB_SCHEMA') else 'mrf' tables = {} - async with db.transaction(): - for cls in (PlanAttributes,): + + processing_classes_array = (PlanAttributes, PlanPrices, ) + + for cls in processing_classes_array: + tables[cls.__main_table__] = make_class(cls, import_date) + obj = tables[cls.__main_table__] + + if hasattr(cls, '__my_additional_indexes__') and cls.__my_additional_indexes__: + for index in cls.__my_additional_indexes__: + index_name = index.get('name', '_'.join(index.get('index_elements'))) + using = "" + if t := index.get('using'): + using = f"USING {t} " + create_index_sql = f"CREATE INDEX IF NOT EXISTS {obj.__tablename__}_idx_{index_name} " \ + f"ON {db_schema}.{obj.__tablename__} {using}" \ + f"({', '.join(index.get('index_elements'))});" + print(create_index_sql) + x = await db.status(create_index_sql) + + print(f"Post-Index VACUUM FULL ANALYZE {db_schema}.{obj.__tablename__};"); + await db.status(f"VACUUM FULL ANALYZE {db_schema}.{obj.__tablename__};") + + + + async with db.transaction() as tx: + for cls in processing_classes_array: tables[cls.__main_table__] = make_class(cls, import_date) obj = tables[cls.__main_table__] + table = obj.__main_table__ await db.status(f"DROP TABLE IF EXISTS {db_schema}.{table}_old;") await db.status(f"ALTER TABLE IF EXISTS {db_schema}.{table} RENAME TO {table}_old;") @@ -71,12 +97,25 @@ async def shutdown(ctx): f"{db_schema}.{obj.__tablename__}_idx_primary RENAME TO " f"{table}_idx_primary;") + if hasattr(cls, '__my_additional_indexes__') and obj.__my_additional_indexes__: + for index in obj.__my_additional_indexes__: + index_name = index.get('name', '_'.join(index.get('index_elements'))) + await db.status(f"ALTER INDEX IF EXISTS " + f"{db_schema}.{table}_idx_{index_name} RENAME TO " + f"{table}_idx_{index_name}_old;") + await db.status(f"ALTER INDEX IF EXISTS " + f"{db_schema}.{obj.__tablename__}_idx_{index_name} RENAME TO " + f"{table}_idx_{index_name};") + print_time_info(ctx['context']['start']) async def save_attributes(ctx, task): import_date = ctx['import_date'] - myplanattributes = make_class(PlanAttributes, import_date) + if ('type' in task) and task['type'] == 'PlanPrices': + myplanattributes = make_class(PlanPrices, import_date) + else: + myplanattributes = make_class(PlanAttributes, import_date) await push_objects(task['attr_obj_list'], myplanattributes) @@ -135,7 +174,88 @@ async def process_attributes(ctx, task): if attr_obj_list: await push_objects(attr_obj_list, myplanattributes) +async def process_prices(ctx, task): + redis = ctx['redis'] + print('Downloading data from: ', task['url']) + + import_date = ctx['import_date'] + myissuer = make_class(Issuer, import_date) + myplanprices = make_class(PlanPrices, import_date) + + with tempfile.TemporaryDirectory() as tmpdirname: + p = 'rate.csv' + tmp_filename = str(PurePath(str(tmpdirname), p + '.zip')) + await download_it_and_save(task['url'], tmp_filename) + await unzip(tmp_filename, tmpdirname) + + tmp_filename = glob.glob(f"{tmpdirname}/*.csv")[0] + total_count = 0 + attr_obj_list = [] + + count = 0 + + range_regex = re.compile(r'^(\d+)-(\d+)$') + int_more_regex = re.compile(r'^(\d+) and over$') + clean_int = re.compile(r'^(\d+)$') + async with async_open(tmp_filename, 'r') as afp: + async for row in AsyncDictReader(afp, delimiter=","): + if not row['PlanId']: + continue + count += 1 + + obj = { + 'plan_id': row['PlanId'], + 'state': row['StateCode'].upper(), + 'year': int(task['year']), + 'rate_effective_date': pytz.utc.localize( + parse_date(row['RateEffectiveDate'], fuzzy=True)) if row['RateEffectiveDate'] else None, + 'rate_expiration_date': pytz.utc.localize( + parse_date(row['RateExpirationDate'], fuzzy=True)) if row[ + 'RateExpirationDate'] else None, + 'rating_area_id': row['RatingAreaId'], + 'tobacco': row['Tobacco'], + 'min_age': 0, + 'max_age': 125, + 'individual_rate': float(row['IndividualRate']) if row['IndividualRate'] else None, + 'individual_tobacco_rate': float(row['IndividualTobaccoRate']) if row['IndividualTobaccoRate'] else None, + 'couple': float(row['Couple']) if row['Couple'] else None, + 'primary_subscriber_and_one_dependent': float(row['PrimarySubscriberAndOneDependent']) if row['PrimarySubscriberAndOneDependent'] else None, + 'primary_subscriber_and_two_dependents': float(row['PrimarySubscriberAndTwoDependents']) if row['PrimarySubscriberAndTwoDependents'] else None, + 'primary_subscriber_and_three_or_more_dependents': float(row[ + 'PrimarySubscriberAndThreeOrMoreDependents']) if row[ + 'PrimarySubscriberAndThreeOrMoreDependents'] else None, + 'couple_and_one_dependent': float(row['CoupleAndOneDependent']) if row['CoupleAndOneDependent'] else None, + 'couple_and_two_dependents': float(row['CoupleAndTwoDependents']) if row['CoupleAndTwoDependents'] else None, + 'couple_and_three_or_more_dependents': float(row['CoupleAndThreeOrMoreDependents']) if row['CoupleAndThreeOrMoreDependents'] else None, + } + + match row['Age'].strip(): + case x if t := clean_int.search(x): + obj['min_age'] = int(t.group(1)) + obj['max_age'] = obj['min_age'] + case x if t := range_regex.search(x): + obj['min_age'] = int(t.group(1)) + obj['max_age'] = int(t.group(2)) + case x if t := int_more_regex.search(x): + obj['min_age'] = int(t.group(1)) + + obj['checksum'] = return_checksum( + [obj['plan_id'], obj['year'], obj['rate_effective_date'], obj['rate_expiration_date'], + obj['rating_area_id'], obj['min_age'], obj['max_age']]) + + attr_obj_list.append(obj) + + if count > 1000000: + total_count += count + await redis.enqueue_job('save_attributes', {'type': 'PlanPrices', 'attr_obj_list': attr_obj_list}) + attr_obj_list.clear() + count = 0 + else: + count += 1 + + if attr_obj_list: + await push_objects(attr_obj_list, myplanprices) @@ -358,13 +478,21 @@ async def main(): attribute_files = json.loads(os.environ['HLTHPRT_CMSGOV_PLAN_ATTRIBUTES_URL_PUF']) state_attribute_files = json.loads(os.environ['HLTHPRT_CMSGOV_STATE_PLAN_ATTRIBUTES_URL_PUF']) + price_files = json.loads(os.environ['HLTHPRT_CMSGOV_PRICE_PLAN_URL_PUF']) + print("Starting to process STATE Plan Attribute files..") for file in state_attribute_files: print("Adding: ", file) x = await redis.enqueue_job('process_state_attributes', {'url': file['url'], 'year': file['year']}) + print("Starting to process Plan Attribute files..") for file in attribute_files: print("Adding: ", file) - x = await redis.enqueue_job('process_attributes', {'url': file['url'], 'year': file['year']}) \ No newline at end of file + x = await redis.enqueue_job('process_attributes', {'url': file['url'], 'year': file['year']}) + + print("Starting to process Plan Prices files..") + for file in price_files: + print("Adding: ", file) + x = await redis.enqueue_job('process_prices', {'url': file['url'], 'year': file['year']}) \ No newline at end of file diff --git a/process/initial.py b/process/initial.py index f8468a4..507d8f0 100644 --- a/process/initial.py +++ b/process/initial.py @@ -15,6 +15,7 @@ import pylightxl as xl from sqlalchemy.dialects.postgresql import insert from aiocsv import AsyncDictReader +import zipfile from process.ext.utils import download_it_and_save, make_class, push_objects, log_error, print_time_info, \ flush_error_log, return_checksum @@ -488,6 +489,7 @@ async def import_unknown_state_issuers_data(): 'state': str(row['StateCode']).upper(), 'issuer_id': int(row['IssuerId']), 'mrf_url': '', + 'data_contact_email': '', 'issuer_name': row['IssuerMarketPlaceMarketingName'].strip() if row[ 'IssuerMarketPlaceMarketingName'].strip() else row['IssuerId'] } @@ -539,6 +541,7 @@ async def import_unknown_state_issuers_data(): 'state': str(row['STATE CODE']).upper(), 'issuer_id': int(row['ISSUER ID']), 'mrf_url': '', + 'data_contact_email': '', 'issuer_name': row['ISSUER NAME'].strip() if row['ISSUER NAME'].strip() else row[ 'ISSUER ID'] } @@ -546,6 +549,46 @@ async def import_unknown_state_issuers_data(): return (issuer_list, plan_list) +async def update_issuer_names_data(): + issuer_list = {} + my_files = json.loads(os.environ['HLTHPRT_CMSGOV_RATE_REVIEW_URL_PUF']) + for file in my_files: + with tempfile.TemporaryDirectory() as tmpdirname: + p = 'some_file' + tmp_filename = str(PurePath(str(tmpdirname), p + '.zip')) + await download_it_and_save(file['url'], tmp_filename) + print(f"Trying to unpack1: {tmp_filename}") + + #temp solution + with zipfile.ZipFile(tmp_filename, 'r') as zip_ref: + zip_ref.extractall(tmpdirname) + + tmp_filename = glob.glob(f"{tmpdirname}/*PUF*.zip")[0] + print(f"Trying to unpack: {tmp_filename}") + tmpdirname = str(PurePath(str(tmpdirname), 'PUF_FILES')) + # temp solution + with zipfile.ZipFile(tmp_filename, 'r') as zip_ref: + zip_ref.extractall(tmpdirname) + print(glob.glob(f"{tmpdirname}/*PUF*.csv")) + + count = 0 + # return 1 + csv_files = glob.glob(f"{tmpdirname}/*PUF*.csv") + for tmp_filename in csv_files: + async with async_open(tmp_filename, 'r') as afp: + async for row in AsyncDictReader(afp, delimiter=","): + issuer_list[int(row['ISSUER_ID'])] = { + 'state': str(row['STATE']).upper(), + 'issuer_id': int(row['ISSUER_ID']), + 'mrf_url': '', + 'data_contact_email': '', + 'issuer_name': row['COMPANY'].strip() if row['COMPANY'].strip() else row[ + 'ISSUER_ID'] + } + + return issuer_list + + async def init_file(ctx): """ The init_file function is the first function called in this file. @@ -630,6 +673,11 @@ async def init_file(ctx): await push_objects(obj_list, myplantransparency) + (issuer_list, plan_list) = await import_unknown_state_issuers_data() + issuer_list.update(await update_issuer_names_data()) + await push_objects(list(plan_list.values()), myplan) + del plan_list + p = 'mrf_puf.xlsx' tmp_filename = str(PurePath(str(tmpdirname), p + '.zip')) await download_it_and_save(os.environ['HLTHPRT_CMSGOV_MRF_URL_PUF'], tmp_filename) @@ -662,16 +710,13 @@ async def init_file(ctx): else: url2issuer[obj['mrf_url']] = [obj['issuer_id'], ] count += 1 - if not (count % 100): - await push_objects(obj_list, myissuer) - obj_list.clear() url_list = list(set(url_list)) - await push_objects(obj_list, myissuer) - (issuer_list, plan_list) = await import_unknown_state_issuers_data() - await asyncio.gather(push_objects(list(issuer_list.values()), myissuer), - push_objects(list(plan_list.values()), myplan)) + for x in obj_list: + issuer_list.update({x['issuer_id']: x}) + + await push_objects(list(issuer_list.values()), myissuer) for url in url_list: await redis.enqueue_job('process_json_index', {'url': url, 'issuer_array': url2issuer[url]})