From 4adf24feaf7e94701e5cc058144c200e7476bf46 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Wed, 30 Oct 2024 19:30:10 -0700 Subject: [PATCH 1/6] feat(sqllab): add latest partition support for BigQuery Adding db_engine_spec-related features that enables SQL Lab to show the latest partition when using time partitioning in BigQuery as well as applying a WHERE clause on the latest partition by default when fetching the sample dataset. Turns out that `SELECT * FROM {{...}} LIMIT {n}` can be costly against large tables in BigQuery as it results in a full table scan. --- docker/pythonpath_dev/superset_config.py | 1 + superset/db_engine_specs/base.py | 15 --- superset/db_engine_specs/bigquery.py | 123 ++++++++++++++--------- 3 files changed, 77 insertions(+), 62 deletions(-) diff --git a/docker/pythonpath_dev/superset_config.py b/docker/pythonpath_dev/superset_config.py index e8223e53584bc..03b20cc5be868 100644 --- a/docker/pythonpath_dev/superset_config.py +++ b/docker/pythonpath_dev/superset_config.py @@ -40,6 +40,7 @@ EXAMPLES_HOST = os.getenv("EXAMPLES_HOST") EXAMPLES_PORT = os.getenv("EXAMPLES_PORT") EXAMPLES_DB = os.getenv("EXAMPLES_DB") +SHOW_STACKTRACE = True # The SQLAlchemy connection string. SQLALCHEMY_DATABASE_URI = ( diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py index 8cabb1e5893ea..37f40d7c1cf3b 100644 --- a/superset/db_engine_specs/base.py +++ b/superset/db_engine_specs/base.py @@ -41,7 +41,6 @@ import sqlparse from apispec import APISpec from apispec.ext.marshmallow import MarshmallowPlugin -from deprecation import deprecated from flask import current_app, g, url_for from flask_appbuilder.security.sqla.models import User from flask_babel import gettext as __, lazy_gettext as _ @@ -1058,19 +1057,6 @@ def get_datatype(cls, type_code: Any) -> str | None: return type_code.upper() return None - @classmethod - @deprecated(deprecated_in="3.0") - def normalize_indexes(cls, indexes: list[dict[str, Any]]) -> list[dict[str, Any]]: - """ - Normalizes indexes for more consistency across db engines - - noop by default - - :param indexes: Raw indexes as returned by SQLAlchemy - :return: cleaner, more aligned index definition - """ - return indexes - @classmethod def get_table_metadata( cls, @@ -1637,7 +1623,6 @@ def where_latest_partition( # pylint: disable=unused-argument :return: SqlAlchemy query with additional where clause referencing the latest partition """ - # TODO: Fix circular import caused by importing Database, TableColumn return None @classmethod diff --git a/superset/db_engine_specs/bigquery.py b/superset/db_engine_specs/bigquery.py index 70bc4bc845390..58f2f1cdb27a6 100644 --- a/superset/db_engine_specs/bigquery.py +++ b/superset/db_engine_specs/bigquery.py @@ -17,20 +17,21 @@ from __future__ import annotations +import logging import re import urllib from datetime import datetime from re import Pattern +from textwrap import dedent from typing import Any, TYPE_CHECKING, TypedDict import pandas as pd from apispec import APISpec from apispec.ext.marshmallow import MarshmallowPlugin -from deprecation import deprecated from flask_babel import gettext as __ from marshmallow import fields, Schema from marshmallow.exceptions import ValidationError -from sqlalchemy import column, types +from sqlalchemy import column, func, types from sqlalchemy.engine.base import Engine from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.url import URL @@ -49,6 +50,11 @@ from superset.utils import core as utils, json from superset.utils.hashing import md5_sha_from_str +if TYPE_CHECKING: + from sqlalchemy.sql.expression import Select + +logger = logging.getLogger(__name__) + try: from google.cloud import bigquery from google.oauth2 import service_account @@ -284,42 +290,51 @@ def _truncate_label(cls, label: str) -> str: return "_" + md5_sha_from_str(label) @classmethod - @deprecated(deprecated_in="3.0") - def normalize_indexes(cls, indexes: list[dict[str, Any]]) -> list[dict[str, Any]]: - """ - Normalizes indexes for more consistency across db engines + def where_latest_partition( + cls, + database: Database, + table: Table, + query: Select, + columns: list[ResultSetColumnType] | None = None, + ) -> Select | None: + if partition_column := cls.get_time_partition_column(database, table): + max_partition_id = cls.get_max_partition_id(database, table) + query = query.where( + column(partition_column) == func.PARSE_DATE("%Y%m%d", max_partition_id) + ) - :param indexes: Raw indexes as returned by SQLAlchemy - :return: cleaner, more aligned index definition - """ - normalized_idxs = [] - # Fixing a bug/behavior observed in pybigquery==0.4.15 where - # the index's `column_names` == [None] - # Here we're returning only non-None indexes - for ix in indexes: - column_names = ix.get("column_names") or [] - ix["column_names"] = [col for col in column_names if col is not None] - if ix["column_names"]: - normalized_idxs.append(ix) - return normalized_idxs + return query @classmethod - def get_indexes( + def get_max_partition_id( cls, database: Database, - inspector: Inspector, table: Table, - ) -> list[dict[str, Any]]: - """ - Get the indexes associated with the specified schema/table. + ) -> Select | None: + sql = dedent(f"""\ + SELECT + MAX(partition_id) AS max_partition_id + FROM `{table.schema}.INFORMATION_SCHEMA.PARTITIONS` + WHERE table_name = '{table.table}' + """) + df = database.get_df(sql) + return df.iat[0, 0] - :param database: The database to inspect - :param inspector: The SQLAlchemy inspector - :param table: The table instance to inspect - :returns: The indexes - """ + @classmethod + def get_time_partition_column( + cls, + database: Database, + table: Table, + ) -> str | None: + with cls.get_engine( + database, catalog=table.catalog, schema=table.schema + ) as engine: + client = cls._get_client(engine, database) + bq_table = client.get_table(f"{table.schema}.{table.table}") - return cls.normalize_indexes(inspector.get_indexes(table.table, table.schema)) + if bq_table.time_partitioning is not None: + return bq_table.time_partitioning.field + return None @classmethod def get_extra_table_metadata( @@ -327,23 +342,37 @@ def get_extra_table_metadata( database: Database, table: Table, ) -> dict[str, Any]: - indexes = database.get_indexes(table) - if not indexes: - return {} - partitions_columns = [ - index.get("column_names", []) - for index in indexes - if index.get("name") == "partition" - ] - cluster_columns = [ - index.get("column_names", []) - for index in indexes - if index.get("name") == "clustering" - ] - return { - "partitions": {"cols": partitions_columns}, - "clustering": {"cols": cluster_columns}, - } + payload = {} + partition_column = cls.get_time_partition_column(database, table) + with cls.get_engine( + database, catalog=table.catalog, schema=table.schema + ) as engine: + if partition_column: + max_partition_id = cls.get_max_partition_id(database, table) + payload.update( + { + "partitions": { + "cols": [partition_column], + "latest": {"ds": max_partition_id}, + }, + "partitionQuery": cls.select_star( + database, + table, + engine, + indent=False, + show_cols=False, + latest_partition=True, + ), + "indexes": [ + { + "name": "partitioned", + "cols": [partition_column], + "type": "partitioned", + } + ], + } + ) + return payload @classmethod def epoch_to_dttm(cls) -> str: From 46ce4f22140225592a3d116df57e669afdc64eb4 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Wed, 30 Oct 2024 19:40:50 -0700 Subject: [PATCH 2/6] formating --- superset/db_engine_specs/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/superset/db_engine_specs/bigquery.py b/superset/db_engine_specs/bigquery.py index 58f2f1cdb27a6..5571d017b7ed8 100644 --- a/superset/db_engine_specs/bigquery.py +++ b/superset/db_engine_specs/bigquery.py @@ -332,7 +332,7 @@ def get_time_partition_column( client = cls._get_client(engine, database) bq_table = client.get_table(f"{table.schema}.{table.table}") - if bq_table.time_partitioning is not None: + if bq_table.time_partitioning: return bq_table.time_partitioning.field return None From b62e740f8d2ba147f9987ab92b591555e3b9eeec Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Wed, 30 Oct 2024 21:47:50 -0700 Subject: [PATCH 3/6] fix tests --- superset/db_engine_specs/bigquery.py | 17 +-- .../db_engine_specs/bigquery_tests.py | 133 ++++++------------ 2 files changed, 49 insertions(+), 101 deletions(-) diff --git a/superset/db_engine_specs/bigquery.py b/superset/db_engine_specs/bigquery.py index 5571d017b7ed8..f897fc2af46f2 100644 --- a/superset/db_engine_specs/bigquery.py +++ b/superset/db_engine_specs/bigquery.py @@ -349,20 +349,21 @@ def get_extra_table_metadata( ) as engine: if partition_column: max_partition_id = cls.get_max_partition_id(database, table) + sql = cls.select_star( + database, + table, + engine, + indent=False, + show_cols=False, + latest_partition=True, + ) payload.update( { "partitions": { "cols": [partition_column], "latest": {"ds": max_partition_id}, + "partitionQuery": sql, }, - "partitionQuery": cls.select_star( - database, - table, - engine, - indent=False, - show_cols=False, - latest_partition=True, - ), "indexes": [ { "name": "partitioned", diff --git a/tests/integration_tests/db_engine_specs/bigquery_tests.py b/tests/integration_tests/db_engine_specs/bigquery_tests.py index fa10bd2ce14bd..76ae57d29b885 100644 --- a/tests/integration_tests/db_engine_specs/bigquery_tests.py +++ b/tests/integration_tests/db_engine_specs/bigquery_tests.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. import unittest.mock as mock +from contextlib import contextmanager import pytest from pandas import DataFrame @@ -32,6 +33,15 @@ ) +@contextmanager +def mock_engine_with_credentials(*args, **kwargs): + engine_mock = mock.Mock() + engine_mock.dialect.credentials_info = { + "key": "value" + } # Add the credentials_info attribute + yield engine_mock + + class TestBigQueryDbEngineSpec(TestDbEngineSpec): def test_bigquery_sqla_column_label(self): """ @@ -111,108 +121,45 @@ def values(self): result = BigQueryEngineSpec.fetch_data(None, 0) assert result == [1, 2] - def test_get_extra_table_metadata(self): + @mock.patch.object( + BigQueryEngineSpec, "get_engine", side_effect=mock_engine_with_credentials + ) + @mock.patch.object(BigQueryEngineSpec, "get_time_partition_column") + @mock.patch.object(BigQueryEngineSpec, "get_max_partition_id") + @mock.patch.object(BigQueryEngineSpec, "quote_table", return_value="`table_name`") + def test_get_extra_table_metadata( + self, + mock_quote_table, + mock_get_max_partition_id, + mock_get_time_partition_column, + mock_get_engine, + ): """ DB Eng Specs (bigquery): Test extra table metadata """ database = mock.Mock() + sql = "SELECT * FROM `table_name`" + database.compile_sqla_query.return_value = sql + tbl = Table("some_table", "some_schema") + # Test no indexes - database.get_indexes = mock.MagicMock(return_value=None) - result = BigQueryEngineSpec.get_extra_table_metadata( - database, - Table("some_table", "some_schema"), - ) + mock_get_time_partition_column.return_value = None + mock_get_max_partition_id.return_value = None + result = BigQueryEngineSpec.get_extra_table_metadata(database, tbl) assert result == {} - index_metadata = [ - { - "name": "clustering", - "column_names": ["c_col1", "c_col2", "c_col3"], - }, - { - "name": "partition", - "column_names": ["p_col1", "p_col2", "p_col3"], + mock_get_time_partition_column.return_value = "ds" + mock_get_max_partition_id.return_value = "19690101" + result = BigQueryEngineSpec.get_extra_table_metadata(database, tbl) + print(result) + assert result == { + "indexes": [{"cols": ["ds"], "name": "partitioned", "type": "partitioned"}], + "partitions": { + "cols": ["ds"], + "latest": {"ds": "19690101"}, + "partitionQuery": sql, }, - ] - expected_result = { - "partitions": {"cols": [["p_col1", "p_col2", "p_col3"]]}, - "clustering": {"cols": [["c_col1", "c_col2", "c_col3"]]}, } - database.get_indexes = mock.MagicMock(return_value=index_metadata) - result = BigQueryEngineSpec.get_extra_table_metadata( - database, - Table("some_table", "some_schema"), - ) - assert result == expected_result - - def test_get_indexes(self): - database = mock.Mock() - inspector = mock.Mock() - schema = "foo" - table_name = "bar" - - inspector.get_indexes = mock.Mock( - return_value=[ - { - "name": "partition", - "column_names": [None], - "unique": False, - } - ] - ) - - assert ( - BigQueryEngineSpec.get_indexes( - database, - inspector, - Table(table_name, schema), - ) - == [] - ) - - inspector.get_indexes = mock.Mock( - return_value=[ - { - "name": "partition", - "column_names": ["dttm"], - "unique": False, - } - ] - ) - - assert BigQueryEngineSpec.get_indexes( - database, - inspector, - Table(table_name, schema), - ) == [ - { - "name": "partition", - "column_names": ["dttm"], - "unique": False, - } - ] - - inspector.get_indexes = mock.Mock( - return_value=[ - { - "name": "partition", - "column_names": ["dttm", None], - "unique": False, - } - ] - ) - - assert BigQueryEngineSpec.get_indexes( - database, - inspector, - Table(table_name, schema), - ) == [ - { - "name": "partition", - "column_names": ["dttm"], - "unique": False, - } - ] @mock.patch("superset.db_engine_specs.bigquery.BigQueryEngineSpec.get_engine") @mock.patch("superset.db_engine_specs.bigquery.pandas_gbq") From 22f40f346a16d3ddf28f280c432f9d7e5cdfba26 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Wed, 30 Oct 2024 23:35:40 -0700 Subject: [PATCH 4/6] minor fix --- superset/db_engine_specs/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/superset/db_engine_specs/bigquery.py b/superset/db_engine_specs/bigquery.py index f897fc2af46f2..093f021f04709 100644 --- a/superset/db_engine_specs/bigquery.py +++ b/superset/db_engine_specs/bigquery.py @@ -361,7 +361,7 @@ def get_extra_table_metadata( { "partitions": { "cols": [partition_column], - "latest": {"ds": max_partition_id}, + "latest": {partition_column: max_partition_id}, "partitionQuery": sql, }, "indexes": [ From 97f0bfa6603972b9d45b6154c7805318f26f61c5 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Tue, 12 Nov 2024 10:03:49 -0800 Subject: [PATCH 5/6] rm SHOW_STACKTRACES --- docker/pythonpath_dev/superset_config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/docker/pythonpath_dev/superset_config.py b/docker/pythonpath_dev/superset_config.py index 03b20cc5be868..e8223e53584bc 100644 --- a/docker/pythonpath_dev/superset_config.py +++ b/docker/pythonpath_dev/superset_config.py @@ -40,7 +40,6 @@ EXAMPLES_HOST = os.getenv("EXAMPLES_HOST") EXAMPLES_PORT = os.getenv("EXAMPLES_PORT") EXAMPLES_DB = os.getenv("EXAMPLES_DB") -SHOW_STACKTRACE = True # The SQLAlchemy connection string. SQLALCHEMY_DATABASE_URI = ( From 2fdcb9e7fd3f0355f43bef0287ff01d1c2c81b12 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Tue, 12 Nov 2024 10:06:50 -0800 Subject: [PATCH 6/6] keep normalize_indexes in base --- superset/db_engine_specs/base.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py index 37f40d7c1cf3b..8cabb1e5893ea 100644 --- a/superset/db_engine_specs/base.py +++ b/superset/db_engine_specs/base.py @@ -41,6 +41,7 @@ import sqlparse from apispec import APISpec from apispec.ext.marshmallow import MarshmallowPlugin +from deprecation import deprecated from flask import current_app, g, url_for from flask_appbuilder.security.sqla.models import User from flask_babel import gettext as __, lazy_gettext as _ @@ -1057,6 +1058,19 @@ def get_datatype(cls, type_code: Any) -> str | None: return type_code.upper() return None + @classmethod + @deprecated(deprecated_in="3.0") + def normalize_indexes(cls, indexes: list[dict[str, Any]]) -> list[dict[str, Any]]: + """ + Normalizes indexes for more consistency across db engines + + noop by default + + :param indexes: Raw indexes as returned by SQLAlchemy + :return: cleaner, more aligned index definition + """ + return indexes + @classmethod def get_table_metadata( cls, @@ -1623,6 +1637,7 @@ def where_latest_partition( # pylint: disable=unused-argument :return: SqlAlchemy query with additional where clause referencing the latest partition """ + # TODO: Fix circular import caused by importing Database, TableColumn return None @classmethod