Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure proper sequencing of view migrations #1157

Merged
merged 24 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3f78d98
test scaffolding
ericvergnaud Mar 27, 2024
6435961
test scaffolding
ericvergnaud Mar 27, 2024
1cbb04f
sequences 0 and 1 views
ericvergnaud Mar 28, 2024
28e7453
Merge branch 'view-migrations-sequencing' of https://github.com/ericv…
ericvergnaud Mar 28, 2024
f576c87
Merge branch 'view-migrations-sequencing' of https://github.com/ericv…
ericvergnaud Mar 28, 2024
e2fd873
fix formatting
ericvergnaud Mar 28, 2024
2ec7215
drop extraneous data
ericvergnaud Mar 28, 2024
dcc50c9
full functional tests
ericvergnaud Mar 28, 2024
cf474ed
manage error cases
ericvergnaud Mar 28, 2024
e1290a2
revert unwanted changes
ericvergnaud Mar 28, 2024
db888a1
revert unwanted changes
ericvergnaud Mar 28, 2024
67e5c78
prevent infinite loop
ericvergnaud Mar 28, 2024
4c755ba
Merge branch 'main' into view-migrations-sequencing
ericvergnaud Mar 28, 2024
5356814
document sequencing algorithm
ericvergnaud Mar 28, 2024
b33c3e7
Merge branch 'view-migrations-sequencing' of https://github.com/ericv…
ericvergnaud Mar 28, 2024
be076a4
rename fields
ericvergnaud Mar 29, 2024
671453d
align code style
ericvergnaud Mar 29, 2024
17f5419
fix pylint issues
ericvergnaud Mar 29, 2024
6bf9af5
Merge branch 'databrickslabs:main' into view-migrations-sequencing
ericvergnaud Mar 29, 2024
f456f0b
Merge branch 'view-migrations-sequencing' of https://github.com/ericv…
ericvergnaud Mar 29, 2024
2bd33f0
Update src/databricks/labs/ucx/hive_metastore/views_migrator.py
ericvergnaud Mar 29, 2024
6a33c1c
fix indentation broken by GH
ericvergnaud Mar 29, 2024
83ef5fe
fix pylint warning
ericvergnaud Mar 29, 2024
679d028
fix typo
ericvergnaud Mar 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def is_delta(self) -> bool:
def key(self) -> str:
return f"{self.catalog}.{self.database}.{self.name}".lower()

def __hash__(self):
return hash(self.key)

@property
def kind(self) -> str:
return "VIEW" if self.view_text is not None else "TABLE"
Expand Down
123 changes: 123 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/views_migrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import sqlglot
from sqlglot import ParseError
from sqlglot.expressions import Expression as SqlExpression
from sqlglot.expressions import Table as SqlTable

from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.tables import Table


class ViewToMigrate:

_view: Table
_table_dependencies: list[Table] | None
_view_dependencies: list[Table] | None

def __init__(self, table: Table):
if table.view_text is None:
raise RuntimeError("Should never get there! A view must have 'view_text'!")

Check warning on line 18 in src/databricks/labs/ucx/hive_metastore/views_migrator.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/views_migrator.py#L18

Added line #L18 was not covered by tests
self._view = table
self._table_dependencies = None
self._view_dependencies = None
nfx marked this conversation as resolved.
Show resolved Hide resolved

@property
def view(self):
return self._view

def view_dependencies(self, all_tables: dict[str, Table]) -> list[Table]:
if self._table_dependencies is None or self._view_dependencies is None:
self._compute_dependencies(all_tables)
assert self._view_dependencies is not None
return self._view_dependencies

def _compute_dependencies(self, all_tables: dict[str, Table]):
table_dependencies = set()
view_dependencies = set()
statement = self._parse_view_text()
for sql_table in statement.find_all(SqlTable):
catalog = self._catalog(sql_table)
if catalog != 'hive_metastore':
continue

Check warning on line 40 in src/databricks/labs/ucx/hive_metastore/views_migrator.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/views_migrator.py#L40

Added line #L40 was not covered by tests
table_with_key = Table(catalog, sql_table.db, sql_table.name, "type", "")
table = all_tables.get(table_with_key.key)
if table is None:
raise ValueError(
f"Unknown schema object: {table_with_key.key} in view SQL: {self._view.view_text} of table {self._view.key}"
)
if table.view_text is None:
table_dependencies.add(table)
else:
view_dependencies.add(table)
self._table_dependencies = list(table_dependencies)
self._view_dependencies = list(view_dependencies)

def _parse_view_text(self) -> SqlExpression:
try:
# below can never happen but avoids a pylint error
assert self._view.view_text is not None
statements = sqlglot.parse(self._view.view_text)
if len(statements) != 1 or statements[0] is None:
raise ValueError(f"Could not analyze view SQL: {self._view.view_text} of table {self._view.key}")

Check warning on line 60 in src/databricks/labs/ucx/hive_metastore/views_migrator.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/views_migrator.py#L60

Added line #L60 was not covered by tests
return statements[0]
except ParseError as e:
raise ValueError(f"Could not analyze view SQL: {self._view.view_text} of table {self._view.key}") from e

# duplicated from FromTable._catalog, not sure if it's worth factorizing
@staticmethod
def _catalog(table):
if table.catalog:
return table.catalog

Check warning on line 69 in src/databricks/labs/ucx/hive_metastore/views_migrator.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/views_migrator.py#L69

Added line #L69 was not covered by tests
return 'hive_metastore'

def __hash__(self):
return hash(self._view)


class ViewsMigrator:

def __init__(self, crawler: TablesCrawler):
self._crawler = crawler
self._result_view_list: list[ViewToMigrate] = []
self._result_tables_set: set[Table] = set()

def sequence(self) -> list[Table]:
# sequencing is achieved using a very simple algorithm:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a neat algorithm. shouln't we integrate it with Index here to line up only the views with only migrated dependencies?..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, but I don't yet know enough details about the migration integration to provide an opinion. Happy to add that suggestion to a ticket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nfx , it's not neat, can improve a lot. Please have a look at my below comment.

# for each view, we register dependencies (extracted from view_text)
# then given the remaining set of views to process,
# and the growing set of views already processed
# we check if each remaining view refers to not yet processed views
# if none, then it's safe to add that view to the next batch of views
# the complexity for a given set of views v and a dependency depth d looks like Ov^d
# this seems enormous but in practice d remains small and v decreases rapidly
table_list = self._crawler.snapshot()
all_tables = {}
views = set()
for table in table_list:
all_tables[table.key] = table
if table.view_text is None:
continue
views.add(ViewToMigrate(table))
while len(views) > 0:
next_batch = self._next_batch(views, all_tables)
self._result_view_list.extend(next_batch)
self._result_tables_set.update([v.view for v in next_batch])
views.difference_update(next_batch)
return [v.view for v in self._result_view_list]

def _next_batch(self, views: set[ViewToMigrate], all_tables: dict[str, Table]) -> set[ViewToMigrate]:
# we can't (slightly) optimize by checking len(views) == 0 or 1,
# because we'd lose the opportunity to check the SQL
result: set[ViewToMigrate] = set()
for view in views:
view_deps = view.view_dependencies(all_tables)
if len(view_deps) == 0:
result.add(view)
else:
# does the view have at least one view dependency that is not yet processed ?
not_processed_yet = next((t for t in view_deps if t not in self._result_tables_set), None)
if not_processed_yet is None:
result.add(view)
# prevent infinite loop
if len(result) == 0 and len(views) > 0:
raise ValueError(f"Circular view references are preventing migration: {views}")
return result
79 changes: 79 additions & 0 deletions tests/unit/hive_metastore/tables/tables_and_views.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
[
{
"db": "db1",
"table": "t1"
},
{
"db": "db1",
"table": "t2"
},
{
"db": "db1",
"table": "t3"
},
{
"db": "db2",
"table": "t1"
},
{
"db": "db2",
"table": "t3"
},
{
"db": "db1",
"table": "v1",
"view_text": "select * from db1.t1"
},
{
"db": "db1",
"table": "v2",
"view_text": "select * from db1.t2 where db1.t2.c1 = 32"
},
{
"db": "db1",
"table": "v3",
"view_text": "select * from db1.t1, db1.t2 where db1.t1.c1 = db1.t2.c1"
},
{
"db": "db1",
"table": "v4",
"view_text": "select * from db1.v1"
},
{
"db": "db1",
"table": "v5",
"view_text": "select * from db1.v7, db1.v6"
},
{
"db": "db1",
"table": "v6",
"view_text": "select * from db1.v7"
},
{
"db": "db1",
"table": "v7",
"view_text": "select * from db1.v4"
},
{
"db": "db1",
"table": "v8",
"view_text": "123 invalid SQL"
},
{
"db": "db1",
"table": "v9",
"view_text": "select * from db15.t32"
},
{
"db": "db1",
"table": "v10",
"view_text": "select * from db1.v11"
},
{
"db": "db1",
"table": "v11",
"view_text": "select * from db1.v10"
}


]
152 changes: 152 additions & 0 deletions tests/unit/hive_metastore/test_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import json
from pathlib import Path

import pytest
from databricks.labs.lsql.backends import MockBackend, SqlBackend

from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.views_migrator import ViewsMigrator

SCHEMA_NAME = "schema"


def test_migrate_no_view_returns_empty_sequence():
samples = Samples.load("db1.t1", "db2.t1")
sql_backend = mock_backend(samples, "db1", "db2")
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1", "db2"])
migrator = ViewsMigrator(crawler)
sequence = migrator.sequence()
assert len(sequence) == 0


def test_migrate_direct_view_returns_singleton_sequence() -> None:
samples = Samples.load("db1.t1", "db1.v1")
sql_backend = mock_backend(samples, "db1")
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"])
migrator = ViewsMigrator(crawler)
sequence = migrator.sequence()
assert len(sequence) == 1
table = sequence[0]
assert table.key == "hive_metastore.db1.v1"


def test_migrate_direct_views_returns_sequence() -> None:
samples = Samples.load("db1.t1", "db1.v1", "db1.t2", "db1.v2")
sql_backend = mock_backend(samples, "db1")
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"])
migrator = ViewsMigrator(crawler)
sequence = migrator.sequence()
assert len(sequence) == 2
expected = {"hive_metastore.db1.v1", "hive_metastore.db1.v2"}
actual = {t.key for t in sequence}
assert expected == actual


def test_migrate_indirect_views_returns_correct_sequence() -> None:
samples = Samples.load("db1.t1", "db1.v1", "db1.v4")
sql_backend = mock_backend(samples, "db1")
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"])
migrator = ViewsMigrator(crawler)
sequence = migrator.sequence()
assert len(sequence) == 2
expected = ["hive_metastore.db1.v1", "hive_metastore.db1.v4"]
actual = [t.key for t in sequence]
assert expected == actual


def test_migrate_deep_indirect_views_returns_correct_sequence() -> None:
samples = Samples.load("db1.t1", "db1.v1", "db1.v4", "db1.v5", "db1.v6", "db1.v7")
sql_backend = mock_backend(samples, "db1")
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"])
migrator = ViewsMigrator(crawler)
sequence = migrator.sequence()
assert len(sequence) == 5
expected = [
"hive_metastore.db1.v1",
"hive_metastore.db1.v4",
"hive_metastore.db1.v7",
"hive_metastore.db1.v6",
"hive_metastore.db1.v5",
]
actual = [t.key for t in sequence]
assert expected == actual


def test_migrate_invalid_sql_raises_value_error() -> None:
with pytest.raises(ValueError) as error:
samples = Samples.load("db1.v8")
sql_backend = mock_backend(samples, "db1")
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"])
migrator = ViewsMigrator(crawler)
sequence = migrator.sequence()
assert sequence is None # should never get there
assert "Could not analyze view SQL:" in str(error)


def test_migrate_invalid_sql_tables_raises_value_error() -> None:
with pytest.raises(ValueError) as error:
samples = Samples.load("db1.v9")
sql_backend = mock_backend(samples, "db1")
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"])
migrator = ViewsMigrator(crawler)
sequence = migrator.sequence()
assert sequence is None # should never get there
assert "Unknown schema object:" in str(error)


def test_migrate_circular_vues_raises_value_error() -> None:
with pytest.raises(ValueError) as error:
samples = Samples.load("db1.v10", "db1.v11")
sql_backend = mock_backend(samples, "db1")
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"])
migrator = ViewsMigrator(crawler)
sequence = migrator.sequence()
assert sequence is None # should never get there
assert "Circular view references are preventing migration:" in str(error)


def mock_backend(samples: list[dict], *dbnames: str) -> SqlBackend:
db_rows: dict[str, list[tuple]] = {}
select_query = 'SELECT \\* FROM hive_metastore.schema.tables'
for dbname in dbnames:
# pylint warning W0640 is a pylint bug (verified manually), see https://github.com/pylint-dev/pylint/issues/5263
valid_samples = list(filter(lambda s: s["db"] == dbname, samples))
show_tuples = [(s["db"], s["table"], "true") for s in valid_samples]
db_rows[f'SHOW TABLES FROM hive_metastore.{dbname}'] = show_tuples
# catalog, database, table, object_type, table_format, location, view_text
select_tuples = [
(
"hive_metastore",
s["db"],
s["table"],
"type",
"DELTA" if s.get("view_text", None) is None else "VIEW",
None,
s.get("view_text", None),
)
for s in valid_samples
]
db_rows[select_query] = select_tuples
return MockBackend(rows=db_rows)


class Samples:

samples: dict = {}

@classmethod
def load(cls, *names: str):
cls._preload_all()
valid_keys = set(names)
return [cls.samples[key] for key in filter(lambda key: key in valid_keys, cls.samples.keys())]

@classmethod
def _preload_all(cls):
if len(cls.samples) == 0:
path = Path(Path(__file__).parent, "tables", "tables_and_views.json")
with open(path, encoding="utf-8") as file:
samples = json.load(file)
cls.samples = {}
for sample in samples:
key = sample["db"] + "." + sample["table"]
cls.samples[key] = sample
Loading