From 64cc39d1136cc103f7ff7848bcc86a07624f2d8d Mon Sep 17 00:00:00 2001 From: joocer Date: Mon, 22 Apr 2024 07:51:47 +0100 Subject: [PATCH 1/4] #1595 --- opteryx/connectors/base/base_connector.py | 2 +- opteryx/models/__init__.py | 2 + .../{managers => models}/connection_state.py | 0 .../{shared => models}/query_statistics.py | 2 +- opteryx/operators/base_plan_node.py | 2 +- opteryx/planner/binder/binding_context.py | 2 +- opteryx/planner/executor/__init__.py | 0 opteryx/shared/__init__.py | 3 +- opteryx/shared/buffer_pool.py | 5 ++- opteryx/shared/memory_pool.py | 43 ++++++++++++++----- tests/misc/test_expressions.py | 2 +- 11 files changed, 45 insertions(+), 18 deletions(-) rename opteryx/{managers => models}/connection_state.py (100%) rename opteryx/{shared => models}/query_statistics.py (97%) create mode 100644 opteryx/planner/executor/__init__.py diff --git a/opteryx/connectors/base/base_connector.py b/opteryx/connectors/base/base_connector.py index b26ce419c..9489c56e1 100644 --- a/opteryx/connectors/base/base_connector.py +++ b/opteryx/connectors/base/base_connector.py @@ -21,7 +21,7 @@ import pyarrow from orso.schema import RelationSchema -from opteryx.shared import QueryStatistics +from opteryx.models import QueryStatistics MIN_CHUNK_SIZE: int = 500 INITIAL_CHUNK_SIZE: int = 500 diff --git a/opteryx/models/__init__.py b/opteryx/models/__init__.py index 1eac86716..b4f317e00 100644 --- a/opteryx/models/__init__.py +++ b/opteryx/models/__init__.py @@ -16,6 +16,7 @@ from opteryx.models.node import Node from opteryx.models.non_tabular_result import NonTabularResult from opteryx.models.query_properties import QueryProperties +from opteryx.models.query_statistics import QueryStatistics __all__ = ( "ConnectionContext", @@ -24,4 +25,5 @@ "Node", "NonTabularResult", "QueryProperties", + "QueryStatistics", ) diff --git a/opteryx/managers/connection_state.py b/opteryx/models/connection_state.py similarity index 100% rename from opteryx/managers/connection_state.py rename to opteryx/models/connection_state.py diff --git a/opteryx/shared/query_statistics.py b/opteryx/models/query_statistics.py similarity index 97% rename from opteryx/shared/query_statistics.py rename to opteryx/models/query_statistics.py index 6272fff7d..26bdc1c77 100644 --- a/opteryx/shared/query_statistics.py +++ b/opteryx/models/query_statistics.py @@ -19,7 +19,7 @@ def __init__(self): self._stats: dict = defaultdict(int) self._stats["messages"] = [] - def _ns_to_s(self, nano_seconds): + def _ns_to_s(self, nano_seconds: int) -> float: """convert elapsed ns to s""" if nano_seconds == 0: return 0 diff --git a/opteryx/operators/base_plan_node.py b/opteryx/operators/base_plan_node.py index 626e648b4..f826bc32f 100644 --- a/opteryx/operators/base_plan_node.py +++ b/opteryx/operators/base_plan_node.py @@ -17,7 +17,7 @@ from orso.tools import random_string from opteryx.models import QueryProperties -from opteryx.shared import QueryStatistics +from opteryx.models import QueryStatistics class BasePlanNode: diff --git a/opteryx/planner/binder/binding_context.py b/opteryx/planner/binder/binding_context.py index c18aa75dc..9806f609d 100644 --- a/opteryx/planner/binder/binding_context.py +++ b/opteryx/planner/binder/binding_context.py @@ -17,7 +17,7 @@ from typing import Set from opteryx.models import ConnectionContext -from opteryx.shared import QueryStatistics +from opteryx.models import QueryStatistics from opteryx.virtual_datasets import derived diff --git a/opteryx/planner/executor/__init__.py b/opteryx/planner/executor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/opteryx/shared/__init__.py b/opteryx/shared/__init__.py index e05aee1da..e4a9f1f34 100644 --- a/opteryx/shared/__init__.py +++ b/opteryx/shared/__init__.py @@ -13,7 +13,6 @@ from opteryx.shared.buffer_pool import BufferPool from opteryx.shared.materialized_datasets import MaterializedDatasets from opteryx.shared.memory_pool import MemoryPool -from opteryx.shared.query_statistics import QueryStatistics from opteryx.shared.rolling_log import RollingLog -__all__ = ("BufferPool", "MaterializedDatasets", "MemoryPool", "QueryStatistics", "RollingLog") +__all__ = ("BufferPool", "MaterializedDatasets", "MemoryPool", "RollingLog") diff --git a/opteryx/shared/buffer_pool.py b/opteryx/shared/buffer_pool.py index c0434f2ca..13afd3734 100644 --- a/opteryx/shared/buffer_pool.py +++ b/opteryx/shared/buffer_pool.py @@ -19,6 +19,8 @@ The buffer pool is has no slot limit, it is a given volume of memory, the pool will try to evict when full. This is different to a classic Buffer Pool which is slot-based. + +The Buffer Pool is a global resource and used across all Connections and Cursors. """ from typing import Optional @@ -29,7 +31,8 @@ class _BufferPool: """ - Buffer Pool is a class implementing a Least Recently Used (LRU) policy. + Buffer Pool is a class implementing a Least Recently Used (LRU) policy for + eviction. """ slots = "_lru", "_cache_backend", "_max_cacheable_item_size", "_memory_pool" diff --git a/opteryx/shared/memory_pool.py b/opteryx/shared/memory_pool.py index 96e6580ab..73b7005a9 100644 --- a/opteryx/shared/memory_pool.py +++ b/opteryx/shared/memory_pool.py @@ -10,11 +10,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from multiprocessing import Lock -from typing import Dict - -from orso.tools import random_int - """ Memory Pool is used to manage access to arbitrary blocks of memory. @@ -22,6 +17,12 @@ """ +from multiprocessing import Lock +from typing import Dict + +from orso.tools import random_int + + class MemorySegment: slots = ("start", "length") @@ -60,7 +61,11 @@ def _find_free_segment(self, size: int) -> int: return -1 def _level1_compaction(self): - """Merges adjacent free segments (Level 1 compaction).""" + """ + Merges adjacent free segments (Level 1 compaction). + + This is intended to a fast way to get larger contiguous blocks. + """ self.l1_compaction += 1 if not self.free_segments: return @@ -81,7 +86,11 @@ def _level1_compaction(self): self.free_segments = new_free_segments def _level2_compaction(self): - """Aggressively compacts by pushing all free memory to the end (Level 2 compaction).""" + """ + Aggressively compacts by pushing all free memory to the end (Level 2 compaction). + + This is slower, but ensures we get the maximum free space. + """ self.l2_compaction += 1 total_free_space = sum(segment.length for segment in self.free_segments) @@ -101,6 +110,13 @@ def _level2_compaction(self): offset += segment.length def commit(self, data: bytes) -> int: + """ + Add an item to the pool and return its reference. + + If we can't find a free block large enough we perform compaction, + first we combine adjacent free blocks into larger blocks. If that's + not enough, we consolidate all of the free blocks together. + """ self.commits += 1 len_data = len(data) # always acquire a lock to write @@ -147,10 +163,11 @@ def read(self, ref_id: int) -> bytes: """ We're using an optimistic locking strategy where we do not acquire a lock, perform the read and then check that the metadata hasn't changed - and if it's the same, we assume no writes have updated it. + and if it's the same, we assume no writes have updated it. If it has + changed, we acquire a lock and try again. - If it has changed, we acquire a lock and try again. The buffer pool is - read heavy, so optimized reads are preferred. + We use this approach because locks are expensive and memory pools are + likely to be read heavy. """ if ref_id not in self.used_segments: raise ValueError("Invalid reference ID.") @@ -167,6 +184,9 @@ def read(self, ref_id: int) -> bytes: return view def release(self, ref_id: int): + """ + Remove an item from the pool + """ self.releases += 1 with self.lock: if ref_id not in self.used_segments: @@ -175,5 +195,8 @@ def release(self, ref_id: int): self.free_segments.append(segment) def __del__(self): + """ + This function exists just to wrap the debug logging + """ pass # DEBUG: log (f"Memory Pool ({self.name}) ") diff --git a/tests/misc/test_expressions.py b/tests/misc/test_expressions.py index 2fdb555c5..0cf46b71b 100644 --- a/tests/misc/test_expressions.py +++ b/tests/misc/test_expressions.py @@ -14,7 +14,7 @@ import opteryx.virtual_datasets from opteryx.managers.expression import ORSO_TO_NUMPY_MAP, NodeType, evaluate from opteryx.models import Node -from opteryx.shared import QueryStatistics +from opteryx.models import QueryStatistics stats = QueryStatistics() From 0e3b42886ac9c2ebdfc08719d60b3f19128886c2 Mon Sep 17 00:00:00 2001 From: XB500 Date: Mon, 22 Apr 2024 06:52:15 +0000 Subject: [PATCH 2/4] Opteryx Version 0.14.2-alpha.435 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 5e9edfc41..92a8dfb45 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 434 +__build__ = 435 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 1d27bd0f6743c5dd47fc4a75d80324c5fb71aae2 Mon Sep 17 00:00:00 2001 From: joocer Date: Mon, 22 Apr 2024 18:37:24 +0100 Subject: [PATCH 3/4] #1598 --- opteryx/__init__.py | 11 +- opteryx/connectors/capabilities/cacheable.py | 27 +++- opteryx/connectors/cql_connector.py | 39 ++--- opteryx/connectors/sql_connector.py | 6 + opteryx/cursor.py | 2 +- opteryx/shared/buffer_pool.py | 61 ++++---- opteryx/shared/memory_pool.py | 21 ++- opteryx/utils/resource_monitor.py | 41 ++++-- tests/storage/test_cache_redis.py | 2 +- tests/storage/test_cql_datastax.py | 146 ++++++++++++++----- tests/storage/test_sql_sqlite.py | 22 +++ 11 files changed, 263 insertions(+), 115 deletions(-) diff --git a/opteryx/__init__.py b/opteryx/__init__.py index 29f5ba349..35092d8c3 100644 --- a/opteryx/__init__.py +++ b/opteryx/__init__.py @@ -83,7 +83,7 @@ # For more details, see: https://www.python.org/dev/peps/pep-0249/ apilevel: str = "1.0" # Compliance level with DB API 2.0 threadsafety: int = 0 # Thread safety level, 0 means not thread-safe -paramstyle: str = "qmark" # Parameter placeholder style, qmark means '?' for placeholders +paramstyle: str = "named" # Parameter placeholder style, named means :name for placeholders def connect(*args, **kwargs): @@ -171,7 +171,6 @@ def query_to_arrow( nice_value = os.nice(0) try: os.nice(-20 + nice_value) - print(f"{datetime.datetime.now()} [LOADER] Process priority set to {os.nice(0)}.") except PermissionError: display_nice = str(nice_value) if nice_value == 0: @@ -180,10 +179,6 @@ def query_to_arrow( f"{datetime.datetime.now()} [LOADER] Cannot update process priority. Currently set to {display_nice}." ) -# Log resource usage -if config.ENABLE_RESOURCE_LOGGING: # pragma: no cover - from opteryx.utils.resource_monitor import ResourceMonitor - _cache_manager = CacheManager(cache_backend=None) @@ -205,3 +200,7 @@ def set_cache_manager(new_cache_manager): cache_manager = get_cache_manager() + +# Log resource usage +if config.ENABLE_RESOURCE_LOGGING: # pragma: no cover + from opteryx.utils.resource_monitor import ResourceMonitor diff --git a/opteryx/connectors/capabilities/cacheable.py b/opteryx/connectors/capabilities/cacheable.py index 243da17d3..0706eafc5 100644 --- a/opteryx/connectors/capabilities/cacheable.py +++ b/opteryx/connectors/capabilities/cacheable.py @@ -42,36 +42,55 @@ def read_thru_cache(func): # Capture the max_evictions value at decoration time from opteryx import get_cache_manager + from opteryx.managers.cache import NullCache from opteryx.shared import BufferPool cache_manager = get_cache_manager() max_evictions = cache_manager.max_evictions_per_query + remote_cache = cache_manager.cache_backend + if not remote_cache: + # rather than make decisions - just use a dummy + remote_cache = NullCache() buffer_pool = BufferPool() + my_keys = set() + @wraps(func) def wrapper(blob_name, statistics, **kwargs): nonlocal max_evictions key = hex(CityHash64(blob_name)).encode() + my_keys.add(key) - # Try to get the result from cache + # try the buffer pool first result = buffer_pool.get(key) + if result is not None: + statistics.bufferpool_hits += 1 + return result + # try the remote cache next + result = remote_cache.get(key) if result is not None: - statistics.cache_hits += 1 + statistics.remote_cache_hits += 1 return result # Key is not in cache, execute the function and store the result in cache result = func(blob_name=blob_name, **kwargs) - # Write the result to cache + # Write the result to caches if max_evictions: + # we set a per-query eviction limit if len(result) < buffer_pool.max_cacheable_item_size: evicted = buffer_pool.set(key, result) + remote_cache.set(key, result) if evicted: + # if we're evicting items we're putting into the cache + if evicted in my_keys: + max_evictions = 0 + else: + max_evictions -= 1 statistics.cache_evictions += 1 - max_evictions -= 1 else: statistics.cache_oversize += 1 diff --git a/opteryx/connectors/cql_connector.py b/opteryx/connectors/cql_connector.py index 833c8cf74..eed574aa8 100644 --- a/opteryx/connectors/cql_connector.py +++ b/opteryx/connectors/cql_connector.py @@ -61,25 +61,17 @@ class CqlConnector(BaseConnector, PredicatePushable): "GtEq": True, "Lt": True, "LtEq": True, - "Like": True, - "NotLike": True, } OPS_XLAT: Dict[str, str] = { - "Eq": "=", - "NotEq": "!=", - "Gt": ">", - "GtEq": ">=", - "Lt": "<", - "LtEq": "<=", - "Like": "LIKE", - "NotLike": "NOT LIKE", - "IsTrue": "IS TRUE", - "IsNotTrue": "IS NOT TRUE", - "IsFalse": "IS FALSE", - "IsNotFalse": "IS NOT FALSE", - "IsNull": "IS NULL", - "IsNotNull": "IS NOT NULL", + "Eq": ":left = :right", + "NotEq": ":left != :right", + "Gt": ":left > :right", + "GtEq": ":left >= :right", + "Lt": ":left < :right", + "LtEq": ":left <= :right", + "Like": ":left LIKE :right", + "NotLike": "NOT (:left LIKE :right)", } def __init__( @@ -113,11 +105,6 @@ def __init__( self.single_column = None - def can_push(self, operator: Node, types: set = None) -> bool: - if super().can_push(operator, types): - return True - return operator.condition.node_type == NodeType.UNARY_OPERATOR - def read_dataset( # type:ignore self, *, @@ -147,10 +134,11 @@ def read_dataset( # type:ignore parameters: list = [] for predicate in predicates: if predicate.node_type == NodeType.UNARY_OPERATOR: - operand = predicate.centre.current_name operator = self.OPS_XLAT[predicate.value] + operand, parameters = _handle_operand(predicate.centre, parameters) + operator = operator.replace(":operand", operand) - query_builder.WHERE(f"{operand} {operator}") + query_builder.WHERE(operator) else: left_operand = predicate.left right_operand = predicate.right @@ -159,7 +147,10 @@ def read_dataset( # type:ignore left_value, parameters = _handle_operand(left_operand, parameters) right_value, parameters = _handle_operand(right_operand, parameters) - query_builder.WHERE(f"{left_value} {operator} {right_value}") + operator = operator.replace(":left", left_value) + operator = operator.replace(":right", right_value) + + query_builder.WHERE(operator) session = self.cluster.connect() # DEBUG: log ("READ DATASET\n", str(query_builder)) diff --git a/opteryx/connectors/sql_connector.py b/opteryx/connectors/sql_connector.py index 65d77bd27..d91c6df49 100644 --- a/opteryx/connectors/sql_connector.py +++ b/opteryx/connectors/sql_connector.py @@ -69,6 +69,12 @@ class SqlConnector(BaseConnector, PredicatePushable): "LtEq": True, "Like": True, "NotLike": True, + "IsTrue": True, + "IsNotTrue": True, + "IsFalse": True, + "IsNotFalse": True, + "IsNull": True, + "IsNotNull": True, } OPS_XLAT: Dict[str, str] = { diff --git a/opteryx/cursor.py b/opteryx/cursor.py index 1514b7b6f..b1dfaef80 100644 --- a/opteryx/cursor.py +++ b/opteryx/cursor.py @@ -39,7 +39,7 @@ from opteryx.exceptions import InvalidCursorStateError from opteryx.exceptions import MissingSqlStatement from opteryx.exceptions import UnsupportedSyntaxError -from opteryx.shared import QueryStatistics +from opteryx.models import QueryStatistics from opteryx.shared.rolling_log import RollingLog from opteryx.utils import sql diff --git a/opteryx/shared/buffer_pool.py b/opteryx/shared/buffer_pool.py index 13afd3734..4e7f9a3d5 100644 --- a/opteryx/shared/buffer_pool.py +++ b/opteryx/shared/buffer_pool.py @@ -24,7 +24,6 @@ """ from typing import Optional -from opteryx.managers.cache import NullCache from opteryx.shared.memory_pool import MemoryPool from opteryx.utils.lru_2 import LRU2 @@ -37,10 +36,12 @@ class _BufferPool: slots = "_lru", "_cache_backend", "_max_cacheable_item_size", "_memory_pool" - def __init__(self, cache_manager): - self._cache_backend = cache_manager.cache_backend - if not self._cache_backend: - self._cache_backend = NullCache() # rather than make decisions - just use a dummy + def __init__(self): + # Import here to avoid circular imports + from opteryx import get_cache_manager + + cache_manager = get_cache_manager() + self.max_cacheable_item_size = cache_manager.max_cacheable_item_size self._lru = LRU2() self._memory_pool = MemoryPool( @@ -55,30 +56,37 @@ def get(self, key: bytes) -> bytes: mp_key = self._lru.get(key) if mp_key is not None: return self._memory_pool.read(mp_key) - return self._cache_backend.get(key) def set(self, key: bytes, value) -> Optional[str]: """ - Put an item into the pool, evict an item if the pool is full. - If a cache is provided, also set the value in the cache. + Attempt to save a value to the buffer pool. Check first if there is space to commit the value. + If not, evict the least recently used item and try again. + + Args: + key: The key associated with the value to commit. + value: The value to commit to the buffer pool. + + Returns: + The key of the evicted item if eviction occurred, otherwise None. """ - # always try to save to the cache backend - self._cache_backend.set(key, value) - - # try to save in the buffer pool, if we fail, release - # an item from the pool (LRUK2) and try again - evicted_key = None - mp_key = self._memory_pool.commit(value) - if mp_key is None: - evicted_key, evicted_value = self._lru.evict(True) + # First check if we can commit the value to the memory pool + if not self._memory_pool.can_commit(value): + evicted_key, evicted_value = self._lru.evict(details=True) if evicted_key: self._memory_pool.release(evicted_value) - mp_key = self._memory_pool.commit(value) - if mp_key is None: - return None - else: - self._lru.set(key, mp_key) - return evicted_key + else: + return None # Return None if no item could be evicted + + # Try to commit the value to the memory pool + memory_pool_key = self._memory_pool.commit(value) + if memory_pool_key is None: + return None # Return None if commit still fails after eviction + + # Update LRU cache with the new key and memory pool key if commit succeeds + self._lru.set(key, memory_pool_key) + + # Return the evicted key if an eviction occurred, otherwise return None + return evicted_key if "evicted_key" in locals() else None @property def stats(self) -> tuple: @@ -109,12 +117,7 @@ def __new__(cls): @classmethod def _create_instance(cls): - # Import here to avoid circular imports - from opteryx import get_cache_manager - - cache_manager = get_cache_manager() - - return _BufferPool(cache_manager) + return _BufferPool() @classmethod def reset(cls): diff --git a/opteryx/shared/memory_pool.py b/opteryx/shared/memory_pool.py index 73b7005a9..a8afb0320 100644 --- a/opteryx/shared/memory_pool.py +++ b/opteryx/shared/memory_pool.py @@ -109,6 +109,9 @@ def _level2_compaction(self): segment.start = new_start offset += segment.length + def can_commit(self, data: bytes) -> bool: + return sum(segment.length for segment in self.free_segments) > len(data) + def commit(self, data: bytes) -> int: """ Add an item to the pool and return its reference. @@ -117,7 +120,6 @@ def commit(self, data: bytes) -> int: first we combine adjacent free blocks into larger blocks. If that's not enough, we consolidate all of the free blocks together. """ - self.commits += 1 len_data = len(data) # always acquire a lock to write with self.lock: @@ -126,9 +128,11 @@ def commit(self, data: bytes) -> int: # avoid trying to compact if it won't release enough space anyway total_free_space = sum(segment.length for segment in self.free_segments) if total_free_space < len_data: + self.failed_commits += 1 return None # avoid trying to compact, if we're already compacted if len(self.free_segments) <= 1: + self.failed_commits += 1 return None # combine adjacent free space (should be pretty quick) self._level1_compaction() @@ -157,6 +161,7 @@ def commit(self, data: bytes) -> int: ref_id = random_int() self.used_segments[ref_id] = new_segment + self.commits += 1 return ref_id def read(self, ref_id: int) -> bytes: @@ -194,6 +199,20 @@ def release(self, ref_id: int): segment = self.used_segments.pop(ref_id) self.free_segments.append(segment) + @property + def stats(self) -> dict: + return { + "free_segments": len(self.free_segments), + "used_segments": len(self.used_segments), + "commits": self.commits, + "failed_commits": self.failed_commits, + "reads": self.reads, + "read_locks": self.read_locks, + "l1_compaction": self.l1_compaction, + "l2_compaction": self.l2_compaction, + "releases": self.releases, + } + def __del__(self): """ This function exists just to wrap the debug logging diff --git a/opteryx/utils/resource_monitor.py b/opteryx/utils/resource_monitor.py index 7c0bfd762..c29c6de20 100644 --- a/opteryx/utils/resource_monitor.py +++ b/opteryx/utils/resource_monitor.py @@ -4,31 +4,46 @@ import orjson -RESOURCE_LIB = True +RESOURCE_LIB = "resource" try: import resource except ImportError: # pragma: no cover - RESOURCE_LIB = False - import psutil # type:ignore + RESOURCE_LIB = "psutil" + try: + import psutil # type:ignore + except ImportError: + RESOURCE_LIB = "none" class ResourceMonitor: # pragma: no cover - slots = "frequency" - def __init__(self, frequency=0.01): # pragma: no cover + def __init__(self, frequency=1.0): # pragma: no cover + + from opteryx.shared import BufferPool + self.frequency = frequency - if not RESOURCE_LIB: + if RESOURCE_LIB == "psutil": self.process = psutil.Process(os.getpid()) - else: - print(resource.getrlimit(resource.RUSAGE_SELF)) + self.bufferpool = BufferPool() def resource_usage(self): # pragma: no cover while True: - if RESOURCE_LIB: - memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1024 - else: - memory_usage = self.process.memory_info()[0] - print(orjson.dumps({"memory": memory_usage}), RESOURCE_LIB) + print(".") + import opteryx + + report = {} + if RESOURCE_LIB == "resource": + report["memory"] = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1024 + elif RESOURCE_LIB == "psutil": + report["memory"] = self.process.memory_info()[0] + + if self.bufferpool: + report["buffer_pool_items"] = len(self.bufferpool._lru) + report.update( + {f"buffer_pool_{k}": v for k, v in self.bufferpool._memory_pool.stats.items()} + ) + + print(orjson.dumps(report).decode()) sleep(self.frequency) diff --git a/tests/storage/test_cache_redis.py b/tests/storage/test_cache_redis.py index 8e03e86ff..a35e8c8c9 100644 --- a/tests/storage/test_cache_redis.py +++ b/tests/storage/test_cache_redis.py @@ -36,7 +36,7 @@ def test_redis_cache(): assert cache.errors == 0 stats = cur.stats - assert stats["cache_hits"] >= stats["blobs_read"] + assert stats["remote_cache_hits"] >= stats["blobs_read"], stats assert stats.get("cache_misses", 0) == 0, stats diff --git a/tests/storage/test_cql_datastax.py b/tests/storage/test_cql_datastax.py index 49aa85899..b3b7710c3 100644 --- a/tests/storage/test_cql_datastax.py +++ b/tests/storage/test_cql_datastax.py @@ -1,24 +1,54 @@ """ Test we can read from Cassandra (DataStax) -This is our only Cassandra Test +This is our only Cassandra Test. """ import os import sys +import pytest -os.environ["OPTERYX_DEBUG"] = "1" +# os.environ["OPTERYX_DEBUG"] = "1" sys.path.insert(1, os.path.join(sys.path[0], "../..")) import opteryx from opteryx.connectors import CqlConnector from tests.tools import is_arm, is_mac, is_windows, skip_if +from opteryx.utils.formatter import format_sql + +# fmt:off +test_cases = [ + ("SELECT * FROM datastax.opteryx.planets", 9, 20, None), + ("SELECT COUNT(*) FROM datastax.opteryx.planets", 1, 1, None), + ("SELECT name FROM datastax.opteryx.planets", 9, 1, None), + ("SELECT * FROM datastax.opteryx.planets AS P INNER JOIN $planets ON P.gravity = $planets.gravity", 11, 40, None), + ("SELECT name FROM datastax.opteryx.planets WHERE name LIKE 'Earth'", 1, 1, {"rows_read": 1, "columns_read": 1}), + ("SELECT * FROM datastax.opteryx.planets WHERE distanceFromSun < lengthOfDay", 2, None, {"rows_read": 9}), + ("SELECT * FROM datastax.opteryx.planets WHERE mass > 0.5", 7, 20, None), + ("SELECT * FROM datastax.opteryx.planets WHERE name = 'Earth'", 1, 20, None), + ("SELECT * FROM datastax.opteryx.planets WHERE name NOT LIKE 'Mars'", 8, 20, None), + ("SELECT AVG(mass) FROM datastax.opteryx.planets", 1, 1, None), + ("SELECT MIN(distanceFromSun) FROM datastax.opteryx.planets", 1, 1, None), + ("SELECT MAX(lengthOfDay) FROM datastax.opteryx.planets", 1, 1, None), + ("SELECT UPPER(name), ROUND(mass, 2) FROM datastax.opteryx.planets", 9, 2, None), + ("SELECT surfacePressure, COUNT(*) FROM datastax.opteryx.planets GROUP BY surfacePressure HAVING COUNT(*) > 1", 1, 2, None), + ("SELECT * FROM datastax.opteryx.planets WHERE mass > 0.1 AND distanceFromSun < 500", 4, 20, None), + ("SELECT name, SIGNUM(mass) AS sin_mass FROM datastax.opteryx.planets", 9, 2, None), + ("SELECT name, CASE WHEN mass > 1 THEN 'heavy' ELSE 'light' END FROM datastax.opteryx.planets", 9, 2, None), + ("SELECT name FROM datastax.opteryx.planets WHERE surfacePressure IS NULL", 4, 1, None), + ("SELECT name FROM datastax.opteryx.planets WHERE surfacePressure IS NOT NULL", 5, 1, None), + ("SELECT name FROM datastax.opteryx.planets WHERE numberOfMoons IS NOT TRUE", 8, 1, None), +] +# fmt:on # skip to reduce billing @skip_if(is_arm() or is_windows() or is_mac()) -def test_datastax_storage(): +@pytest.mark.parametrize( + "query, expected_rowcount, expected_columncount, expected_stats", test_cases +) +def test_datastax_storage(query, expected_rowcount, expected_columncount, expected_stats): from cassandra.cluster import Cluster from cassandra.auth import PlainTextAuthProvider @@ -39,42 +69,86 @@ def test_datastax_storage(): cluster=cluster, ) - results = opteryx.query("SELECT * FROM datastax.opteryx.planets") - assert results.rowcount == 9, results.rowcount - assert results.columncount == 20 + results = opteryx.query(query) - # PROCESS THE DATA IN SOME WAY - results = opteryx.query("SELECT COUNT(*) FROM datastax.opteryx.planets;") - assert results.rowcount == 1, results.rowcount - assert results.columncount == 1 + assert ( + results.rowcount == expected_rowcount + ), f"Expected row count {expected_rowcount}, got {results.rowcount}" - # PUSH A PROJECTION - results = opteryx.query("SELECT name FROM datastax.opteryx.planets;") - assert results.rowcount == 9, results.rowcount - assert results.columncount == 1 + if expected_columncount is not None: + assert ( + results.columncount == expected_columncount + ), f"Expected column count {expected_columncount}, got {results.columncount}" - # JOIN ON A NON SQL TABLE - results = opteryx.query( - "SELECT * FROM datastax.opteryx.planets AS P INNER JOIN $planets ON P.gravity = $planets.gravity;" - ) - assert results.rowcount == 11, results.rowcount - assert results.columncount == 40, results.columncount - - # PUSH - CHECK STATS THE PUSHES WORKED - results = opteryx.query("SELECT name FROM datastax.opteryx.planets WHERE name LIKE 'Earth';") - assert results.rowcount == 1, results.rowcount - assert results.columncount == 1 - assert results.stats["rows_read"] == 1 - assert results.stats["columns_read"] == 1 - - results = opteryx.query( - "SELECT * FROM datastax.opteryx.planets WHERE distanceFromSun < lengthOfDay" - ) - assert results.rowcount == 2, results.rowcount - assert results.stats.get("rows_read", 0) == 9, results.stats + if expected_stats: + for key, expected_value in expected_stats.items(): + actual_value = results.stats.get(key, None) + assert ( + actual_value == expected_value + ), f"Expected {key} {expected_value}, got {actual_value}" if __name__ == "__main__": # pragma: no cover - from tests.tools import run_tests - - run_tests() + """ + Running in the IDE we do some formatting - it's not functional but helps + when reading the outputs. + """ + + import shutil + import time + + from tests.tools import trunc_printable + + start_suite = time.monotonic_ns() + + width = shutil.get_terminal_size((80, 20))[0] - 15 + + passed = 0 + failed = 0 + + nl = "\n" + + failures = [] + + print(f"RUNNING BATTERY OF {len(test_cases)} DATASTAX TESTS") + for index, (statement, rows, cols, expected_stats) in enumerate(test_cases): + + printable = statement + if hasattr(printable, "decode"): + printable = printable.decode() + print( + f"\033[38;2;255;184;108m{(index + 1):04}\033[0m" + f" {trunc_printable(format_sql(printable), width - 1)}", + end="", + flush=True, + ) + try: + start = time.monotonic_ns() + test_datastax_storage(statement, rows, cols, expected_stats) + print( + f"\033[38;2;26;185;67m{str(int((time.monotonic_ns() - start)/1e6)).rjust(4)}ms\033[0m ✅", + end="", + ) + passed += 1 + if failed > 0: + print(" \033[0;31m*\033[0m") + else: + print() + except Exception as err: + print(f"\033[0;31m{str(int((time.monotonic_ns() - start)/1e6)).rjust(4)}ms ❌ *\033[0m") + print(">", err) + failed += 1 + failures.append((statement, err)) + + print("--- ✅ \033[0;32mdone\033[0m") + + if failed > 0: + print("\n\033[38;2;139;233;253m\033[3mFAILURES\033[0m") + for statement, err in failures: + print(err) + + print( + f"\n\033[38;2;139;233;253m\033[3mCOMPLETE\033[0m ({((time.monotonic_ns() - start_suite) / 1e9):.2f} seconds)\n" + f" \033[38;2;26;185;67m{passed} passed ({(passed * 100) // (passed + failed)}%)\033[0m\n" + f" \033[38;2;255;121;198m{failed} failed\033[0m" + ) diff --git a/tests/storage/test_sql_sqlite.py b/tests/storage/test_sql_sqlite.py index 1edc32752..9b7d45efc 100644 --- a/tests/storage/test_sql_sqlite.py +++ b/tests/storage/test_sql_sqlite.py @@ -49,6 +49,28 @@ ("SELECT * FROM sqlite.planets, sqlite.satellites WHERE sqlite.planets.id - sqlite.satellites.planetId = 0;", 177, 28, None), ("SELECT * FROM sqlite.planets, sqlite.satellites WHERE sqlite.planets.id - sqlite.satellites.planetId != 0;", 1416, 28, None), ("SELECT * FROM sqlite.planets WHERE sqlite.planets.id - sqlite.planets.numberOfMoons < 0;", 4, 20, None), + ("SELECT avg(num_moons) FROM (SELECT numberOfMoons as num_moons FROM sqlite.planets) AS subquery;", 1, 1, None), + ("SELECT p.name, s.name FROM sqlite.planets p LEFT OUTER JOIN sqlite.satellites s ON p.id = s.planetId;", 179, 2, None), + ("SELECT A.name, B.name FROM sqlite.planets A, sqlite.planets B WHERE A.gravity = B.gravity AND A.id != B.id;", 2, 2, None), +# ("SELECT * FROM sqlite.planets p JOIN sqlite.satellites s ON p.id = s.planetId AND p.gravity > 1;", 6, 28, None), + ("SELECT planetId, COUNT(*) AS num_satellites FROM sqlite.satellites GROUP BY planetId HAVING COUNT(*) > 1;", 6, 2, None), + ("SELECT * FROM sqlite.planets ORDER BY name;", 9, 20, None), + ("SELECT DISTINCT name FROM sqlite.planets;", 9, 1, None), + ("SELECT MAX(gravity) FROM sqlite.planets;", 1, 1, None), + ("SELECT MIN(gravity) FROM sqlite.planets;", 1, 1, None), + ("SELECT COUNT(*) FROM sqlite.planets WHERE surfacePressure > 0;", 1, 1, None), + ("SELECT AVG(mass) FROM sqlite.planets", 1, 1, None), + ("SELECT MIN(distanceFromSun) FROM sqlite.planets", 1, 1, None), + ("SELECT MAX(lengthOfDay) FROM sqlite.planets", 1, 1, None), + ("SELECT UPPER(name), ROUND(mass, 2) FROM sqlite.planets", 9, 2, None), + ("SELECT surfacePressure, COUNT(*) FROM sqlite.planets GROUP BY surfacePressure HAVING COUNT(*) > 1", 1, 2, None), + ("SELECT * FROM sqlite.planets WHERE mass > 0.1 AND distanceFromSun < 500", 4, 20, None), + ("SELECT name, SIGNUM(mass) AS sin_mass FROM sqlite.planets", 9, 2, None), + ("SELECT name, CASE WHEN mass > 1 THEN 'heavy' ELSE 'light' END FROM sqlite.planets", 9, 2, None), + ("SELECT name FROM sqlite.planets WHERE surfacePressure IS NULL", 4, 1, None), + ("SELECT name FROM sqlite.planets WHERE surfacePressure IS NOT NULL", 5, 1, None), + ("SELECT name FROM sqlite.planets WHERE numberOfMoons IS NOT TRUE", 2, 1, None), + ("SELECT name FROM sqlite.planets WHERE numberOfMoons IS TRUE", 7, 1, None), ] # fmt: on From 2bbeb0d3d79ccb532bb9d692e7741a565e87278a Mon Sep 17 00:00:00 2001 From: XB500 Date: Mon, 22 Apr 2024 17:37:46 +0000 Subject: [PATCH 4/4] Opteryx Version 0.14.2-alpha.438 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 92a8dfb45..d2892f851 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 435 +__build__ = 438 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.