Skip to content

Commit

Permalink
Fix for the new testing environment (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
nfx committed Aug 25, 2023
1 parent c7d4e8d commit 32dd2dd
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 191 deletions.
191 changes: 159 additions & 32 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
ClusterDetails,
CreateInstancePoolResponse,
CreatePolicyResponse,
DataSecurityMode,
RuntimeEngine,
)
from databricks.sdk.service.iam import AccessControlRequest, PermissionLevel
from databricks.sdk.service.iam import (
AccessControlRequest,
ComplexValue,
PermissionLevel,
)
from databricks.sdk.service.jobs import CreateResponse
from databricks.sdk.service.ml import CreateExperimentResponse, ModelDatabricks
from databricks.sdk.service.ml import PermissionLevel as ModelPermissionLevel
Expand All @@ -45,6 +47,7 @@
from databricks.labs.ucx.inventory.types import RequestObjectType
from databricks.labs.ucx.providers.client import ImprovedWorkspaceClient
from databricks.labs.ucx.providers.logger import logger
from databricks.labs.ucx.providers.mixins.sql import StatementExecutionExt
from databricks.labs.ucx.utils import ThreadedExecution

from .utils import (
Expand Down Expand Up @@ -144,32 +147,156 @@ def account_host(cfg: Config) -> str:
return AccountClient(host=account_host(ws.config))


@pytest.fixture(scope="session")
def dbconnect_cluster_id(ws: ImprovedWorkspaceClient) -> str:
# TODO: will use predeclared DATABRICKS_CLUSTER_ID env variable
dbc_cluster = next(filter(lambda c: c.cluster_name == DB_CONNECT_CLUSTER_NAME, ws.clusters.list()), None)

if dbc_cluster:
logger.debug(f"Integration testing cluster {DB_CONNECT_CLUSTER_NAME} already exists, skipping it's creation")
return dbc_cluster.cluster_id

logger.debug("Creating a cluster for integration testing")
dbc_cluster = ws.clusters.create(
spark_version=ws.clusters.select_spark_version(latest=True),
cluster_name=DB_CONNECT_CLUSTER_NAME,
instance_pool_id=os.environ["TEST_INSTANCE_POOL_ID"],
driver_node_type_id=os.environ["TEST_INSTANCE_POOL_ID"],
num_workers=0,
spark_conf={"spark.master": "local[*, 4]", "spark.databricks.cluster.profile": "singleNode"},
custom_tags={"ResourceClass": "SingleNode"},
data_security_mode=DataSecurityMode.SINGLE_USER,
autotermination_minutes=60,
runtime_engine=RuntimeEngine.PHOTON,
)
logger.debug(f"Cluster {dbc_cluster.cluster_id} created")
@pytest.fixture
def sql_exec(ws: ImprovedWorkspaceClient):
warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]
statement_execution = StatementExecutionExt(ws.api_client)
return partial(statement_execution.execute, warehouse_id)


@pytest.fixture
def sql_fetch_all(ws: ImprovedWorkspaceClient):
warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]
statement_execution = StatementExecutionExt(ws.api_client)
return partial(statement_execution.execute_fetch_all, warehouse_id)


@pytest.fixture
def make_group(ws: ImprovedWorkspaceClient, make_random):
cleanup = []

def inner(
entitlements: list[ComplexValue] | None = None,
external_id: str | None = None,
members: list[ComplexValue] | None = None,
roles: list[ComplexValue] | None = None,
):
group = ws.groups.create(
display_name=f"ucx_G{make_random(4)}",
entitlements=entitlements,
external_id=external_id,
members=members,
roles=roles,
)
logger.debug(f"created group fixture: {group.display_name} ({group.id})")
cleanup.append(group)
return group

yield inner

logger.debug(f"clearing {len(cleanup)} group fixtures")
for group in cleanup:
logger.debug(f"removing group fixture: {group.display_name} ({group.id})")
ws.groups.delete(group.id)


def test_group_fixture(make_group):
logger.info(f"Created new group: {make_group()}")
logger.info(f"Created new group: {make_group()}")


# TODO: pre-create the cluster in the test infra
return dbc_cluster.cluster_id
@pytest.fixture
def make_catalog(sql_exec, make_random):
cleanup = []

def inner():
name = f"ucx_C{make_random(4)}".lower()
sql_exec(f"CREATE CATALOG {name}")
cleanup.append(name)
return name

yield inner
logger.debug(f"clearing {len(cleanup)} catalogs")
for name in cleanup:
logger.debug(f"removing {name} catalog")
sql_exec(f"DROP CATALOG IF EXISTS {name} CASCADE")
logger.debug(f"removed {len(cleanup)} catalogs")


def test_catalog_fixture(make_catalog):
logger.info(f"Created new catalog: {make_catalog()}")
logger.info(f"Created new catalog: {make_catalog()}")


@pytest.fixture
def make_schema(sql_exec, make_random):
cleanup = []

def inner(catalog="hive_metastore"):
name = f"{catalog}.ucx_S{make_random(4)}".lower()
sql_exec(f"CREATE SCHEMA {name}")
cleanup.append(name)
return name

yield inner
logger.debug(f"clearing {len(cleanup)} schemas")
for name in cleanup:
logger.debug(f"removing {name} schema")
sql_exec(f"DROP SCHEMA IF EXISTS {name} CASCADE")
logger.debug(f"removed {len(cleanup)} schemas")


def test_schema_fixture(make_schema):
logger.info(f"Created new schema: {make_schema()}")
logger.info(f"Created new schema: {make_schema()}")


@pytest.fixture
def make_table(sql_exec, make_schema, make_random):
cleanup = []

def inner(
*,
catalog="hive_metastore",
schema: str | None = None,
ctas: str | None = None,
non_detla: bool = False,
external: bool = False,
view: bool = False,
):
if schema is None:
schema = make_schema(catalog=catalog)
name = f"{schema}.ucx_T{make_random(4)}".lower()
ddl = f'CREATE {"VIEW" if view else "TABLE"} {name}'
if ctas is not None:
# temporary (if not view)
ddl = f"{ddl} AS {ctas}"
elif non_detla:
location = "dbfs:/databricks-datasets/iot-stream/data-device"
ddl = f"{ddl} USING json LOCATION '{location}'"
elif external:
# external table
location = "dbfs:/databricks-datasets/nyctaxi-with-zipcodes/subsampled"
ddl = f"{ddl} USING delta LOCATION '{location}'"
else:
# managed table
ddl = f"{ddl} (id INT, value STRING)"
sql_exec(ddl)
cleanup.append(name)
return name

yield inner

logger.debug(f"clearing {len(cleanup)} tables")
for name in cleanup:
logger.debug(f"removing {name} table")
try:
sql_exec(f"DROP TABLE IF EXISTS {name}")
except RuntimeError as e:
if "Cannot drop a view" in str(e):
sql_exec(f"DROP VIEW IF EXISTS {name}")
else:
raise e
logger.debug(f"removed {len(cleanup)} tables")


def test_table_fixture(make_table):
logger.info(f"Created new managed table in new schema: {make_table()}")
logger.info(f'Created new managed table in default schema: {make_table(schema="default")}')
logger.info(f"Created new external table in new schema: {make_table(external=True)}")
logger.info(f"Created new external JSON table in new schema: {make_table(non_detla=True)}")
logger.info(f'Created new tmp table in new schema: {make_table(ctas="SELECT 2+2 AS four")}')
logger.info(f'Created new view in new schema: {make_table(view=True, ctas="SELECT 2+2 AS four")}')


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -625,11 +752,11 @@ def verifiable_objects(


@pytest.fixture()
def inventory_table(env: EnvironmentInfo, ws: ImprovedWorkspaceClient, dbconnect_cluster_id: str) -> InventoryTable:
ws.config.cluster_id = dbconnect_cluster_id
def inventory_table(env: EnvironmentInfo, ws: ImprovedWorkspaceClient, make_catalog, make_schema) -> InventoryTable:
catalog, schema = make_schema(make_catalog()).split(".")
table = InventoryTable(
catalog="main",
database="default",
catalog=catalog,
database=schema,
name=f"test_inventory_{env.test_uid}",
)

Expand Down
159 changes: 0 additions & 159 deletions tests/integration/test_tacls.py
Original file line number Diff line number Diff line change
@@ -1,168 +1,9 @@
import os
from functools import partial

import pytest
from databricks.sdk.service.iam import ComplexValue

from databricks.labs.ucx.providers.client import ImprovedWorkspaceClient
from databricks.labs.ucx.providers.logger import logger
from databricks.labs.ucx.providers.mixins.sql import StatementExecutionExt
from databricks.labs.ucx.toolkits.table_acls import TaclToolkit

# _LOG.setLevel("DEBUG")


@pytest.fixture
def sql_exec(ws: ImprovedWorkspaceClient):
warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]
statement_execution = StatementExecutionExt(ws.api_client)
return partial(statement_execution.execute, warehouse_id)


@pytest.fixture
def sql_fetch_all(ws: ImprovedWorkspaceClient):
warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]
statement_execution = StatementExecutionExt(ws.api_client)
return partial(statement_execution.execute_fetch_all, warehouse_id)


@pytest.fixture
def make_group(ws: ImprovedWorkspaceClient, make_random):
cleanup = []

def inner(
entitlements: list[ComplexValue] | None = None,
external_id: str | None = None,
members: list[ComplexValue] | None = None,
roles: list[ComplexValue] | None = None,
):
group = ws.groups.create(
display_name=f"ucx_G{make_random(4)}",
entitlements=entitlements,
external_id=external_id,
members=members,
roles=roles,
)
logger.debug(f"created group fixture: {group.display_name} ({group.id})")
cleanup.append(group)
return group

yield inner

logger.debug(f"clearing {len(cleanup)} group fixtures")
for group in cleanup:
logger.debug(f"removing group fixture: {group.display_name} ({group.id})")
ws.groups.delete(group.id)


def test_group_fixture(make_group):
logger.info(f"Created new group: {make_group()}")
logger.info(f"Created new group: {make_group()}")


@pytest.fixture
def make_catalog(sql_exec, make_random):
cleanup = []

def inner():
name = f"ucx_C{make_random(4)}".lower()
sql_exec(f"CREATE CATALOG {name}")
cleanup.append(name)
return name

yield inner
logger.debug(f"clearing {len(cleanup)} catalogs")
for name in cleanup:
logger.debug(f"removing {name} catalog")
sql_exec(f"DROP CATALOG IF EXISTS {name} CASCADE")
logger.debug(f"removed {len(cleanup)} catalogs")


def test_catalog_fixture(make_catalog):
logger.info(f"Created new catalog: {make_catalog()}")
logger.info(f"Created new catalog: {make_catalog()}")


@pytest.fixture
def make_schema(sql_exec, make_random):
cleanup = []

def inner(catalog="hive_metastore"):
name = f"{catalog}.ucx_S{make_random(4)}".lower()
sql_exec(f"CREATE SCHEMA {name}")
cleanup.append(name)
return name

yield inner
logger.debug(f"clearing {len(cleanup)} schemas")
for name in cleanup:
logger.debug(f"removing {name} schema")
sql_exec(f"DROP SCHEMA IF EXISTS {name} CASCADE")
logger.debug(f"removed {len(cleanup)} schemas")


def test_schema_fixture(make_schema):
logger.info(f"Created new schema: {make_schema()}")
logger.info(f"Created new schema: {make_schema()}")


@pytest.fixture
def make_table(sql_exec, make_schema, make_random):
cleanup = []

def inner(
*,
catalog="hive_metastore",
schema: str | None = None,
ctas: str | None = None,
non_detla: bool = False,
external: bool = False,
view: bool = False,
):
if schema is None:
schema = make_schema(catalog=catalog)
name = f"{schema}.ucx_T{make_random(4)}".lower()
ddl = f'CREATE {"VIEW" if view else "TABLE"} {name}'
if ctas is not None:
# temporary (if not view)
ddl = f"{ddl} AS {ctas}"
elif non_detla:
location = "dbfs:/databricks-datasets/iot-stream/data-device"
ddl = f"{ddl} USING json LOCATION '{location}'"
elif external:
# external table
location = "dbfs:/databricks-datasets/nyctaxi-with-zipcodes/subsampled"
ddl = f"{ddl} USING delta LOCATION '{location}'"
else:
# managed table
ddl = f"{ddl} (id INT, value STRING)"
sql_exec(ddl)
cleanup.append(name)
return name

yield inner

logger.debug(f"clearing {len(cleanup)} tables")
for name in cleanup:
logger.debug(f"removing {name} table")
try:
sql_exec(f"DROP TABLE IF EXISTS {name}")
except RuntimeError as e:
if "Cannot drop a view" in str(e):
sql_exec(f"DROP VIEW IF EXISTS {name}")
else:
raise e
logger.debug(f"removed {len(cleanup)} tables")


def test_table_fixture(make_table):
logger.info(f"Created new managed table in new schema: {make_table()}")
logger.info(f'Created new managed table in default schema: {make_table(schema="default")}')
logger.info(f"Created new external table in new schema: {make_table(external=True)}")
logger.info(f"Created new external JSON table in new schema: {make_table(non_detla=True)}")
logger.info(f'Created new tmp table in new schema: {make_table(ctas="SELECT 2+2 AS four")}')
logger.info(f'Created new view in new schema: {make_table(view=True, ctas="SELECT 2+2 AS four")}')


def test_describe_all_tables(ws: ImprovedWorkspaceClient, make_catalog, make_schema, make_table):
warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]
Expand Down

0 comments on commit 32dd2dd

Please sign in to comment.