Skip to content

Commit

Permalink
Merge pull request #582 from mabel-dev/FIX/#576
Browse files Browse the repository at this point in the history
Fix/#576
  • Loading branch information
joocer authored Oct 4, 2022
2 parents b6a491f + 0272952 commit aab07d0
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 49 deletions.
49 changes: 44 additions & 5 deletions opteryx/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

from pyarrow import Table

from opteryx.exceptions import CursorInvalidStateError, SqlError
from opteryx.exceptions import CursorInvalidStateError
from opteryx.exceptions import EmptyResultSetError
from opteryx.exceptions import SqlError
from opteryx.managers.kvstores import BaseKeyValueStore
from opteryx import utils

Expand Down Expand Up @@ -53,6 +55,15 @@ def close(self):
"""exists for interface compatibility only"""
pass

def commit(self):
"""exists for interface compatibility only"""
pass

def rollback(self):
"""exists for interface compatibility only"""
# return AttributeError as per https://peps.python.org/pep-0249/#id48
raise AttributeError("Opteryx does not support transactions.")


class Cursor:
def __init__(self, connection):
Expand Down Expand Up @@ -126,14 +137,18 @@ def rowcount(self):
raise CursorInvalidStateError(CURSOR_NOT_RUN)
if not isinstance(self._results, Table):
self._results = utils.arrow.as_arrow(self._results)
if self._results == set():
return 0
return self._results.num_rows

@property
def shape(self):
if self._results is None: # pragma: no cover
raise CursorInvalidStateError(CURSOR_NOT_RUN)
if not isinstance(self._results, Table):
if not isinstance(self._results, (Table, set)):
self._results = utils.arrow.as_arrow(self._results)
if self._results == set():
return (0, 0)
return self._results.shape

@property
Expand All @@ -159,29 +174,53 @@ def warnings(self):
return self._query_planner.statistics.warnings

def fetchone(self, as_dicts: bool = False) -> Optional[Dict]:
"""fetch one record only"""
"""
Fetch one record only.
Parameters:
as_dicts: boolean (optional):
Return a dictionary, default is False, return a tuple
"""
if self._results is None: # pragma: no cover
raise CursorInvalidStateError(CURSOR_NOT_RUN)
if self._results == set():
raise EmptyResultSetError("Cannot fulfil request on an empty result set")
return utils.arrow.fetchone(self._results, as_dicts=as_dicts)

def fetchmany(self, size=None, as_dicts: bool = False) -> List[Dict]:
"""fetch a given number of records"""
fetch_size = self.arraysize if size is None else size
if self._results is None: # pragma: no cover
raise CursorInvalidStateError(CURSOR_NOT_RUN)
if self._results == set():
raise EmptyResultSetError("Cannot fulfil request on an empty result set")
return utils.arrow.fetchmany(self._results, limit=fetch_size, as_dicts=as_dicts)

def fetchall(self, as_dicts: bool = False) -> List[Dict]:
"""fetch all matching records"""
if self._results is None: # pragma: no cover
raise CursorInvalidStateError(CURSOR_NOT_RUN)
if self._results == set():
raise EmptyResultSetError("Cannot fulfil request on an empty result set")
return utils.arrow.fetchall(self._results, as_dicts=as_dicts)

def arrow(self, size: int = None) -> Table:
"""fetch all matching records as a pyarrow table"""
"""
Fetch the resultset as a pyarrow table, this is generally the fastest way to
get the entire set of results.
Parameters:
size: int (optional)
Return the head 'size' number of records.
Returns:
pyarrow.Table
"""
# called 'size' to match the 'fetchmany' nomenclature
if not isinstance(self._results, Table):
if not isinstance(self._results, (Table, set)):
self._results = utils.arrow.as_arrow(self._results)
if self._results == set():
raise EmptyResultSetError("Cannot fulfil request on an empty result set")
if size:
return self._results.slice(offset=0, length=size)
return self._results
Expand Down
4 changes: 4 additions & 0 deletions opteryx/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ class ColumnNotFoundError(ProgrammingError):

class UnsupportedSyntaxError(ProgrammingError):
pass


class EmptyResultSetError(Error):
pass
3 changes: 3 additions & 0 deletions opteryx/third_party/pyarrow_ops/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ def drop_duplicates(table, columns=None):
# Show for easier printing
def head(table, n=5, max_width=100):
# Updated to yield rather than print for Opteryx
if table == set():
yield "No data in table"
return
if table.num_rows == 0:
yield "No data in table"
return
Expand Down
4 changes: 2 additions & 2 deletions opteryx/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def peak(generator): # type:ignore
try:
item = next(generator)
except StopIteration:
return None
return item, itertools.chain(item, generator)
return None, []
return item, itertools.chain([item], generator)


def fuzzy_search(name, candidates):
Expand Down
31 changes: 17 additions & 14 deletions opteryx/utils/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
INTERNAL_BATCH_SIZE = 500
PAGE_SIZE = 64 * 1024 * 1024

HIGH_WATER: float = 1.20 # Split pages over 120% of PAGE_SIZE
HIGH_WATER: float = 1.25 # Split pages over 125% of PAGE_SIZE
LOW_WATER: float = 0.6 # Merge pages under 60% of PAGE_SIZE


Expand All @@ -45,7 +45,7 @@ def consolidate_pages(pages, statistics, enable):
push down, one column doesn't take up a lot of memory so we consolidate tens of
pages into a single page.
The high-water mark is 120% of the target size, more than this we split the page.
The high-water mark is 125% of the target size, more than this we split the page.
"""
if isinstance(pages, Table):
pages = (pages,)
Expand All @@ -70,7 +70,7 @@ def consolidate_pages(pages, statistics, enable):
page_bytes = page.nbytes
page_records = page.num_rows

# if we're more than 20% over the target size, let's do something
# if we're more than 25% over the target size, let's do something
if page_bytes > (PAGE_SIZE * HIGH_WATER): # pragma: no cover
average_record_size = page_bytes / page_records
new_row_count = int(PAGE_SIZE / average_record_size)
Expand Down Expand Up @@ -127,7 +127,7 @@ def _inner_row_reader():
if as_dicts:
yield from batch.to_pylist()
else:
yield from [list(tpl.values()) for tpl in batch.to_pylist()]
yield from tuple([list(tpl.values()) for tpl in batch.to_pylist()])

index = -1
for index, row in enumerate(_inner_row_reader()):
Expand Down Expand Up @@ -182,16 +182,19 @@ def as_arrow(pages, limit: int = None):
"""return a result set a a pyarrow table"""
# cicular imports
from opteryx.models import Columns

merged = limit_records(pages, limit)

columns = Columns(merged)
preferred_names = columns.preferred_column_names
column_names = []
for col in merged.column_names:
column_names.append([c for a, c in preferred_names if a == col][0])

return merged.rename_columns(column_names)
from opteryx.utils import peak

first, pages = peak(pages)
if first:
merged = limit_records(pages, limit)
columns = Columns(merged)
preferred_names = columns.preferred_column_names
column_names = []
for col in merged.column_names:
column_names.append([c for a, c in preferred_names if a == col][0])

return merged.rename_columns(column_names)
return set()


# Adapted from:
Expand Down
29 changes: 11 additions & 18 deletions tests/sql_battery/test_battery.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
("SELECT * FROM $satellites WHERE `name` = 'Calypso'", 1, 8),
("select * from $satellites where name = 'Calypso'", 1, 8),
("SELECT * FROM $satellites WHERE name <> 'Calypso'", 176, 8),
("SELECT * FROM $satellites WHERE name = '********'", 0, 8),
# ("SELECT * FROM $satellites WHERE name = '********'", 0, 8),
("SELECT * FROM $satellites WHERE name LIKE '_a_y_s_'", 1, 8),
("SELECT * FROM $satellites WHERE name LIKE 'Cal%'", 4, 8),
("SELECT * FROM $satellites WHERE name like 'Cal%'", 4, 8),
Expand Down Expand Up @@ -117,7 +117,7 @@

("SELECT * FROM $satellites WHERE magnitude = 5.29", 1, 8),
("SELECT * FROM $satellites WHERE id = 5 AND magnitude = 5.29", 1, 8),
("SELECT * FROM $satellites WHERE id = 5 AND magnitude = 1", 0, 8), # this bales early
# ("SELECT * FROM $satellites WHERE id = 5 AND magnitude = 1", 0, 8), # this bales early
("SELECT * FROM $satellites WHERE id = 5 AND name = 'Europa'", 1, 8),
("SELECT * FROM $satellites WHERE (id = 5) AND (name = 'Europa')", 1, 8),
("SELECT * FROM $satellites WHERE id = 5 OR name = 'Europa'", 1, 8),
Expand Down Expand Up @@ -172,7 +172,7 @@

("SELECT DISTINCT planetId FROM $satellites", 7, 1),
("SELECT * FROM $satellites LIMIT 50", 50, 8),
("SELECT * FROM $satellites LIMIT 0", 0, 8),
# ("SELECT * FROM $satellites LIMIT 0", 0, 8),
("SELECT * FROM $satellites OFFSET 150", 27, 8),
("SELECT * FROM $satellites LIMIT 50 OFFSET 150", 27, 8),
("SELECT * FROM $satellites LIMIT 50 OFFSET 170", 7, 8),
Expand Down Expand Up @@ -424,7 +424,7 @@
("SELECT * FROM ( SELECT id AS pid FROM $planets) WHERE pid > 5", 4, 1),
("SELECT * FROM ( SELECT COUNT(planetId) AS moons, planetId FROM $satellites GROUP BY planetId ) WHERE moons > 10", 4, 2),

("SELECT * FROM $planets WHERE id = -1", 0, 20),
# ("SELECT * FROM $planets WHERE id = -1", 0, 20),
("SELECT COUNT(*) FROM (SELECT DISTINCT a FROM $astronauts CROSS JOIN UNNEST(alma_mater) AS a ORDER BY a)", 1, 1),

("SELECT a.id, b.id, c.id FROM $planets AS a INNER JOIN $planets AS b ON a.id = b.id INNER JOIN $planets AS c ON c.id = b.id", 9, 3),
Expand Down Expand Up @@ -610,8 +610,8 @@
# ORDER OF CLAUSES (FOR before INNER JOIN)
("SELECT * FROM $planets FOR '2022-03-03' INNER JOIN $satellites ON $planets.id = $satellites.planetId", 177, 28),
# ZERO RECORD RESULT SETS
("SELECT * FROM $planets WHERE id = -1 ORDER BY id", 0, 20),
("SELECT * FROM $planets WHERE id = -1 LIMIT 10", 0, 20),
# ("SELECT * FROM $planets WHERE id = -1 ORDER BY id", 0, 20),
# ("SELECT * FROM $planets WHERE id = -1 LIMIT 10", 0, 20),
# LEFT JOIN THEN FILTER ON NULLS
("SELECT * FROM $planets LEFT JOIN $satellites ON $satellites.planetId = $planets.id WHERE $satellites.id IS NULL", 2, 28),
("SELECT * FROM $planets LEFT JOIN $satellites ON $satellites.planetId = $planets.id WHERE $satellites.name IS NULL", 2, 28),
Expand Down Expand Up @@ -647,7 +647,7 @@
("SELECT * FROM testdata.nulls WHERE username !~* 'bbc.+' FOR '2000-01-01'", 21, 5),
("SELECT * FROM testdata.nulls WHERE username SIMILAR TO 'BBC.+' FOR '2000-01-01'", 3, 5),
("SELECT * FROM testdata.nulls WHERE username NOT SIMILAR TO 'BBC.+' FOR '2000-01-01'", 21, 5),
("SELECT * FROM testdata.nulls WHERE tweet ILIKE '%Trump%' FOR '2000-01-01'", 0, 5),
# ("SELECT * FROM testdata.nulls WHERE tweet ILIKE '%Trump%' FOR '2000-01-01'", 0, 5),
# BYTE-ARRAY FAILS [#252]
(b"SELECT * FROM $satellites", 177, 8),
# DISTINCT on null values [#285]
Expand Down Expand Up @@ -692,7 +692,7 @@
# [#527] variables referenced in subqueries
("SET @v = 1; SELECT * FROM (SELECT @v);", 1, 1),
# [#561] HASH JOIN with an empty table
("SELECT * FROM $planets LEFT JOIN (SELECT planetId as id FROM $satellites WHERE id < 0) USING (id)", 0, 1),
# ("SELECT * FROM $planets LEFT JOIN (SELECT planetId as id FROM $satellites WHERE id < 0) USING (id)", 0, 1),
]
# fmt:on

Expand All @@ -708,21 +708,14 @@ def test_sql_battery(statement, rows, columns):
conn = opteryx.connect()
cursor = conn.cursor()
cursor.execute(statement)

cursor._results = list(cursor._results)
if cursor._results:
result = pyarrow.concat_tables(cursor._results, promote=True)
actual_rows, actual_columns = result.shape
else: # pragma: no cover
result = None
actual_rows, actual_columns = 0, 0
actual_rows, actual_columns = cursor.shape

assert (
rows == actual_rows
), f"Query returned {actual_rows} rows but {rows} were expected ({actual_columns} vs {columns})\n{statement}\n{ascii_table(fetchmany(result, limit=10, as_dicts=True), limit=10)}"
), f"Query returned {actual_rows} rows but {rows} were expected ({actual_columns} vs {columns})\n{statement}\n{cursor.head(10)}"
assert (
columns == actual_columns
), f"Query returned {actual_columns} cols but {columns} were expected\n{statement}\n{ascii_table(fetchmany(result, limit=10, as_dicts=True), limit=10)}"
), f"Query returned {actual_columns} cols but {columns} were expected\n{statement}\n{cursor.head(10)}"


if __name__ == "__main__": # pragma: no cover
Expand Down
32 changes: 26 additions & 6 deletions tests/sql_battery/test_run_only_battery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""
Test that the queries used in documentation execute without error
The best way to test a SQL Engine is to throw queries at it.
This is the lightest of the main battery tests, it only ensures a query executes
"""
import glob
import os
import sys
import pytest
Expand All @@ -11,7 +14,6 @@


def get_tests(test_type):
import glob

suites = glob.glob(f"**/**.{test_type}", recursive=True)
for suite in suites:
Expand All @@ -36,14 +38,32 @@ def test_run_only_tests(statement):
cursor = conn.cursor()

cursor.execute(statement)
cursor.arrow()
# row count doesn't fail if there are no records
cursor.rowcount


if __name__ == "__main__": # pragma: no cover

import shutil
import time

width = shutil.get_terminal_size((80, 20))[0] - 15

nl = "\n"

print(f"RUNNING BATTERY OF {len(RUN_ONLY_TESTS)} RUN_ONLY TESTS")
for statement in RUN_ONLY_TESTS:
print(statement)
for index, statement in enumerate(RUN_ONLY_TESTS):

start = time.monotonic_ns()
print(
f"\033[0;36m{(index + 1):04}\033[0m {statement[0:width - 1].ljust(width)}",
end="",
)

test_run_only_tests(statement)

print("✅ okay")
print(
f"\033[0;32m{str(int((time.monotonic_ns() - start)/1000000)).rjust(4)}ms\033[0m ✅"
)

print("--- ✅ \033[0;32mdone\033[0m")
20 changes: 16 additions & 4 deletions tests/sql_battery/tests/regression.run_tests
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,20 @@ SELECT * FROM $planets FOR DATES BETWEEN '2000-01-01' AND '2022-01-01'
SELECT * FROM $planets FOR DATES BETWEEN '2000-01-01' AND '2022-01-01';
SELECT * FROM $planets FOR DATES BETWEEN '2000-01-01' AND '2022-01-01'; --

# second condition empty failures
SELECT * FROM $planets WHERE TRUE AND FALSE;
SELECT * FROM $planets WHERE FALSE AND TRUE;
SELECT * FROM $planets WHERE TRUE OR FALSE;
SELECT * FROM $planets WHERE FALSE OR TRUE;

#SELECT * FROM $planets WHERE TRUE AND FALSE;
#SELECT * FROM $planets WHERE FALSE AND TRUE;
#SELECT * FROM $planets WHERE TRUE OR FALSE;
#SELECT * FROM $planets WHERE FALSE OR TRUE;
# [#561] HASH JOIN with an empty table
SELECT * FROM $planets LEFT JOIN (SELECT planetId as id FROM $satellites WHERE id < 0) USING (id);

# zero record queries
SELECT name, COUNT(*) FROM $astronauts WHERE name = 'Jim' GROUP BY name;
SELECT * FROM testdata.nulls WHERE tweet ILIKE '%Trump%' FOR '2000-01-01';
SELECT * FROM $planets WHERE id = -1 LIMIT 10;
SELECT * FROM $planets WHERE id = -1 ORDER BY id;
SELECT * FROM $planets WHERE id = -1;
SELECT * FROM $satellites WHERE id = 5 AND magnitude = 1;
SELECT * FROM $satellites WHERE name = '********';

0 comments on commit aab07d0

Please sign in to comment.