diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 247fb1522a..e6f81cb97c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -18,10 +18,12 @@ ClusterDetails, CreateInstancePoolResponse, CreatePolicyResponse, - DataSecurityMode, - RuntimeEngine, ) -from databricks.sdk.service.iam import AccessControlRequest, PermissionLevel +from databricks.sdk.service.iam import ( + AccessControlRequest, + ComplexValue, + PermissionLevel, +) from databricks.sdk.service.jobs import CreateResponse from databricks.sdk.service.ml import CreateExperimentResponse, ModelDatabricks from databricks.sdk.service.ml import PermissionLevel as ModelPermissionLevel @@ -45,6 +47,7 @@ from databricks.labs.ucx.inventory.types import RequestObjectType from databricks.labs.ucx.providers.client import ImprovedWorkspaceClient from databricks.labs.ucx.providers.logger import logger +from databricks.labs.ucx.providers.mixins.sql import StatementExecutionExt from databricks.labs.ucx.utils import ThreadedExecution from .utils import ( @@ -144,32 +147,156 @@ def account_host(cfg: Config) -> str: return AccountClient(host=account_host(ws.config)) -@pytest.fixture(scope="session") -def dbconnect_cluster_id(ws: ImprovedWorkspaceClient) -> str: - # TODO: will use predeclared DATABRICKS_CLUSTER_ID env variable - dbc_cluster = next(filter(lambda c: c.cluster_name == DB_CONNECT_CLUSTER_NAME, ws.clusters.list()), None) - - if dbc_cluster: - logger.debug(f"Integration testing cluster {DB_CONNECT_CLUSTER_NAME} already exists, skipping it's creation") - return dbc_cluster.cluster_id - - logger.debug("Creating a cluster for integration testing") - dbc_cluster = ws.clusters.create( - spark_version=ws.clusters.select_spark_version(latest=True), - cluster_name=DB_CONNECT_CLUSTER_NAME, - instance_pool_id=os.environ["TEST_INSTANCE_POOL_ID"], - driver_node_type_id=os.environ["TEST_INSTANCE_POOL_ID"], - num_workers=0, - spark_conf={"spark.master": "local[*, 4]", "spark.databricks.cluster.profile": "singleNode"}, - custom_tags={"ResourceClass": "SingleNode"}, - data_security_mode=DataSecurityMode.SINGLE_USER, - autotermination_minutes=60, - runtime_engine=RuntimeEngine.PHOTON, - ) - logger.debug(f"Cluster {dbc_cluster.cluster_id} created") +@pytest.fixture +def sql_exec(ws: ImprovedWorkspaceClient): + warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"] + statement_execution = StatementExecutionExt(ws.api_client) + return partial(statement_execution.execute, warehouse_id) + + +@pytest.fixture +def sql_fetch_all(ws: ImprovedWorkspaceClient): + warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"] + statement_execution = StatementExecutionExt(ws.api_client) + return partial(statement_execution.execute_fetch_all, warehouse_id) + + +@pytest.fixture +def make_group(ws: ImprovedWorkspaceClient, make_random): + cleanup = [] + + def inner( + entitlements: list[ComplexValue] | None = None, + external_id: str | None = None, + members: list[ComplexValue] | None = None, + roles: list[ComplexValue] | None = None, + ): + group = ws.groups.create( + display_name=f"ucx_G{make_random(4)}", + entitlements=entitlements, + external_id=external_id, + members=members, + roles=roles, + ) + logger.debug(f"created group fixture: {group.display_name} ({group.id})") + cleanup.append(group) + return group + + yield inner + + logger.debug(f"clearing {len(cleanup)} group fixtures") + for group in cleanup: + logger.debug(f"removing group fixture: {group.display_name} ({group.id})") + ws.groups.delete(group.id) + + +def test_group_fixture(make_group): + logger.info(f"Created new group: {make_group()}") + logger.info(f"Created new group: {make_group()}") + - # TODO: pre-create the cluster in the test infra - return dbc_cluster.cluster_id +@pytest.fixture +def make_catalog(sql_exec, make_random): + cleanup = [] + + def inner(): + name = f"ucx_C{make_random(4)}".lower() + sql_exec(f"CREATE CATALOG {name}") + cleanup.append(name) + return name + + yield inner + logger.debug(f"clearing {len(cleanup)} catalogs") + for name in cleanup: + logger.debug(f"removing {name} catalog") + sql_exec(f"DROP CATALOG IF EXISTS {name} CASCADE") + logger.debug(f"removed {len(cleanup)} catalogs") + + +def test_catalog_fixture(make_catalog): + logger.info(f"Created new catalog: {make_catalog()}") + logger.info(f"Created new catalog: {make_catalog()}") + + +@pytest.fixture +def make_schema(sql_exec, make_random): + cleanup = [] + + def inner(catalog="hive_metastore"): + name = f"{catalog}.ucx_S{make_random(4)}".lower() + sql_exec(f"CREATE SCHEMA {name}") + cleanup.append(name) + return name + + yield inner + logger.debug(f"clearing {len(cleanup)} schemas") + for name in cleanup: + logger.debug(f"removing {name} schema") + sql_exec(f"DROP SCHEMA IF EXISTS {name} CASCADE") + logger.debug(f"removed {len(cleanup)} schemas") + + +def test_schema_fixture(make_schema): + logger.info(f"Created new schema: {make_schema()}") + logger.info(f"Created new schema: {make_schema()}") + + +@pytest.fixture +def make_table(sql_exec, make_schema, make_random): + cleanup = [] + + def inner( + *, + catalog="hive_metastore", + schema: str | None = None, + ctas: str | None = None, + non_detla: bool = False, + external: bool = False, + view: bool = False, + ): + if schema is None: + schema = make_schema(catalog=catalog) + name = f"{schema}.ucx_T{make_random(4)}".lower() + ddl = f'CREATE {"VIEW" if view else "TABLE"} {name}' + if ctas is not None: + # temporary (if not view) + ddl = f"{ddl} AS {ctas}" + elif non_detla: + location = "dbfs:/databricks-datasets/iot-stream/data-device" + ddl = f"{ddl} USING json LOCATION '{location}'" + elif external: + # external table + location = "dbfs:/databricks-datasets/nyctaxi-with-zipcodes/subsampled" + ddl = f"{ddl} USING delta LOCATION '{location}'" + else: + # managed table + ddl = f"{ddl} (id INT, value STRING)" + sql_exec(ddl) + cleanup.append(name) + return name + + yield inner + + logger.debug(f"clearing {len(cleanup)} tables") + for name in cleanup: + logger.debug(f"removing {name} table") + try: + sql_exec(f"DROP TABLE IF EXISTS {name}") + except RuntimeError as e: + if "Cannot drop a view" in str(e): + sql_exec(f"DROP VIEW IF EXISTS {name}") + else: + raise e + logger.debug(f"removed {len(cleanup)} tables") + + +def test_table_fixture(make_table): + logger.info(f"Created new managed table in new schema: {make_table()}") + logger.info(f'Created new managed table in default schema: {make_table(schema="default")}') + logger.info(f"Created new external table in new schema: {make_table(external=True)}") + logger.info(f"Created new external JSON table in new schema: {make_table(non_detla=True)}") + logger.info(f'Created new tmp table in new schema: {make_table(ctas="SELECT 2+2 AS four")}') + logger.info(f'Created new view in new schema: {make_table(view=True, ctas="SELECT 2+2 AS four")}') @pytest.fixture(scope="session") @@ -625,11 +752,11 @@ def verifiable_objects( @pytest.fixture() -def inventory_table(env: EnvironmentInfo, ws: ImprovedWorkspaceClient, dbconnect_cluster_id: str) -> InventoryTable: - ws.config.cluster_id = dbconnect_cluster_id +def inventory_table(env: EnvironmentInfo, ws: ImprovedWorkspaceClient, make_catalog, make_schema) -> InventoryTable: + catalog, schema = make_schema(make_catalog()).split(".") table = InventoryTable( - catalog="main", - database="default", + catalog=catalog, + database=schema, name=f"test_inventory_{env.test_uid}", ) diff --git a/tests/integration/test_tacls.py b/tests/integration/test_tacls.py index e5dbd040c3..6c8c0d938e 100644 --- a/tests/integration/test_tacls.py +++ b/tests/integration/test_tacls.py @@ -1,168 +1,9 @@ import os -from functools import partial - -import pytest -from databricks.sdk.service.iam import ComplexValue from databricks.labs.ucx.providers.client import ImprovedWorkspaceClient from databricks.labs.ucx.providers.logger import logger -from databricks.labs.ucx.providers.mixins.sql import StatementExecutionExt from databricks.labs.ucx.toolkits.table_acls import TaclToolkit -# _LOG.setLevel("DEBUG") - - -@pytest.fixture -def sql_exec(ws: ImprovedWorkspaceClient): - warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"] - statement_execution = StatementExecutionExt(ws.api_client) - return partial(statement_execution.execute, warehouse_id) - - -@pytest.fixture -def sql_fetch_all(ws: ImprovedWorkspaceClient): - warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"] - statement_execution = StatementExecutionExt(ws.api_client) - return partial(statement_execution.execute_fetch_all, warehouse_id) - - -@pytest.fixture -def make_group(ws: ImprovedWorkspaceClient, make_random): - cleanup = [] - - def inner( - entitlements: list[ComplexValue] | None = None, - external_id: str | None = None, - members: list[ComplexValue] | None = None, - roles: list[ComplexValue] | None = None, - ): - group = ws.groups.create( - display_name=f"ucx_G{make_random(4)}", - entitlements=entitlements, - external_id=external_id, - members=members, - roles=roles, - ) - logger.debug(f"created group fixture: {group.display_name} ({group.id})") - cleanup.append(group) - return group - - yield inner - - logger.debug(f"clearing {len(cleanup)} group fixtures") - for group in cleanup: - logger.debug(f"removing group fixture: {group.display_name} ({group.id})") - ws.groups.delete(group.id) - - -def test_group_fixture(make_group): - logger.info(f"Created new group: {make_group()}") - logger.info(f"Created new group: {make_group()}") - - -@pytest.fixture -def make_catalog(sql_exec, make_random): - cleanup = [] - - def inner(): - name = f"ucx_C{make_random(4)}".lower() - sql_exec(f"CREATE CATALOG {name}") - cleanup.append(name) - return name - - yield inner - logger.debug(f"clearing {len(cleanup)} catalogs") - for name in cleanup: - logger.debug(f"removing {name} catalog") - sql_exec(f"DROP CATALOG IF EXISTS {name} CASCADE") - logger.debug(f"removed {len(cleanup)} catalogs") - - -def test_catalog_fixture(make_catalog): - logger.info(f"Created new catalog: {make_catalog()}") - logger.info(f"Created new catalog: {make_catalog()}") - - -@pytest.fixture -def make_schema(sql_exec, make_random): - cleanup = [] - - def inner(catalog="hive_metastore"): - name = f"{catalog}.ucx_S{make_random(4)}".lower() - sql_exec(f"CREATE SCHEMA {name}") - cleanup.append(name) - return name - - yield inner - logger.debug(f"clearing {len(cleanup)} schemas") - for name in cleanup: - logger.debug(f"removing {name} schema") - sql_exec(f"DROP SCHEMA IF EXISTS {name} CASCADE") - logger.debug(f"removed {len(cleanup)} schemas") - - -def test_schema_fixture(make_schema): - logger.info(f"Created new schema: {make_schema()}") - logger.info(f"Created new schema: {make_schema()}") - - -@pytest.fixture -def make_table(sql_exec, make_schema, make_random): - cleanup = [] - - def inner( - *, - catalog="hive_metastore", - schema: str | None = None, - ctas: str | None = None, - non_detla: bool = False, - external: bool = False, - view: bool = False, - ): - if schema is None: - schema = make_schema(catalog=catalog) - name = f"{schema}.ucx_T{make_random(4)}".lower() - ddl = f'CREATE {"VIEW" if view else "TABLE"} {name}' - if ctas is not None: - # temporary (if not view) - ddl = f"{ddl} AS {ctas}" - elif non_detla: - location = "dbfs:/databricks-datasets/iot-stream/data-device" - ddl = f"{ddl} USING json LOCATION '{location}'" - elif external: - # external table - location = "dbfs:/databricks-datasets/nyctaxi-with-zipcodes/subsampled" - ddl = f"{ddl} USING delta LOCATION '{location}'" - else: - # managed table - ddl = f"{ddl} (id INT, value STRING)" - sql_exec(ddl) - cleanup.append(name) - return name - - yield inner - - logger.debug(f"clearing {len(cleanup)} tables") - for name in cleanup: - logger.debug(f"removing {name} table") - try: - sql_exec(f"DROP TABLE IF EXISTS {name}") - except RuntimeError as e: - if "Cannot drop a view" in str(e): - sql_exec(f"DROP VIEW IF EXISTS {name}") - else: - raise e - logger.debug(f"removed {len(cleanup)} tables") - - -def test_table_fixture(make_table): - logger.info(f"Created new managed table in new schema: {make_table()}") - logger.info(f'Created new managed table in default schema: {make_table(schema="default")}') - logger.info(f"Created new external table in new schema: {make_table(external=True)}") - logger.info(f"Created new external JSON table in new schema: {make_table(non_detla=True)}") - logger.info(f'Created new tmp table in new schema: {make_table(ctas="SELECT 2+2 AS four")}') - logger.info(f'Created new view in new schema: {make_table(view=True, ctas="SELECT 2+2 AS four")}') - def test_describe_all_tables(ws: ImprovedWorkspaceClient, make_catalog, make_schema, make_table): warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]