Skip to content

Commit

Permalink
[Issue #1747] Add transformations for the opportunity summary table (#…
Browse files Browse the repository at this point in the history
…1917)

## Summary
Fixes #1747

### Time to review: __10 mins__

## Changes proposed
Added transformation for the opportunity summary table which is a merger
of 4 separate tables in the existing system (synopsis, synopsis history,
forecast, forecast history)

A little bit of cleanup / reorganization of the utility methods into
their own file

## Context for reviewers
The primary complexity here is that the join we need to do for each
tables is a bit more complex than the prior tickets. In the Oracle
tables, the primary key of the synopsis/forecast tables is just the
opportunity ID, and the historical tables have it as opportunity ID +
revision number. In order to uniquely identify a record between system,
we need to join on 3 values:

* `opportunity_id` - this narrows it down to the right opportuity
* `is_forecast` - which effectively lets us know whether it connects to
the forecast or synopsis tables
* `revision_number` - which lets us determine whether it was a
historical record or not (will be null in non-historical tables)

With these 3 values, we have a key that lets us handle updates/deletes
by properly linking records across the tables.

---
Besides the fetching of data, the other primary implementation detail is
the transformation itself. Since these tables largely overlap, I wrote
one transformation method for all of them, and use some `isinstance`
checks to handle the differences. MyPy does a good job of validating
that I'm not making any mistakes on fields
  • Loading branch information
chouinar committed May 6, 2024
1 parent 58dd9ec commit c1ddebf
Show file tree
Hide file tree
Showing 10 changed files with 1,125 additions and 201 deletions.
6 changes: 6 additions & 0 deletions api/src/data_migration/transformation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import TypeAlias

from src.db.models.staging.forecast import Tforecast, TforecastHist
from src.db.models.staging.synopsis import Tsynopsis, TsynopsisHist

SourceSummary: TypeAlias = Tforecast | Tsynopsis | TforecastHist | TsynopsisHist
244 changes: 115 additions & 129 deletions api/src/data_migration/transformation/transform_oracle_data_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,28 @@
from enum import StrEnum
from typing import Sequence, Tuple, Type, TypeVar, cast

from sqlalchemy import select
from sqlalchemy import and_, select

from src.adapters import db
from src.constants.lookup_constants import OpportunityCategory
from src.db.models.base import ApiSchemaTable, TimestampMixin
from src.db.models.opportunity_models import Opportunity, OpportunityAssistanceListing
from src.data_migration.transformation import transform_util
from src.db.models.base import ApiSchemaTable
from src.db.models.opportunity_models import (
Opportunity,
OpportunityAssistanceListing,
OpportunitySummary,
)
from src.db.models.staging.forecast import Tforecast, TforecastHist
from src.db.models.staging.opportunity import Topportunity, TopportunityCfda
from src.db.models.staging.staging_base import StagingBase, StagingParamMixin
from src.db.models.staging.staging_base import StagingParamMixin
from src.db.models.staging.synopsis import Tsynopsis, TsynopsisHist
from src.task.task import Task
from src.util import datetime_util

from . import SourceSummary

S = TypeVar("S", bound=StagingParamMixin)
D = TypeVar("D", bound=ApiSchemaTable)


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -63,7 +70,7 @@ def fetch(
list[Tuple[S, D | None]],
self.db_session.execute(
select(source_model, destination_model)
.join(destination_model, *join_clause, isouter=True)
.join(destination_model, and_(*join_clause), isouter=True)
.where(source_model.transformed_at.is_(None))
.execution_options(yield_per=5000)
),
Expand All @@ -79,7 +86,7 @@ def fetch_with_opportunity(
list[Tuple[S, D | None, Opportunity | None]],
self.db_session.execute(
select(source_model, destination_model, Opportunity)
.join(destination_model, *join_clause, isouter=True)
.join(destination_model, and_(*join_clause), isouter=True)
.join(
Opportunity,
source_model.opportunity_id == Opportunity.opportunity_id, # type: ignore[attr-defined]
Expand Down Expand Up @@ -131,7 +138,9 @@ def process_opportunity(
is_insert = target_opportunity is None

logger.info("Transforming and upserting opportunity", extra=extra)
transformed_opportunity = transform_opportunity(source_opportunity, target_opportunity)
transformed_opportunity = transform_util.transform_opportunity(
source_opportunity, target_opportunity
)
self.db_session.merge(transformed_opportunity)

if is_insert:
Expand Down Expand Up @@ -212,7 +221,7 @@ def process_assistance_listing(
is_insert = target_assistance_listing is None

logger.info("Transforming and upserting assistance listing", extra=extra)
transformed_assistance_listing = transform_assistance_listing(
transformed_assistance_listing = transform_util.transform_assistance_listing(
source_assistance_listing, target_assistance_listing
)
self.db_session.merge(transformed_assistance_listing)
Expand All @@ -226,136 +235,113 @@ def process_assistance_listing(
source_assistance_listing.transformed_at = self.transform_time

def process_opportunity_summaries(self) -> None:
# TODO - https://github.com/HHS/simpler-grants-gov/issues/1747
pass

def process_one_to_many_lookup_tables(self) -> None:
# TODO - https://github.com/HHS/simpler-grants-gov/issues/1749
pass


###############################
# Transformations
###############################


def transform_opportunity(
source_opportunity: Topportunity, existing_opportunity: Opportunity | None
) -> Opportunity:
log_extra = {"opportunity_id": source_opportunity.opportunity_id}

if existing_opportunity is None:
logger.info("Creating new opportunity record", extra=log_extra)

# We always create a new opportunity record here and merge it in the calling function
# this way if there is any error doing the transformation, we don't modify the existing one.
target_opportunity = Opportunity(opportunity_id=source_opportunity.opportunity_id)

target_opportunity.opportunity_number = source_opportunity.oppnumber
target_opportunity.opportunity_title = source_opportunity.opptitle
target_opportunity.agency = source_opportunity.owningagency
target_opportunity.category = transform_opportunity_category(source_opportunity.oppcategory)
target_opportunity.category_explanation = source_opportunity.category_explanation
target_opportunity.revision_number = source_opportunity.revision_number
target_opportunity.modified_comments = source_opportunity.modified_comments
target_opportunity.publisher_user_id = source_opportunity.publisheruid
target_opportunity.publisher_profile_id = source_opportunity.publisher_profile_id

# The legacy system doesn't actually have this value as a boolean. There are several
# different letter codes. However, their API implementation also does this for their draft flag.
target_opportunity.is_draft = source_opportunity.is_draft != "N"
transform_update_create_timestamp(source_opportunity, target_opportunity, log_extra=log_extra)

return target_opportunity


OPPORTUNITY_CATEGORY_MAP = {
"D": OpportunityCategory.DISCRETIONARY,
"M": OpportunityCategory.MANDATORY,
"C": OpportunityCategory.CONTINUATION,
"E": OpportunityCategory.EARMARK,
"O": OpportunityCategory.OTHER,
}


def transform_opportunity_category(value: str | None) -> OpportunityCategory | None:
if value is None or value == "":
return None
logger.info("Processing opportunity summaries")
logger.info("Processing synopsis records")
synopsis_records = self.fetch_with_opportunity(
Tsynopsis,
OpportunitySummary,
[
Tsynopsis.opportunity_id == OpportunitySummary.opportunity_id,
OpportunitySummary.is_forecast.is_(False),
OpportunitySummary.revision_number.is_(None),
],
)
self.process_opportunity_summary_group(synopsis_records)

if value not in OPPORTUNITY_CATEGORY_MAP:
raise ValueError("Unrecognized opportunity category: %s" % value)
logger.info("Processing synopsis hist records")
synopsis_hist_records = self.fetch_with_opportunity(
TsynopsisHist,
OpportunitySummary,
[
TsynopsisHist.opportunity_id == OpportunitySummary.opportunity_id,
TsynopsisHist.revision_number == OpportunitySummary.revision_number,
OpportunitySummary.is_forecast.is_(False),
],
)
self.process_opportunity_summary_group(synopsis_hist_records)

return OPPORTUNITY_CATEGORY_MAP[value]
logger.info("Processing forecast records")
forecast_records = self.fetch_with_opportunity(
Tforecast,
OpportunitySummary,
[
Tforecast.opportunity_id == OpportunitySummary.opportunity_id,
OpportunitySummary.is_forecast.is_(True),
OpportunitySummary.revision_number.is_(None),
],
)
self.process_opportunity_summary_group(forecast_records)

logger.info("Processing forecast hist records")
forecast_hist_records = self.fetch_with_opportunity(
TforecastHist,
OpportunitySummary,
[
TforecastHist.opportunity_id == OpportunitySummary.opportunity_id,
TforecastHist.revision_number == OpportunitySummary.revision_number,
OpportunitySummary.is_forecast.is_(True),
],
)
self.process_opportunity_summary_group(forecast_hist_records)

def transform_assistance_listing(
source_assistance_listing: TopportunityCfda,
existing_assistance_listing: OpportunityAssistanceListing | None,
) -> OpportunityAssistanceListing:
log_extra = {"opportunity_assistance_listing_id": source_assistance_listing.opp_cfda_id}
def process_opportunity_summary_group(
self, records: Sequence[Tuple[SourceSummary, OpportunitySummary | None, Opportunity | None]]
) -> None:
for source_summary, target_summary, opportunity in records:
try:
self.process_opportunity_summary(source_summary, target_summary, opportunity)
except ValueError:
self.increment(self.Metrics.TOTAL_ERROR_COUNT)
logger.exception(
"Failed to process opportunity summary",
extra=transform_util.get_log_extra_summary(source_summary),
)

if existing_assistance_listing is None:
logger.info("Creating new assistance listing record", extra=log_extra)
def process_opportunity_summary(
self,
source_summary: SourceSummary,
target_summary: OpportunitySummary | None,
opportunity: Opportunity | None,
) -> None:
self.increment(self.Metrics.TOTAL_RECORDS_PROCESSED)
extra = transform_util.get_log_extra_summary(source_summary)
logger.info("Processing opportunity summary", extra=extra)

# We always create a new assistance listing record here and merge it in the calling function
# this way if there is any error doing the transformation, we don't modify the existing one.
target_assistance_listing = OpportunityAssistanceListing(
opportunity_assistance_listing_id=source_assistance_listing.opp_cfda_id,
opportunity_id=source_assistance_listing.opportunity_id,
)
if opportunity is None:
# This shouldn't be possible as the incoming data has foreign keys, but as a safety net
# we'll make sure the opportunity actually exists
raise ValueError(
"Opportunity summary cannot be processed as the opportunity for it does not exist"
)

target_assistance_listing.assistance_listing_number = source_assistance_listing.cfdanumber
target_assistance_listing.program_title = source_assistance_listing.programtitle
if source_summary.is_deleted:
logger.info("Deleting opportunity summary", extra=extra)

transform_update_create_timestamp(
source_assistance_listing, target_assistance_listing, log_extra=log_extra
)
if target_summary is None:
raise ValueError("Cannot delete opportunity summary as it does not exist")

return target_assistance_listing
self.increment(self.Metrics.TOTAL_RECORDS_DELETED)
self.db_session.delete(target_summary)

else:
# To avoid incrementing metrics for records we fail to transform, record
# here whether it's an insert/update and we'll increment after transforming
is_insert = target_summary is None

def convert_est_timestamp_to_utc(timestamp: datetime | None) -> datetime | None:
if timestamp is None:
return None
logger.info("Transforming and upserting opportunity summary", extra=extra)
transformed_opportunity_summary = transform_util.transform_opportunity_summary(
source_summary, target_summary
)
self.db_session.merge(transformed_opportunity_summary)

# The timestamps we get from the legacy system have no timezone info
# but we know the database uses US Eastern timezone by default
#
# First add the America/New_York timezone without any other modification
aware_timestamp = datetime_util.make_timezone_aware(timestamp, "US/Eastern")
# Then adjust the timezone to UTC this will handle any DST or other conversion complexities
return datetime_util.adjust_timezone(aware_timestamp, "UTC")
if is_insert:
self.increment(self.Metrics.TOTAL_RECORDS_INSERTED)
else:
self.increment(self.Metrics.TOTAL_RECORDS_UPDATED)

logger.info("Processed opportunity summary", extra=extra)
source_summary.transformed_at = self.transform_time

def transform_update_create_timestamp(
source: StagingBase, target: TimestampMixin, log_extra: dict | None = None
) -> None:
# Convert the source timestamps to UTC
# Note: the type ignores are because created_date/last_upd_date are added
# on the individual class definitions, not the base class - due to how
# we need to maintain the column order of the legacy system.
# Every legacy table does have these columns.
created_timestamp = convert_est_timestamp_to_utc(source.created_date) # type: ignore[attr-defined]
updated_timestamp = convert_est_timestamp_to_utc(source.last_upd_date) # type: ignore[attr-defined]

if created_timestamp is not None:
target.created_at = created_timestamp
else:
# This is incredibly rare, but possible - because our system requires
# we set something, we'll default to the current time and log a warning.
if log_extra is None:
log_extra = {}

logger.warning(
f"{source.__class__} does not have a created_date timestamp set, setting value to now.",
extra=log_extra,
)
target.created_at = datetime_util.utcnow()

if updated_timestamp is not None:
target.updated_at = updated_timestamp
else:
# In the legacy system, they don't set whether something was updated
# until it receives an update. We always set the value, and on initial insert
# want it to be the same as the created_at.
target.updated_at = target.created_at
def process_one_to_many_lookup_tables(self) -> None:
# TODO - https://github.com/HHS/simpler-grants-gov/issues/1749
pass
Loading

0 comments on commit c1ddebf

Please sign in to comment.