Skip to content

Commit

Permalink
chore: Upgrade GCP dependencies (#2945)
Browse files Browse the repository at this point in the history
* Upgrade GCP dependencies.

Signed-off-by: Abhin Chhabra <abhin.chhabra@shopify.com>

* `entity_df` in `get_historical_features` should be tzaware

Signed-off-by: Abhin Chhabra <abhin.chhabra@shopify.com>

* Linting fixes

Signed-off-by: Abhin Chhabra <abhin.chhabra@shopify.com>

* Update 3.9 reqs

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Remove unnecessary code coverage

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

Co-authored-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
chhabrakadabra and kevjumba authored Jul 19, 2022
1 parent d593351 commit d3868c5
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 159 deletions.
10 changes: 5 additions & 5 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,17 +1057,17 @@ def get_historical_features(

# Check that the right request data is present in the entity_df
if type(entity_df) == pd.DataFrame:
entity_pd_df = cast(pd.DataFrame, entity_df)
entity_df = utils.make_df_tzaware(cast(pd.DataFrame, entity_df))
for fv in request_feature_views:
for feature in fv.features:
if feature.name not in entity_pd_df.columns:
if feature.name not in entity_df.columns:
raise RequestDataNotFoundInEntityDfException(
feature_name=feature.name, feature_view_name=fv.name
)
for odfv in on_demand_feature_views:
odfv_request_data_schema = odfv.get_request_data_schema()
for feature_name in odfv_request_data_schema.keys():
if feature_name not in entity_pd_df.columns:
if feature_name not in entity_df.columns:
raise RequestDataNotFoundInEntityDfException(
feature_name=feature_name, feature_view_name=odfv.name,
)
Expand Down Expand Up @@ -2273,7 +2273,7 @@ def _teardown_go_server(self):

@log_exceptions_and_usage
def write_logged_features(
self, logs: Union[pa.Table, Path], source: Union[FeatureService]
self, logs: Union[pa.Table, Path], source: FeatureService
):
"""
Write logs produced by a source (currently only feature service is supported as a source)
Expand Down Expand Up @@ -2302,7 +2302,7 @@ def write_logged_features(
@log_exceptions_and_usage
def validate_logged_features(
self,
source: Union[FeatureService],
source: FeatureService,
start: datetime,
end: datetime,
reference: ValidationReference,
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def to_bigquery(
job_config: bigquery.QueryJobConfig = None,
timeout: int = 1800,
retry_cadence: int = 10,
) -> Optional[str]:
) -> str:
"""
Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table.
Runs for a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).
Expand Down Expand Up @@ -567,7 +567,7 @@ def _wait_until_done(bq_job):

finally:
if client.get_job(bq_job).state in ["PENDING", "RUNNING"]:
client.cancel_job(bq_job)
client.cancel_job(bq_job.job_id)
raise BigQueryJobCancelled(job_id=bq_job.job_id)

if bq_job.exception():
Expand Down Expand Up @@ -601,6 +601,7 @@ def _upload_entity_df(
client: Client, table_name: str, entity_df: Union[pd.DataFrame, str],
) -> Table:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table"""
job: Union[bigquery.job.query.QueryJob, bigquery.job.load.LoadJob]

if isinstance(entity_df, str):
job = client.query(f"CREATE TABLE {table_name} AS ({entity_df})")
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def make_tzaware(t: datetime) -> datetime:
return t


def make_df_tzaware(t: pd.DataFrame) -> pd.DataFrame:
"""Make all datetime type columns tzaware; leave everything else intact."""
df = t.copy() # don't modify incoming dataframe inplace
for column in df.columns:
if pd.api.types.is_datetime64_any_dtype(df[column]):
df[column] = pd.to_datetime(df[column], utc=True)
return df


def to_naive_utc(ts: datetime) -> datetime:
if ts.tzinfo is None:
return ts
Expand Down
Loading

0 comments on commit d3868c5

Please sign in to comment.