Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed databricks labs ucx repair-run command to execute correctly #801

Merged
merged 11 commits into from
Jan 19, 2024
19 changes: 18 additions & 1 deletion src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,8 @@ def latest_job_status(self) -> list[dict]:

def repair_run(self, workflow):
try:
start_time = time.time()
timeout = 20
job_id = self._state.jobs.get(workflow)
if not job_id:
logger.warning(f"{workflow} job does not exists hence skipping Repair Run")
Expand All @@ -893,13 +895,28 @@ def repair_run(self, workflow):
return
latest_job_run = job_runs[0]
state = latest_job_run.state

while not state.result_state and (time.time() - start_time < timeout):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor this into private method and decode with retried

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nfx .Refactored the same with retried logic.

logger.info("Waiting for the result_state to update the state")
time.sleep(10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not unit testable, see how we use retried() decorator in workspace access package (dbsql permissions, secrets acls, etc).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nfx .Updated the code with retried logic.

job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1))
latest_job_run = job_runs[0]
state = latest_job_run.state

if not state.result_state:
logger.warning(f"{workflow} job result state is not updated.Please try after some time")
return
logger.info(f"The status for the latest run is {state.result_state.value}")

if state.result_state.value != "FAILED":
logger.warning(f"{workflow} job is not in FAILED state hence skipping Repair Run")
return
run_id = latest_job_run.run_id
run_details = self._ws.jobs.get_run(run_id=run_id, include_history=True)
latest_repair_run_id = run_details.repair_history[-1].id
job_url = f"{self._ws.config.host}#job/{job_id}/run/{run_id}"
logger.debug(f"Repair Running {workflow} job: {job_url}")
self._ws.jobs.repair_run(run_id=run_id, rerun_all_failed_tasks=True)
self._ws.jobs.repair_run(run_id=run_id, rerun_all_failed_tasks=True, latest_repair_id=latest_repair_run_id)
webbrowser.open(job_url)
except InvalidParameterValue as e:
logger.warning(f"skipping {workflow}: {e}")
Expand Down
19 changes: 19 additions & 0 deletions tests/unit/test_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -1179,3 +1179,22 @@ def test_repair_run_exception(ws):
install._state.jobs = {"assessment": "123"}
ws.jobs.list_runs.side_effect = InvalidParameterValue("Workflow does not exists")
install.repair_run("assessment")


def test_repair_run_result_state(ws, caplog):
base = [
BaseRun(
job_clusters=None,
job_id=677268692725050,
job_parameters=None,
number_in_job=725118654200173,
run_id=725118654200173,
run_name="[UCX] assessment",
state=RunState(result_state=None),
)
]
install = WorkspaceInstaller(ws)
install._state.jobs = {"assessment": "123"}
ws.jobs.list_runs.return_value = base
ws.jobs.list_runs.repair_run = None
install.repair_run("assessment")
Loading