Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1598 #1601

Merged
merged 4 commits into from
Apr 22, 2024
Merged

#1598 #1601

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions opteryx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
# For more details, see: https://www.python.org/dev/peps/pep-0249/
apilevel: str = "1.0" # Compliance level with DB API 2.0
threadsafety: int = 0 # Thread safety level, 0 means not thread-safe
paramstyle: str = "qmark" # Parameter placeholder style, qmark means '?' for placeholders
paramstyle: str = "named" # Parameter placeholder style, named means :name for placeholders


def connect(*args, **kwargs):
Expand Down Expand Up @@ -171,7 +171,6 @@ def query_to_arrow(
nice_value = os.nice(0)
try:
os.nice(-20 + nice_value)
print(f"{datetime.datetime.now()} [LOADER] Process priority set to {os.nice(0)}.")
except PermissionError:
display_nice = str(nice_value)
if nice_value == 0:
Expand All @@ -180,10 +179,6 @@ def query_to_arrow(
f"{datetime.datetime.now()} [LOADER] Cannot update process priority. Currently set to {display_nice}."
)

# Log resource usage
if config.ENABLE_RESOURCE_LOGGING: # pragma: no cover
from opteryx.utils.resource_monitor import ResourceMonitor


_cache_manager = CacheManager(cache_backend=None)

Expand All @@ -205,3 +200,7 @@ def set_cache_manager(new_cache_manager):


cache_manager = get_cache_manager()

# Log resource usage
if config.ENABLE_RESOURCE_LOGGING: # pragma: no cover
from opteryx.utils.resource_monitor import ResourceMonitor
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 434
__build__ = 438

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion opteryx/connectors/base/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import pyarrow
from orso.schema import RelationSchema

from opteryx.shared import QueryStatistics
from opteryx.models import QueryStatistics

MIN_CHUNK_SIZE: int = 500
INITIAL_CHUNK_SIZE: int = 500
Expand Down
27 changes: 23 additions & 4 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,55 @@ def read_thru_cache(func):

# Capture the max_evictions value at decoration time
from opteryx import get_cache_manager
from opteryx.managers.cache import NullCache
from opteryx.shared import BufferPool

cache_manager = get_cache_manager()
max_evictions = cache_manager.max_evictions_per_query
remote_cache = cache_manager.cache_backend
if not remote_cache:
# rather than make decisions - just use a dummy
remote_cache = NullCache()

buffer_pool = BufferPool()

my_keys = set()

@wraps(func)
def wrapper(blob_name, statistics, **kwargs):
nonlocal max_evictions

key = hex(CityHash64(blob_name)).encode()
my_keys.add(key)

# Try to get the result from cache
# try the buffer pool first
result = buffer_pool.get(key)
if result is not None:
statistics.bufferpool_hits += 1
return result

# try the remote cache next
result = remote_cache.get(key)
if result is not None:
statistics.cache_hits += 1
statistics.remote_cache_hits += 1
return result

# Key is not in cache, execute the function and store the result in cache
result = func(blob_name=blob_name, **kwargs)

# Write the result to cache
# Write the result to caches
if max_evictions:
# we set a per-query eviction limit
if len(result) < buffer_pool.max_cacheable_item_size:
evicted = buffer_pool.set(key, result)
remote_cache.set(key, result)
if evicted:
# if we're evicting items we're putting into the cache
if evicted in my_keys:
max_evictions = 0
else:
max_evictions -= 1
statistics.cache_evictions += 1
max_evictions -= 1
else:
statistics.cache_oversize += 1

Expand Down
39 changes: 15 additions & 24 deletions opteryx/connectors/cql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,17 @@ class CqlConnector(BaseConnector, PredicatePushable):
"GtEq": True,
"Lt": True,
"LtEq": True,
"Like": True,
"NotLike": True,
}

OPS_XLAT: Dict[str, str] = {
"Eq": "=",
"NotEq": "!=",
"Gt": ">",
"GtEq": ">=",
"Lt": "<",
"LtEq": "<=",
"Like": "LIKE",
"NotLike": "NOT LIKE",
"IsTrue": "IS TRUE",
"IsNotTrue": "IS NOT TRUE",
"IsFalse": "IS FALSE",
"IsNotFalse": "IS NOT FALSE",
"IsNull": "IS NULL",
"IsNotNull": "IS NOT NULL",
"Eq": ":left = :right",
"NotEq": ":left != :right",
"Gt": ":left > :right",
"GtEq": ":left >= :right",
"Lt": ":left < :right",
"LtEq": ":left <= :right",
"Like": ":left LIKE :right",
"NotLike": "NOT (:left LIKE :right)",
}

def __init__(
Expand Down Expand Up @@ -113,11 +105,6 @@ def __init__(

self.single_column = None

def can_push(self, operator: Node, types: set = None) -> bool:
if super().can_push(operator, types):
return True
return operator.condition.node_type == NodeType.UNARY_OPERATOR

def read_dataset( # type:ignore
self,
*,
Expand Down Expand Up @@ -147,10 +134,11 @@ def read_dataset( # type:ignore
parameters: list = []
for predicate in predicates:
if predicate.node_type == NodeType.UNARY_OPERATOR:
operand = predicate.centre.current_name
operator = self.OPS_XLAT[predicate.value]
operand, parameters = _handle_operand(predicate.centre, parameters)
operator = operator.replace(":operand", operand)

query_builder.WHERE(f"{operand} {operator}")
query_builder.WHERE(operator)
else:
left_operand = predicate.left
right_operand = predicate.right
Expand All @@ -159,7 +147,10 @@ def read_dataset( # type:ignore
left_value, parameters = _handle_operand(left_operand, parameters)
right_value, parameters = _handle_operand(right_operand, parameters)

query_builder.WHERE(f"{left_value} {operator} {right_value}")
operator = operator.replace(":left", left_value)
operator = operator.replace(":right", right_value)

query_builder.WHERE(operator)

session = self.cluster.connect()
# DEBUG: log ("READ DATASET\n", str(query_builder))
Expand Down
6 changes: 6 additions & 0 deletions opteryx/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class SqlConnector(BaseConnector, PredicatePushable):
"LtEq": True,
"Like": True,
"NotLike": True,
"IsTrue": True,
"IsNotTrue": True,
"IsFalse": True,
"IsNotFalse": True,
"IsNull": True,
"IsNotNull": True,
}

OPS_XLAT: Dict[str, str] = {
Expand Down
2 changes: 1 addition & 1 deletion opteryx/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from opteryx.exceptions import InvalidCursorStateError
from opteryx.exceptions import MissingSqlStatement
from opteryx.exceptions import UnsupportedSyntaxError
from opteryx.shared import QueryStatistics
from opteryx.models import QueryStatistics
from opteryx.shared.rolling_log import RollingLog
from opteryx.utils import sql

Expand Down
2 changes: 2 additions & 0 deletions opteryx/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from opteryx.models.node import Node
from opteryx.models.non_tabular_result import NonTabularResult
from opteryx.models.query_properties import QueryProperties
from opteryx.models.query_statistics import QueryStatistics

__all__ = (
"ConnectionContext",
Expand All @@ -24,4 +25,5 @@
"Node",
"NonTabularResult",
"QueryProperties",
"QueryStatistics",
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self):
self._stats: dict = defaultdict(int)
self._stats["messages"] = []

def _ns_to_s(self, nano_seconds):
def _ns_to_s(self, nano_seconds: int) -> float:
"""convert elapsed ns to s"""
if nano_seconds == 0:
return 0
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from orso.tools import random_string

from opteryx.models import QueryProperties
from opteryx.shared import QueryStatistics
from opteryx.models import QueryStatistics


class BasePlanNode:
Expand Down
2 changes: 1 addition & 1 deletion opteryx/planner/binder/binding_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import Set

from opteryx.models import ConnectionContext
from opteryx.shared import QueryStatistics
from opteryx.models import QueryStatistics
from opteryx.virtual_datasets import derived


Expand Down
Empty file.
3 changes: 1 addition & 2 deletions opteryx/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from opteryx.shared.buffer_pool import BufferPool
from opteryx.shared.materialized_datasets import MaterializedDatasets
from opteryx.shared.memory_pool import MemoryPool
from opteryx.shared.query_statistics import QueryStatistics
from opteryx.shared.rolling_log import RollingLog

__all__ = ("BufferPool", "MaterializedDatasets", "MemoryPool", "QueryStatistics", "RollingLog")
__all__ = ("BufferPool", "MaterializedDatasets", "MemoryPool", "RollingLog")
66 changes: 36 additions & 30 deletions opteryx/shared/buffer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,29 @@
The buffer pool is has no slot limit, it is a given volume of memory, the pool
will try to evict when full. This is different to a classic Buffer Pool which
is slot-based.

The Buffer Pool is a global resource and used across all Connections and Cursors.
"""
from typing import Optional

from opteryx.managers.cache import NullCache
from opteryx.shared.memory_pool import MemoryPool
from opteryx.utils.lru_2 import LRU2


class _BufferPool:
"""
Buffer Pool is a class implementing a Least Recently Used (LRU) policy.
Buffer Pool is a class implementing a Least Recently Used (LRU) policy for
eviction.
"""

slots = "_lru", "_cache_backend", "_max_cacheable_item_size", "_memory_pool"

def __init__(self, cache_manager):
self._cache_backend = cache_manager.cache_backend
if not self._cache_backend:
self._cache_backend = NullCache() # rather than make decisions - just use a dummy
def __init__(self):
# Import here to avoid circular imports
from opteryx import get_cache_manager

cache_manager = get_cache_manager()

self.max_cacheable_item_size = cache_manager.max_cacheable_item_size
self._lru = LRU2()
self._memory_pool = MemoryPool(
Expand All @@ -52,30 +56,37 @@ def get(self, key: bytes) -> bytes:
mp_key = self._lru.get(key)
if mp_key is not None:
return self._memory_pool.read(mp_key)
return self._cache_backend.get(key)

def set(self, key: bytes, value) -> Optional[str]:
"""
Put an item into the pool, evict an item if the pool is full.
If a cache is provided, also set the value in the cache.
Attempt to save a value to the buffer pool. Check first if there is space to commit the value.
If not, evict the least recently used item and try again.

Args:
key: The key associated with the value to commit.
value: The value to commit to the buffer pool.

Returns:
The key of the evicted item if eviction occurred, otherwise None.
"""
# always try to save to the cache backend
self._cache_backend.set(key, value)

# try to save in the buffer pool, if we fail, release
# an item from the pool (LRUK2) and try again
evicted_key = None
mp_key = self._memory_pool.commit(value)
if mp_key is None:
evicted_key, evicted_value = self._lru.evict(True)
# First check if we can commit the value to the memory pool
if not self._memory_pool.can_commit(value):
evicted_key, evicted_value = self._lru.evict(details=True)
if evicted_key:
self._memory_pool.release(evicted_value)
mp_key = self._memory_pool.commit(value)
if mp_key is None:
return None
else:
self._lru.set(key, mp_key)
return evicted_key
else:
return None # Return None if no item could be evicted

# Try to commit the value to the memory pool
memory_pool_key = self._memory_pool.commit(value)
if memory_pool_key is None:
return None # Return None if commit still fails after eviction

# Update LRU cache with the new key and memory pool key if commit succeeds
self._lru.set(key, memory_pool_key)

# Return the evicted key if an eviction occurred, otherwise return None
return evicted_key if "evicted_key" in locals() else None

@property
def stats(self) -> tuple:
Expand Down Expand Up @@ -106,12 +117,7 @@ def __new__(cls):

@classmethod
def _create_instance(cls):
# Import here to avoid circular imports
from opteryx import get_cache_manager

cache_manager = get_cache_manager()

return _BufferPool(cache_manager)
return _BufferPool()

@classmethod
def reset(cls):
Expand Down
Loading
Loading