Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ability to install UCX on workspaces without Public Internet connectivity #1566

Merged
merged 10 commits into from
May 27, 2024
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ classifiers = [

dependencies = ["databricks-sdk>=0.27,<0.29",
"databricks-labs-lsql~=0.4.0",
"databricks-labs-blueprint>=0.4.3,<0.7.0",
"databricks-labs-blueprint>=0.6.0",
"PyYAML>=6.0.0,<7.0.0",
"sqlglot>=23.9,<24.1"]

Expand Down
3 changes: 3 additions & 0 deletions src/databricks/labs/ucx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class WorkspaceConfig: # pylint: disable=too-many-instance-attributes
# Threshold for row count comparison during data reconciliation, in percentage
recon_tolerance_percent: int = 5

# Whether to upload dependent libraries to the workspace
upload_dependencies: bool = False

# [INTERNAL ONLY] Whether the assessment should capture only specific object permissions.
include_object_permissions: list[str] | None = None

Expand Down
4 changes: 4 additions & 0 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ def _prompt_for_new_installation(self) -> WorkspaceConfig:
configure_groups = ConfigureGroups(self.prompts)
configure_groups.run()
include_databases = self._select_databases()
upload_dependencies = self.prompts.confirm(
f"Does given workspace {self.workspace_client.get_workspace_id()} " f"block Internet access?"
)
trigger_job = self.prompts.confirm("Do you want to trigger assessment job after installation?")
recon_tolerance_percent = int(
self.prompts.question("Reconciliation threshold, in percentage", default="5", valid_number=True)
Expand All @@ -233,6 +236,7 @@ def _prompt_for_new_installation(self) -> WorkspaceConfig:
include_databases=include_databases,
trigger_job=trigger_job,
recon_tolerance_percent=recon_tolerance_percent,
upload_dependencies=upload_dependencies,
)

def _compare_remote_local_versions(self):
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/installer/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _get_instance_pool_id(self) -> str | None:
return None

def _definition(self, conf: dict, instance_profile: str | None, instance_pool_id: str | None) -> str:
latest_lts_dbr = self._ws.clusters.select_spark_version(latest=True, long_term_support=True)
latest_lts_dbr = self._ws.clusters.select_spark_version(latest=True)
node_type_id = self._ws.clusters.select_node_type(local_disk=True, min_memory_gb=16)
policy_definition = {
"spark_version": self._policy_config(latest_lts_dbr),
Expand Down
62 changes: 40 additions & 22 deletions src/databricks/labs/ucx/installer/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@

# COMMAND ----------

# MAGIC %pip install /Workspace{remote_wheel}
# MAGIC %pip install {remote_wheel}
dbutils.library.restartPython()

# COMMAND ----------
Expand All @@ -92,7 +92,7 @@
"""

TEST_RUNNER_NOTEBOOK = """# Databricks notebook source
# MAGIC %pip install /Workspace{remote_wheel}
# MAGIC %pip install {remote_wheel}
dbutils.library.restartPython()

# COMMAND ----------
Expand Down Expand Up @@ -403,14 +403,14 @@ def __init__( # pylint: disable=too-many-arguments
super().__init__(config, installation, ws)

def create_jobs(self):
remote_wheel = self._upload_wheel()
remote_wheels = self._upload_wheel()
desired_workflows = {t.workflow for t in self._tasks if t.cloud_compatible(self._ws.config)}
wheel_runner = None

if self._config.override_clusters:
wheel_runner = self._upload_wheel_runner(remote_wheel)
wheel_runner = self._upload_wheel_runner(remote_wheels)
for workflow_name in desired_workflows:
settings = self._job_settings(workflow_name, remote_wheel)
settings = self._job_settings(workflow_name, remote_wheels)
if self._config.override_clusters:
settings = self._apply_cluster_overrides(
workflow_name,
Expand All @@ -430,7 +430,7 @@ def create_jobs(self):
continue

self._install_state.save()
self._create_debug(remote_wheel)
self._create_debug(remote_wheels)
return self._create_readme()

@property
Expand Down Expand Up @@ -529,13 +529,30 @@ def _deploy_workflow(self, step_name: str, settings):
self._install_state.jobs[step_name] = str(new_job.job_id)
return None

@staticmethod
def _library_dep_order(library: str):
match library:
case library if 'sdk' in library:
return 0
case library if 'blueprint' in library:
return 1
case _:
return 2

def _upload_wheel(self):
wheel_paths = []
with self._wheels:
return self._wheels.upload_to_wsfs()

def _upload_wheel_runner(self, remote_wheel: str):
if self._config.upload_dependencies:
wheel_paths = self._wheels.upload_wheel_dependencies(["databricks", "sqlglot"])
wheel_paths.sort(key=WorkflowsDeployment._library_dep_order)
wheel_paths.append(self._wheels.upload_to_wsfs())
wheel_paths = [f"/Workspace{wheel}" for wheel in wheel_paths]
return wheel_paths
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved

def _upload_wheel_runner(self, remote_wheels: list[str]):
# TODO: we have to be doing this workaround until ES-897453 is solved in the platform
code = TEST_RUNNER_NOTEBOOK.format(remote_wheel=remote_wheel, config_file=self._config_file).encode("utf8")
remote_wheels_str = " ".join(remote_wheels)
code = TEST_RUNNER_NOTEBOOK.format(remote_wheel=remote_wheels_str, config_file=self._config_file).encode("utf8")
return self._installation.upload(f"wheels/wheel-test-runner-{self._product_info.version()}.py", code)

@staticmethod
Expand All @@ -559,8 +576,7 @@ def _apply_cluster_overrides(
job_task.notebook_task = jobs.NotebookTask(notebook_path=wheel_runner, base_parameters=widget_values)
return settings

def _job_settings(self, step_name: str, remote_wheel: str):

def _job_settings(self, step_name: str, remote_wheels: list[str]) -> dict[str, Any]:
email_notifications = None
if not self._config.override_clusters and "@" in self._my_username:
# set email notifications only if we're running the real
Expand All @@ -577,8 +593,8 @@ def _job_settings(self, step_name: str, remote_wheel: str):
if self._skip_dashboards and task.dashboard:
continue
job_clusters.add(task.job_cluster)
job_tasks.append(self._job_task(task, remote_wheel))
job_tasks.append(self._job_parse_logs_task(job_tasks, step_name, remote_wheel))
job_tasks.append(self._job_task(task, remote_wheels))
job_tasks.append(self._job_parse_logs_task(job_tasks, step_name, remote_wheels))
version = self._product_info.version()
version = version if not self._ws.config.is_gcp else version.replace("+", "-")
tags = {"version": f"v{version}"}
Expand All @@ -594,7 +610,7 @@ def _job_settings(self, step_name: str, remote_wheel: str):
"tasks": job_tasks,
}

def _job_task(self, task: Task, remote_wheel: str) -> jobs.Task:
def _job_task(self, task: Task, remote_wheels: list[str]) -> jobs.Task:
jobs_task = jobs.Task(
task_key=task.name,
job_cluster_key=task.job_cluster,
Expand All @@ -607,7 +623,7 @@ def _job_task(self, task: Task, remote_wheel: str) -> jobs.Task:
return retried_job_dashboard_task(jobs_task, task)
if task.notebook:
return self._job_notebook_task(jobs_task, task)
return self._job_wheel_task(jobs_task, task.workflow, remote_wheel)
return self._job_wheel_task(jobs_task, task.workflow, remote_wheels)

def _job_dashboard_task(self, jobs_task: jobs.Task, task: Task) -> jobs.Task:
assert task.dashboard is not None
Expand Down Expand Up @@ -639,8 +655,10 @@ def _job_notebook_task(self, jobs_task: jobs.Task, task: Task) -> jobs.Task:
),
)

def _job_wheel_task(self, jobs_task: jobs.Task, workflow: str, remote_wheel: str) -> jobs.Task:
libraries = [compute.Library(whl=f"/Workspace{remote_wheel}")]
def _job_wheel_task(self, jobs_task: jobs.Task, workflow: str, remote_wheels: list[str]) -> jobs.Task:
libraries = []
for wheel in remote_wheels:
libraries.append(compute.Library(whl=wheel))
named_parameters = {
"config": f"/Workspace{self._config_file}",
"workflow": workflow,
Expand Down Expand Up @@ -701,24 +719,24 @@ def _job_clusters(self, names: set[str]):
)
return clusters

def _job_parse_logs_task(self, job_tasks: list[jobs.Task], workflow: str, remote_wheel: str) -> jobs.Task:
def _job_parse_logs_task(self, job_tasks: list[jobs.Task], workflow: str, remote_wheels: list[str]) -> jobs.Task:
jobs_task = jobs.Task(
task_key="parse_logs",
job_cluster_key=Task.job_cluster,
# The task dependents on all previous tasks.
depends_on=[jobs.TaskDependency(task_key=task.task_key) for task in job_tasks],
run_if=jobs.RunIf.ALL_DONE,
)
return self._job_wheel_task(jobs_task, workflow, remote_wheel)
return self._job_wheel_task(jobs_task, workflow, remote_wheels)

def _create_debug(self, remote_wheel: str):
def _create_debug(self, remote_wheels: list[str]):
readme_link = self._installation.workspace_link('README')
job_links = ", ".join(
f"[{self._name(step_name)}]({self._ws.config.host}#job/{job_id})"
for step_name, job_id in self._install_state.jobs.items()
)
content = DEBUG_NOTEBOOK.format(
remote_wheel=remote_wheel, readme_link=readme_link, job_links=job_links, config_file=self._config_file
remote_wheel=remote_wheels, readme_link=readme_link, job_links=job_links, config_file=self._config_file
).encode("utf8")
self._installation.upload('DEBUG.py', content)

Expand Down
14 changes: 13 additions & 1 deletion tests/integration/test_installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def test_job_cluster_policy(ws, installation_ctx):

assert cluster_policy.name == f"Unity Catalog Migration ({installation_ctx.inventory_database}) ({user_name})"

spark_version = ws.clusters.select_spark_version(latest=True, long_term_support=True)
spark_version = ws.clusters.select_spark_version(latest=True)
assert policy_definition["spark_version"]["value"] == spark_version
assert policy_definition["node_type_id"]["value"] == ws.clusters.select_node_type(local_disk=True, min_memory_gb=16)
if ws.config.is_azure:
Expand Down Expand Up @@ -471,3 +471,15 @@ def test_new_collection(ws, sql_backend, installation_ctx, env_or_skip):
config = installation_ctx.installation.load(WorkspaceConfig)
workspace_id = installation_ctx.workspace_installer.workspace_client.get_workspace_id()
assert config.installed_workspace_ids == [workspace_id]


def test_installation_with_dependency_upload(ws, installation_ctx, mocker):
config = dataclasses.replace(installation_ctx.config, upload_dependencies=True)
installation_ctx = installation_ctx.replace(config=config)
mocker.patch("webbrowser.open")
installation_ctx.workspace_installation.run()
with pytest.raises(ManyError):
installation_ctx.deployed_workflows.run_workflow("failing")

installation_ctx.deployed_workflows.repair_run("failing")
assert installation_ctx.deployed_workflows.validate_step("failing")
27 changes: 27 additions & 0 deletions tests/unit/test_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -1898,3 +1898,30 @@ def test_save_config_ext_hms(ws, mock_installation):
'recon_tolerance_percent': 5,
},
)


def test_upload_dependencies(ws, mock_installation):
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
prompts = MockPrompts(
{
r".*": "",
r"Choose how to map the workspace groups.*": "0",
r".*PRO or SERVERLESS SQL warehouse.*": "1",
r".*Does given workspace.* block Internet access.*": "Yes",
}
)
wheels = create_autospec(WheelsV2)
wheels.upload_wheel_dependencies.return_value = [
'databricks_labs_blueprint-0.6.2-py3-none-any.whl',
'databricks_sdk-0.28.0-py3-none-any.whl',
'databricks_labs_ucx-0.23.2+4920240527095658-py3-none-any.whl',
]
workspace_installation = WorkspaceInstaller(ws).replace(
prompts=prompts,
installation=mock_installation,
product_info=PRODUCT_INFO,
sql_backend=MockBackend(),
wheels=wheels,
)
workspace_installation.run()
wheels.upload_wheel_dependencies.assert_called_once()
wheels.upload_to_wsfs.assert_called_once()
Loading