From 2c1473b1ac11531d580b149824af3b33c9f80443 Mon Sep 17 00:00:00 2001 From: Luka Peschke Date: Thu, 5 Dec 2024 10:52:26 +0100 Subject: [PATCH] fix(server/pypika): reverse OFFSET and LIMIT for Athena translator [TCTC-9905] Signed-off-by: Luka Peschke --- server/CHANGELOG.md | 5 ++ .../pypika_translator/translators/athena.py | 29 ++++++-- .../pypika_translator/translators/base.py | 4 ++ .../test_offset_limit_behaviour.py | 68 +++++++++++++++++++ 4 files changed, 102 insertions(+), 4 deletions(-) create mode 100644 server/tests/backends/sql_translator_unit_tests/test_offset_limit_behaviour.py diff --git a/server/CHANGELOG.md b/server/CHANGELOG.md index adb77efe5..97c658a0e 100644 --- a/server/CHANGELOG.md +++ b/server/CHANGELOG.md @@ -2,6 +2,11 @@ ## Unreleased +### Fixed + +- Pypika: source_rows_subset now takes precedence over limit in case it is specified and smaller than the limit +- Pypika: The Athena translator now puts OFFSET and LIMIT in the correct order + ## [0.48.4] - 2024-11-28 ### Fixed diff --git a/server/src/weaverbird/backends/pypika_translator/translators/athena.py b/server/src/weaverbird/backends/pypika_translator/translators/athena.py index 369891667..5cae52b28 100644 --- a/server/src/weaverbird/backends/pypika_translator/translators/athena.py +++ b/server/src/weaverbird/backends/pypika_translator/translators/athena.py @@ -1,9 +1,9 @@ from datetime import date, datetime +from typing import Any -from pypika import functions -from pypika.dialects import Query +from pypika import Query, functions from pypika.enums import Dialects -from pypika.queries import Selectable +from pypika.queries import QueryBuilder, Selectable from pypika.terms import Case, CustomFunction, Field, Term from weaverbird.backends.pypika_translator.dialects import SQLDialect @@ -20,9 +20,30 @@ def __init__(self, field: Field): super().__init__("TO_MILLISECONDS", field) +class AthenaQueryBuilder(QueryBuilder): + def __init__(self, **kwargs: Any) -> None: + super().__init__(dialect=SQLDialect.ATHENA, **kwargs) + + # Does the same as the parent class, but with OFFSET before LIMIT + def _apply_pagination(self, querystring: str) -> str: + if self._offset: + querystring += self._offset_sql() + + if self._limit is not None: + querystring += self._limit_sql() + + return querystring + + +class AthenaQuery(Query): + @classmethod + def _builder(cls, **kwargs: Any) -> AthenaQueryBuilder: + return AthenaQueryBuilder(**kwargs) + + class AthenaTranslator(SQLTranslator): DIALECT = SQLDialect.ATHENA - QUERY_CLS = Query + QUERY_CLS = AthenaQuery SUPPORT_ROW_NUMBER = True SUPPORT_SPLIT_PART = True SUPPORT_UNPIVOT = False diff --git a/server/src/weaverbird/backends/pypika_translator/translators/base.py b/server/src/weaverbird/backends/pypika_translator/translators/base.py index 11802fb91..798ec0c6f 100644 --- a/server/src/weaverbird/backends/pypika_translator/translators/base.py +++ b/server/src/weaverbird/backends/pypika_translator/translators/base.py @@ -436,6 +436,10 @@ def get_query_str( if limit > steps[-1].limit: limit = steps[-1].limit + # If we have both a source rows subset and a limit, we want to apply the smallest limit of both + if self._source_rows_subset and limit: + limit = min(limit, self._source_rows_subset) + try: # This method is used by translate_pipeline. We are at the top level here, not in a nested # builder, so we want to unwrap the last step diff --git a/server/tests/backends/sql_translator_unit_tests/test_offset_limit_behaviour.py b/server/tests/backends/sql_translator_unit_tests/test_offset_limit_behaviour.py new file mode 100644 index 000000000..23bd510dc --- /dev/null +++ b/server/tests/backends/sql_translator_unit_tests/test_offset_limit_behaviour.py @@ -0,0 +1,68 @@ +import pytest + +from weaverbird.backends.pypika_translator.dialects import SQLDialect +from weaverbird.backends.pypika_translator.translate import translate_pipeline +from weaverbird.pipeline.pipeline import Pipeline +from weaverbird.pipeline.steps.domain import DomainStep + +_TABLES_COLUMNS = {"my_table": ["a", "b", "c"]} + + +@pytest.mark.parametrize( + "offset,limit,source_rows_subset,expected", + [ + pytest.param(0, None, None, 'SELECT "a","b","c" FROM "my_table"', id="defaults"), + pytest.param(0, 50, None, 'SELECT "a","b","c" FROM "my_table" LIMIT 50', id="limit_only"), + pytest.param(50, 50, None, 'SELECT "a","b","c" FROM "my_table" LIMIT 50 OFFSET 50', id="offset_and_limit"), + pytest.param( + 50, + 50, + 10, + 'SELECT "a","b","c" FROM "my_table" LIMIT 10 OFFSET 50', + id="offset_and_limit_and_source_rows_subset_smaller_than_limit", + ), + ], +) +def test_offset_limit_with_source_rows_subset_single_domain_step_postgres( + offset: int, limit: int | None, source_rows_subset: int | None, expected: str +) -> None: + pipe = Pipeline(steps=[DomainStep(domain="my_table")]) + translated = translate_pipeline( + sql_dialect=SQLDialect.POSTGRES, + pipeline=pipe, + tables_columns=_TABLES_COLUMNS, + offset=offset, + limit=limit, + source_rows_subset=source_rows_subset, + ) + assert translated == expected + + +@pytest.mark.parametrize( + "offset,limit,source_rows_subset,expected", + [ + pytest.param(0, None, None, 'SELECT "a","b","c" FROM "my_table"', id="defaults"), + pytest.param(0, 50, None, 'SELECT "a","b","c" FROM "my_table" LIMIT 50', id="limit_only"), + pytest.param(50, 50, None, 'SELECT "a","b","c" FROM "my_table" OFFSET 50 LIMIT 50', id="offset_and_limit"), + pytest.param( + 50, + 50, + 10, + 'SELECT "a","b","c" FROM "my_table" OFFSET 50 LIMIT 10', + id="offset_and_limit_and_source_rows_subset_smaller_than_limit", + ), + ], +) +def test_offset_limit_with_source_rows_subset_single_domain_step_athena( + offset: int, limit: int | None, source_rows_subset: int | None, expected: str +) -> None: + pipe = Pipeline(steps=[DomainStep(domain="my_table")]) + translated = translate_pipeline( + sql_dialect=SQLDialect.ATHENA, + pipeline=pipe, + tables_columns=_TABLES_COLUMNS, + offset=offset, + limit=limit, + source_rows_subset=source_rows_subset, + ) + assert translated == expected