From 95eba9d4fb8a5796a992ad177ecee290cab57335 Mon Sep 17 00:00:00 2001 From: Ricky Schools Date: Tue, 14 May 2024 13:15:38 -0400 Subject: [PATCH] Push fixes --- dev-requirements.txt | 2 ++ dltflow.yml | 10 +++++++++ dltflow/quality/config.py | 11 +++++----- dltflow/quality/dlt_meta.py | 33 ++++++++--------------------- setup.py | 4 ++++ tests/unit/cli/test_deploy.py | 2 ++ tests/unit/quality/test_dlt_meta.py | 31 ++++++++++++++++++++++++--- 7 files changed, 61 insertions(+), 32 deletions(-) create mode 100644 dltflow.yml diff --git a/dev-requirements.txt b/dev-requirements.txt index 66c316e..38fab83 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -5,3 +5,5 @@ coverage pytest-cov pytest-xdist interrogate +pandas~=2.2.2 +pyarrow>=4.0.0 diff --git a/dltflow.yml b/dltflow.yml new file mode 100644 index 0000000..f915ce3 --- /dev/null +++ b/dltflow.yml @@ -0,0 +1,10 @@ +dltflow: + name: my_project +include: + code: + name: src + conf: + name: conf + workflows: + name: workflows +targets: null diff --git a/dltflow/quality/config.py b/dltflow/quality/config.py index ce9edcf..55b2a59 100644 --- a/dltflow/quality/config.py +++ b/dltflow/quality/config.py @@ -77,6 +77,9 @@ class PydanticV2BaseModelLazy(pyd.BaseModel): A class to represent the base model for the Pydantic models. This class is a slim wrapper around the Pydantic """ + def __init__(self, *args, **kwargs): # pydantic v1 style post init. + super().__init__(*args, **kwargs) + def model_dump(self, *args, **kwargs): """Method to dump the model as a dictionary.""" return self.dict(*args, **kwargs) @@ -118,10 +121,8 @@ def __init__(self, *args, **kwargs): # pydantic v1 style post init. """ super().__init__(*args, **kwargs) - # Do things after Pydantic validation - if self.name is None and all( - [self.tablename is not None, self.database is not None] - ): + # Set name if not provided + if self.name is None and self.tablename and self.database: self.name = f"{self.database}.{self.tablename}" @@ -160,7 +161,7 @@ class TableWriteOpts(CommonWriteOpts): __pydantic_model_config__ = pyd.ConfigDict(extra="allow") table_properties: t.Optional[dict] = pyd.Field( - default_factory=lambda: TableProperties(), + default_factory=TableProperties, description="The table properties that will be added to the table.", ) path: t.Optional[str] = pyd.Field( diff --git a/dltflow/quality/dlt_meta.py b/dltflow/quality/dlt_meta.py index f060b33..5ecc36e 100644 --- a/dltflow/quality/dlt_meta.py +++ b/dltflow/quality/dlt_meta.py @@ -160,6 +160,12 @@ def _set_child_func_attributes(self): """ self._logger.info('Setting child function attributes based on execution plans.') for conf in self._execution_conf: + if conf.dlt_config.is_streaming_table: + self._logger.info('Provided `dlt` config is for a streaming table.') + print(conf.dlt_config.write_opts.model_dump(exclude_none=True)) + dlt.create_streaming_table( + **conf.dlt_config.write_opts.model_dump(exclude_none=True), + ) self._logger.debug(f'Setting child function attributes for {conf.child_func_name}') setattr( self, conf.child_func_name, self.apply_dlt(conf.child_func_obj, conf) @@ -413,11 +419,6 @@ def streaming_table_expectation_wrapper(child_function, execution_config): execution_config.dlt_config.expectation_function.__name__: execution_config.dlt_config.dlt_expectations } - dlt.create_streaming_table( - **execution_config.dlt_config.write_opts.model_dump(exclude_none=True), - **extra, - ) - if execution_config.dlt_config.apply_chg_config: dlt.apply_changes( **execution_config.dlt_config.apply_chg_config.model_dump( @@ -483,25 +484,9 @@ def wrapper(*args, **kwargs): self._logger.info(f"Entering wrapped method or {child_function}") if not execution_config.dlt_config.is_streaming_table: - if execution_config.dlt_config.dlt_expectations: - self._logger.debug( - f'Expectations provided. Applying DLT expectations to {child_function.__name__}.') - return execution_config.dlt_config.expectation_function( - execution_config.dlt_config.dlt_expectations - )( - execution_config.table_or_view_func( - child_function, - **execution_config.dlt_config.write_opts.model_dump(exclude_none=True), - ) - - ) - else: - self._logger.debug( - f'Expectations not provided. Applying DLT expectations to {child_function.__name__}.') - return execution_config.table_or_view_func( - child_function, - **execution_config.dlt_config.write_opts.model_dump(exclude_none=True), - ) + return self.table_view_expectation_wrapper( + child_function, execution_config + ) elif execution_config.dlt_config.is_streaming_table: return self.streaming_table_expectation_wrapper( child_function, execution_config diff --git a/setup.py b/setup.py index 447590b..66b4380 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,11 @@ from setuptools import setup, find_packages +with open("requirements.txt") as f: + requirements = f.read().splitlines() + setup( name="dltflow", version="0.0.1", packages=find_packages(), + install_requires=requirements, ) diff --git a/tests/unit/cli/test_deploy.py b/tests/unit/cli/test_deploy.py index b0f44a7..7aa2b8d 100644 --- a/tests/unit/cli/test_deploy.py +++ b/tests/unit/cli/test_deploy.py @@ -61,6 +61,7 @@ def test_upload_py_package_to_workspace(tmp_dir): ws_client=mock_workspace, dbfs_path=pathlib.Path("/dbfs_path"), package_path=tmp_dir.absolute(), + config_path=tmp_dir.joinpath('conf').absolute(), profile="test", files=["file1.xyz", "file2.xyz"], ) @@ -119,6 +120,7 @@ def test_deploy_py_module_as_notebook(): workflow_name="workflow_name", dbfs_path=pathlib.Path("/dbfs_path"), package_path=pathlib.Path("data_platform/python/dltflow/"), + config_path=pathlib.Path("data_platform/python/config/"), items=[ {"python_file": tmp_file_one, "config_path": "item1"}, {"python_file": tmp_file_two, "config_path": "item2"}, diff --git a/tests/unit/quality/test_dlt_meta.py b/tests/unit/quality/test_dlt_meta.py index 692a695..b47a17c 100644 --- a/tests/unit/quality/test_dlt_meta.py +++ b/tests/unit/quality/test_dlt_meta.py @@ -26,6 +26,7 @@ from dltflow.quality.config import ( AppendFlowConfig, ApplyChangesConfig, + TableWriteOpts, DLTConfig, DLTConfigs, ) @@ -49,6 +50,26 @@ def pipeline_config(): ] } +@pytest.fixture +def streaming_pipeline_config(): + """A fixture for the streaming pipeline configuration.""" + return { + "dlt": [ + { + "func_name": "orchestrate", + "kind": "table", + "expectations": [ + {"name": "check_addition", "constraint": "result < 10"} + ], + "expectation_action": "allow", + "is_streaming_table": True, + "write_opts": { + "name": "test", + } + } + ] + } + class MyPipeline(DLTMetaMixin): # pragma: no cover """A dummy pipeline class.""" @@ -170,6 +191,7 @@ def test_dlt_calls_streaming_table_append_flow( pipeline_config["dlt"][0]["append_config"] = AppendFlowConfig( target="dummy" ).model_dump() + pipeline_config['dlt'][0]['write_opts'] = TableWriteOpts(name='test').model_dump() pipeline_instance = MyPipeline(init_conf=pipeline_config) with patch("dltflow.quality.dlt_meta.dlt.create_streaming_table") as mock_expect: @@ -196,13 +218,16 @@ def test_dlt_calls_streaming_table_apply_changes( pipeline_config["dlt"][0]["apply_chg_config"] = ApplyChangesConfig( target="dummy", source="source", keys=["x", "y"] ).model_dump() - pipeline_instance = MyPipeline(init_conf=pipeline_config) + pipeline_config['dlt'][0]['write_opts'] = TableWriteOpts(name='test').model_dump() + + with patch("dltflow.quality.dlt_meta.dlt.create_streaming_table") as mock_streaming_table: + pipeline_instance = MyPipeline(init_conf=pipeline_config) + assert mock_streaming_table.call_count == 1 - with patch("dltflow.quality.dlt_meta.dlt.create_streaming_table") as mock_expect: with patch("dltflow.quality.dlt_meta.dlt.apply_changes") as mock_apply_changes: out_df = pipeline_instance.orchestrate() mock_apply_changes.assert_called() - mock_expect.assert_called() + def test_dlt_calls_streaming_fails(