Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1595 #1596

Merged
merged 2 commits into from
Apr 22, 2024
Merged

#1595 #1596

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 434
__build__ = 435

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion opteryx/connectors/base/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import pyarrow
from orso.schema import RelationSchema

from opteryx.shared import QueryStatistics
from opteryx.models import QueryStatistics

MIN_CHUNK_SIZE: int = 500
INITIAL_CHUNK_SIZE: int = 500
Expand Down
2 changes: 2 additions & 0 deletions opteryx/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from opteryx.models.node import Node
from opteryx.models.non_tabular_result import NonTabularResult
from opteryx.models.query_properties import QueryProperties
from opteryx.models.query_statistics import QueryStatistics

__all__ = (
"ConnectionContext",
Expand All @@ -24,4 +25,5 @@
"Node",
"NonTabularResult",
"QueryProperties",
"QueryStatistics",
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self):
self._stats: dict = defaultdict(int)
self._stats["messages"] = []

def _ns_to_s(self, nano_seconds):
def _ns_to_s(self, nano_seconds: int) -> float:
"""convert elapsed ns to s"""
if nano_seconds == 0:
return 0
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from orso.tools import random_string

from opteryx.models import QueryProperties
from opteryx.shared import QueryStatistics
from opteryx.models import QueryStatistics


class BasePlanNode:
Expand Down
2 changes: 1 addition & 1 deletion opteryx/planner/binder/binding_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import Set

from opteryx.models import ConnectionContext
from opteryx.shared import QueryStatistics
from opteryx.models import QueryStatistics
from opteryx.virtual_datasets import derived


Expand Down
Empty file.
3 changes: 1 addition & 2 deletions opteryx/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from opteryx.shared.buffer_pool import BufferPool
from opteryx.shared.materialized_datasets import MaterializedDatasets
from opteryx.shared.memory_pool import MemoryPool
from opteryx.shared.query_statistics import QueryStatistics
from opteryx.shared.rolling_log import RollingLog

__all__ = ("BufferPool", "MaterializedDatasets", "MemoryPool", "QueryStatistics", "RollingLog")
__all__ = ("BufferPool", "MaterializedDatasets", "MemoryPool", "RollingLog")
5 changes: 4 additions & 1 deletion opteryx/shared/buffer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
The buffer pool is has no slot limit, it is a given volume of memory, the pool
will try to evict when full. This is different to a classic Buffer Pool which
is slot-based.

The Buffer Pool is a global resource and used across all Connections and Cursors.
"""
from typing import Optional

Expand All @@ -29,7 +31,8 @@

class _BufferPool:
"""
Buffer Pool is a class implementing a Least Recently Used (LRU) policy.
Buffer Pool is a class implementing a Least Recently Used (LRU) policy for
eviction.
"""

slots = "_lru", "_cache_backend", "_max_cacheable_item_size", "_memory_pool"
Expand Down
43 changes: 33 additions & 10 deletions opteryx/shared/memory_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from multiprocessing import Lock
from typing import Dict

from orso.tools import random_int

"""
Memory Pool is used to manage access to arbitrary blocks of memory.

This is designed to be thread-safe with non-blocking reads.
"""


from multiprocessing import Lock
from typing import Dict

from orso.tools import random_int


class MemorySegment:
slots = ("start", "length")

Expand Down Expand Up @@ -60,7 +61,11 @@ def _find_free_segment(self, size: int) -> int:
return -1

def _level1_compaction(self):
"""Merges adjacent free segments (Level 1 compaction)."""
"""
Merges adjacent free segments (Level 1 compaction).

This is intended to a fast way to get larger contiguous blocks.
"""
self.l1_compaction += 1
if not self.free_segments:
return
Expand All @@ -81,7 +86,11 @@ def _level1_compaction(self):
self.free_segments = new_free_segments

def _level2_compaction(self):
"""Aggressively compacts by pushing all free memory to the end (Level 2 compaction)."""
"""
Aggressively compacts by pushing all free memory to the end (Level 2 compaction).

This is slower, but ensures we get the maximum free space.
"""
self.l2_compaction += 1

total_free_space = sum(segment.length for segment in self.free_segments)
Expand All @@ -101,6 +110,13 @@ def _level2_compaction(self):
offset += segment.length

def commit(self, data: bytes) -> int:
"""
Add an item to the pool and return its reference.

If we can't find a free block large enough we perform compaction,
first we combine adjacent free blocks into larger blocks. If that's
not enough, we consolidate all of the free blocks together.
"""
self.commits += 1
len_data = len(data)
# always acquire a lock to write
Expand Down Expand Up @@ -147,10 +163,11 @@ def read(self, ref_id: int) -> bytes:
"""
We're using an optimistic locking strategy where we do not acquire
a lock, perform the read and then check that the metadata hasn't changed
and if it's the same, we assume no writes have updated it.
and if it's the same, we assume no writes have updated it. If it has
changed, we acquire a lock and try again.

If it has changed, we acquire a lock and try again. The buffer pool is
read heavy, so optimized reads are preferred.
We use this approach because locks are expensive and memory pools are
likely to be read heavy.
"""
if ref_id not in self.used_segments:
raise ValueError("Invalid reference ID.")
Expand All @@ -167,6 +184,9 @@ def read(self, ref_id: int) -> bytes:
return view

def release(self, ref_id: int):
"""
Remove an item from the pool
"""
self.releases += 1
with self.lock:
if ref_id not in self.used_segments:
Expand All @@ -175,5 +195,8 @@ def release(self, ref_id: int):
self.free_segments.append(segment)

def __del__(self):
"""
This function exists just to wrap the debug logging
"""
pass
# DEBUG: log (f"Memory Pool ({self.name}) <size={self.size}, commits={self.commits}, reads={self.reads}, releases={self.releases}, L1={self.l1_compaction}, L2={self.l2_compaction}>")
2 changes: 1 addition & 1 deletion tests/misc/test_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import opteryx.virtual_datasets
from opteryx.managers.expression import ORSO_TO_NUMPY_MAP, NodeType, evaluate
from opteryx.models import Node
from opteryx.shared import QueryStatistics
from opteryx.models import QueryStatistics

stats = QueryStatistics()

Expand Down
Loading