Skip to content

Commit

Permalink
[Issue #1659] Write a script that copies csvs from s3 to the db + scr…
Browse files Browse the repository at this point in the history
…ipt to generate them (#1773)

## Summary
Fixes #1659

### Time to review: __5 mins__

## Changes proposed
**this is temporary code and will be deleted after we run it - these are
onetime scripts and are deliberately a bit hacky**

Adds two scripts:
* One that takes csvs extracts of the Oracle database tables and does
transformations to make csvs we can import into our database
* One that takes the output of the first script and uploads them into
our database

## Context for reviewers
As the transformation work is just starting, we want to unblock testing
of the front-end UI by getting data into our dev environment. This
approach is basically a hacky one-time shortcut to get data uploaded so
that we have time to build the actual transformation process.
Additionally, we using prod data so that it's realistic for testing
purposes.

The unit test I setup for the load script just exists to verify the
tools work if the input dataset is on disk (as I did locally) or via S3
(as it will actually be run).

## Additional information
Testing this requires downloading data from the real production
database, which I think only I have access to at the moment. I did
actually build this out / test it using that data locally. You can also
use the test files I uploaded for the unit test to verify the behavior
of the second part of the script.


To run it:
```
poetry run python tests/util/convert_oracle_csvs_to_postgres.py --directory <whatever directory your csvs are in>

poetry run flask task import-opportunity-csvs --input-folder <same directory as before>
```

Note this will not work at the moment due to needing a change from
#1743
  • Loading branch information
chouinar authored Apr 19, 2024
1 parent f60e8ea commit 1ef6e97
Show file tree
Hide file tree
Showing 12 changed files with 669 additions and 1 deletion.
1 change: 1 addition & 0 deletions api/src/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

# import any of the other files so they get initialized and attached to the blueprint
import src.task.opportunities.set_current_opportunities_task # noqa: F401 E402 isort:skip
import src.task.opportunities.import_opportunity_csvs # noqa: F401 E402 isort:skip

__all__ = ["task_blueprint"]
71 changes: 71 additions & 0 deletions api/src/task/opportunities/import_opportunity_csvs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import csv
import logging
from typing import cast

import click

import src.adapters.db as db
import src.adapters.db.flask_db as flask_db
import src.util.file_util as file_util
from src.constants.schema import Schemas
from src.task.task_blueprint import task_blueprint

logger = logging.getLogger(__name__)

FILES_TO_LOAD = [
"opportunity.csv",
"opportunity_summary.csv",
"opportunity_assistance_listing.csv",
"link_opportunity_summary_applicant_type.csv",
"link_opportunity_summary_funding_instrument.csv",
"link_opportunity_summary_funding_category.csv",
]


@task_blueprint.cli.command(
"import-opportunity-csvs",
help="Load several csv files to the opportunity tables",
)
@click.option("--input-folder", required=True, help="The directory to fetch the input files from")
@flask_db.with_db_session()
def import_opportunity_csvs(db_session: db.Session, input_folder: str) -> None:
with db_session.begin():
process(db_session, input_folder, Schemas.API)


def process(db_session: db.Session, input_folder: str, schema: str) -> None:
for csv_file in FILES_TO_LOAD:
logger.info("Processing %s", csv_file)
table_name = csv_file.removesuffix(".csv")
csv_filepath = file_util.join(input_folder, csv_file)

load_csv_stream_to_table(db_session, table_name, csv_filepath, schema)


def load_csv_stream_to_table(
db_session: db.Session, table_name: str, csv_filepath: str, schema: str
) -> None:
# This is a bit hacky - I need all of the field names of the csv
# to write the COPY command, so open the file, read a single record
# so that we have the fieldnames, and then close the file
field_names: list[str] = []
with file_util.open_stream(csv_filepath) as csvfile:
reader = csv.DictReader(csvfile)

field_names = cast(list[str], reader.fieldnames)

with file_util.open_stream(csv_filepath) as csvfile:
# FORCE_NULL(col1, col2..)
# makes it so empty quotes are treated as nulls
# this isn't technically right as actual empty-string
# values will be changed to nulls, but working around that
# problem requires us to generate the CSVs differently
# and this is deliberately a pretty quick hacky approach
cmd = f"COPY {schema}.{table_name}({','.join(field_names)}) from STDIN with (DELIMITER ',', FORMAT CSV, HEADER TRUE, FORCE_NULL({','.join(field_names)}))"
cursor = db_session.connection().connection.cursor()

with cursor.copy(cmd) as copy:
while data := csvfile.read(10000):
copy.write(data)

logger.info(cursor.rowcount)
28 changes: 27 additions & 1 deletion api/src/util/file_util.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
from pathlib import PosixPath
from typing import Optional, Tuple
from typing import Any, Optional, Tuple
from urllib.parse import urlparse

import boto3
import botocore
import smart_open
from botocore.config import Config

##################################
# Path parsing utils
Expand Down Expand Up @@ -34,6 +36,10 @@ def get_file_name(path: str) -> str:
return os.path.basename(path)


def join(*parts: str) -> str:
return os.path.join(*parts)


##################################
# S3 Utilities
##################################
Expand All @@ -45,3 +51,23 @@ def get_s3_client(boto_session: Optional[boto3.Session] = None) -> botocore.clie
return boto_session.client("s3")

return boto3.client("s3")


##################################
# File operations
##################################


def open_stream(path: str, mode: str = "r", encoding: str | None = None) -> Any:
if is_s3_path(path):
so_config = Config(
max_pool_connections=10,
connect_timeout=60,
read_timeout=60,
retries={"max_attempts": 10},
)
so_transport_params = {"client_kwargs": {"config": so_config}}

return smart_open.open(path, mode, transport_params=so_transport_params, encoding=encoding)
else:
return smart_open.open(path, mode, encoding=encoding)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"opportunity_summary_id","applicant_type_id","legacy_applicant_type_id","updated_by","created_by"
"1","14","1564","",""
"1","15","1565","",""
"1","16","1566","",""
"2","1","1567","",""
"2","2","1568","",""
"3","3","1569","",""
"3","4","1570","",""
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"opportunity_summary_id","funding_category_id","legacy_funding_category_id","updated_by","created_by"
"1","18","1832","",""
"2","18","1841","",""
"3","13","1842","",""
"3","18","1843","",""
"3","16","1843","",""
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"opportunity_summary_id","funding_instrument_id","legacy_funding_instrument_id","updated_by","created_by"
"1","1","1942","",""
"2","1","1945","",""
"3","2","1947","",""
"3","1","1948","",""
3 changes: 3 additions & 0 deletions api/tests/src/task/opportunities/test_files/opportunity.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"opportunity_id","opportunity_number","opportunity_title","agency","opportunity_category_id","category_explanation","is_draft","revision_number","modified_comments","publisher_user_id","publisher_profile_id"
"1","US-ABC-123","Example title 1","US-ABC","1","","False","0","","",""
"2","US-XYZ-456","Example title 2","DO-XYZ","1","","False","0","","",""
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"opportunity_assistance_listing_id","opportunity_id","assistance_listing_number","program_title","updated_by","created_by"
"58825","1","15.224","Cultural Resource Management","","EGRANTSADMIN"
"59476","1","45.161","Promotion of the Humanities_Research","","EGRANTSADMIN"
"59478","1","12.345","Example title","","EGRANTSADMIN"
"56923","2","00.000","Not Elsewhere Classified","","EGRANTSADMIN"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"opportunity_summary_id","opportunity_id","summary_description","is_cost_sharing","is_forecast","post_date","close_date","close_date_description","archive_date","unarchive_date","expected_number_of_awards","estimated_total_program_funding","award_floor","award_ceiling","additional_info_url","additional_info_url_description","forecasted_post_date","forecasted_close_date","forecasted_close_date_description","forecasted_award_date","forecasted_project_start_date","fiscal_year","revision_number","modification_comments","funding_category_description","agency_code","agency_name","agency_phone_number","agency_contact_description","agency_email_address","agency_email_address_description","is_deleted","can_send_mail","publisher_profile_id","publisher_user_id","updated_by","created_by"
"1","1","The overarching goal of these training and technical assistance projects is to improves to improve providers adherence to prevention and treatment guidelines, and strengthen management practices and quality improvement efforts in family planning centers.","False","True","18-MAY-16","","","18-AUG-16","","4","4000000","500000","4000000","","","19-MAY-16","18-JUL-16","Please refer to the full announcement once posted on Grants.gov","01-SEP-16","01-SEP-16","2016","","","","","John Smith","","","john.smith@mail.com","john.smith@mail.com","False","","0","System","",""
"2","1","The graduates, staff and trainees are important assets and help build sustainable public health capacity in their countries.","False",False,"30-MAR-16","","","29-MAY-16","","1","20000000","0","4000000","","","01-APR-16","31-MAY-16","Electronically submitted applications must be submitted no later than 5:00 p.m., ET, on the listed application due date.","30-SEP-16","30-SEP-16","2016","","Grammatical Changes","","","Bob Smith","","","example@mail.com","example@mail.com","False","True","0","abc","EGRANTSADMIN@ProdAPP01",""
"3","2","The successful applicant to this FOA will identify effective service delivery models","False","True","25-MAR-16","","","04-AUG-16","","2","0","0","2000000","","","09-AUG-16","08-OCT-16","Electronically submitted applications must be submitted no later than 11:59 p.m., ET, on the listed application due date.","01-APR-17","01-APR-17","2017","","Updated Archive Date","","","Jane Doe","","","GHK5@cdc.gov","example@mail.com","False","False","0","System","EGRANTSADMIN@ProdAPP03",""
57 changes: 57 additions & 0 deletions api/tests/src/task/opportunities/test_import_opportunity_csvs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os
from pathlib import Path

import boto3
import pytest

from src.db.models.opportunity_models import (
LinkOpportunitySummaryApplicantType,
LinkOpportunitySummaryFundingCategory,
LinkOpportunitySummaryFundingInstrument,
Opportunity,
OpportunityAssistanceListing,
OpportunitySummary,
)
from src.task.opportunities.import_opportunity_csvs import process
from src.util import file_util
from tests.conftest import BaseTestClass


@pytest.fixture()
def test_file_path():
return Path(__file__).parent / "test_files"


def upload_file_to_s3(file_path, s3_bucket, key):
s3 = boto3.client("s3")
s3.upload_file(str(file_path), s3_bucket, key)


def setup_s3_files(directory, s3_bucket, s3_path):
files_to_upload = os.listdir(directory)

for f in files_to_upload:
upload_file_to_s3(file_util.join(directory, f), s3_bucket, s3_path + f)


class TestImportOpportunityCsvs(BaseTestClass):
def test_process(
self, db_session, test_file_path, test_api_schema, truncate_opportunities, mock_s3_bucket
):
s3_path = "path/to/"
setup_s3_files(test_file_path, mock_s3_bucket, s3_path)

# sanity check that we did in fact upload files to (mock) s3
s3 = boto3.client("s3")
s3_files = s3.list_objects_v2(Bucket=mock_s3_bucket)
assert len(s3_files["Contents"]) == 6

process(db_session, f"s3://{mock_s3_bucket}/" + s3_path, test_api_schema)

# This is just a very hacky validation that we did in fact load the files to the tables
assert len(db_session.query(Opportunity).all()) == 2
assert len(db_session.query(OpportunityAssistanceListing).all()) == 4
assert len(db_session.query(OpportunitySummary).all()) == 3
assert len(db_session.query(LinkOpportunitySummaryFundingInstrument).all()) == 4
assert len(db_session.query(LinkOpportunitySummaryFundingCategory).all()) == 5
assert len(db_session.query(LinkOpportunitySummaryApplicantType).all()) == 7
Empty file added api/tests/util/__init__.py
Empty file.
Loading

0 comments on commit 1ef6e97

Please sign in to comment.