Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1547 #1548

Merged
merged 2 commits into from
Mar 30, 2024
Merged

#1547 #1548

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 391
__build__ = 392

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
10 changes: 2 additions & 8 deletions opteryx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,16 @@ def get(key, default=None):

# GCP project ID - for Google Cloud Data
GCP_PROJECT_ID: str = get("GCP_PROJECT_ID")
# Mapping prefixes to readers - the default is to use disk
DATASET_PREFIX_MAPPING: dict = get("DATASET_PREFIX_MAPPING", {"_":"disk"})
# Data Partitioning
PARTITION_SCHEME: str = get("PARTITION_SCHEME", None)
# The maximum number of evictions by a single query
MAX_CACHE_EVICTIONS_PER_QUERY: int = int(get("MAX_CACHE_EVICTIONS_PER_QUERY", 32))
# Maximum size for items saved to the buffer cache
MAX_CACHEABLE_ITEM_SIZE: int = int(get("MAX_CACHEABLE_ITEM_SIZE", 1024 * 1024))
# The local buffer pool size
MAX_LOCAL_BUFFER_CAPACITY: int = int(get("MAX_LOCAL_BUFFER_CAPACITY", 256))
# The local buffer pool size in bytes
MAX_LOCAL_BUFFER_CAPACITY: int = int(get("MAX_LOCAL_BUFFER_CAPACITY", 256 * 1024 * 1024))
# don't try to raise the priority of the server process
DISABLE_HIGH_PRIORITY: bool = bool(get("DISABLE_HIGH_PRIORITY", False))
# don't output resource (memory) utilization information
ENABLE_RESOURCE_LOGGING: bool = bool(get("ENABLE_RESOURCE_LOGGING", False))
# only push equals predicates
ONLY_PUSH_EQUALS_PREDICATES: bool = bool(get("ONLY_PUSH_EQUALS_PREDICATES", False))
# size of morsels to push between steps
MORSEL_SIZE: int = int(get("MORSEL_SIZE", 64 * 1024 * 1024))

Expand Down
4 changes: 2 additions & 2 deletions opteryx/managers/cache/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .cache_manager import CacheManager
from .memcached import MemcachedCache
from .memory import MemoryCache
from .null_cache import NullCache
from .redis import RedisCache

__all__ = ("CacheManager", "MemcachedCache", "MemoryCache", "RedisCache")
__all__ = ("CacheManager", "MemcachedCache", "NullCache", "RedisCache")
50 changes: 0 additions & 50 deletions opteryx/managers/cache/memory.py

This file was deleted.

10 changes: 10 additions & 0 deletions opteryx/managers/cache/null_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Any


class NullCache:

def get(self, key: bytes) -> None:
return None

def set(self, key: bytes, value: Any) -> None:
return None
55 changes: 30 additions & 25 deletions opteryx/shared/buffer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,61 +13,66 @@
"""
Global Buffer Pool.

This is little more than a wrapper around the LRU-K(2) cache.
This is uses an LRU-K2 policy to determine what to keep and what to evict and is
backed by a MemoryPool.
"""
from array import array
from typing import Any
from typing import Optional

from opteryx.managers.cache import NullCache
from opteryx.shared.memory_pool import MemoryPool
from opteryx.utils.lru_2 import LRU2


class NullCacheBackEnd:
"""
We can remove a check in each operation by just having a null service.
"""

def get(self, key: bytes) -> None:
return None

def set(self, key: bytes, value: Any) -> None:
return None


class _BufferPool:
"""
Buffer Pool is a class implementing a Least Recently Used (LRU) cache of buffers.
Buffer Pool is a class implementing a Least Recently Used (LRU) policy.
"""

slots = "_lru", "_cache_backend", "_max_cacheable_item_size"
slots = "_lru", "_cache_backend", "_max_cacheable_item_size", "_memory_pool"

def __init__(self, cache_manager):
self._cache_backend = cache_manager.cache_backend
if not self._cache_backend:
self._cache_backend = (
NullCacheBackEnd()
) # rather than make decisions - just use a dummy
self._cache_backend = NullCache() # rather than make decisions - just use a dummy
self.max_cacheable_item_size = cache_manager.max_cacheable_item_size
self._lru = LRU2(size=cache_manager.max_local_buffer_capacity)
self._lru = LRU2()
self._memory_pool = MemoryPool(
name="BufferPool", size=cache_manager.max_local_buffer_capacity
)

def get(self, key: bytes) -> Optional[array]:
"""
Retrieve an item from the pool, return None if the item isn't found.
If cache is provided and item is not in pool, attempt to get it from cache.
"""
value = self._lru.get(key)
if value is not None:
return value
mp_key = self._lru.get(key)
if mp_key is not None:
return self._memory_pool.read(mp_key)
return self._cache_backend.get(key)

def set(self, key: bytes, value) -> Optional[str]:
"""
Put an item into the pool, evict an item if the pool is full.
If a cache is provided, also set the value in the cache.
"""
evicted = self._lru.set(key, value)
# always try to save to the cache backend
self._cache_backend.set(key, value)
return evicted

# try to save in the buffer pool, if we fail, release
# an item from the pool (LRUK2) and try again
evicted_key = None
mp_key = self._memory_pool.commit(value)
if mp_key is None:
evicted_key, evicted_value = self._lru.evict(True)
if evicted_key:
self._memory_pool.release(evicted_value)
mp_key = self._memory_pool.commit(value)
if mp_key is None:
return None
else:
self._lru.set(key, mp_key)
return evicted_key

@property
def stats(self) -> tuple:
Expand Down
166 changes: 166 additions & 0 deletions opteryx/shared/memory_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from multiprocessing import Lock

from orso.tools import random_int

"""
Memory Pool is used to manage access to arbitrary blocks of memory.

This is designed to be thread-safe.
"""


class MemorySegment:
slots = ("start", "length")

def __init__(self, start, length):
self.start = start
self.length = length


class MemoryPool:
def __init__(
self,
size: int,
name: str = "Memory Pool",
):
if size <= 0:
raise ValueError("MemoryPool size must be a positive integer")
self.lock = Lock()
self.pool = bytearray(size)
self.size = size
self.free_segments = [MemorySegment(0, size)] # the whole pool is free
self.used_segments = {}
self.name = name
# statistics
self.commits = 0
self.failed_commits = 0
self.reads = 0
self.l1_compaction = 0
self.l2_compaction = 0
self.releases = 0

def _find_free_segment(self, size: int) -> int:
for index, segment in enumerate(self.free_segments):
if segment.length >= size:
return index
return -1

def _level1_compaction(self):
"""Merges adjacent free segments (Level 1 compaction)."""
self.l1_compaction += 1
if not self.free_segments:
return
# Ensure the list is sorted
self.free_segments.sort(key=lambda segment: segment.start)

# Use a new list to store merged segments
new_free_segments = [self.free_segments[0]]
for segment in self.free_segments[1:]:
last_segment = new_free_segments[-1]
if last_segment.start + last_segment.length == segment.start:
# If adjacent, merge by extending the last segment
last_segment.length += segment.length
else:
# If not adjacent, just add the segment to the new list
new_free_segments.append(segment)

self.free_segments = new_free_segments

def _level2_compaction(self):
"""Aggressively compacts by pushing all free memory to the end (Level 2 compaction)."""
self.l2_compaction += 1

total_free_space = sum(segment.length for segment in self.free_segments)
compacted_start = self.size - total_free_space
self.free_segments = [MemorySegment(compacted_start, total_free_space)]

offset = 0
for segment_id, segment in sorted(self.used_segments.items(), key=lambda x: x[1].start):
new_start = offset

# Apply memory views for zero-copy slice assignment
source_view = memoryview(self.pool)[segment.start : segment.start + segment.length]
dest_view = memoryview(self.pool)[new_start : new_start + segment.length]
dest_view[:] = source_view

segment.start = new_start
offset += segment.length

def commit(self, data: bytes) -> int:
self.commits += 1
len_data = len(data)
# always acquire a lock to write
# with self.lock:
if True:
segment_index = self._find_free_segment(len_data)
if segment_index == -1:
# avoid trying to compact if it won't release enough space anyway
total_free_space = sum(segment.length for segment in self.free_segments)
if total_free_space < len_data:
return None
# avoid trying to compact, if we're already compacted
if len(self.free_segments) <= 1:
return None
# combine adjacent free space (should be pretty quick)
self._level1_compaction()
segment_index = self._find_free_segment(len_data)
if segment_index == -1:
# move free space to the end (is slower)
self._level2_compaction()
segment_index = self._find_free_segment(len_data)
if segment_index == -1:
self.failed_commits += 1
# we're full, even after compaction
return None

free_segment = self.free_segments[segment_index]
start, length = free_segment.start, free_segment.length
new_segment = MemorySegment(start, len_data)

pool_view = memoryview(self.pool)[start : start + len_data]
pool_view[:] = data

if length > len_data:
free_segment.start += len_data
free_segment.length -= len_data
else:
self.free_segments.pop(segment_index)

ref_id = random_int()
self.used_segments[ref_id] = new_segment
return ref_id

def read(self, ref_id: int) -> bytes:
"""
We're using an optimistic locking strategy where we do not acquire
a lock, perform the read and then check that the metadata hasn't changed
and if it's the same, we assume no writes have updated it.

If it has changed, we acquire a lock and try again. The buffer pool is
read heavy, so optimized reads are preferred.
"""
self.reads += 1

if ref_id not in self.used_segments:
raise ValueError("Invalid reference id.")
segment = self.used_segments[ref_id]
view = memoryview(self.pool)[segment.start : segment.start + segment.length]
if segment != self.used_segments[ref_id]:
with self.lock:
if ref_id not in self.used_segments:
raise ValueError("Invalid reference id.")
segment = self.used_segments[ref_id]
view = memoryview(self.pool)[segment.start : segment.start + segment.length]
return view

def release(self, ref_id: int):
self.releases += 1
with self.lock:
if ref_id not in self.used_segments:
raise ValueError(f"Invalid reference ID - {ref_id}.")
segment = self.used_segments.pop(ref_id)
self.free_segments.append(segment)

def __del__(self):
pass
# DEBUG: log (f"Memory Pool ({self.name}) <size={self.size}, commits={self.commits}, reads={self.reads}, releases={self.releases}, L1={self.l1_compaction}, L2={self.l2_compaction}>")
Loading
Loading