Skip to content

Commit

Permalink
Update deploy.py
Browse files Browse the repository at this point in the history
  • Loading branch information
rickyschools committed May 8, 2024
1 parent c71310d commit c51d531
Showing 1 changed file with 44 additions and 35 deletions.
79 changes: 44 additions & 35 deletions dltflow/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import sys
import yaml
import pathlib
sys.path.append('{package_namespace}')
sys.path.append('/dbfs{package_namespace}')
"""

_CODE_IMPORT_AND_LAUNCH_CELLS = """
Expand All @@ -67,12 +67,13 @@


def upload_py_package_to_workspace(
run_id: str,
dbfs_path: pathlib.Path,
package_path: pathlib.Path,
files: list,
profile: str,
ws_client: WorkspaceClient = None,
run_id: str,
dbfs_path: pathlib.Path,
package_path: pathlib.Path,
config_path: pathlib.Path,
files: list,
profile: str,
ws_client: WorkspaceClient = None,
):
"""
Upload a python package to the Databricks workspace. The logic being leveraged by
Expand Down Expand Up @@ -108,13 +109,18 @@ def upload_py_package_to_workspace(
dbx_echo(f"Upload directory: [yellow]`{upload_dbfs_path}`")
ws_client.dbfs.mkdirs(str(upload_dbfs_path))

for file in files:
file = pathlib.Path(file).absolute()
ws_client.dbfs.copy(
f"file://{file}/",
str(upload_dbfs_path),
recursive=True,
)
ws_client.dbfs.copy(f"file://{package_path}/", str(upload_dbfs_path), recursive=True)
ws_client.dbfs.copy(f'file://{config_path}/', str(upload_dbfs_path), recursive=True)

# for file in files:
# file = pathlib.Path(file).absolute()
# parent_dir = file.parent
# ws_client.dbfs.mkdirs(str(parent_dir))
# ws_client.dbfs.copy(
# f"file://{file}/",
# str(upload_dbfs_path),
# recursive=True,
# )
dbx_echo(f"📦📤 [green]Upload complete!")
return "/dbfs" + str(upload_dbfs_path)

Expand Down Expand Up @@ -160,10 +166,10 @@ def get_module_info_from_path_name(project_name, project_path, module_path: str)


def create_notebook_and_upload(
ws_client: WorkspaceClient,
dbfs_path: pathlib.Path,
notebook_name: str,
notebook_src: str,
ws_client: WorkspaceClient,
dbfs_path: pathlib.Path,
notebook_name: str,
notebook_src: str,
):
"""
This function creates a DLT pipeline notebook and uploads it to the Databricks workspace.
Expand Down Expand Up @@ -206,15 +212,16 @@ def create_notebook_and_upload(


def deploy_py_module_as_notebook(
run_id: str,
ws_client: WorkspaceClient,
project_name: str,
workflow_name: str,
dbfs_path: pathlib.Path,
package_path: pathlib.Path,
items: list,
dependencies: list = None,
launch_method: str = "launch",
run_id: str,
ws_client: WorkspaceClient,
project_name: str,
workflow_name: str,
dbfs_path: pathlib.Path,
package_path: pathlib.Path,
config_path: pathlib.Path,
items: list,
dependencies: list = None,
launch_method: str = "launch",
):
"""
This function deploys a python module as a DLT pipeline to the Databricks workspace.
Expand Down Expand Up @@ -253,7 +260,7 @@ def deploy_py_module_as_notebook(
)

for item in items:
cfg = f'config_path = "{str(package_namespace.joinpath(item.get("config_path")))}"'
cfg = f'config_path = "/dbfs{str(package_namespace.joinpath(item.get("config_path")))}"'
class_name, import_path = get_module_info_from_path_name(
project_name, package_path, item.get("python_file")
)
Expand All @@ -268,6 +275,7 @@ def deploy_py_module_as_notebook(
run_id=run_id,
dbfs_path=dbfs_path,
package_path=package_path,
config_path=config_path,
files=item.values(),
profile=ws_client.config.profile,
ws_client=ws_client,
Expand All @@ -287,11 +295,11 @@ def deploy_py_module_as_notebook(


def register_notebook_as_dlt_pipeline(
ws_client: WorkspaceClient,
notebook_path: str,
pipeline_payload: DLTFlowPipeline,
project_conf,
as_individual: bool,
ws_client: WorkspaceClient,
notebook_path: str,
pipeline_payload: DLTFlowPipeline,
project_conf,
as_individual: bool,
): # pragma: no cover
"""
Register a notebook as a DLT pipeline in the Databricks workspace.
Expand Down Expand Up @@ -329,7 +337,7 @@ def register_notebook_as_dlt_pipeline(


def handle_deployment_per_file_config_dependency(
workflow_name, items, dependencies, deployment_config, profile, as_individual: bool
workflow_name, items, dependencies, deployment_config, profile, as_individual: bool
):
"""
A helper function to handle the deployment of a python module as a DLT pipeline.
Expand All @@ -353,7 +361,7 @@ def handle_deployment_per_file_config_dependency(
config: ProjectConfig = get_config()
project_path = pathlib.Path(".")
_code_path = project_path.joinpath(config.include.code.name).resolve()
_cfg_path = project_path.joinpath(config.include.conf.name)
_cfg_path = project_path.joinpath(config.include.conf.name).resolve()
assert profile in config.targets, f"Profile {profile} not found in config file."
_target_config = config.targets.get(profile)
_dbfs_path = pathlib.Path(config.targets.get(profile).workspace.root_path)
Expand All @@ -367,6 +375,7 @@ def handle_deployment_per_file_config_dependency(
workflow_name=workflow_name,
dbfs_path=_dbfs_path,
package_path=_code_path,
config_path=_cfg_path,
items=items,
dependencies=dependencies,
launch_method=config.include.code.dict().get("launch_method", "launch"),
Expand Down

0 comments on commit c51d531

Please sign in to comment.