Skip to content

Commit

Permalink
Push fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rickyschools committed May 14, 2024
1 parent df59de7 commit 95eba9d
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 32 deletions.
2 changes: 2 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ coverage
pytest-cov
pytest-xdist
interrogate
pandas~=2.2.2
pyarrow>=4.0.0
10 changes: 10 additions & 0 deletions dltflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
dltflow:
name: my_project
include:
code:
name: src
conf:
name: conf
workflows:
name: workflows
targets: null
11 changes: 6 additions & 5 deletions dltflow/quality/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class PydanticV2BaseModelLazy(pyd.BaseModel):
A class to represent the base model for the Pydantic models. This class is a slim wrapper around the Pydantic
"""

def __init__(self, *args, **kwargs): # pydantic v1 style post init.
super().__init__(*args, **kwargs)

def model_dump(self, *args, **kwargs):
"""Method to dump the model as a dictionary."""
return self.dict(*args, **kwargs)
Expand Down Expand Up @@ -118,10 +121,8 @@ def __init__(self, *args, **kwargs): # pydantic v1 style post init.
"""
super().__init__(*args, **kwargs)

# Do things after Pydantic validation
if self.name is None and all(
[self.tablename is not None, self.database is not None]
):
# Set name if not provided
if self.name is None and self.tablename and self.database:
self.name = f"{self.database}.{self.tablename}"


Expand Down Expand Up @@ -160,7 +161,7 @@ class TableWriteOpts(CommonWriteOpts):

__pydantic_model_config__ = pyd.ConfigDict(extra="allow")
table_properties: t.Optional[dict] = pyd.Field(
default_factory=lambda: TableProperties(),
default_factory=TableProperties,
description="The table properties that will be added to the table.",
)
path: t.Optional[str] = pyd.Field(
Expand Down
33 changes: 9 additions & 24 deletions dltflow/quality/dlt_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ def _set_child_func_attributes(self):
"""
self._logger.info('Setting child function attributes based on execution plans.')
for conf in self._execution_conf:
if conf.dlt_config.is_streaming_table:
self._logger.info('Provided `dlt` config is for a streaming table.')
print(conf.dlt_config.write_opts.model_dump(exclude_none=True))
dlt.create_streaming_table(
**conf.dlt_config.write_opts.model_dump(exclude_none=True),
)
self._logger.debug(f'Setting child function attributes for {conf.child_func_name}')
setattr(
self, conf.child_func_name, self.apply_dlt(conf.child_func_obj, conf)
Expand Down Expand Up @@ -413,11 +419,6 @@ def streaming_table_expectation_wrapper(child_function, execution_config):
execution_config.dlt_config.expectation_function.__name__: execution_config.dlt_config.dlt_expectations
}

dlt.create_streaming_table(
**execution_config.dlt_config.write_opts.model_dump(exclude_none=True),
**extra,
)

if execution_config.dlt_config.apply_chg_config:
dlt.apply_changes(
**execution_config.dlt_config.apply_chg_config.model_dump(
Expand Down Expand Up @@ -483,25 +484,9 @@ def wrapper(*args, **kwargs):
self._logger.info(f"Entering wrapped method or {child_function}")

if not execution_config.dlt_config.is_streaming_table:
if execution_config.dlt_config.dlt_expectations:
self._logger.debug(
f'Expectations provided. Applying DLT expectations to {child_function.__name__}.')
return execution_config.dlt_config.expectation_function(
execution_config.dlt_config.dlt_expectations
)(
execution_config.table_or_view_func(
child_function,
**execution_config.dlt_config.write_opts.model_dump(exclude_none=True),
)

)
else:
self._logger.debug(
f'Expectations not provided. Applying DLT expectations to {child_function.__name__}.')
return execution_config.table_or_view_func(
child_function,
**execution_config.dlt_config.write_opts.model_dump(exclude_none=True),
)
return self.table_view_expectation_wrapper(
child_function, execution_config
)
elif execution_config.dlt_config.is_streaming_table:
return self.streaming_table_expectation_wrapper(
child_function, execution_config
Expand Down
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from setuptools import setup, find_packages

with open("requirements.txt") as f:
requirements = f.read().splitlines()

setup(
name="dltflow",
version="0.0.1",
packages=find_packages(),
install_requires=requirements,
)
2 changes: 2 additions & 0 deletions tests/unit/cli/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def test_upload_py_package_to_workspace(tmp_dir):
ws_client=mock_workspace,
dbfs_path=pathlib.Path("/dbfs_path"),
package_path=tmp_dir.absolute(),
config_path=tmp_dir.joinpath('conf').absolute(),
profile="test",
files=["file1.xyz", "file2.xyz"],
)
Expand Down Expand Up @@ -119,6 +120,7 @@ def test_deploy_py_module_as_notebook():
workflow_name="workflow_name",
dbfs_path=pathlib.Path("/dbfs_path"),
package_path=pathlib.Path("data_platform/python/dltflow/"),
config_path=pathlib.Path("data_platform/python/config/"),
items=[
{"python_file": tmp_file_one, "config_path": "item1"},
{"python_file": tmp_file_two, "config_path": "item2"},
Expand Down
31 changes: 28 additions & 3 deletions tests/unit/quality/test_dlt_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dltflow.quality.config import (
AppendFlowConfig,
ApplyChangesConfig,
TableWriteOpts,
DLTConfig,
DLTConfigs,
)
Expand All @@ -49,6 +50,26 @@ def pipeline_config():
]
}

@pytest.fixture
def streaming_pipeline_config():
"""A fixture for the streaming pipeline configuration."""
return {
"dlt": [
{
"func_name": "orchestrate",
"kind": "table",
"expectations": [
{"name": "check_addition", "constraint": "result < 10"}
],
"expectation_action": "allow",
"is_streaming_table": True,
"write_opts": {
"name": "test",
}
}
]
}


class MyPipeline(DLTMetaMixin): # pragma: no cover
"""A dummy pipeline class."""
Expand Down Expand Up @@ -170,6 +191,7 @@ def test_dlt_calls_streaming_table_append_flow(
pipeline_config["dlt"][0]["append_config"] = AppendFlowConfig(
target="dummy"
).model_dump()
pipeline_config['dlt'][0]['write_opts'] = TableWriteOpts(name='test').model_dump()
pipeline_instance = MyPipeline(init_conf=pipeline_config)

with patch("dltflow.quality.dlt_meta.dlt.create_streaming_table") as mock_expect:
Expand All @@ -196,13 +218,16 @@ def test_dlt_calls_streaming_table_apply_changes(
pipeline_config["dlt"][0]["apply_chg_config"] = ApplyChangesConfig(
target="dummy", source="source", keys=["x", "y"]
).model_dump()
pipeline_instance = MyPipeline(init_conf=pipeline_config)
pipeline_config['dlt'][0]['write_opts'] = TableWriteOpts(name='test').model_dump()

with patch("dltflow.quality.dlt_meta.dlt.create_streaming_table") as mock_streaming_table:
pipeline_instance = MyPipeline(init_conf=pipeline_config)
assert mock_streaming_table.call_count == 1

with patch("dltflow.quality.dlt_meta.dlt.create_streaming_table") as mock_expect:
with patch("dltflow.quality.dlt_meta.dlt.apply_changes") as mock_apply_changes:
out_df = pipeline_instance.orchestrate()
mock_apply_changes.assert_called()
mock_expect.assert_called()



def test_dlt_calls_streaming_fails(
Expand Down

0 comments on commit 95eba9d

Please sign in to comment.