Skip to content

Commit

Permalink
Fixed databricks labs ucx repair-run command to execute correctly (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
prajin-29 authored Jan 19, 2024
1 parent b32d885 commit b45fa41
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
30 changes: 26 additions & 4 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import webbrowser
from dataclasses import replace
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any

Expand Down Expand Up @@ -42,6 +42,7 @@
Unauthenticated,
Unknown,
)
from databricks.sdk.retries import retried
from databricks.sdk.service import compute, jobs
from databricks.sdk.service.jobs import RunLifeCycleState, RunResultState
from databricks.sdk.service.sql import EndpointInfoWarehouseType, SpotInstancePolicy
Expand Down Expand Up @@ -171,6 +172,7 @@ def __init__(
promtps: Prompts | None = None,
wheels: Wheels | None = None,
sql_backend: SqlBackend | None = None,
verify_timeout: timedelta | None = None,
):
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
msg = "WorkspaceInstaller is not supposed to be executed in Databricks Runtime"
Expand All @@ -189,6 +191,9 @@ def __init__(
self._this_file = Path(__file__)
self._dashboards: dict[str, str] = {}
self._install_override_clusters = None
if verify_timeout is None:
verify_timeout = timedelta(minutes=2)
self._verify_timeout = verify_timeout

def run(self):
logger.info(f"Installing UCX v{self._product_info.version()}")
Expand Down Expand Up @@ -906,6 +911,14 @@ def latest_job_status(self) -> list[dict]:
continue
return latest_status

def _get_result_state(self, job_id):
job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1))
latest_job_run = job_runs[0]
if not latest_job_run.state.result_state:
raise AttributeError("no result state in job run")
job_state = latest_job_run.state.result_state.value
return job_state

def repair_run(self, workflow):
try:
job_id = self._state.jobs.get(workflow)
Expand All @@ -917,17 +930,26 @@ def repair_run(self, workflow):
logger.warning(f"{workflow} job is not initialized yet. Can't trigger repair run now")
return
latest_job_run = job_runs[0]
state = latest_job_run.state
if state.result_state.value != "FAILED":
retry_on_attribute_error = retried(on=[AttributeError], timeout=self._verify_timeout)
retried_check = retry_on_attribute_error(self._get_result_state)
state_value = retried_check(job_id)

logger.info(f"The status for the latest run is {state_value}")

if state_value != "FAILED":
logger.warning(f"{workflow} job is not in FAILED state hence skipping Repair Run")
return
run_id = latest_job_run.run_id
run_details = self._ws.jobs.get_run(run_id=run_id, include_history=True)
latest_repair_run_id = run_details.repair_history[-1].id
job_url = f"{self._ws.config.host}#job/{job_id}/run/{run_id}"
logger.debug(f"Repair Running {workflow} job: {job_url}")
self._ws.jobs.repair_run(run_id=run_id, rerun_all_failed_tasks=True)
self._ws.jobs.repair_run(run_id=run_id, rerun_all_failed_tasks=True, latest_repair_id=latest_repair_run_id)
webbrowser.open(job_url)
except InvalidParameterValue as e:
logger.warning(f"skipping {workflow}: {e}")
except TimeoutError:
logger.warning(f"Skipping the {workflow} due to time out. Please try after sometime")

def uninstall(self):
if self._prompts and not self._prompts.confirm(
Expand Down
21 changes: 21 additions & 0 deletions tests/unit/test_install.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
from datetime import timedelta
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, create_autospec, patch
Expand Down Expand Up @@ -1179,3 +1180,23 @@ def test_repair_run_exception(ws):
install._state.jobs = {"assessment": "123"}
ws.jobs.list_runs.side_effect = InvalidParameterValue("Workflow does not exists")
install.repair_run("assessment")


def test_repair_run_result_state(ws, caplog):
base = [
BaseRun(
job_clusters=None,
job_id=677268692725050,
job_parameters=None,
number_in_job=725118654200173,
run_id=725118654200173,
run_name="[UCX] assessment",
state=RunState(result_state=None),
)
]
install = WorkspaceInstaller(ws, verify_timeout=timedelta(seconds=5))
install._state.jobs = {"assessment": "123"}
ws.jobs.list_runs.return_value = base
ws.jobs.list_runs.repair_run = None
install.repair_run("assessment")
assert "Please try after sometime" in caplog.text

0 comments on commit b45fa41

Please sign in to comment.