Skip to content

Commit

Permalink
Update dlt_meta.py
Browse files Browse the repository at this point in the history
  • Loading branch information
rickyschools committed May 9, 2024
1 parent 62ddcf5 commit f730cc8
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions dltflow/quality/dlt_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def _enforce_delta_limitations_and_requirements(self, func_name: str) -> t.Calla

return user_func

def table_view_expectation_wrapper(self, child_function, execution_config):
def table_view_expectation_wrapper(self, child_function, execution_config, *args, *kwargs):
"""
This method is the "magic" that dynamically and automatically wraps the user function with DLT expectations.
Expand Down Expand Up @@ -483,9 +483,25 @@ def wrapper(*args, **kwargs):
self._logger.info(f"Entering wrapped method or {child_function}")

if not execution_config.dlt_config.is_streaming_table:
return self.table_view_expectation_wrapper(
child_function, execution_config
)
if execution_config.dlt_config.dlt_expectations:
self._logger.debug(
f'Expectations provided. Applying DLT expectations to {child_function.__name__}.')
return execution_config.dlt_config.expectation_function(
execution_config.dlt_config.dlt_expectations
)(
execution_config.table_or_view_func(
child_function,
**execution_config.dlt_config.write_opts.model_dump(exclude_none=True),
)

)
else:
self._logger.debug(
f'Expectations not provided. Applying DLT expectations to {child_function.__name__}.')
return execution_config.table_or_view_func(
child_function,
**execution_config.dlt_config.write_opts.model_dump(exclude_none=True),
)
elif execution_config.dlt_config.is_streaming_table:
return self.streaming_table_expectation_wrapper(
child_function, execution_config
Expand Down

0 comments on commit f730cc8

Please sign in to comment.