From cbd072ff56dc7ba86cc5976a10ed993dc42c23ed Mon Sep 17 00:00:00 2001 From: Stephen Privitera Date: Tue, 19 Nov 2024 16:22:36 +0100 Subject: [PATCH] tests --- .../continuous_coverage_phenotype.py | 107 ------------- phenex/phenotypes/sex_phenotype.py | 9 +- phenex/test/phenotype_test_generator.py | 2 +- .../test_continuous_coverage_phenotype.py | 141 ------------------ phenex/test/phenotypes/test_sex_phenotype.py | 2 +- 5 files changed, 7 insertions(+), 254 deletions(-) delete mode 100644 phenex/phenotypes/continuous_coverage_phenotype.py delete mode 100644 phenex/test/phenotypes/test_continuous_coverage_phenotype.py diff --git a/phenex/phenotypes/continuous_coverage_phenotype.py b/phenex/phenotypes/continuous_coverage_phenotype.py deleted file mode 100644 index b832ebf..0000000 --- a/phenex/phenotypes/continuous_coverage_phenotype.py +++ /dev/null @@ -1,107 +0,0 @@ -from typing import Union, List, Dict, Optional -from phenex.phenotypes.phenotype import Phenotype -from phenex.filters.value import Value -from phenex.filters.codelist_filter import CodelistFilter -from phenex.filters.relative_time_range_filter import RelativeTimeRangeFilter -from phenex.filters.date_range_filter import DateRangeFilter -from phenex.filters.aggregator import First, Last -from phenex.codelists import Codelist -from phenex.tables import is_phenex_code_table, PHENOTYPE_TABLE_COLUMNS, PhenotypeTable -from phenex.phenotypes.functions import select_phenotype_columns -from ibis import _ -from ibis.expr.types.relations import Table -import ibis - - -class ContinuousCoveragePhenotype(Phenotype): - """ - A phenotype based on continuous coverage within an observation period. - - This class helps generate SQL queries to filter a population based on - continuous coverage criteria within the observation period. - - :param domain: The domain of the phenotype, default is 'observation_period'. The domain - key is used at runtime to determine which table to run on. - :param coverage_period_min: The minimum coverage period for the phenotype with a default - of 0 days. The operator must be '>=' or '>'. - :param return_date: An optional return date for the phenotype result. Possible values are - "first" and "last", where "first" is the beginning of the coverage period containing - the index date and "last" in the end of the coverage period containing the index date. - - Example usage: Find all patients with at least 90 days of continuous coverage - -------------- - >>> coverage_min_filter = ValueFilter(">=", 90) - >>> phenotype = ContinuousCoveragePhenotype(coverage_period_min=coverage_min_filter) - """ - - def __init__(self, - name:Optional[str] = 'continuous_coverage', - domain:Optional[str] = 'OBSERVATION_PERIOD', - relative_time_range:Optional[RelativeTimeRangeFilter] = None, - min_days : Optional[Value] = None, - anchor_phenotype:Optional[Phenotype] = None, - ): - super().__init__() - self.name = name - self.domain = domain - self.relative_time_range = relative_time_range - self.min_days = min_days - - def _execute(self, tables: Dict[str, Table]) -> PhenotypeTable: - coverage_table = tables[self.domain] - # first perform time range filter on observation period start date - coverage_table = coverage_table.mutate(EVENT_DATE = coverage_table.OBSERVATION_PERIOD_START_DATE) - coverage_table = self._perform_time_filtering(coverage_table) - # ensure that coverage end extends past the anchor date - coverage_table = self._filter_observation_period_end(coverage_table) - coverage_table = self._filter_coverage_period(coverage_table) - - coverage_table = coverage_table.mutate(EVENT_DATE = ibis.null()) - return coverage_table - - def _perform_time_filtering(self, coverage_table): - ''' - Filter the observation period start - ''' - if self.relative_time_range is not None: - coverage_table = self.relative_time_range.filter(coverage_table) - return coverage_table - - def _filter_observation_period_end(self, coverage_table): - ''' - Get only rows where the observation period end date is after the anchor date - ''' - if self.relative_time_range is not None: - if self.relative_time_range.anchor_phenotype is not None: - reference_column = self.relative_time_range.anchor_phenotype.table.EVENT_DATE - else: - reference_column = coverage_table.INDEX_DATE - - coverage_table = coverage_table.filter( - coverage_table.OBSERVATION_PERIOD_END_DATE >= reference_column - ) - return coverage_table - - - def _filter_coverage_period(self, coverage_table: Table) -> Table: - if self.min_days.operator == '>': - coverage_table = coverage_table.filter( - (coverage_table['DAYS_FROM_ANCHOR'] > self.min_days.value) - ) - elif self.min_days.operator == '>=': - coverage_table = coverage_table.filter( - (coverage_table['DAYS_FROM_ANCHOR'] >= self.min_days.value) - ) - elif self.min_days.operator == '<': - coverage_table = coverage_table.filter( - (coverage_table['DAYS_FROM_ANCHOR'] < self.min_days.value) - ) - elif self.min_days.operator == '<=': - coverage_table = coverage_table.filter( - (coverage_table['DAYS_FROM_ANCHOR'] <= self.min_days.value) - ) - return coverage_table - - - def get_codelists(self): - return [] diff --git a/phenex/phenotypes/sex_phenotype.py b/phenex/phenotypes/sex_phenotype.py index 77b1204..15d0873 100644 --- a/phenex/phenotypes/sex_phenotype.py +++ b/phenex/phenotypes/sex_phenotype.py @@ -24,7 +24,7 @@ class SexPhenotype(Phenotype): def __init__( self, name: str = "sex", - allowed_values: List[str] = ["male", "female"], + allowed_values: Optional[List[str]] = None, domain: str = "PERSON", ): self.name = name @@ -37,7 +37,8 @@ def _execute(self, tables: Dict[str, Table]) -> PhenotypeTable: person_table = tables[self.domain] assert is_phenex_person_table(person_table) - sex_filter = CategoricalFilter(column_name="SEX", allowed_values=self.allowed_values) - filtered_table = sex_filter._filter(person_table) + if self.allowed_values is not None: + sex_filter = CategoricalFilter(column_name="SEX", allowed_values=self.allowed_values) + person_table = sex_filter._filter(person_table) - return filtered_table.mutate(VALUE=filtered_table.SEX, EVENT_DATE= ibis.null()) + return person_table.mutate(VALUE=person_table.SEX, EVENT_DATE= ibis.null()) diff --git a/phenex/test/phenotype_test_generator.py b/phenex/test/phenotype_test_generator.py index d846f53..989dc20 100644 --- a/phenex/test/phenotype_test_generator.py +++ b/phenex/test/phenotype_test_generator.py @@ -139,7 +139,7 @@ def df_from_test_info(test_info): if "date" in col.lower(): schema[col] = datetime.date elif "value" in col.lower(): - schema[col] = float + schema[col] = str if isinstance(df[col].iloc[0], str) else float elif "boolean" in col.lower(): schema[col] = bool else: diff --git a/phenex/test/phenotypes/test_continuous_coverage_phenotype.py b/phenex/test/phenotypes/test_continuous_coverage_phenotype.py deleted file mode 100644 index df7b62a..0000000 --- a/phenex/test/phenotypes/test_continuous_coverage_phenotype.py +++ /dev/null @@ -1,141 +0,0 @@ -import datetime, os -import pandas as pd - -from phenex.phenotypes.continuous_coverage_phenotype import ContinuousCoveragePhenotype -from phenex.codelists import LocalCSVCodelistFactory -from phenex.filters.date_range_filter import DateRangeFilter -from phenex.filters.relative_time_range_filter import RelativeTimeRangeFilter - -from phenex.test.phenotype_test_generator import PhenotypeTestGenerator -from phenex.filters.value import * - - - -class ContinuousCoveragePhenotypeTestGenerator(PhenotypeTestGenerator): - name_space = "continuouscoverage" - - def define_input_tables(self): - oneday = datetime.timedelta(days=1) - index_date = datetime.datetime.strptime("01-01-2022", "%m-%d-%Y") - - observation_period_min = 90 * oneday - possible_start_dates = [ - index_date - 4 * observation_period_min, - index_date - 2 * observation_period_min, - index_date - observation_period_min - oneday, - index_date - observation_period_min, - index_date - observation_period_min + oneday, - index_date, - index_date + oneday, - ] - - intervals = [ - observation_period_min, - observation_period_min - oneday, - observation_period_min + oneday, - 2 * observation_period_min, - ] - - start_dates = [] - end_dates = [] - for s in possible_start_dates: - for i in intervals: - start_dates.append(s) - end_dates.append(s + i) - - N = len(end_dates) - df_observation_period = pd.DataFrame() - df_observation_period["PERSON_ID"] = [ - f"P{x}" for x in list(range(N)) - ] - df_observation_period["INDEX_DATE"] = index_date - df_observation_period["observation_period_start_date"] = start_dates - df_observation_period["observation_period_end_date"] = end_dates - - - self.df_input = df_observation_period - input_info_observation_period = { - "name": "observation_period", - "df": df_observation_period, - } - - return [input_info_observation_period] - - def define_phenotype_tests(self): - t1 = { - "name": "coverage_min_geq_90", - "coverage_period_min": Value(value=90, operator=">="), - "persons": ["P7", "P10", "P11", "P12", "P14", "P15"], - } - t2 = { - "name": "coverage_min_gt_90", - "coverage_period_min": Value(value=90, operator=">"), - "persons": ["P7", "P10", "P11"], - } - test_infos = [t1, t2] - - for test_info in test_infos: - test_info["phenotype"] = ContinuousCoveragePhenotype( - name=test_info["name"], - domain="observation_period", - coverage_period_min=test_info.get("coverage_period_min"), - ) - test_info["refactor"] = True # TODO remove once refactored - - return test_infos - - -class ContinuousCoverageReturnLastPhenotypeTestGenerator( - ContinuousCoveragePhenotypeTestGenerator -): - name_space = "ccpt_returnlast" - - def define_phenotype_tests(self): - persons = ["P7", "P10", "P11", "P12", "P14", "P15"] - - t1 = { - "name": "coverage_min_geq_90", - "coverage_period_min": Value(value=90, operator=">="), - "persons": persons, - "dates": list( - self.df_input[self.df_input["PERSON_ID"].isin(persons)][ - "observation_period_end_date" - ].values - ), - } - - persons = ["P7", "P10", "P11"] - t2 = { - "name": "coverage_min_gt_90", - "coverage_period_min": Value(value=90, operator=">"), - "persons": ["P7", "P10", "P11"], - "dates": list( - self.df_input[self.df_input["PERSON_ID"].isin(persons)][ - "observation_period_end_date" - ].values - ), - } - test_infos = [t1, t2] - - for test_info in test_infos: - test_info["phenotype"] = ContinuousCoveragePhenotype( - name=test_info["name"], - domain="observation_period", - return_date="last", - coverage_period_min=test_info.get("coverage_period_min"), - ) - test_info["column_types"] = {f"{test_info['name']}_date": "date"} - - return test_infos - - -def test_continuous_coverage_phenotypes(): - spg = ContinuousCoveragePhenotypeTestGenerator() - spg.run_tests() - - spg = ContinuousCoverageReturnLastPhenotypeTestGenerator() - spg.run_tests() - - -if __name__ == "__main__": - test_continuous_coverage_phenotypes() diff --git a/phenex/test/phenotypes/test_sex_phenotype.py b/phenex/test/phenotypes/test_sex_phenotype.py index 385d089..a265ea9 100644 --- a/phenex/test/phenotypes/test_sex_phenotype.py +++ b/phenex/test/phenotypes/test_sex_phenotype.py @@ -29,7 +29,7 @@ def define_input_tables(self): column_types_person = {} input_info_person = { - "name": "person", + "name": "PERSON", "df": df_person, "column_types": column_types_person, }