Skip to content

Commit

Permalink
chore: Migrate /superset/stop_query/ to API v1
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomedina248 committed Jan 6, 2023
1 parent af34e45 commit 601562f
Show file tree
Hide file tree
Showing 11 changed files with 1,091 additions and 433 deletions.
1,258 changes: 833 additions & 425 deletions docs/static/resources/openapi.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions superset-frontend/src/SqlLab/actions/sqlLab.js
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,9 @@ export function validateQuery(queryEditor, sql) {
export function postStopQuery(query) {
return function (dispatch) {
return SupersetClient.post({
endpoint: '/superset/stop_query/',
postPayload: { client_id: query.id },
stringify: false,
endpoint: '/api/v1/query/stop',
body: JSON.stringify({ client_id: query.id }),
headers: { 'Content-Type': 'application/json' },
})
.then(() => dispatch(stopQuery(query)))
.then(() => dispatch(addSuccessToast(t('Query was stopped.'))))
Expand Down
2 changes: 1 addition & 1 deletion superset-frontend/src/SqlLab/actions/sqlLab.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ describe('async actions', () => {
});

describe('postStopQuery', () => {
const stopQueryEndpoint = 'glob:*/superset/stop_query/*';
const stopQueryEndpoint = 'glob:*/api/v1/query/stop';
fetchMock.post(stopQueryEndpoint, {});

const makeRequest = () => {
Expand Down
1 change: 1 addition & 0 deletions superset/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class RouteMethod: # pylint: disable=too-few-public-methods
"get_data": "read",
"samples": "read",
"delete_ssh_tunnel": "write",
"stop_query": "read",
}

EXTRA_FORM_DATA_APPEND_KEYS = {
Expand Down
2 changes: 1 addition & 1 deletion superset/models/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ def get_sqla_col(self, col: Dict[str, Any]) -> Column:
col = sa.column(label, type_=col_type)
return self.make_sqla_column_compatible(col, label)

def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements,unused-argument
def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements
self,
apply_fetch_values_predicate: bool = False,
columns: Optional[List[Column]] = None,
Expand Down
74 changes: 72 additions & 2 deletions superset/queries/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,28 @@
# under the License.
import logging

import backoff
from flask_appbuilder.api import expose, protect, request, safe
from flask_appbuilder.models.sqla.interface import SQLAInterface

from superset import db, event_logger
from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod
from superset.databases.filters import DatabaseFilter
from superset.models.sql_lab import Query
from superset.queries.dao import QueryDAO
from superset.queries.filters import QueryFilter
from superset.queries.schemas import openapi_spec_methods_override, QuerySchema
from superset.views.base_api import BaseSupersetModelRestApi, RelatedFieldFilter
from superset.queries.schemas import (
openapi_spec_methods_override,
QuerySchema,
StopQuerySchema,
)
from superset.superset_typing import FlaskResponse
from superset.views.base_api import (
BaseSupersetModelRestApi,
RelatedFieldFilter,
requires_json,
statsd_metrics,
)
from superset.views.filters import BaseFilterRelatedUsers, FilterRelatedOwners

logger = logging.getLogger(__name__)
Expand All @@ -43,6 +57,7 @@ class QueryRestApi(BaseSupersetModelRestApi):
RouteMethod.GET_LIST,
RouteMethod.RELATED,
RouteMethod.DISTINCT,
"stop_query",
}

list_columns = [
Expand Down Expand Up @@ -95,9 +110,11 @@ class QueryRestApi(BaseSupersetModelRestApi):
base_filters = [["id", QueryFilter, lambda: []]]
base_order = ("changed_on", "desc")
list_model_schema = QuerySchema()
stop_query_schema = StopQuerySchema()

openapi_spec_tag = "Queries"
openapi_spec_methods = openapi_spec_methods_override
openapi_spec_component_schemas = (StopQuerySchema,)

order_columns = [
"changed_on",
Expand All @@ -123,3 +140,56 @@ class QueryRestApi(BaseSupersetModelRestApi):
base_related_field_filters = {"database": [["id", DatabaseFilter, lambda: []]]}
allowed_rel_fields = {"database", "user"}
allowed_distinct_fields = {"status"}

@expose("/stop", methods=["POST"])
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".stop_query",
log_to_statsd=False,
)
@backoff.on_exception(
backoff.constant,
Exception,
interval=1,
on_backoff=lambda details: db.session.rollback(),
on_giveup=lambda details: db.session.rollback(),
max_tries=5,
)
@requires_json
def stop_query(self) -> FlaskResponse:
"""Manually stop a query with client_id
---
post:
summary: Manually stop a query with client_id
requestBody:
description: Stop query schema
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/StopQuerySchema'
responses:
200:
description: Query stopped
content:
application/json:
schema:
type: object
properties:
result:
type: string
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
body = self.stop_query_schema.load(request.json)
QueryDAO.stop_query(body["client_id"])
return self.response(200, result="OK")
24 changes: 24 additions & 0 deletions superset/queries/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
from datetime import datetime
from typing import Any, Dict

from superset import sql_lab
from superset.common.db_query_status import QueryStatus
from superset.dao.base import BaseDAO
from superset.exceptions import SupersetCancelQueryException
from superset.extensions import db
from superset.models.sql_lab import Query, SavedQuery
from superset.queries.filters import QueryFilter
from superset.utils.dates import now_as_float

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,3 +60,23 @@ def save_metadata(query: Query, payload: Dict[str, Any]) -> None:
columns = payload.get("columns", {})
db.session.add(query)
query.set_extra_json_key("columns", columns)

@staticmethod
def stop_query(client_id: str) -> None:
query = db.session.query(Query).filter_by(client_id=client_id).one()
if query.status in [
QueryStatus.FAILED,
QueryStatus.SUCCESS,
QueryStatus.TIMED_OUT,
]:
logger.warning(
"Query with client_id could not be stopped: query already complete",
)
return

if not sql_lab.cancel_query(query):
raise SupersetCancelQueryException("Could not cancel query")

query.status = QueryStatus.STOPPED
query.end_time = now_as_float()
db.session.commit()
8 changes: 8 additions & 0 deletions superset/queries/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,11 @@ class Meta: # pylint: disable=too-few-public-methods
# pylint: disable=no-self-use
def get_sql_tables(self, obj: Query) -> List[Table]:
return obj.sql_tables


class StopQuerySchema(Schema):
"""
Schema for the stop_query API call.
"""

client_id = fields.String()
1 change: 1 addition & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2296,6 +2296,7 @@ def results_exec(key: str) -> FlaskResponse:
on_giveup=lambda details: db.session.rollback(),
max_tries=5,
)
@deprecated()
def stop_query(self) -> FlaskResponse:
client_id = request.form.get("client_id")
query = db.session.query(Query).filter_by(client_id=client_id).one()
Expand Down
25 changes: 25 additions & 0 deletions tests/integration_tests/queries/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# isort:skip_file
"""Unit tests for Superset"""
from datetime import datetime, timedelta
from unittest import mock
import json
import random
import string
Expand Down Expand Up @@ -392,3 +393,27 @@ def test_get_list_query_no_data_access(self):
# rollback changes
db.session.delete(query)
db.session.commit()

@mock.patch("superset.sql_lab.cancel_query")
@mock.patch("superset.views.core.db.session")
def test_stop_query(self, mock_superset_db_session, mock_sql_lab_cancel_query):
"""
Handles stop query when the DB engine spec does not
have a cancel query method.
"""
form_data = {"client_id": "foo"}
query_mock = mock.Mock()
query_mock.client_id = "foo"
query_mock.status = QueryStatus.RUNNING
self.login(username="admin")
mock_superset_db_session.query().filter_by().one().return_value = query_mock
mock_sql_lab_cancel_query.return_value = True
rv = self.client.post(
"/api/v1/query/stop",
data=json.dumps(form_data),
content_type="application/json",
)

assert rv.status_code == 200
data = json.loads(rv.data.decode("utf-8"))
assert data["result"] == "OK"
123 changes: 122 additions & 1 deletion tests/unit_tests/dao/queries_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
# specific language governing permissions and limitations
# under the License.
import json
from typing import Iterator
from typing import Any, Iterator

import pytest
from pytest_mock import MockFixture
from sqlalchemy.orm.session import Session

from superset.exceptions import SupersetCancelQueryException


def test_query_dao_save_metadata(session: Session) -> None:
from superset.models.core import Database
Expand Down Expand Up @@ -53,3 +56,121 @@ def test_query_dao_save_metadata(session: Session) -> None:
query = session.query(Query).one()
QueryDAO.save_metadata(query=query, payload={"columns": []})
assert query.extra.get("columns", None) == []


def test_query_dao_stop_query_not_running(
mocker: MockFixture, app: Any, session: Session
) -> None:
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query

engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member

db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")

query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
status=QueryStatus.FAILED,
)

session.add(db)
session.add(query_obj)

from superset.queries.dao import QueryDAO

QueryDAO.stop_query(query_obj.client_id)
query = session.query(Query).one()
assert query.status == QueryStatus.FAILED


def test_query_dao_stop_query_failed(
mocker: MockFixture, app: Any, session: Session
) -> None:
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query

engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member

db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")

query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
status=QueryStatus.RUNNING,
)

session.add(db)
session.add(query_obj)

mocker.patch("superset.sql_lab.cancel_query", return_value=False)

from superset.queries.dao import QueryDAO

with pytest.raises(SupersetCancelQueryException):
QueryDAO.stop_query(query_obj.client_id)

query = session.query(Query).one()
assert query.status == QueryStatus.RUNNING


def test_query_dao_stop_query(mocker: MockFixture, app: Any, session: Session) -> None:
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query

engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member

db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")

query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
status=QueryStatus.RUNNING,
)

session.add(db)
session.add(query_obj)

mocker.patch("superset.sql_lab.cancel_query", return_value=True)

from superset.queries.dao import QueryDAO

QueryDAO.stop_query(query_obj.client_id)
query = session.query(Query).one()
assert query.status == QueryStatus.STOPPED

0 comments on commit 601562f

Please sign in to comment.