Skip to content

Commit

Permalink
MySQL External HMS Support for HMS Federation (#3385)
Browse files Browse the repository at this point in the history
Added support for SQL Based HMS (MySQL) when creating an HMS Federated
Catalog
  • Loading branch information
FastLee authored Dec 20, 2024
1 parent 938e4c5 commit e768af2
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/aws/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def get_roles_to_migrate(self) -> list[AWSCredentialCandidate]:
"""
Identify the roles that need to be migrated to UC from the UC compatible roles list.
"""
external_locations = self._locations.external_locations_with_root()
external_locations = list(self._locations.external_locations_with_root())
logger.info(f"Found {len(external_locations)} external locations")
compatible_roles = self.load_uc_compatible_roles()
roles: dict[str, AWSCredentialCandidate] = {}
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/aws/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(
external_locations: ExternalLocations,
aws_resource_permissions: AWSResourcePermissions,
principal_acl: PrincipalACL,
*,
enable_hms_federation: bool = False,
):
self._ws = ws
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/azure/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(
resource_permissions: AzureResourcePermissions,
azurerm: AzureResources,
principal_acl: PrincipalACL,
*,
enable_hms_federation: bool = False,
):
self._ws = ws
Expand Down
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,10 +870,10 @@ def export_assessment(w: WorkspaceClient, prompts: Prompts):


@ucx.command
def create_federated_catalog(w: WorkspaceClient, _: Prompts):
def create_federated_catalog(w: WorkspaceClient, prompts: Prompts):
"""(Experimental) Create federated catalog from current workspace Hive Metastore."""
ctx = WorkspaceContext(w)
ctx.federation.register_internal_hms_as_federated_catalog()
ctx.federation.create_from_cli(prompts)


@ucx.command
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ def external_locations(self) -> ExternalLocations:
self.inventory_database,
self.tables_crawler,
self.mounts_crawler,
self.config.enable_hms_federation,
enable_hms_federation=self.config.enable_hms_federation,
)

@cached_property
Expand Down
7 changes: 4 additions & 3 deletions src/databricks/labs/ucx/contexts/workspace_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def external_locations_migration(self) -> AWSExternalLocationsMigration | Extern
self.external_locations,
self.aws_resource_permissions,
self.principal_acl,
self.config.enable_hms_federation,
enable_hms_federation=self.config.enable_hms_federation,
)
if self.is_azure:
return ExternalLocationsMigration(
Expand All @@ -127,7 +127,7 @@ def external_locations_migration(self) -> AWSExternalLocationsMigration | Extern
self.azure_resource_permissions,
self.azure_resources,
self.principal_acl,
self.config.enable_hms_federation,
enable_hms_federation=self.config.enable_hms_federation,
)
raise NotImplementedError

Expand Down Expand Up @@ -200,7 +200,8 @@ def federation(self):
self.workspace_client,
self.external_locations,
self.workspace_info,
self.config.enable_hms_federation,
self.config,
enable_hms_federation=self.config.enable_hms_federation,
)


Expand Down
181 changes: 163 additions & 18 deletions src/databricks/labs/ucx/hive_metastore/federation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import collections
import logging
import re
from dataclasses import dataclass, replace
from functools import cached_property
from typing import ClassVar
from packaging.version import Version, InvalidVersion


from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import Prompts
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import AlreadyExists, NotFound, BadRequest
from databricks.sdk.service.catalog import (
Expand All @@ -14,13 +21,38 @@
)

from databricks.labs.ucx.account.workspaces import WorkspaceInfo
from databricks.labs.ucx.assessment.secrets import SecretsMixin
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.hive_metastore import ExternalLocations


logger = logging.getLogger(__name__)


@dataclass
class ExternalHmsInfo:
"""
This is a dataclass that represents the external Hive Metastore connection information.
It supports non glue external metastores.
"""

database_type: str
host: str
port: str
database: str
user: str | None
password: str | None
version: str | None

def as_dict(self) -> dict[str, str]:
return {
"database": self.database,
"db_type": self.database_type,
"host": self.host,
"port": self.port,
}


class HiveMetastoreFederationEnabler:
def __init__(self, installation: Installation):
self._installation = installation
Expand All @@ -31,61 +63,174 @@ def enable(self):
self._installation.save(config)


class HiveMetastoreFederation:
class HiveMetastoreFederation(SecretsMixin):
def __init__(
self,
workspace_client: WorkspaceClient,
ws: WorkspaceClient,
external_locations: ExternalLocations,
workspace_info: WorkspaceInfo,
config: WorkspaceConfig,
*,
enable_hms_federation: bool = False,
):
self._workspace_client = workspace_client
self._ws = ws
self._external_locations = external_locations
self._workspace_info = workspace_info
self._enable_hms_federation = enable_hms_federation
self._config = config

# Supported databases and version for HMS Federation
supported_database_versions: ClassVar[dict[str, list[str]]] = {
"mysql": ["2.3", "0.13"],
}

def register_internal_hms_as_federated_catalog(self) -> CatalogInfo:
def create_from_cli(self, prompts: Prompts) -> None:
if not self._enable_hms_federation:
raise RuntimeWarning('Run `databricks labs ucx enable-hms-federation` to enable HMS Federation')
name = self._workspace_info.current()
connection_info = self._get_or_create_connection(name)

name = prompts.question(
'Enter the name of the Hive Metastore connection and catalog', default=self._workspace_info.current()
)

if self._external_hms and prompts.confirm(
f'A supported external Hive Metastore connection was identified: {self._external_hms.database_type}. '
f'Use this connection?'
):
connection_info = self._get_or_create_ext_connection(name, self._external_hms)
else:
connection_info = self._get_or_create_int_connection(name)

assert connection_info.name is not None
self._register_federated_catalog(connection_info)

@cached_property
def _external_hms(self) -> ExternalHmsInfo | None:
if not self._config.spark_conf:
logger.info('Spark config not found')
return None
spark_config = self._config.spark_conf
jdbc_url = self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionURL')
if not jdbc_url:
logger.info('JDBC URL not found')
return None
version_value = self._get_value_from_config_key(spark_config, 'spark.sql.hive.metastore.version')
if not version_value:
logger.info('Hive Metastore version not found')
return None
try:
return self._workspace_client.catalogs.create(
version = Version(version_value)
except InvalidVersion:
logger.info('Hive Metastore version is not valid')
return None
major_minor_version = f"{version.major}.{version.minor}"
external_hms = replace(self._split_jdbc_url(jdbc_url), version=major_minor_version)
supported_versions = self.supported_database_versions.get(external_hms.database_type)
if not supported_versions:
logger.info(f'Unsupported Hive Metastore: {external_hms.database_type}')
return None
if major_minor_version not in supported_versions:
logger.info(f'Unsupported Hive Metastore Version: {external_hms.database_type} - {version}')
return None

if not external_hms.user:
external_hms = replace(
external_hms,
user=self._get_value_from_config_key(spark_config, 'spark.hadoop.javax.jdo.option.ConnectionUserName'),
)
if not external_hms.password:
external_hms = replace(
external_hms,
password=self._get_value_from_config_key(
spark_config, 'spark.hadoop.javax.jdo.option.ConnectionPassword'
),
)
return external_hms

@classmethod
def _split_jdbc_url(cls, jdbc_url: str) -> ExternalHmsInfo:
# Define the regex pattern to match the JDBC URL components
pattern = re.compile(
r'jdbc:(?P<db_type>[a-zA-Z0-9]+)://(?P<host>[^:/]+):(?P<port>\d+)/(?P<database>[^?]+)(\?user=(?P<user>[^&]+)&password=(?P<password>[^&]+))?'
)
match = pattern.match(jdbc_url)
if not match:
raise ValueError(f'Unsupported JDBC URL: {jdbc_url}')

db_type = match.group('db_type')
host = match.group('host')
port = match.group('port')
database = match.group('database')
user = match.group('user')
password = match.group('password')

return ExternalHmsInfo(db_type, host, port, database, user, password, None)

def _register_federated_catalog(
self,
connection_info,
) -> CatalogInfo:
try:
return self._ws.catalogs.create(
name=connection_info.name,
connection_name=connection_info.name,
options={"authorized_paths": self._get_authorized_paths()},
)
except BadRequest as err:
if err.error_code == 'CATALOG_ALREADY_EXISTS':
logger.info(f'Catalog {connection_info.name} already exists')
for catalog_info in self._workspace_client.catalogs.list():
for catalog_info in self._ws.catalogs.list():
if catalog_info.name == connection_info.name:
return catalog_info
raise err

def _get_or_create_connection(self, name: str) -> ConnectionInfo:
def _get_or_create_int_connection(self, name: str) -> ConnectionInfo:
try:
return self._workspace_client.connections.create(
return self._ws.connections.create(
name=name,
connection_type=ConnectionType.HIVE_METASTORE, # needs SDK change
options={"builtin": "true"},
)
except AlreadyExists:
for connection in self._workspace_client.connections.list():
if connection.name == name:
return connection
return self._get_existing_connection(name)

def _get_existing_connection(self, name: str) -> ConnectionInfo:
for connection in self._ws.connections.list():
if connection.name == name:
return connection
raise NotFound(f'Connection {name} not found')

def _get_or_create_ext_connection(self, name: str, external_hms: ExternalHmsInfo) -> ConnectionInfo:
options = external_hms.as_dict()
if external_hms.user:
options["user"] = external_hms.user
if external_hms.password:
options["password"] = external_hms.password
if external_hms.version:
options["version"] = external_hms.version
try:
return self._ws.connections.create(
name=name,
connection_type=ConnectionType.HIVE_METASTORE, # needs SDK change
options=options,
)
except AlreadyExists:
return self._get_existing_connection(name)

def _get_authorized_paths(self) -> str:
existing = {}
for external_location in self._workspace_client.external_locations.list():
for external_location in self._ws.external_locations.list():
existing[external_location.url] = external_location
authorized_paths = []
current_user = self._workspace_client.current_user.me()
current_user = self._ws.current_user.me()
if not current_user.user_name:
raise NotFound('Current user not found')
for external_location_info in self._external_locations.external_locations_with_root():
# Get the external locations. If not using external HMS, include the root DBFS location.
if self._external_hms is not None:
external_locations = self._external_locations.external_locations_with_root()
else:
external_locations = self._external_locations.snapshot()

for external_location_info in external_locations:
location = ExternalLocations.clean_location(external_location_info.location)
existing_location = existing.get(location)
if not existing_location:
Expand All @@ -103,11 +248,11 @@ def _add_missing_permissions_if_needed(self, location_name: str, current_user: s
grants = self._location_grants(location_name)
if Privilege.CREATE_FOREIGN_SECURABLE not in grants[current_user]:
change = PermissionsChange(principal=current_user, add=[Privilege.CREATE_FOREIGN_SECURABLE])
self._workspace_client.grants.update(SecurableType.EXTERNAL_LOCATION, location_name, changes=[change])
self._ws.grants.update(SecurableType.EXTERNAL_LOCATION, location_name, changes=[change])

def _location_grants(self, location_name: str) -> dict[str, set[Privilege]]:
grants: dict[str, set[Privilege]] = collections.defaultdict(set)
result = self._workspace_client.grants.get(SecurableType.EXTERNAL_LOCATION, location_name)
result = self._ws.grants.get(SecurableType.EXTERNAL_LOCATION, location_name)
if not result.privilege_assignments:
return grants
for assignment in result.privilege_assignments:
Expand Down
12 changes: 6 additions & 6 deletions src/databricks/labs/ucx/hive_metastore/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def __init__(
schema: str,
tables_crawler: TablesCrawler,
mounts_crawler: 'MountsCrawler',
*,
enable_hms_federation: bool = False,
):
super().__init__(sql_backend, "hive_metastore", schema, "external_locations", ExternalLocation)
Expand All @@ -174,21 +175,20 @@ def clean_location(location: str) -> str:
# Having s3a and s3 as separate locations will cause issues when trying to find overlapping locations
return re.sub(r"^s3a:/", r"s3:/", location).rstrip("/")

def external_locations_with_root(self) -> list[ExternalLocation]:
def external_locations_with_root(self) -> Iterable[ExternalLocation]:
"""
Produces a list of external locations with the DBFS root location appended to the list.
Utilizes the snapshot method.
Used for HMS Federation.
Returns:
List of ExternalLocation objects
Yields:
Iterable[Result]: Combination of all the external locations and the DBFS root location
"""

external_locations = list(self.snapshot())
yield from self.snapshot()
dbfs_root = self._get_dbfs_root()
if dbfs_root:
external_locations.append(dbfs_root)
return external_locations
yield dbfs_root

def _get_dbfs_root(self) -> ExternalLocation | None:
"""
Expand Down
8 changes: 5 additions & 3 deletions tests/integration/hive_metastore/test_federation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from unittest.mock import create_autospec

import pytest
from databricks.labs.blueprint.tui import MockPrompts
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.account.workspaces import WorkspaceInfo
Expand All @@ -14,13 +15,14 @@ def ws():


@pytest.mark.skip("needs to be enabled")
def test_federation(ws, sql_backend):
def test_federation(ws, ctx, sql_backend):
schema = 'ucx'
tables_crawler = TablesCrawler(sql_backend, schema)
mounts_crawler = MountsCrawler(sql_backend, ws, schema)
external_locations = ExternalLocations(ws, sql_backend, schema, tables_crawler, mounts_crawler)
workspace_info = create_autospec(WorkspaceInfo)
workspace_info.current.return_value = 'some_thing'
federation = HiveMetastoreFederation(ws, external_locations, workspace_info)
federation.register_internal_hms_as_federated_catalog()
federation = HiveMetastoreFederation(ws, external_locations, workspace_info, ctx.config)
prompts = MockPrompts({})
federation.create_from_cli(prompts)
workspace_info.current.assert_called_once()
Loading

0 comments on commit e768af2

Please sign in to comment.