-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(dao): Add explicit ON DELETE CASCADE when deleting datasets #24488
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
from sqlalchemy.exc import SQLAlchemyError | ||
|
||
from superset import security_manager | ||
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn | ||
from superset.daos.base import BaseDAO | ||
from superset.extensions import db | ||
|
@@ -361,25 +362,24 @@ def create_metric( | |
""" | ||
return DatasetMetricDAO.create(properties, commit=commit) | ||
|
||
@staticmethod | ||
def bulk_delete(models: Optional[list[SqlaTable]], commit: bool = True) -> None: | ||
@classmethod | ||
def bulk_delete( | ||
cls, models: Optional[list[SqlaTable]], commit: bool = True | ||
) -> None: | ||
item_ids = [model.id for model in models] if models else [] | ||
# bulk delete, first delete related data | ||
if models: | ||
for model in models: | ||
model.owners = [] | ||
db.session.merge(model) | ||
db.session.query(SqlMetric).filter(SqlMetric.table_id.in_(item_ids)).delete( | ||
synchronize_session="fetch" | ||
) | ||
db.session.query(TableColumn).filter( | ||
TableColumn.table_id.in_(item_ids) | ||
).delete(synchronize_session="fetch") | ||
# bulk delete itself | ||
try: | ||
db.session.query(SqlaTable).filter(SqlaTable.id.in_(item_ids)).delete( | ||
synchronize_session="fetch" | ||
) | ||
|
||
if models: | ||
connection = db.session.connection() | ||
mapper = next(iter(cls.model_cls.registry.mappers)) # type: ignore | ||
|
||
for model in models: | ||
security_manager.dataset_after_delete(mapper, connection, model) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally, we wouldn't reference the SM from the DAO layer as it's more of a mid-tier concern. This belongs in the CMD. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @craig-rueda could you help educate me as to why this is? Granted that the DAO is only abstraction from the database (and not the security manager which in turns interfaces with the database), but isn't this the interface we expect commands, APIs, etc. to interface with? Note this logic would be handled by the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally, DAO's should be super light, only concerned with interfacing with the DB. They shouldn't have dependencies upwards (to the mid-tier), which can lead to circular dependencies as basically all mid-tier logic will at some point need a DAO. This can obviously be worked around using some late-binding technique. From a semantic POV, the command is just asking to "please delete these records". Calling the SM to do additional work here is a side effect, which ultimately makes this code less reusable for lower level operations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @craig-rueda I generally agree, but how does this differ from the DAO asking "please delete these records" which triggers the SQLAlchemy ORM to handle post deletion events (per the after_delete event handler) which in turn invokes the security manager? Note we typically use these SQLAlchemy events because of short comings with either our data model and/or constructs which can't be modeled by our database schema. I guess what I’m saying is I would expect the DAO to behave in the same way whether it was deleting a single entity or bulk deleting multiple entities and explicitly calling the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, either way. It's more of a stylistic thing IMO. Keeping the DAO layer as decoupled as possible has worked well for me in the past, hence my comments :). |
||
|
||
if commit: | ||
db.session.commit() | ||
except SQLAlchemyError as ex: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,6 @@ | |
DatasetNotFoundError, | ||
) | ||
from superset.exceptions import SupersetSecurityException | ||
from superset.extensions import db | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -40,35 +39,10 @@ def __init__(self, model_ids: list[int]): | |
|
||
def run(self) -> None: | ||
self.validate() | ||
if not self._models: | ||
return None | ||
assert self._models | ||
|
||
try: | ||
DatasetDAO.bulk_delete(self._models) | ||
for model in self._models: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is logic taken #24466. Previously a test was failing because the relationship no longer existed in Python after the commit. See here for details. The issue was that: DatasetDAO.bulk_delete(self._models) includes a |
||
view_menu = ( | ||
security_manager.find_view_menu(model.get_perm()) if model else None | ||
) | ||
|
||
if view_menu: | ||
permission_views = ( | ||
db.session.query(security_manager.permissionview_model) | ||
.filter_by(view_menu=view_menu) | ||
.all() | ||
) | ||
|
||
for permission_view in permission_views: | ||
db.session.delete(permission_view) | ||
if view_menu: | ||
db.session.delete(view_menu) | ||
else: | ||
if not view_menu: | ||
logger.error( | ||
"Could not find the data access permission for the dataset", | ||
exc_info=True, | ||
) | ||
db.session.commit() | ||
|
||
return None | ||
except DeleteFailedError as ex: | ||
logger.exception(ex.exception) | ||
raise DatasetBulkDeleteFailedError() from ex | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,10 +15,9 @@ | |
# specific language governing permissions and limitations | ||
# under the License. | ||
import logging | ||
from typing import cast, Optional | ||
from typing import Optional | ||
|
||
from flask_appbuilder.models.sqla import Model | ||
from sqlalchemy.exc import SQLAlchemyError | ||
|
||
from superset import security_manager | ||
from superset.commands.base import BaseCommand | ||
|
@@ -31,7 +30,6 @@ | |
DatasetNotFoundError, | ||
) | ||
from superset.exceptions import SupersetSecurityException | ||
from superset.extensions import db | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -43,19 +41,13 @@ def __init__(self, model_id: int): | |
|
||
def run(self) -> Model: | ||
self.validate() | ||
self._model = cast(SqlaTable, self._model) | ||
assert self._model | ||
|
||
try: | ||
# Even though SQLAlchemy should in theory delete rows from the association | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @betodealmeida this should address the problem you were trying to circumvent in #23414. |
||
# table, sporadically Superset will error because the rows are not deleted. | ||
# Let's do it manually here to prevent the error. | ||
self._model.owners = [] | ||
dataset = DatasetDAO.delete(self._model, commit=False) | ||
db.session.commit() | ||
except (SQLAlchemyError, DAODeleteFailedError) as ex: | ||
return DatasetDAO.delete(self._model) | ||
except DAODeleteFailedError as ex: | ||
logger.exception(ex) | ||
db.session.rollback() | ||
raise DatasetDeleteFailedError() from ex | ||
return dataset | ||
|
||
def validate(self) -> None: | ||
# Validate/populate model exists | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
"""add on delete cascade for tables references | ||
|
||
Revision ID: 6fbe660cac39 | ||
Revises: 83e1abbe777f | ||
Create Date: 2023-06-22 13:39:47.989373 | ||
|
||
""" | ||
from __future__ import annotations | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "6fbe660cac39" | ||
down_revision = "83e1abbe777f" | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
|
||
from superset.utils.core import generic_find_fk_constraint_name | ||
|
||
|
||
def migrate(ondelete: str | None) -> None: | ||
""" | ||
Redefine the foreign key constraints, via a successive DROP and ADD, for all tables | ||
related to the `tables` table to include the ON DELETE construct for cascading | ||
purposes. | ||
|
||
:param ondelete: If set, emit ON DELETE <value> when issuing DDL for this constraint | ||
""" | ||
|
||
bind = op.get_bind() | ||
insp = sa.engine.reflection.Inspector.from_engine(bind) | ||
|
||
conv = { | ||
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", | ||
} | ||
|
||
for table in ("sql_metrics", "table_columns"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
with op.batch_alter_table(table, naming_convention=conv) as batch_op: | ||
if constraint := generic_find_fk_constraint_name( | ||
table=table, | ||
columns={"id"}, | ||
referenced="tables", | ||
insp=insp, | ||
): | ||
batch_op.drop_constraint(constraint, type_="foreignkey") | ||
|
||
batch_op.create_foreign_key( | ||
constraint_name=f"fk_{table}_table_id_tables", | ||
referent_table="tables", | ||
local_cols=["table_id"], | ||
remote_cols=["id"], | ||
ondelete=ondelete, | ||
) | ||
|
||
with op.batch_alter_table("sqlatable_user", naming_convention=conv) as batch_op: | ||
for table, column in zip(("ab_user", "tables"), ("user_id", "table_id")): | ||
if constraint := generic_find_fk_constraint_name( | ||
table="sqlatable_user", | ||
columns={"id"}, | ||
referenced=table, | ||
insp=insp, | ||
): | ||
batch_op.drop_constraint(constraint, type_="foreignkey") | ||
|
||
batch_op.create_foreign_key( | ||
constraint_name=f"fk_sqlatable_user_{column}_{table}", | ||
referent_table=table, | ||
local_cols=[column], | ||
remote_cols=["id"], | ||
ondelete=ondelete, | ||
) | ||
|
||
|
||
def upgrade(): | ||
migrate(ondelete="CASCADE") | ||
|
||
|
||
def downgrade(): | ||
migrate(ondelete=None) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,14 +29,15 @@ | |
import re | ||
import signal | ||
import smtplib | ||
import sqlite3 | ||
import ssl | ||
import tempfile | ||
import threading | ||
import traceback | ||
import uuid | ||
import zlib | ||
from collections.abc import Iterable, Iterator, Sequence | ||
from contextlib import contextmanager | ||
from contextlib import closing, contextmanager | ||
from dataclasses import dataclass | ||
from datetime import date, datetime, time, timedelta | ||
from email.mime.application import MIMEApplication | ||
|
@@ -849,6 +850,24 @@ def ping_connection(connection: Connection, branch: bool) -> None: | |
# restore 'close with result' | ||
connection.should_close_with_result = save_should_close_with_result | ||
|
||
if some_engine.dialect.name == "sqlite": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this is why a number of tests currently have, if backend() == "sqlite":
return |
||
|
||
@event.listens_for(some_engine, "connect") | ||
def set_sqlite_pragma( # pylint: disable=unused-argument | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hopefully (if approved) this will remove the need for this logic in a number of tests. |
||
connection: sqlite3.Connection, | ||
*args: Any, | ||
) -> None: | ||
r""" | ||
Enable foreign key support for SQLite. | ||
|
||
:param connection: The SQLite connection | ||
:param \*args: Additional positional arguments | ||
:see: https://docs.sqlalchemy.org/en/latest/dialects/sqlite.html | ||
""" | ||
|
||
with closing(connection.cursor()) as cursor: | ||
cursor.execute("PRAGMA foreign_keys=ON") | ||
|
||
|
||
def send_email_smtp( # pylint: disable=invalid-name,too-many-arguments,too-many-locals | ||
to: str, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer needed as the database will handle said logic.