Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1665 #1668

Merged
merged 6 commits into from
May 18, 2024
Merged

#1665 #1668

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions opteryx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
dotenv = None # type:ignore

_env_path = Path(".") / ".env"

# we do a separate check for debug mode here so we don't loaf the config
# module just yet
OPTERYX_DEBUG = os.environ.get("OPTERYX_DEBUG") is not None

# deepcode ignore PythonSameEvalBinaryExpressiontrue: false +ve, values can be different
Expand Down
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 506
__build__ = 509

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
17 changes: 14 additions & 3 deletions opteryx/compiled/structures/memory_pool.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,25 @@ cdef class MemoryPool:
public long commits, failed_commits, reads, read_locks, l1_compaction, l2_compaction, releases
object lock

def __cinit__(self, int size, str name="Memory Pool"):
def __cinit__(self, long size, str name="Memory Pool"):
if size <= 0:
raise ValueError("MemoryPool size must be a positive integer")

self.size = size
self.pool = <unsigned char*>malloc(size * sizeof(unsigned char))
attempt_size = size

while attempt_size > 0:
self.pool = <unsigned char*>malloc(attempt_size * sizeof(unsigned char))
if self.pool:
break
attempt_size >>= 1 # Bit shift to halve the size and try again

if not self.pool:
raise MemoryError("Failed to allocate memory pool")

self.size = attempt_size
self.name = name
self.free_segments = [MemorySegment(0, size)]
self.free_segments = [MemorySegment(0, self.size)]
self.used_segments = {}
self.lock = Lock()

Expand Down Expand Up @@ -151,6 +161,7 @@ cdef class MemoryPool:

memcpy(self.pool + segment.start, PyBytes_AsString(data), len_data)
self.used_segments[ref_id] = MemorySegment(segment.start, len_data)
self.commits += 1
return ref_id

def read(self, long ref_id, int zero_copy = 1):
Expand Down
55 changes: 43 additions & 12 deletions opteryx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,34 @@
from os import environ
from pathlib import Path

import psutil

_config_values: dict = {}
OPTERYX_DEBUG = environ.get("OPTERYX_DEBUG") is not None

# we need a preliminary version of this variable
_OPTERYX_DEBUG = environ.get("OPTERYX_DEBUG") is not None


def memory_allocation_calculation(allocation) -> int:
"""
Configure the memory allocation for the database based on the input.
If the allocation is between 0 and 1, it's treated as a percentage of the total system memory.
If the allocation is greater than 1, it's treated as an absolute value in megabytes.

Parameters:
allocation (float|int): Memory allocation value which could be a percentage or an absolute value.

Returns:
int: Memory size in bytes to be allocated.
"""
total_memory = psutil.virtual_memory().total # Convert bytes to megabytes

if 0 < allocation < 1: # Treat as a percentage
return int(total_memory * allocation)
elif allocation >= 1: # Treat as an absolute value in MB
return int(allocation)
else:
raise ValueError("Invalid memory allocation value. Must be a positive number.")


def parse_yaml(yaml_str):
Expand Down Expand Up @@ -83,10 +109,10 @@ def line_value(value):
if _config_path.exists():
with open(_config_path, "r") as _config_file:
_config_values = parse_yaml(_config_file.read())
if OPTERYX_DEBUG:
if _OPTERYX_DEBUG:
print(f"{datetime.datetime.now()} [LOADER] Loading config from {_config_path}")
except Exception as exception: # pragma: no cover # it doesn't matter why - just use the defaults
if OPTERYX_DEBUG:
if _OPTERYX_DEBUG:
print(
f"{datetime.datetime.now()} [LOADER] Config file {_config_path} not used - {exception}"
)
Expand All @@ -104,25 +130,30 @@ def get(key, default=None):
# These are 'protected' properties which cannot be overridden by a single query

# GCP project ID - for Google Cloud Data
GCP_PROJECT_ID: str = get("GCP_PROJECT_ID")
GCP_PROJECT_ID: str = get("GCP_PROJECT_ID")
# The maximum number of evictions by a single query
MAX_CACHE_EVICTIONS_PER_QUERY: int = int(get("MAX_CACHE_EVICTIONS_PER_QUERY", 32))
# Maximum size for items saved to the buffer cache
MAX_CACHEABLE_ITEM_SIZE: int = int(get("MAX_CACHEABLE_ITEM_SIZE", 1024 * 1024))
# The local buffer pool size in bytes
MAX_LOCAL_BUFFER_CAPACITY: int = int(get("MAX_LOCAL_BUFFER_CAPACITY", 256 * 1024 * 1024))
MAX_CACHEABLE_ITEM_SIZE: int = int(get("MAX_CACHEABLE_ITEM_SIZE", 2 * 1024 * 1024))
# The local buffer pool size in either bytes or fraction of system memory
MAX_LOCAL_BUFFER_CAPACITY: int = memory_allocation_calculation(float(get("MAX_LOCAL_BUFFER_CAPACITY", 0.2)))
# The read buffer pool size in either bytes or fraction of system memory
MAX_READ_BUFFER_CAPACITY: int = memory_allocation_calculation(float(get("MAX_READ_BUFFER_CAPACITY", 0.1)))
# don't try to raise the priority of the server process
DISABLE_HIGH_PRIORITY: bool = bool(get("DISABLE_HIGH_PRIORITY", False))
# don't output resource (memory) utilization information
ENABLE_RESOURCE_LOGGING: bool = bool(get("ENABLE_RESOURCE_LOGGING", False))
# size of morsels to push between steps
MORSEL_SIZE: int = int(get("MORSEL_SIZE", 64 * 1024 * 1024))

# not GA
PROFILE_LOCATION:str = get("PROFILE_LOCATION")
# debug mode
OPTERYX_DEBUG: bool = bool(get("OPTERYX_DEBUG", False))
# number of read loops per data source
CONCURRENT_READS: int = int(get("CONCURRENT_READS", 4))
# query log
QUERY_LOG_LOCATION:str = get("QUERY_LOG_LOCATION", False)
QUERY_LOG_SIZE:int = int(get("QUERY_LOG_SIZE", 100))
# not currently supported
METADATA_SERVER: str = None


# not GA
PROFILE_LOCATION:str = get("PROFILE_LOCATION")
# fmt:on
28 changes: 17 additions & 11 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

from orso.cityhash import CityHash64

from opteryx.config import MAX_CACHE_EVICTIONS_PER_QUERY
from opteryx.config import MAX_CACHEABLE_ITEM_SIZE

__all__ = ("Cacheable", "read_thru_cache")


Expand Down Expand Up @@ -47,7 +50,7 @@ def read_thru_cache(func):
from opteryx.shared import BufferPool

cache_manager = get_cache_manager()
max_evictions = cache_manager.max_evictions_per_query
max_evictions = MAX_CACHE_EVICTIONS_PER_QUERY
remote_cache = cache_manager.cache_backend
if not remote_cache:
# rather than make decisions - just use a dummy
Expand Down Expand Up @@ -81,17 +84,19 @@ def wrapper(blob_name, statistics, **kwargs):

# Write the result to caches
if max_evictions:
# we set a per-query eviction limit
if len(result) < buffer_pool.max_cacheable_item_size:

if len(result) < buffer_pool.size // 10:
evicted = buffer_pool.set(key, result)
remote_cache.set(key, result)
if evicted:
# if we're evicting items we're putting into the cache
# if we're evicting items we just put in the cache, stop
if evicted in my_keys:
max_evictions = 0
else:
max_evictions -= 1
statistics.cache_evictions += 1

if len(result) < MAX_CACHEABLE_ITEM_SIZE:
remote_cache.set(key, result)
else:
statistics.cache_oversize += 1

Expand Down Expand Up @@ -120,7 +125,7 @@ def async_read_thru_cache(func):
from opteryx.shared import MemoryPool

cache_manager = get_cache_manager()
max_evictions = cache_manager.max_evictions_per_query
max_evictions = MAX_CACHE_EVICTIONS_PER_QUERY
remote_cache = cache_manager.cache_backend
if not remote_cache:
# rather than make decisions - just use a dummy
Expand Down Expand Up @@ -171,18 +176,19 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
if max_evictions:
# we set a per-query eviction limit
buffer = await pool.read(result) # type: ignore
if len(buffer) < buffer_pool.max_cacheable_item_size:

if len(buffer) < buffer_pool.size // 10:
evicted = buffer_pool.set(key, buffer)
remote_cache.set(key, buffer)
if evicted:
# if we're evicting items we're putting into the cache
# stop putting more stuff into the cache, otherwise we're
# just thrashing
# if we're evicting items we just put in the cache, stop
if evicted in my_keys:
max_evictions = 0
else:
max_evictions -= 1
statistics.cache_evictions += 1

if len(buffer) < MAX_CACHEABLE_ITEM_SIZE:
remote_cache.set(key, buffer)
else:
statistics.cache_oversize += 1

Expand Down
2 changes: 1 addition & 1 deletion opteryx/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from orso.tools import random_string
from orso.types import PYTHON_TO_ORSO_MAP

from opteryx import OPTERYX_DEBUG
from opteryx.config import OPTERYX_DEBUG
from opteryx.connectors.base.base_connector import DEFAULT_MORSEL_SIZE
from opteryx.connectors.base.base_connector import INITIAL_CHUNK_SIZE
from opteryx.connectors.base.base_connector import MIN_CHUNK_SIZE
Expand Down
6 changes: 4 additions & 2 deletions opteryx/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
from opteryx.utils import sql

PROFILE_LOCATION = config.PROFILE_LOCATION
QUERY_LOG_LOCATION = config.QUERY_LOG_LOCATION
QUERY_LOG_SIZE = config.QUERY_LOG_SIZE


ROLLING_LOG = None
if PROFILE_LOCATION:
ROLLING_LOG = RollingLog(PROFILE_LOCATION + ".log")
if QUERY_LOG_LOCATION:
ROLLING_LOG = RollingLog(QUERY_LOG_LOCATION, max_entries=QUERY_LOG_SIZE)


class CursorState(Enum):
Expand Down
59 changes: 13 additions & 46 deletions opteryx/managers/cache/cache_manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union

from opteryx import config
from opteryx.exceptions import InvalidConfigurationError
from opteryx.managers.kvstores import BaseKeyValueStore

MAX_CACHEABLE_ITEM_SIZE = config.MAX_CACHEABLE_ITEM_SIZE
MAX_CACHE_EVICTIONS_PER_QUERY = config.MAX_CACHE_EVICTIONS_PER_QUERY
MAX_LOCAL_BUFFER_CAPACITY = config.MAX_LOCAL_BUFFER_CAPACITY


class CacheManager:
"""
Expand All @@ -16,54 +23,14 @@ class CacheManager:
Parameters:
cache_backend: Union[BaseKeyValueStore, None]
The cache storage to use.
max_cacheable_item_size: int
The maximum size a single item in the cache can occupy.
max_evictions_per_query: int
The number of items to evict from cache per query.
max_local_buffer_capacity: int
The maximum number of items to store in the BufferPool.
"""

def __init__(
self,
cache_backend: Union[BaseKeyValueStore, None] = None,
max_cacheable_item_size: Union[int, None] = MAX_CACHEABLE_ITEM_SIZE,
max_evictions_per_query: Union[int, None] = MAX_CACHE_EVICTIONS_PER_QUERY,
max_local_buffer_capacity: int = MAX_LOCAL_BUFFER_CAPACITY,
):
def __init__(self, cache_backend: Union[BaseKeyValueStore, None] = None):
if cache_backend is not None and not isinstance(cache_backend, BaseKeyValueStore):
raise InvalidConfigurationError(
config_item="cache_backend",
provided_value=str(type(cache_backend)),
valid_value_description="Instance of BaseKeyValueStore",
)

if max_cacheable_item_size is not None and (
not isinstance(max_cacheable_item_size, int) or max_cacheable_item_size <= 0
):
raise InvalidConfigurationError(
config_item="max_cacheable_item_size",
provided_value=str(max_cacheable_item_size),
valid_value_description="A number greater than zero",
)

if max_evictions_per_query is not None and (
not isinstance(max_evictions_per_query, int) or max_evictions_per_query <= 0
):
raise InvalidConfigurationError(
config_item="max_evictions_per_query",
provided_value=str(max_evictions_per_query),
valid_value_description="A number greater than zero",
)

if not isinstance(max_local_buffer_capacity, int) or max_local_buffer_capacity <= 0:
raise InvalidConfigurationError(
config_item="max_local_buffer_capacity",
provided_value=str(max_local_buffer_capacity),
valid_value_description="A number greater than zero",
)

self.cache_backend = cache_backend
self.max_cacheable_item_size = max_cacheable_item_size
self.max_evictions_per_query = max_evictions_per_query
self.max_local_buffer_capacity = max_local_buffer_capacity
7 changes: 7 additions & 0 deletions opteryx/managers/cache/memcached.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(self, **kwargs):
self.misses: int = 0
self.skips: int = 0
self.errors: int = 0
self.sets: int = 0

def get(self, key: bytes) -> Union[bytes, None]:
if self._consecutive_failures >= MAXIMUM_CONSECUTIVE_FAILURES:
Expand Down Expand Up @@ -119,6 +120,12 @@ def set(self, key: bytes, value: bytes) -> None:
if self._consecutive_failures < MAXIMUM_CONSECUTIVE_FAILURES:
try:
self._server.set(key, value)
self.sets += 1
except:
# if we fail to set, stop trying
self._consecutive_failures = MAXIMUM_CONSECUTIVE_FAILURES
self.errors += 1

def __del__(self):
pass
# DEBUG: log(f"Memcached <hits={self.hits} misses={self.misses} sets={self.sets} skips={self.skips} errors={self.errors}>")
9 changes: 4 additions & 5 deletions opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
import pyarrow.parquet
from orso.schema import RelationSchema

from opteryx import config
from opteryx.operators.base_plan_node import BasePlanDataObject
from opteryx.operators.read_node import ReaderNode
from opteryx.shared import AsyncMemoryPool
from opteryx.shared import MemoryPool
from opteryx.utils.file_decoders import get_decoder

CONCURRENT_READS = 4
MAX_BUFFER_SIZE_MB = 512
CONCURRENT_READS = config.CONCURRENT_READS
MAX_READ_BUFFER_CAPACITY = config.MAX_READ_BUFFER_CAPACITY


def normalize_morsel(schema: RelationSchema, morsel: pyarrow.Table) -> pyarrow.Table:
Expand Down Expand Up @@ -106,9 +107,7 @@ class AsyncReaderNode(ReaderNode):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pool = MemoryPool(
MAX_BUFFER_SIZE_MB * 1024 * 1024, f"ReadBuffer <{self.parameters['alias']}>"
)
self.pool = MemoryPool(MAX_READ_BUFFER_CAPACITY, f"ReadBuffer <{self.parameters['alias']}>")

self.do = AsyncReaderDataObject()

Expand Down
3 changes: 2 additions & 1 deletion opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ def __init__(self, properties: QueryProperties, **parameters):
self.statistics = QueryStatistics(properties.qid)
self.execution_time = 0
self.identity = random_string()
self.do: Optional[BasePlanDataObject] = None

def to_json(self) -> dict: # pragma: no cover
def to_json(self) -> bytes: # pragma: no cover

import orjson

Expand Down
Loading