Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dataset] New, refresh API endpoint #9363

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ def mutator(df: pd.DataFrame) -> None:
def get_sqla_table_object(self) -> Table:
return self.database.get_table(self.table_name, schema=self.schema)

def fetch_metadata(self) -> None:
def fetch_metadata(self, commit=True) -> None:
"""Fetches the metadata for the table and merges it in"""
try:
table = self.get_sqla_table_object()
Expand All @@ -1086,7 +1086,6 @@ def fetch_metadata(self) -> None:
).format(self.table_name)
)

M = SqlMetric
metrics = []
any_date_col = None
db_engine_spec = self.database.db_engine_spec
Expand Down Expand Up @@ -1123,7 +1122,7 @@ def fetch_metadata(self) -> None:
any_date_col = col.name

metrics.append(
M(
SqlMetric(
metric_name="count",
verbose_name="COUNT(*)",
metric_type="count",
Expand All @@ -1134,7 +1133,8 @@ def fetch_metadata(self) -> None:
self.main_dttm_col = any_date_col
self.add_missing_metrics(metrics)
db.session.merge(self)
db.session.commit()
if commit:
db.session.commit()

@classmethod
def import_obj(cls, i_datasource, import_time=None) -> int:
Expand Down
50 changes: 49 additions & 1 deletion superset/datasets/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
DatasetForbiddenError,
DatasetInvalidError,
DatasetNotFoundError,
DatasetRefreshFailedError,
DatasetUpdateFailedError,
)
from superset.datasets.commands.refresh import RefreshDatasetCommand
from superset.datasets.commands.update import UpdateDatasetCommand
from superset.datasets.schemas import DatasetPostSchema, DatasetPutSchema
from superset.views.base import DatasourceFilter
Expand All @@ -49,7 +51,9 @@ class DatasetRestApi(BaseSupersetModelRestApi):
allow_browser_login = True

class_permission_name = "TableModelView"
include_route_methods = RouteMethod.REST_MODEL_VIEW_CRUD_SET | {RouteMethod.RELATED}
include_route_methods = (
RouteMethod.REST_MODEL_VIEW_CRUD_SET | {RouteMethod.RELATED} | {"refresh"}
)

list_columns = [
"changed_by_name",
Expand Down Expand Up @@ -268,3 +272,47 @@ def delete(self, pk: int) -> Response: # pylint: disable=arguments-differ
except DatasetDeleteFailedError as e:
logger.error(f"Error deleting model {self.__class__.__name__}: {e}")
return self.response_422(message=str(e))

@expose("/<pk>/refresh", methods=["PUT"])
@protect()
@safe
def refresh(self, pk: int) -> Response: # pylint: disable=invalid-name
"""Refresh a Dataset
---
put:
description: >-
Refresh updates columns for a dataset
parameters:
- in: path
schema:
type: integer
name: pk
responses:
200:
description: Dataset refreshed
content:
application/json:
schema:
type: object
properties:
message:
type: string
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
try:
RefreshDatasetCommand(g.user, pk).run()
return self.response(200, message="OK")
except DatasetNotFoundError:
return self.response_404()
except DatasetForbiddenError:
return self.response_403()
except DatasetRefreshFailedError as e:
logger.error(f"Error refreshing dataset {self.__class__.__name__}: {e}")
return self.response_422(message=str(e))
4 changes: 4 additions & 0 deletions superset/datasets/commands/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,7 @@ class DatasetDeleteFailedError(DeleteFailedError):

class DatasetForbiddenError(ForbiddenError):
message = _("Changing this dataset is forbidden")


class DatasetRefreshFailedError(UpdateFailedError):
message = _("Dataset could not be updated.")
61 changes: 61 additions & 0 deletions superset/datasets/commands/refresh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from typing import Optional

from flask_appbuilder.security.sqla.models import User

from superset.commands.base import BaseCommand
from superset.connectors.sqla.models import SqlaTable
from superset.datasets.commands.exceptions import (
DatasetForbiddenError,
DatasetNotFoundError,
DatasetRefreshFailedError,
)
from superset.datasets.dao import DatasetDAO
from superset.exceptions import SupersetSecurityException
from superset.views.base import check_ownership

logger = logging.getLogger(__name__)


class RefreshDatasetCommand(BaseCommand):
def __init__(self, user: User, model_id: int):
self._actor = user
self._model_id = model_id
self._model: Optional[SqlaTable] = None

def run(self):
self.validate()
try:
# Updates columns and metrics from the dataset
self._model.fetch_metadata()
except Exception as e:
logger.exception(e)
raise DatasetRefreshFailedError()
return self._model

def validate(self) -> None:
# Validate/populate model exists
self._model = DatasetDAO.find_by_id(self._model_id)
if not self._model:
raise DatasetNotFoundError()
# Check ownership
try:
check_ownership(self._model)
except SupersetSecurityException:
raise DatasetForbiddenError()
1 change: 1 addition & 0 deletions superset/views/base_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class BaseSupersetModelRestApi(ModelRestApi):
"bulk_delete": "delete",
"info": "list",
"related": "list",
"refresh": "edit",
}

order_rel_fields: Dict[str, Tuple[str, str]] = {}
Expand Down
55 changes: 54 additions & 1 deletion tests/dataset_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
from unittest.mock import patch

import prison
from sqlalchemy.sql import func

from superset import db, security_manager
from superset.connectors.sqla.models import SqlaTable
from superset.connectors.sqla.models import SqlaTable, TableColumn
from superset.dao.exceptions import (
DAOCreateFailedError,
DAODeleteFailedError,
Expand Down Expand Up @@ -452,3 +453,55 @@ def test_delete_dataset_sqlalchemy_error(self, mock_dao_delete):
self.assertEqual(data, {"message": "Dataset could not be deleted."})
db.session.delete(table)
db.session.commit()

def test_dataset_item_refresh(self):
"""
Dataset API: Test item refresh
"""
dataset = self.insert_default_dataset()
# delete a column
id_column = (
db.session.query(TableColumn)
.filter_by(table_id=dataset.id, column_name="id")
.one()
)
db.session.delete(id_column)
db.session.commit()

self.login(username="admin")
uri = f"api/v1/dataset/{dataset.id}/refresh"
rv = self.client.put(uri)
self.assertEqual(rv.status_code, 200)
# Assert the column is restored on refresh
id_column = (
db.session.query(TableColumn)
.filter_by(table_id=dataset.id, column_name="id")
.one()
)
self.assertIsNotNone(id_column)
db.session.delete(dataset)
db.session.commit()

def test_dataset_item_refresh_not_found(self):
"""
Dataset API: Test item refresh not found dataset
"""
max_id = db.session.query(func.max(SqlaTable.id)).scalar()

self.login(username="admin")
uri = f"api/v1/dataset/{max_id + 1}/refresh"
rv = self.client.put(uri)
self.assertEqual(rv.status_code, 404)

def test_dataset_item_refresh_not_owned(self):
"""
Dataset API: Test item refresh not owned dataset
"""
dataset = self.insert_default_dataset()
self.login(username="alpha")
uri = f"api/v1/dataset/{dataset.id}/refresh"
rv = self.client.put(uri)
self.assertEqual(rv.status_code, 403)

db.session.delete(dataset)
db.session.commit()