-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure proper sequencing of view migrations #1157
Changes from 15 commits
3f78d98
6435961
1cbb04f
28e7453
f576c87
e2fd873
2ec7215
dcc50c9
cf474ed
e1290a2
db888a1
67e5c78
4c755ba
5356814
b33c3e7
be076a4
671453d
17f5419
6bf9af5
f456f0b
2bd33f0
6a33c1c
83ef5fe
679d028
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,105 @@ | ||||||||||||||||||
import sqlglot | ||||||||||||||||||
from sqlglot import ParseError | ||||||||||||||||||
from sqlglot.expressions import Expression as SqlExpression | ||||||||||||||||||
from sqlglot.expressions import Table as SqlTable | ||||||||||||||||||
|
||||||||||||||||||
from databricks.labs.ucx.hive_metastore import TablesCrawler | ||||||||||||||||||
from databricks.labs.ucx.hive_metastore.tables import Table | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
class ViewToMigrate: | ||||||||||||||||||
|
||||||||||||||||||
table: Table | ||||||||||||||||||
table_dependencies: set[Table] | ||||||||||||||||||
view_dependencies: set[Table] | ||||||||||||||||||
|
||||||||||||||||||
def __init__(self, table: Table): | ||||||||||||||||||
if table.view_text is None: | ||||||||||||||||||
raise RuntimeError("Should never get there! A view must have 'view_text'!") | ||||||||||||||||||
self.table = table | ||||||||||||||||||
self.table_dependencies = set() | ||||||||||||||||||
self.view_dependencies = set() | ||||||||||||||||||
|
||||||||||||||||||
def compute_dependencies(self, all_tables: dict[str, Table]): | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'd prefer if this method is hidden and the only method we expose is to iterate over dependencies, while we lazily call this behind the cover. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||||||||||||||
if len(self.table_dependencies) + len(self.view_dependencies) == 0: | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
can we invert the condition to reduce nesting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||||||
statement = self._parse_view_text() | ||||||||||||||||||
for sql_table in statement.find_all(SqlTable): | ||||||||||||||||||
catalog = self._catalog(sql_table) | ||||||||||||||||||
if catalog != 'hive_metastore': | ||||||||||||||||||
continue | ||||||||||||||||||
table_with_key = Table(catalog, sql_table.db, sql_table.name, "type", "") | ||||||||||||||||||
table = all_tables.get(table_with_key.key) | ||||||||||||||||||
if table is None: | ||||||||||||||||||
raise ValueError( | ||||||||||||||||||
f"Unknown schema object: {table_with_key.key} in view SQL: {self.table.view_text} of table {self.table.key}" | ||||||||||||||||||
) | ||||||||||||||||||
if table.view_text is None: | ||||||||||||||||||
self.table_dependencies.add(table) | ||||||||||||||||||
else: | ||||||||||||||||||
self.view_dependencies.add(table) | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we really need both fields? View can depend on both table and a view... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for sequencing, we're only concerned about the view dependencies, and we don't want to filter the entire list on each sequencing pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and when debugging it's convenient to see both sets, hence why I keep table_dependencies |
||||||||||||||||||
|
||||||||||||||||||
def _parse_view_text(self) -> SqlExpression: | ||||||||||||||||||
try: | ||||||||||||||||||
# below can never happen but avoids a pylint error | ||||||||||||||||||
assert self.table.view_text is not None | ||||||||||||||||||
statements = sqlglot.parse(self.table.view_text) | ||||||||||||||||||
if len(statements) != 1 or statements[0] is None: | ||||||||||||||||||
raise ValueError(f"Could not analyze view SQL: {self.table.view_text} of table {self.table.key}") | ||||||||||||||||||
return statements[0] | ||||||||||||||||||
except ParseError as e: | ||||||||||||||||||
raise ValueError(f"Could not analyze view SQL: {self.table.view_text} of table {self.table.key}") from e | ||||||||||||||||||
|
||||||||||||||||||
# duplicated from FromTable._catalog, not sure if it's worth factorizing | ||||||||||||||||||
@staticmethod | ||||||||||||||||||
def _catalog(table): | ||||||||||||||||||
if table.catalog: | ||||||||||||||||||
return table.catalog | ||||||||||||||||||
return 'hive_metastore' | ||||||||||||||||||
|
||||||||||||||||||
def __hash__(self): | ||||||||||||||||||
return hash(self.table) | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
class ViewsMigrator: | ||||||||||||||||||
|
||||||||||||||||||
def __init__(self, crawler: TablesCrawler): | ||||||||||||||||||
self.crawler = crawler | ||||||||||||||||||
self.result_view_list: list[ViewToMigrate] = [] | ||||||||||||||||||
self.result_tables_set: set[Table] = set() | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
need to enforce it in the styleguide :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||||||
|
||||||||||||||||||
def sequence(self) -> list[Table]: | ||||||||||||||||||
# sequencing is achieved using a very simple algorithm: | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's a neat algorithm. shouln't we integrate it with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possibly, but I don't yet know enough details about the migration integration to provide an opinion. Happy to add that suggestion to a ticket. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nfx , it's not neat, can improve a lot. Please have a look at my below comment. |
||||||||||||||||||
# for each view, we register dependencies (extracted from view_text) | ||||||||||||||||||
# then given the remaining set of views to process, | ||||||||||||||||||
# and the growing set of views already processed | ||||||||||||||||||
# we check if each remaining view refers to not yet processed views | ||||||||||||||||||
# if none, then it's safe to add that view to the next batch of views | ||||||||||||||||||
# the complexity for a given set of views v and a dependency depth d looks like Ov^d | ||||||||||||||||||
# this seems enormous but in practice d remains small and v decreases rapidly | ||||||||||||||||||
table_values = self.crawler.snapshot() | ||||||||||||||||||
raw_tables = set(filter(lambda t: t.view_text is None, table_values)) | ||||||||||||||||||
raw_views = set(table_values) | ||||||||||||||||||
raw_views.difference_update(raw_tables) | ||||||||||||||||||
table_keys = [table.key for table in table_values] | ||||||||||||||||||
all_tables = dict(zip(table_keys, table_values)) | ||||||||||||||||||
views = {ViewToMigrate(view) for view in raw_views} | ||||||||||||||||||
while len(views) > 0: | ||||||||||||||||||
next_batch = self._next_batch(views, all_tables) | ||||||||||||||||||
self.result_view_list.extend(next_batch) | ||||||||||||||||||
self.result_tables_set.update([v.table for v in next_batch]) | ||||||||||||||||||
views.difference_update(next_batch) | ||||||||||||||||||
return [v.table for v in self.result_view_list] | ||||||||||||||||||
|
||||||||||||||||||
def _next_batch(self, views: set[ViewToMigrate], all_tables: dict[str, Table]) -> set[ViewToMigrate]: | ||||||||||||||||||
# we can't (slightly) optimize by checking len(views) == 0 or 1, | ||||||||||||||||||
# because we'd lose the opportunity to check the SQL | ||||||||||||||||||
result: set[ViewToMigrate] = set() | ||||||||||||||||||
for view in views: | ||||||||||||||||||
view.compute_dependencies(all_tables) | ||||||||||||||||||
not_batched_yet = list(filter(lambda v: v not in self.result_tables_set, view.view_dependencies)) | ||||||||||||||||||
if len(not_batched_yet) == 0: | ||||||||||||||||||
result.add(view) | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
this should be more readable. please hide There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure your version is more readable ? It also adds the view more than once, not great... |
||||||||||||||||||
# prevent infinite loop | ||||||||||||||||||
if len(result) == 0 and len(views) > 0: | ||||||||||||||||||
raise ValueError(f"Circular view references are preventing migration: {views}") | ||||||||||||||||||
return result |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
[ | ||
{ | ||
"db": "db1", | ||
"table": "t1" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "t2" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "t3" | ||
}, | ||
{ | ||
"db": "db2", | ||
"table": "t1" | ||
}, | ||
{ | ||
"db": "db2", | ||
"table": "t3" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v1", | ||
"view_text": "select * from db1.t1" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v2", | ||
"view_text": "select * from db1.t2 where db1.t2.c1 = 32" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v3", | ||
"view_text": "select * from db1.t1, db1.t2 where db1.t1.c1 = db1.t2.c1" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v4", | ||
"view_text": "select * from db1.v1" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v5", | ||
"view_text": "select * from db1.v7, db1.v6" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v6", | ||
"view_text": "select * from db1.v7" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v7", | ||
"view_text": "select * from db1.v4" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v8", | ||
"view_text": "123 invalid SQL" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v9", | ||
"view_text": "select * from db15.t32" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v10", | ||
"view_text": "select * from db1.v11" | ||
}, | ||
{ | ||
"db": "db1", | ||
"table": "v11", | ||
"view_text": "select * from db1.v10" | ||
} | ||
|
||
|
||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
import json | ||
from pathlib import Path | ||
|
||
import pytest | ||
from databricks.labs.lsql.backends import MockBackend, SqlBackend | ||
|
||
from databricks.labs.ucx.hive_metastore import TablesCrawler | ||
from databricks.labs.ucx.hive_metastore.views_migrator import ViewsMigrator | ||
|
||
SCHEMA_NAME = "schema" | ||
|
||
|
||
def test_migrate_no_view_returns_empty_sequence(): | ||
samples = Samples.load("db1.t1", "db2.t1") | ||
sql_backend = mock_backend(samples, "db1", "db2") | ||
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1", "db2"]) | ||
migrator = ViewsMigrator(crawler) | ||
sequence = migrator.sequence() | ||
assert len(sequence) == 0 | ||
|
||
|
||
def test_migrate_direct_view_returns_singleton_sequence() -> None: | ||
samples = Samples.load("db1.t1", "db1.v1") | ||
sql_backend = mock_backend(samples, "db1") | ||
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"]) | ||
migrator = ViewsMigrator(crawler) | ||
sequence = migrator.sequence() | ||
assert len(sequence) == 1 | ||
table = sequence[0] | ||
assert table.key == "hive_metastore.db1.v1" | ||
|
||
|
||
def test_migrate_direct_views_returns_sequence() -> None: | ||
samples = Samples.load("db1.t1", "db1.v1", "db1.t2", "db1.v2") | ||
sql_backend = mock_backend(samples, "db1") | ||
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"]) | ||
migrator = ViewsMigrator(crawler) | ||
sequence = migrator.sequence() | ||
assert len(sequence) == 2 | ||
expected = {"hive_metastore.db1.v1", "hive_metastore.db1.v2"} | ||
actual = {t.key for t in sequence} | ||
assert expected == actual | ||
|
||
|
||
def test_migrate_indirect_views_returns_correct_sequence() -> None: | ||
samples = Samples.load("db1.t1", "db1.v1", "db1.v4") | ||
sql_backend = mock_backend(samples, "db1") | ||
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"]) | ||
migrator = ViewsMigrator(crawler) | ||
sequence = migrator.sequence() | ||
assert len(sequence) == 2 | ||
expected = ["hive_metastore.db1.v1", "hive_metastore.db1.v4"] | ||
actual = [t.key for t in sequence] | ||
assert expected == actual | ||
|
||
|
||
def test_migrate_deep_indirect_views_returns_correct_sequence() -> None: | ||
samples = Samples.load("db1.t1", "db1.v1", "db1.v4", "db1.v5", "db1.v6", "db1.v7") | ||
sql_backend = mock_backend(samples, "db1") | ||
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"]) | ||
migrator = ViewsMigrator(crawler) | ||
sequence = migrator.sequence() | ||
assert len(sequence) == 5 | ||
expected = [ | ||
"hive_metastore.db1.v1", | ||
"hive_metastore.db1.v4", | ||
"hive_metastore.db1.v7", | ||
"hive_metastore.db1.v6", | ||
"hive_metastore.db1.v5", | ||
] | ||
actual = [t.key for t in sequence] | ||
assert expected == actual | ||
|
||
|
||
def test_migrate_invalid_sql_raises_value_error() -> None: | ||
with pytest.raises(ValueError) as error: | ||
samples = Samples.load("db1.v8") | ||
sql_backend = mock_backend(samples, "db1") | ||
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"]) | ||
migrator = ViewsMigrator(crawler) | ||
sequence = migrator.sequence() | ||
assert sequence is None # should never get there | ||
assert "Could not analyze view SQL:" in str(error) | ||
|
||
|
||
def test_migrate_invalid_sql_tables_raises_value_error() -> None: | ||
with pytest.raises(ValueError) as error: | ||
samples = Samples.load("db1.v9") | ||
sql_backend = mock_backend(samples, "db1") | ||
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"]) | ||
migrator = ViewsMigrator(crawler) | ||
sequence = migrator.sequence() | ||
assert sequence is None # should never get there | ||
assert "Unknown schema object:" in str(error) | ||
|
||
|
||
def test_migrate_circular_vues_raises_value_error() -> None: | ||
with pytest.raises(ValueError) as error: | ||
samples = Samples.load("db1.v10", "db1.v11") | ||
sql_backend = mock_backend(samples, "db1") | ||
crawler = TablesCrawler(sql_backend, SCHEMA_NAME, ["db1"]) | ||
migrator = ViewsMigrator(crawler) | ||
sequence = migrator.sequence() | ||
assert sequence is None # should never get there | ||
assert "Circular view references are preventing migration:" in str(error) | ||
|
||
|
||
def mock_backend(samples: list[dict], *dbnames: str) -> SqlBackend: | ||
db_rows: dict[str, list[tuple]] = {} | ||
select_query = 'SELECT \\* FROM hive_metastore.schema.tables' | ||
for dbname in dbnames: | ||
# pylint warning W0640 is a pylint bug (verified manually), see https://github.com/pylint-dev/pylint/issues/5263 | ||
valid_samples = list(filter(lambda s: s["db"] == dbname, samples)) | ||
show_tuples = [(s["db"], s["table"], "true") for s in valid_samples] | ||
db_rows[f'SHOW TABLES FROM hive_metastore.{dbname}'] = show_tuples | ||
# catalog, database, table, object_type, table_format, location, view_text | ||
select_tuples = [ | ||
( | ||
"hive_metastore", | ||
s["db"], | ||
s["table"], | ||
"type", | ||
"DELTA" if s.get("view_text", None) is None else "VIEW", | ||
None, | ||
s.get("view_text", None), | ||
) | ||
for s in valid_samples | ||
] | ||
db_rows[select_query] = select_tuples | ||
return MockBackend(rows=db_rows) | ||
|
||
|
||
class Samples: | ||
|
||
samples: dict = {} | ||
|
||
@classmethod | ||
def load(cls, *names: str): | ||
cls._preload_all() | ||
valid_keys = set(names) | ||
return [cls.samples[key] for key in filter(lambda key: key in valid_keys, cls.samples.keys())] | ||
|
||
@classmethod | ||
def _preload_all(cls): | ||
if len(cls.samples) == 0: | ||
path = Path(Path(__file__).parent, "tables", "tables_and_views.json") | ||
with open(path, encoding="utf-8") as file: | ||
samples = json.load(file) | ||
cls.samples = {} | ||
for sample in samples: | ||
key = sample["db"] + "." + sample["table"] | ||
cls.samples[key] = sample |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done