Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed redundant pyspark, databricks-connect, delta-spark, and pandas dependencies #193

Merged
merged 3 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@ dependencies = [

# TODO: remove later
"typer[all]>=0.9.0,<0.10.0",
"pandas>=2.0.3,<3.0.0",
"ratelimit>=2.2.1,<3.0.0",
"tenacity>=8.2.2,<9.0.0",
]

[project.optional-dependencies]
dbconnect = [
"databricks-connect>=13.2.0,<=14.0.0"
]
test = [
"coverage[toml]>=6.5",
"pytest",
Expand All @@ -62,9 +58,7 @@ path = "src/databricks/labs/ucx/__about__.py"

[tool.hatch.envs.unit]
dependencies = [
"databricks-labs-ucx[test]",
"pyspark>=3.4.0,<=3.5.0",
"delta-spark>=2.4.0,<3.0.0"
"databricks-labs-ucx[test]"
]

[tool.hatch.envs.unit.scripts]
Expand All @@ -74,8 +68,6 @@ test-cov-report = "pytest --cov src tests/unit --cov-report=html"
[tool.hatch.envs.integration]
dependencies = [
"databricks-labs-ucx[test]",
"databricks-labs-ucx[dbconnect]",
"delta-spark>=2.4.0,<3.0.0"
]

[tool.hatch.envs.integration.scripts]
Expand Down Expand Up @@ -108,10 +100,6 @@ profile = "black"

[tool.pytest.ini_options]
addopts = "-s -p no:warnings -vv --cache-clear"
filterwarnings = [
"ignore:::.*pyspark.broadcast*",
"ignore:::.*pyspark.sql.pandas.utils*"
]

[tool.black]
target-version = ["py310"]
Expand Down
7 changes: 2 additions & 5 deletions src/databricks/labs/ucx/inventory/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from databricks.labs.ucx.inventory.permissions_inventory import (
PermissionsInventoryTable,
)
from databricks.labs.ucx.inventory.types import PermissionsInventoryItem
from databricks.labs.ucx.providers.groups_info import GroupMigrationState
from databricks.labs.ucx.support.impl import SupportsProvider
from databricks.labs.ucx.utils import ThreadedExecution
Expand All @@ -28,8 +27,7 @@ def inventorize_permissions(self):
crawler_tasks = list(self._supports_provider.get_crawler_tasks())
logger.info(f"Total crawler tasks: {len(crawler_tasks)}")
logger.info("Starting the permissions inventorization")
execution = ThreadedExecution[PermissionsInventoryItem | None](crawler_tasks)
results = execution.run()
results = ThreadedExecution.gather("crawl permissions", crawler_tasks)
items = [item for item in results if item is not None]
logger.info(f"Total inventorized items: {len(items)}")
self._permissions_inventory.save(items)
Expand Down Expand Up @@ -62,6 +60,5 @@ def apply_group_permissions(self, migration_state: GroupMigrationState, destinat

logger.info(f"Total applier tasks: {len(applier_tasks)}")
logger.info("Starting the permissions application")
execution = ThreadedExecution(applier_tasks)
execution.run()
ThreadedExecution.gather("apply permissions", applier_tasks)
logger.info("Permissions were applied")
49 changes: 15 additions & 34 deletions src/databricks/labs/ucx/inventory/permissions_inventory.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,31 @@
import logging

from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.inventory.types import PermissionsInventoryItem
from databricks.labs.ucx.providers.spark import SparkMixin
from databricks.labs.ucx.tacl._internal import CrawlerBase, SqlBackend

logger = logging.getLogger(__name__)


class PermissionsInventoryTable(SparkMixin):
def __init__(self, inventory_database: str, ws: WorkspaceClient):
super().__init__(ws)
self._table = f"hive_metastore.{inventory_database}.permissions"

@property
def _table_schema(self):
from pyspark.sql.types import StringType, StructField, StructType

return StructType(
[
StructField("object_id", StringType(), True),
StructField("support", StringType(), True),
StructField("raw_object_permissions", StringType(), True),
]
)

@property
def _df(self):
return self.spark.table(self._table)
class PermissionsInventoryTable(CrawlerBase):
def __init__(self, backend: SqlBackend, inventory_database: str):
super().__init__(backend, "hive_metastore", inventory_database, "permissions")

def cleanup(self):
logger.info(f"Cleaning up inventory table {self._table}")
self.spark.sql(f"DROP TABLE IF EXISTS {self._table}")
logger.info(f"Cleaning up inventory table {self._full_name}")
self._exec(f"DROP TABLE IF EXISTS {self._full_name}")
logger.info("Inventory table cleanup complete")

def save(self, items: list[PermissionsInventoryItem]):
# TODO: update instead of append
logger.info(f"Saving {len(items)} items to inventory table {self._table}")
serialized_items = [item.as_dict() for item in items]
df = self.spark.createDataFrame(serialized_items, schema=self._table_schema)
df.write.mode("append").format("delta").saveAsTable(self._table)
logger.info(f"Saving {len(items)} items to inventory table {self._full_name}")
self._append_records(PermissionsInventoryItem, items)
logger.info("Successfully saved the items to inventory table")

def load_all(self) -> list[PermissionsInventoryItem]:
logger.info(f"Loading inventory table {self._table}")
df = self._df.toPandas()

logger.info("Successfully loaded the inventory table")
return PermissionsInventoryItem.from_pandas(df)
logger.info(f"Loading inventory table {self._full_name}")
return [
PermissionsInventoryItem(object_id, support, raw_object_permissions)
for object_id, support, raw_object_permissions in self._fetch(
f"SELECT object_id, support, raw_object_permissions FROM {self._full_name}"
)
]
20 changes: 1 addition & 19 deletions src/databricks/labs/ucx/inventory/types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from dataclasses import asdict, dataclass
from dataclasses import dataclass
from typing import Literal

import pandas as pd

from databricks.labs.ucx.generic import StrEnum

Destination = Literal["backup", "account"]
Expand Down Expand Up @@ -32,19 +30,3 @@ class PermissionsInventoryItem:
object_id: str
support: str # shall be taken from CRAWLERS dict
raw_object_permissions: str

@staticmethod
def from_pandas(source: pd.DataFrame) -> list["PermissionsInventoryItem"]:
items = source.to_dict(orient="records")
return [PermissionsInventoryItem.from_dict(item) for item in items]

def as_dict(self) -> dict:
return asdict(self)

@classmethod
def from_dict(cls, raw: dict) -> "PermissionsInventoryItem":
return cls(
object_id=raw["object_id"],
raw_object_permissions=raw["raw_object_permissions"],
support=raw["support"],
)
39 changes: 0 additions & 39 deletions src/databricks/labs/ucx/providers/spark.py

This file was deleted.

16 changes: 14 additions & 2 deletions src/databricks/labs/ucx/toolkits/group_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
from databricks.labs.ucx.inventory.verification import VerificationManager
from databricks.labs.ucx.managers.group import GroupManager
from databricks.labs.ucx.support.impl import SupportsProvider
from databricks.labs.ucx.tacl._internal import (
RuntimeBackend,
SqlBackend,
StatementExecutionBackend,
)


class GroupMigrationToolkit:
def __init__(self, config: MigrationConfig):
def __init__(self, config: MigrationConfig, *, warehouse_id=None):
self._num_threads = config.num_threads
self._workspace_start_path = config.workspace_start_path

Expand All @@ -27,13 +32,20 @@ def __init__(self, config: MigrationConfig):
self._verify_ws_client(self._ws)

self._group_manager = GroupManager(self._ws, config.groups)
self._permissions_inventory = PermissionsInventoryTable(config.inventory_database, self._ws)
sql_backend = self._backend(self._ws, warehouse_id)
self._permissions_inventory = PermissionsInventoryTable(sql_backend, config.inventory_database)
self._supports_provider = SupportsProvider(self._ws, self._num_threads, self._workspace_start_path)
self._permissions_manager = PermissionManager(
self._ws, self._permissions_inventory, supports_provider=self._supports_provider
)
self._verification_manager = VerificationManager(self._ws, self._supports_provider.supports["secrets"])

@staticmethod
def _backend(ws: WorkspaceClient, warehouse_id: str | None = None) -> SqlBackend:
if warehouse_id is None:
return RuntimeBackend()
return StatementExecutionBackend(ws, warehouse_id)

@staticmethod
def _verify_ws_client(w: WorkspaceClient):
_me = w.current_user.me()
Expand Down
4 changes: 1 addition & 3 deletions src/databricks/labs/ucx/toolkits/table_acls.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ def __init__(
self._tc = TablesCrawler(self._backend(ws, warehouse_id), inventory_catalog, inventory_schema)
self._gc = GrantsCrawler(self._tc)

self._databases = (
databases if databases else [database["databaseName"] for database in self._tc._all_databases()]
)
self._databases = databases if databases else [database for (database,) in self._tc._all_databases()]

def database_snapshot(self):
tables = []
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ def test_e2e(
tacl=TaclConfig(auto=True),
log_level="DEBUG",
)
toolkit = GroupMigrationToolkit(config)

warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]
toolkit = GroupMigrationToolkit(config, warehouse_id=warehouse_id)
toolkit.prepare_environment()

group_migration_state = toolkit._group_manager.migration_groups_provider
Expand Down
23 changes: 23 additions & 0 deletions tests/integration/test_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os

from databricks.labs.ucx.inventory.permissions_inventory import (
PermissionsInventoryTable,
)
from databricks.labs.ucx.inventory.types import PermissionsInventoryItem
from databricks.labs.ucx.tacl._internal import StatementExecutionBackend


def test_permissions_save_and_load(ws, make_schema):
schema = make_schema().split(".")[-1]
backend = StatementExecutionBackend(ws, os.environ["TEST_DEFAULT_WAREHOUSE_ID"])
pi = PermissionsInventoryTable(backend, schema)

saved = [
PermissionsInventoryItem(object_id="abc", support="bcd", raw_object_permissions="def"),
PermissionsInventoryItem(object_id="efg", support="fgh", raw_object_permissions="ghi"),
]

pi.save(saved)
loaded = pi.load_all()

assert saved == loaded
37 changes: 0 additions & 37 deletions tests/unit/conftest.py

This file was deleted.

Loading