Skip to content

Commit

Permalink
Improve README.py and logging messages (#433)
Browse files Browse the repository at this point in the history
Fixes #425
  • Loading branch information
larsgeorge-db authored and FastLee committed Oct 25, 2023
1 parent 78e573f commit 8aec83b
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 71 deletions.
3 changes: 3 additions & 0 deletions src/databricks/labs/ucx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,6 @@ def from_dict(cls, raw: dict):

def to_workspace_client(self) -> WorkspaceClient:
return WorkspaceClient(config=self.to_databricks_config())

def replace_inventory_variable(self, text: str) -> str:
return text.replace("$inventory", f"hive_metastore.{self.inventory_database}")
70 changes: 20 additions & 50 deletions src/databricks/labs/ucx/framework/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ class Task:
dashboard: str = None


@staticmethod
def _remove_extra_indentation(doc: str) -> str:
lines = doc.splitlines()
stripped = []
for line in lines:
if line.startswith(" " * 4):
stripped.append(line[4:])
else:
stripped.append(line)
return "\n".join(stripped)


def task(workflow, *, depends_on=None, job_cluster="main", notebook: str | None = None, dashboard: str | None = None):
def decorator(func):
@wraps(func)
Expand Down Expand Up @@ -59,7 +71,7 @@ def wrapper(*args, **kwargs):
task_id=len(_TASKS),
workflow=workflow,
name=func.__name__,
doc=func.__doc__,
doc=_remove_extra_indentation(func.__doc__),
fn=func,
depends_on=deps,
job_cluster=job_cluster,
Expand All @@ -77,60 +89,18 @@ def trigger(*argv):
if "config" not in args:
msg = "no --config specified"
raise KeyError(msg)

task_name = args.get("task", "not specified")
# `{{parent_run_id}}` is the run of entire workflow, whereas `{{run_id}}` is the run of a task
workflow_run_id = args.get("parent_run_id", "unknown_run_id")
job_id = args.get("job_id")
if task_name not in _TASKS:
msg = f'task "{task_name}" not found. Valid tasks are: {", ".join(_TASKS.keys())}'
raise KeyError(msg)

current_task = _TASKS[task_name]
print(current_task.doc)

config_path = Path(args["config"])
cfg = WorkspaceConfig.from_file(config_path)

# see https://docs.python.org/3/howto/logging-cookbook.html
databricks_logger = logging.getLogger("databricks")
databricks_logger.setLevel(logging.DEBUG)

ucx_logger = logging.getLogger("databricks.labs.ucx")
ucx_logger.setLevel(logging.DEBUG)

log_path = config_path.parent / "logs" / current_task.workflow / f"run-{workflow_run_id}"
log_path.mkdir(parents=True, exist_ok=True)

log_file = log_path / f"{task_name}.log"
file_handler = logging.FileHandler(log_file.as_posix())
log_format = "%(asctime)s %(levelname)s [%(name)s] {%(threadName)s} %(message)s"
log_formatter = logging.Formatter(fmt=log_format, datefmt="%H:%M:%S")
file_handler.setFormatter(log_formatter)
file_handler.setLevel(logging.DEBUG)

console_handler = _install(cfg.log_level)
databricks_logger.removeHandler(console_handler)
databricks_logger.addHandler(file_handler)

ucx_logger.info(f"See debug logs at {log_file}")

log_readme = log_path.joinpath("README.md")
if not log_readme.exists():
# this may race when run from multiple tasks, but let's accept the risk for now.
with log_readme.open(mode="w") as f:
f.write(f"# Logs for the UCX {current_task.workflow} workflow\n")
f.write("This folder contains UCX log files.\n\n")
f.write(f"See the [{current_task.workflow} job](/#job/{job_id}) and ")
f.write(f"[run #{workflow_run_id}](/#job/{job_id}/run/{workflow_run_id})\n")

try:
current_task.fn(cfg)
except BaseException as error:
log_file_for_cli = str(log_file).lstrip("/Workspace")
cli_command = f"databricks workspace export /{log_file_for_cli}"
ucx_logger.error(f"Task crashed. Execute `{cli_command}` locally to troubleshoot with more details. {error}")
databricks_logger.debug("Task crash details", exc_info=error)
file_handler.flush()
raise
finally:
file_handler.close()
_install()

cfg = WorkspaceConfig.from_file(Path(args["config"]))
logging.getLogger("databricks").setLevel(cfg.log_level)

current_task.fn(cfg)
16 changes: 2 additions & 14 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _create_dashboards(self):
remote_folder=f"{self._install_folder}/queries",
name=self._name("UCX Assessment"),
warehouse_id=self._warehouse_id,
query_text_callback=self._replace_inventory_variable,
query_text_callback=self._current_config.replace_inventory_variable,
)
self._dashboards["assessment"] = dash.create_dashboard()

Expand Down Expand Up @@ -376,17 +376,6 @@ def _step_list(cls) -> list[str]:
step_list.append(task.workflow)
return step_list

@staticmethod
def _remove_extra_indentation(doc: str) -> str:
lines = doc.splitlines()
stripped = []
for line in lines:
if line.startswith(" " * 4):
stripped.append(line[4:])
else:
stripped.append(line)
return "\n".join(stripped)

def _create_readme(self):
md = [
"# UCX - The Unity Catalog Migration Assistant",
Expand All @@ -411,8 +400,7 @@ def _create_readme(self):
for t in self._sorted_tasks():
if t.workflow != step_name:
continue
doc = self._remove_extra_indentation(t.doc)
doc = self._replace_inventory_variable(doc)
doc = self._current_config.replace_inventory_variable(t.doc)
md.append(f"### `{t.name}`\n\n")
md.append(f"{doc}\n")
md.append("\n\n")
Expand Down
11 changes: 8 additions & 3 deletions src/databricks/labs/ucx/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def setup_schema(cfg: WorkspaceConfig):
def crawl_tables(_: WorkspaceConfig):
"""Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such
as _database name_, _table name_, _table type_, _table location_, etc., in the Delta table named
`${inventory_database}.tables`. The `inventory_database` placeholder is set in the configuration file. The metadata
`$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
cannot easily be migrated to Unity Catalog."""

Expand All @@ -47,10 +47,10 @@ def setup_tacl(_: WorkspaceConfig):

@task("assessment", depends_on=[crawl_tables, setup_tacl], job_cluster="tacl")
def crawl_grants(cfg: WorkspaceConfig):
"""Scans the previously created Delta table named `${inventory_database}.tables` and issues a `SHOW GRANTS`
"""Scans the previously created Delta table named `$inventory_database.tables` and issues a `SHOW GRANTS`
statement for every object to retrieve the permissions it has assigned to it. The permissions include information
such as the _principal_, _action type_, and the _table_ it applies to. This is persisted in the Delta table
`${inventory_database}.grants`. Other, migration related jobs use this inventory table to convert the legacy Table
`$inventory_database.grants`. Other, migration related jobs use this inventory table to convert the legacy Table
ACLs to Unity Catalog permissions.
Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table
Expand Down Expand Up @@ -126,8 +126,10 @@ def assess_pipelines(cfg: WorkspaceConfig):
"""This module scans through all the Pipelines and identifies those pipelines which has Azure Service Principals
embedded (who has been given access to the Azure storage accounts via spark configurations) in the pipeline
configurations.
It looks for:
- all the pipelines which has Azure Service Principal embedded in the pipeline configuration
Subsequently, a list of all the pipelines with matching configurations are stored in the
`$inventory.pipelines` table."""
ws = WorkspaceClient(config=cfg.to_databricks_config())
Expand All @@ -140,8 +142,10 @@ def assess_azure_service_principals(cfg: WorkspaceConfig):
"""This module scans through all the clusters configurations, cluster policies, job cluster configurations,
Pipeline configurations, Warehouse configuration and identifies all the Azure Service Principals who has been
given access to the Azure storage accounts via spark configurations referred in those entities.
It looks in:
- all those entities and prepares a list of Azure Service Principal embedded in their configurations
Subsequently, the list of all the Azure Service Principals referred in those configurations are saved
in the `$inventory.azure_service_principals` table."""
ws = WorkspaceClient(config=cfg.to_databricks_config())
Expand All @@ -153,6 +157,7 @@ def assess_azure_service_principals(cfg: WorkspaceConfig):
def assess_global_init_scripts(cfg: WorkspaceConfig):
"""This module scans through all the global init scripts and identifies if there is an Azure Service Principal
who has been given access to the Azure storage accounts via spark configurations referred in those scripts.
It looks in:
- the list of all the global init scripts are saved in the `$inventory.azure_service_principals` table."""
ws = WorkspaceClient(config=cfg.to_databricks_config())
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/assessment/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_dashboard(mocker):
remote_folder="/users/not_a_real_user/queries",
name="Assessment",
warehouse_id="000000",
query_text_callback=installer._replace_inventory_variable,
query_text_callback=installer._current_config.replace_inventory_variable,
)
dashboard = dash.create_dashboard()
assert dashboard is not None
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@ def test_create_readme(mocker):


def test_replace_pydoc(mocker):
ws = mocker.Mock()
install = WorkspaceInstaller(ws)
doc = install._remove_extra_indentation(
from databricks.labs.ucx.framework.tasks import _remove_extra_indentation

doc = _remove_extra_indentation(
"""Test1
Test2
Test3"""
Expand Down

0 comments on commit 8aec83b

Please sign in to comment.