Skip to content

Commit

Permalink
Merge pull request #1601 from mabel-dev/#1598
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Apr 22, 2024
2 parents b331c86 + 2bbeb0d commit a1c88fd
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 116 deletions.
11 changes: 5 additions & 6 deletions opteryx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
# For more details, see: https://www.python.org/dev/peps/pep-0249/
apilevel: str = "1.0" # Compliance level with DB API 2.0
threadsafety: int = 0 # Thread safety level, 0 means not thread-safe
paramstyle: str = "qmark" # Parameter placeholder style, qmark means '?' for placeholders
paramstyle: str = "named" # Parameter placeholder style, named means :name for placeholders


def connect(*args, **kwargs):
Expand Down Expand Up @@ -171,7 +171,6 @@ def query_to_arrow(
nice_value = os.nice(0)
try:
os.nice(-20 + nice_value)
print(f"{datetime.datetime.now()} [LOADER] Process priority set to {os.nice(0)}.")
except PermissionError:
display_nice = str(nice_value)
if nice_value == 0:
Expand All @@ -180,10 +179,6 @@ def query_to_arrow(
f"{datetime.datetime.now()} [LOADER] Cannot update process priority. Currently set to {display_nice}."
)

# Log resource usage
if config.ENABLE_RESOURCE_LOGGING: # pragma: no cover
from opteryx.utils.resource_monitor import ResourceMonitor


_cache_manager = CacheManager(cache_backend=None)

Expand All @@ -205,3 +200,7 @@ def set_cache_manager(new_cache_manager):


cache_manager = get_cache_manager()

# Log resource usage
if config.ENABLE_RESOURCE_LOGGING: # pragma: no cover
from opteryx.utils.resource_monitor import ResourceMonitor
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 435
__build__ = 438

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
27 changes: 23 additions & 4 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,55 @@ def read_thru_cache(func):

# Capture the max_evictions value at decoration time
from opteryx import get_cache_manager
from opteryx.managers.cache import NullCache
from opteryx.shared import BufferPool

cache_manager = get_cache_manager()
max_evictions = cache_manager.max_evictions_per_query
remote_cache = cache_manager.cache_backend
if not remote_cache:
# rather than make decisions - just use a dummy
remote_cache = NullCache()

buffer_pool = BufferPool()

my_keys = set()

@wraps(func)
def wrapper(blob_name, statistics, **kwargs):
nonlocal max_evictions

key = hex(CityHash64(blob_name)).encode()
my_keys.add(key)

# Try to get the result from cache
# try the buffer pool first
result = buffer_pool.get(key)
if result is not None:
statistics.bufferpool_hits += 1
return result

# try the remote cache next
result = remote_cache.get(key)
if result is not None:
statistics.cache_hits += 1
statistics.remote_cache_hits += 1
return result

# Key is not in cache, execute the function and store the result in cache
result = func(blob_name=blob_name, **kwargs)

# Write the result to cache
# Write the result to caches
if max_evictions:
# we set a per-query eviction limit
if len(result) < buffer_pool.max_cacheable_item_size:
evicted = buffer_pool.set(key, result)
remote_cache.set(key, result)
if evicted:
# if we're evicting items we're putting into the cache
if evicted in my_keys:
max_evictions = 0
else:
max_evictions -= 1
statistics.cache_evictions += 1
max_evictions -= 1
else:
statistics.cache_oversize += 1

Expand Down
39 changes: 15 additions & 24 deletions opteryx/connectors/cql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,17 @@ class CqlConnector(BaseConnector, PredicatePushable):
"GtEq": True,
"Lt": True,
"LtEq": True,
"Like": True,
"NotLike": True,
}

OPS_XLAT: Dict[str, str] = {
"Eq": "=",
"NotEq": "!=",
"Gt": ">",
"GtEq": ">=",
"Lt": "<",
"LtEq": "<=",
"Like": "LIKE",
"NotLike": "NOT LIKE",
"IsTrue": "IS TRUE",
"IsNotTrue": "IS NOT TRUE",
"IsFalse": "IS FALSE",
"IsNotFalse": "IS NOT FALSE",
"IsNull": "IS NULL",
"IsNotNull": "IS NOT NULL",
"Eq": ":left = :right",
"NotEq": ":left != :right",
"Gt": ":left > :right",
"GtEq": ":left >= :right",
"Lt": ":left < :right",
"LtEq": ":left <= :right",
"Like": ":left LIKE :right",
"NotLike": "NOT (:left LIKE :right)",
}

def __init__(
Expand Down Expand Up @@ -113,11 +105,6 @@ def __init__(

self.single_column = None

def can_push(self, operator: Node, types: set = None) -> bool:
if super().can_push(operator, types):
return True
return operator.condition.node_type == NodeType.UNARY_OPERATOR

def read_dataset( # type:ignore
self,
*,
Expand Down Expand Up @@ -147,10 +134,11 @@ def read_dataset( # type:ignore
parameters: list = []
for predicate in predicates:
if predicate.node_type == NodeType.UNARY_OPERATOR:
operand = predicate.centre.current_name
operator = self.OPS_XLAT[predicate.value]
operand, parameters = _handle_operand(predicate.centre, parameters)
operator = operator.replace(":operand", operand)

query_builder.WHERE(f"{operand} {operator}")
query_builder.WHERE(operator)
else:
left_operand = predicate.left
right_operand = predicate.right
Expand All @@ -159,7 +147,10 @@ def read_dataset( # type:ignore
left_value, parameters = _handle_operand(left_operand, parameters)
right_value, parameters = _handle_operand(right_operand, parameters)

query_builder.WHERE(f"{left_value} {operator} {right_value}")
operator = operator.replace(":left", left_value)
operator = operator.replace(":right", right_value)

query_builder.WHERE(operator)

session = self.cluster.connect()
# DEBUG: log ("READ DATASET\n", str(query_builder))
Expand Down
6 changes: 6 additions & 0 deletions opteryx/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class SqlConnector(BaseConnector, PredicatePushable):
"LtEq": True,
"Like": True,
"NotLike": True,
"IsTrue": True,
"IsNotTrue": True,
"IsFalse": True,
"IsNotFalse": True,
"IsNull": True,
"IsNotNull": True,
}

OPS_XLAT: Dict[str, str] = {
Expand Down
2 changes: 1 addition & 1 deletion opteryx/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from opteryx.exceptions import InvalidCursorStateError
from opteryx.exceptions import MissingSqlStatement
from opteryx.exceptions import UnsupportedSyntaxError
from opteryx.shared import QueryStatistics
from opteryx.models import QueryStatistics
from opteryx.shared.rolling_log import RollingLog
from opteryx.utils import sql

Expand Down
61 changes: 32 additions & 29 deletions opteryx/shared/buffer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"""
from typing import Optional

from opteryx.managers.cache import NullCache
from opteryx.shared.memory_pool import MemoryPool
from opteryx.utils.lru_2 import LRU2

Expand All @@ -37,10 +36,12 @@ class _BufferPool:

slots = "_lru", "_cache_backend", "_max_cacheable_item_size", "_memory_pool"

def __init__(self, cache_manager):
self._cache_backend = cache_manager.cache_backend
if not self._cache_backend:
self._cache_backend = NullCache() # rather than make decisions - just use a dummy
def __init__(self):
# Import here to avoid circular imports
from opteryx import get_cache_manager

cache_manager = get_cache_manager()

self.max_cacheable_item_size = cache_manager.max_cacheable_item_size
self._lru = LRU2()
self._memory_pool = MemoryPool(
Expand All @@ -55,30 +56,37 @@ def get(self, key: bytes) -> bytes:
mp_key = self._lru.get(key)
if mp_key is not None:
return self._memory_pool.read(mp_key)
return self._cache_backend.get(key)

def set(self, key: bytes, value) -> Optional[str]:
"""
Put an item into the pool, evict an item if the pool is full.
If a cache is provided, also set the value in the cache.
Attempt to save a value to the buffer pool. Check first if there is space to commit the value.
If not, evict the least recently used item and try again.
Args:
key: The key associated with the value to commit.
value: The value to commit to the buffer pool.
Returns:
The key of the evicted item if eviction occurred, otherwise None.
"""
# always try to save to the cache backend
self._cache_backend.set(key, value)

# try to save in the buffer pool, if we fail, release
# an item from the pool (LRUK2) and try again
evicted_key = None
mp_key = self._memory_pool.commit(value)
if mp_key is None:
evicted_key, evicted_value = self._lru.evict(True)
# First check if we can commit the value to the memory pool
if not self._memory_pool.can_commit(value):
evicted_key, evicted_value = self._lru.evict(details=True)
if evicted_key:
self._memory_pool.release(evicted_value)
mp_key = self._memory_pool.commit(value)
if mp_key is None:
return None
else:
self._lru.set(key, mp_key)
return evicted_key
else:
return None # Return None if no item could be evicted

# Try to commit the value to the memory pool
memory_pool_key = self._memory_pool.commit(value)
if memory_pool_key is None:
return None # Return None if commit still fails after eviction

# Update LRU cache with the new key and memory pool key if commit succeeds
self._lru.set(key, memory_pool_key)

# Return the evicted key if an eviction occurred, otherwise return None
return evicted_key if "evicted_key" in locals() else None

@property
def stats(self) -> tuple:
Expand Down Expand Up @@ -109,12 +117,7 @@ def __new__(cls):

@classmethod
def _create_instance(cls):
# Import here to avoid circular imports
from opteryx import get_cache_manager

cache_manager = get_cache_manager()

return _BufferPool(cache_manager)
return _BufferPool()

@classmethod
def reset(cls):
Expand Down
21 changes: 20 additions & 1 deletion opteryx/shared/memory_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def _level2_compaction(self):
segment.start = new_start
offset += segment.length

def can_commit(self, data: bytes) -> bool:
return sum(segment.length for segment in self.free_segments) > len(data)

def commit(self, data: bytes) -> int:
"""
Add an item to the pool and return its reference.
Expand All @@ -117,7 +120,6 @@ def commit(self, data: bytes) -> int:
first we combine adjacent free blocks into larger blocks. If that's
not enough, we consolidate all of the free blocks together.
"""
self.commits += 1
len_data = len(data)
# always acquire a lock to write
with self.lock:
Expand All @@ -126,9 +128,11 @@ def commit(self, data: bytes) -> int:
# avoid trying to compact if it won't release enough space anyway
total_free_space = sum(segment.length for segment in self.free_segments)
if total_free_space < len_data:
self.failed_commits += 1
return None
# avoid trying to compact, if we're already compacted
if len(self.free_segments) <= 1:
self.failed_commits += 1
return None
# combine adjacent free space (should be pretty quick)
self._level1_compaction()
Expand Down Expand Up @@ -157,6 +161,7 @@ def commit(self, data: bytes) -> int:

ref_id = random_int()
self.used_segments[ref_id] = new_segment
self.commits += 1
return ref_id

def read(self, ref_id: int) -> bytes:
Expand Down Expand Up @@ -194,6 +199,20 @@ def release(self, ref_id: int):
segment = self.used_segments.pop(ref_id)
self.free_segments.append(segment)

@property
def stats(self) -> dict:
return {
"free_segments": len(self.free_segments),
"used_segments": len(self.used_segments),
"commits": self.commits,
"failed_commits": self.failed_commits,
"reads": self.reads,
"read_locks": self.read_locks,
"l1_compaction": self.l1_compaction,
"l2_compaction": self.l2_compaction,
"releases": self.releases,
}

def __del__(self):
"""
This function exists just to wrap the debug logging
Expand Down
Loading

0 comments on commit a1c88fd

Please sign in to comment.