Skip to content

Commit

Permalink
[Issue #1977] Adjust transformation logic to handle orphaned history …
Browse files Browse the repository at this point in the history
…records (#1982)

## Summary
Fixes #1977 

### Time to review: __10 mins__

## Changes proposed
Handle orphaned history records when the opportunity (and consequently)
opportunity summary objects do not exist

Add a `transformation_notes` column for us to put whatever info
regarding the transformation into - in this case marking that we skipped
a record for a particular reason.

## Context for reviewers
A record in the `tsynopsis_hist` or `tforecast_hist` table connects with
an opportunity, however that opportunity may not be in the
`topportunity` table as it was deleted, and its record only remains in
the `topportunity_hist` table - a table we aren't yet importing. Because
of this, we can't actually import these records. While we may want to
import the historical opportunities, we're not going to do that at this
time, instead we'll mark these records as processed

For the three sets of one-to-many lookup tables, we also need to add a
check because if we couldn't import the historical synopsis/forecast
into our `opportunity_summary` table - it'll also cause those to fail.

Important detail - historical data isn't technically required right now,
and as far as I can tell this _should_ only apply to deleted
opportunities (something that wouldn't ever be visible in our next few
features)

## Additional information
Testing this with a snapshot of prod data, I was able to run through the
full dataset locally and get the following metrics

```
total_records_processed=1484400
total_records_deleted=0 
total_records_inserted=1338467
total_records_updated=0
total_records_orphaned=2305
total_duplicate_records_skipped=977
total_historical_orphans_skipped=142650 
total_error_count=1
task_duration_sec=615.295
app.name=src.app
```
Note that the error is unrelated to this work and will be addressed
separately (boolean with an invalid value)

---------

Co-authored-by: nava-platform-bot <platform-admins@navapbc.com>
  • Loading branch information
chouinar and nava-platform-bot authored May 13, 2024
1 parent 9e843a5 commit aa4d15f
Show file tree
Hide file tree
Showing 10 changed files with 434 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@

logger = logging.getLogger(__name__)

# Constants
ORPHANED_CFDA = "orphaned_cfda"
ORPHANED_HISTORICAL_RECORD = "orphaned_historical_record"


class TransformOracleDataTask(Task):
class Metrics(StrEnum):
Expand All @@ -58,6 +62,7 @@ class Metrics(StrEnum):
TOTAL_RECORDS_UPDATED = "total_records_updated"
TOTAL_RECORDS_ORPHANED = "total_records_orphaned"
TOTAL_DUPLICATE_RECORDS_SKIPPED = "total_duplicate_records_skipped"
TOTAL_HISTORICAL_ORPHANS_SKIPPED = "total_historical_orphans_skipped"

TOTAL_ERROR_COUNT = "total_error_count"

Expand Down Expand Up @@ -266,6 +271,7 @@ def process_assistance_listing(
"Assistance listing is orphaned and does not connect to any opportunity",
extra=extra,
)
source_assistance_listing.transformation_notes = ORPHANED_CFDA

elif source_assistance_listing.is_deleted:
logger.info("Deleting assistance listing", extra=extra)
Expand Down Expand Up @@ -369,14 +375,27 @@ def process_opportunity_summary(
extra = transform_util.get_log_extra_summary(source_summary)
logger.info("Processing opportunity summary", extra=extra)

if opportunity is None:
# Historical records are linked to other historical records, however
# we don't import historical opportunity records, so if the opportunity
# was deleted, we don't have anything to link these to. Whenever we do
# support historical opportunities, we'll have these all marked with a
# flag that we can use to reprocess these.
if opportunity is None and source_summary.is_historical_table:
logger.warning(
"Historical opportunity summary does not have a corresponding opportunity - cannot import, but will mark as processed",
extra=extra,
)
self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED)
source_summary.transformation_notes = ORPHANED_HISTORICAL_RECORD

elif opportunity is None:
# This shouldn't be possible as the incoming data has foreign keys, but as a safety net
# we'll make sure the opportunity actually exists
raise ValueError(
"Opportunity summary cannot be processed as the opportunity for it does not exist"
)

if source_summary.is_deleted:
elif source_summary.is_deleted:
logger.info("Deleting opportunity summary", extra=extra)

if target_summary is None:
Expand Down Expand Up @@ -501,14 +520,27 @@ def process_link_applicant_type(
extra = transform_util.get_log_extra_applicant_type(source_applicant_type)
logger.info("Processing applicant type", extra=extra)

if opportunity_summary is None:
# Historical records are linked to other historical records, however
# we don't import historical opportunity records, so if the opportunity
# was deleted, we won't have created the opportunity summary. Whenever we do
# support historical opportunities, we'll have these all marked with a
# flag that we can use to reprocess these.
if opportunity_summary is None and source_applicant_type.is_historical_table:
logger.warning(
"Historical applicant type does not have a corresponding opportunity summary - cannot import, but will mark as processed",
extra=extra,
)
self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED)
source_applicant_type.transformation_notes = ORPHANED_HISTORICAL_RECORD

elif opportunity_summary is None:
# This shouldn't be possible as the incoming data has foreign keys, but as a safety net
# we'll make sure the opportunity actually exists
raise ValueError(
"Applicant type record cannot be processed as the opportunity summary for it does not exist"
)

if source_applicant_type.is_deleted:
elif source_applicant_type.is_deleted:
logger.info("Deleting applicant type", extra=extra)

if target_applicant_type is None:
Expand Down Expand Up @@ -647,14 +679,27 @@ def process_link_funding_category(
extra = transform_util.get_log_extra_funding_category(source_funding_category)
logger.info("Processing funding category", extra=extra)

if opportunity_summary is None:
# Historical records are linked to other historical records, however
# we don't import historical opportunity records, so if the opportunity
# was deleted, we won't have created the opportunity summary. Whenever we do
# support historical opportunities, we'll have these all marked with a
# flag that we can use to reprocess these.
if opportunity_summary is None and source_funding_category.is_historical_table:
logger.warning(
"Historical funding category does not have a corresponding opportunity summary - cannot import, but will mark as processed",
extra=extra,
)
self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED)
source_funding_category.transformation_notes = ORPHANED_HISTORICAL_RECORD

elif opportunity_summary is None:
# This shouldn't be possible as the incoming data has foreign keys, but as a safety net
# we'll make sure the opportunity actually exists
raise ValueError(
"Funding category record cannot be processed as the opportunity summary for it does not exist"
)

if source_funding_category.is_deleted:
elif source_funding_category.is_deleted:
logger.info("Deleting funding category", extra=extra)

if target_funding_category is None:
Expand Down Expand Up @@ -799,14 +844,27 @@ def process_link_funding_instrument(
extra = transform_util.get_log_extra_funding_instrument(source_funding_instrument)
logger.info("Processing funding instrument", extra=extra)

if opportunity_summary is None:
# Historical records are linked to other historical records, however
# we don't import historical opportunity records, so if the opportunity
# was deleted, we won't have created the opportunity summary. Whenever we do
# support historical opportunities, we'll have these all marked with a
# flag that we can use to reprocess these.
if opportunity_summary is None and source_funding_instrument.is_historical_table:
logger.warning(
"Historical funding instrument does not have a corresponding opportunity summary - cannot import, but will mark as processed",
extra=extra,
)
self.increment(self.Metrics.TOTAL_HISTORICAL_ORPHANS_SKIPPED)
source_funding_instrument.transformation_notes = ORPHANED_HISTORICAL_RECORD

elif opportunity_summary is None:
# This shouldn't be possible as the incoming data has foreign keys, but as a safety net
# we'll make sure the opportunity actually exists
raise ValueError(
"Funding instrument record cannot be processed as the opportunity summary for it does not exist"
)

if source_funding_instrument.is_deleted:
elif source_funding_instrument.is_deleted:
logger.info("Deleting funding instrument", extra=extra)

if target_funding_instrument is None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""add transformation notes to staging tables
Revision ID: 61c58638e56b
Revises: f97987d087b5
Create Date: 2024-05-09 15:06:48.010975
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "61c58638e56b"
down_revision = "f97987d087b5"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"tapplicanttypes_forecast",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tapplicanttypes_forecast_hist",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tapplicanttypes_synopsis",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tapplicanttypes_synopsis_hist",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tforecast", sa.Column("transformation_notes", sa.Text(), nullable=True), schema="staging"
)
op.add_column(
"tforecast_hist",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tfundactcat_forecast",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tfundactcat_forecast_hist",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tfundactcat_synopsis",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tfundactcat_synopsis_hist",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tfundinstr_forecast",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tfundinstr_forecast_hist",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tfundinstr_synopsis",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tfundinstr_synopsis_hist",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"topportunity",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"topportunity_cfda",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
op.add_column(
"tsynopsis", sa.Column("transformation_notes", sa.Text(), nullable=True), schema="staging"
)
op.add_column(
"tsynopsis_hist",
sa.Column("transformation_notes", sa.Text(), nullable=True),
schema="staging",
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("tsynopsis_hist", "transformation_notes", schema="staging")
op.drop_column("tsynopsis", "transformation_notes", schema="staging")
op.drop_column("topportunity_cfda", "transformation_notes", schema="staging")
op.drop_column("topportunity", "transformation_notes", schema="staging")
op.drop_column("tfundinstr_synopsis_hist", "transformation_notes", schema="staging")
op.drop_column("tfundinstr_synopsis", "transformation_notes", schema="staging")
op.drop_column("tfundinstr_forecast_hist", "transformation_notes", schema="staging")
op.drop_column("tfundinstr_forecast", "transformation_notes", schema="staging")
op.drop_column("tfundactcat_synopsis_hist", "transformation_notes", schema="staging")
op.drop_column("tfundactcat_synopsis", "transformation_notes", schema="staging")
op.drop_column("tfundactcat_forecast_hist", "transformation_notes", schema="staging")
op.drop_column("tfundactcat_forecast", "transformation_notes", schema="staging")
op.drop_column("tforecast_hist", "transformation_notes", schema="staging")
op.drop_column("tforecast", "transformation_notes", schema="staging")
op.drop_column("tapplicanttypes_synopsis_hist", "transformation_notes", schema="staging")
op.drop_column("tapplicanttypes_synopsis", "transformation_notes", schema="staging")
op.drop_column("tapplicanttypes_forecast_hist", "transformation_notes", schema="staging")
op.drop_column("tapplicanttypes_forecast", "transformation_notes", schema="staging")
# ### end Alembic commands ###
32 changes: 32 additions & 0 deletions api/src/db/models/staging/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class Tforecast(StagingBase, forecast_mixin.TforecastMixin, StagingParamMixin):
def is_forecast(self) -> bool:
return True

@property
def is_historical_table(self) -> bool:
return False


class TforecastHist(StagingBase, forecast_mixin.TforecastHistMixin, StagingParamMixin):
__tablename__ = "tforecast_hist"
Expand All @@ -35,6 +39,10 @@ class TforecastHist(StagingBase, forecast_mixin.TforecastHistMixin, StagingParam
def is_forecast(self) -> bool:
return True

@property
def is_historical_table(self) -> bool:
return True


class TapplicanttypesForecast(
StagingBase, forecast_mixin.TapplicanttypesForecastMixin, StagingParamMixin
Expand All @@ -60,6 +68,10 @@ def is_forecast(self) -> bool:
def revision_number(self) -> None:
return None

@property
def is_historical_table(self) -> bool:
return False


class TapplicanttypesForecastHist(
StagingBase, forecast_mixin.TapplicanttypesForecastHistMixin, StagingParamMixin
Expand All @@ -81,6 +93,10 @@ def legacy_applicant_type_id(self) -> int:
def is_forecast(self) -> bool:
return True

@property
def is_historical_table(self) -> bool:
return True


class TfundactcatForecast(StagingBase, forecast_mixin.TfundactcatForecastMixin, StagingParamMixin):
__tablename__ = "tfundactcat_forecast"
Expand All @@ -104,6 +120,10 @@ def is_forecast(self) -> bool:
def revision_number(self) -> None:
return None

@property
def is_historical_table(self) -> bool:
return False


class TfundactcatForecastHist(
StagingBase, forecast_mixin.TfundactcatForecastHistMixin, StagingParamMixin
Expand All @@ -125,6 +145,10 @@ def legacy_funding_category_id(self) -> int:
def is_forecast(self) -> bool:
return True

@property
def is_historical_table(self) -> bool:
return True


class TfundinstrForecast(StagingBase, forecast_mixin.TfundinstrForecastMixin, StagingParamMixin):
__tablename__ = "tfundinstr_forecast"
Expand All @@ -148,6 +172,10 @@ def is_forecast(self) -> bool:
def revision_number(self) -> None:
return None

@property
def is_historical_table(self) -> bool:
return False


class TfundinstrForecastHist(
StagingBase, forecast_mixin.TfundinstrForecastHistMixin, StagingParamMixin
Expand All @@ -168,3 +196,7 @@ def legacy_funding_instrument_id(self) -> int:
@property
def is_forecast(self) -> bool:
return True

@property
def is_historical_table(self) -> bool:
return True
2 changes: 2 additions & 0 deletions api/src/db/models/staging/staging_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,5 @@ class StagingParamMixin:
default=None,
server_default=None,
)

transformation_notes: Mapped[str | None]
Loading

0 comments on commit aa4d15f

Please sign in to comment.