From ab38e6a2732c1780c2d9446dd09d014544324d75 Mon Sep 17 00:00:00 2001 From: joocer Date: Sun, 28 Apr 2024 10:48:53 +0100 Subject: [PATCH 1/2] #1613 --- opteryx/compiled/structures/__init__.py | 1 + opteryx/compiled/structures/memory_pool.pyx | 159 +++++++++++++++++ .../connectors/gcp_cloudstorage_connector.py | 39 ++++ opteryx/operators/__init__.py | 1 + opteryx/operators/async_scanner_node.py | 168 ++++++++++++++++++ .../bench/predicate_compaction_strategy.py | 117 ++++++++++++ opteryx/planner/temporary_physical_planner.py | 6 +- opteryx/shared/__init__.py | 3 +- opteryx/shared/memory_pool.py | 42 ++++- tests/misc/test_memory_pool.py | 22 +-- 10 files changed, 542 insertions(+), 16 deletions(-) create mode 100644 opteryx/compiled/structures/memory_pool.pyx create mode 100644 opteryx/operators/async_scanner_node.py create mode 100644 opteryx/planner/cost_based_optimizer/bench/predicate_compaction_strategy.py diff --git a/opteryx/compiled/structures/__init__.py b/opteryx/compiled/structures/__init__.py index 73525a004..d8c1e398e 100644 --- a/opteryx/compiled/structures/__init__.py +++ b/opteryx/compiled/structures/__init__.py @@ -1,4 +1,5 @@ from .hash_table import HashSet from .hash_table import HashTable from .hash_table import distinct +from .memory_pool import MemoryPool from .node import Node diff --git a/opteryx/compiled/structures/memory_pool.pyx b/opteryx/compiled/structures/memory_pool.pyx new file mode 100644 index 000000000..9127ab604 --- /dev/null +++ b/opteryx/compiled/structures/memory_pool.pyx @@ -0,0 +1,159 @@ +# memory_pool.pyx +from libc.stdlib cimport malloc, free +from libc.string cimport memcpy +from cpython.bytes cimport PyBytes_AsString, PyBytes_FromStringAndSize +from threading import Lock +from orso.tools import random_int +from libcpp.vector cimport vector + +cdef struct MemorySegment: + int start + int length + +cdef class MemoryPool: + cdef: + unsigned char* pool + public int size + public vector[MemorySegment] free_segments + public dict used_segments + public str name + public int commits, failed_commits, reads, read_locks, l1_compaction, l2_compaction, releases + object lock + + def __cinit__(self, int size, str name="Memory Pool"): + if size <= 0: + raise ValueError("MemoryPool size must be a positive integer") + self.size = size + self.pool = malloc(size * sizeof(unsigned char)) + if not self.pool: + raise MemoryError("Failed to allocate memory pool") + self.name = name + self.free_segments = [MemorySegment(0, size)] + self.used_segments = {} + self.lock = Lock() + # Initialize statistics + self.commits = 0 + self.failed_commits = 0 + self.reads = 0 + self.read_locks = 0 + self.l1_compaction = 0 + self.l2_compaction = 0 + self.releases = 0 + + def __dealloc__(self): + if self.pool is not NULL: + free(self.pool) + + def _find_free_segment(self, int size) -> int: + cdef int i + cdef MemorySegment segment + for i in range(len(self.free_segments)): + segment = self.free_segments[i] + if segment.length >= size: + return i + return -1 + + def _level1_compaction(self): + cdef int i, n + cdef MemorySegment last_segment, current_segment, segment + cdef vector[MemorySegment] sorted_segments + + self.l1_compaction += 1 + i = 1 + n = len(self.free_segments) + + sorted_segments = sorted(self.free_segments, key=lambda x: x["start"]) + new_free_segments = [sorted_segments[0]] + + for segment in self.free_segments[1:]: + last_segment = new_free_segments[-1] + if last_segment.start + last_segment.length == segment.start: + # If adjacent, merge by extending the last segment + last_segment.length += segment.length + else: + # If not adjacent, just add the segment to the new list + new_free_segments.append(segment) + + self.free_segments = new_free_segments + + def _level2_compaction(self): + """ + Aggressively compacts by pushing all free memory to the end (Level 2 compaction). + + This is slower, but ensures we get the maximum free space. + """ + cdef MemorySegment segment + cdef long segment_id + cdef int offset = 0 + + self.l2_compaction += 1 + + total_free_space = sum(segment.length for segment in self.free_segments) + compacted_start = self.size - total_free_space + self.free_segments = [MemorySegment(compacted_start, total_free_space)] + + for segment_id, segment in sorted(self.used_segments.items(), key=lambda x: x[1]["start"]): + memcpy(self.pool + offset, self.pool + segment.start, segment.length) + segment.start = offset + self.used_segments[segment_id] = segment + offset += segment.length + + def commit(self, bytes data) -> long: + cdef int len_data = len(data) + cdef int segment_index + cdef MemorySegment segment + cdef long ref_id = random_int() + + # special case for 0 byte segments + if len_data == 0: + new_segment = MemorySegment(0, 0) + ref_id = random_int() + self.used_segments[ref_id] = new_segment + self.commits += 1 + return ref_id + + total_free_space = sum(segment.length for segment in self.free_segments) + if total_free_space < len_data: + self.failed_commits += 1 + return None + + with self.lock: + segment_index = self._find_free_segment(len_data) + if segment_index == -1: + self._level1_compaction() + segment_index = self._find_free_segment(len_data) + if segment_index == -1: + self._level2_compaction() + segment_index = self._find_free_segment(len_data) + if segment_index == -1: + return None # No space available + + segment = self.free_segments[segment_index] + self.free_segments.erase(self.free_segments.begin() + segment_index) + if segment.length > len_data: + self.free_segments.push_back(MemorySegment(segment.start + len_data, segment.length - len_data)) + + memcpy(self.pool + segment.start, PyBytes_AsString(data), len_data) + self.used_segments[ref_id] = MemorySegment(segment.start, len_data) + return ref_id + + def read(self, long ref_id) -> bytes: + cdef MemorySegment segment + cdef char* char_ptr = self.pool + with self.lock: + if ref_id not in self.used_segments: + raise ValueError("Invalid reference ID.") + segment = self.used_segments[ref_id] + + data = PyBytes_FromStringAndSize(char_ptr + segment.start, segment.length) + return data + + def release(self, long ref_id): + with self.lock: + if ref_id not in self.used_segments: + raise ValueError(f"Invalid reference ID - {ref_id}.") + segment = self.used_segments.pop(ref_id) + self.free_segments.push_back(segment) + + def available_space(self) -> int: + return sum(segment.length for segment in self.free_segments) \ No newline at end of file diff --git a/opteryx/connectors/gcp_cloudstorage_connector.py b/opteryx/connectors/gcp_cloudstorage_connector.py index 9a6087776..87dbdf618 100644 --- a/opteryx/connectors/gcp_cloudstorage_connector.py +++ b/opteryx/connectors/gcp_cloudstorage_connector.py @@ -10,11 +10,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import os import urllib.request from typing import Dict from typing import List +import aiohttp import pyarrow from orso.schema import RelationSchema from orso.tools import single_item_cache @@ -124,6 +126,43 @@ def read_blob(self, *, blob_name, **kwargs): self.statistics.bytes_read += len(content) return content + async def async_read_blob(self, *, blob_name, pool, session, **kwargs): + # For performance we use the GCS API directly, this is roughly 10% + # faster than using the SDK. As one of the slowest parts of the system + # 10% can be measured in seconds. + + bucket, _, _, _ = paths.get_parts(blob_name) + print("READ ", blob_name) + + # Ensure the credentials are valid, refreshing them if necessary + if not self.client_credentials.valid: # pragma: no cover + from google.auth.transport.requests import Request + + request = Request() + self.client_credentials.refresh(request) + self.access_token = self.client_credentials.token + + if "kh" not in bucket: + bucket = bucket.replace("va_data", "va-data") + bucket = bucket.replace("data_", "data-") + object_full_path = urllib.parse.quote(blob_name[(len(bucket) + 1) :], safe="") + + url = f"https://storage.googleapis.com/storage/v1/b/{bucket}/o/{object_full_path}?alt=media" + + async with session.get( + url, headers={"Authorization": f"Bearer {self.access_token}"}, timeout=30 + ) as response: + if response.status != 200: + raise Exception(f"Unable to read '{blob_name}' - {response.status_code}") + data = await response.read() + ref = await pool.commit(data) + while ref is None: + print(".", end="", flush=True) + await asyncio.sleep(1) + ref = await pool.commit(data) + self.statistics.bytes_read += len(data) + return ref + @single_item_cache def get_list_of_blob_names(self, *, prefix: str) -> List[str]: bucket, object_path, _, _ = paths.get_parts(prefix) diff --git a/opteryx/operators/__init__.py b/opteryx/operators/__init__.py index 3916a147c..f87e08128 100644 --- a/opteryx/operators/__init__.py +++ b/opteryx/operators/__init__.py @@ -15,6 +15,7 @@ from .aggregate_and_group_node import AggregateAndGroupNode # Group is always followed by aggregate from .aggregate_node import AGGREGATORS from .aggregate_node import AggregateNode # aggregate data +from .async_scanner_node import AsyncScannerNode # from .build_statistics_node import BuildStatisticsNode # Analyze Tables from .cross_join_node import CrossJoinNode # CROSS JOIN diff --git a/opteryx/operators/async_scanner_node.py b/opteryx/operators/async_scanner_node.py new file mode 100644 index 000000000..24556cf05 --- /dev/null +++ b/opteryx/operators/async_scanner_node.py @@ -0,0 +1,168 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Async Scanner Node + +This is the SQL Query Execution Plan Node responsible for the reading of data. + +It wraps different internal readers (e.g. GCP Blob reader, SQL Reader), +normalizes the data into the format for internal processing. +""" +import asyncio +import queue +import threading +import time +from typing import Generator + +import aiohttp +import pyarrow +import pyarrow.parquet +from orso.schema import RelationSchema + +from opteryx.operators.scanner_node import ScannerNode +from opteryx.shared import AsyncMemoryPool +from opteryx.shared import MemoryPool +from opteryx.utils.file_decoders import get_decoder + +CONCURRENT_READS = 4 +MAX_BUFFER_SIZE_MB = 256 + + +def normalize_morsel(schema: RelationSchema, morsel: pyarrow.Table) -> pyarrow.Table: + if len(schema.columns) == 0: + one_column = pyarrow.array([1] * morsel.num_rows, type=pyarrow.int8()) + morsel = morsel.append_column("*", one_column) + return morsel.select(["*"]) + + # rename columns for internal use + target_column_names = [] + # columns in the data but not in the schema, droppable + droppable_columns = [] + + # find which columns to drop and which columns we already have + for i, column in enumerate(morsel.column_names): + column_name = schema.find_column(column) + if column_name is None: + droppable_columns.append(i) + else: + target_column_names.append(str(column_name)) + + # remove from the end otherwise we'll remove the wrong columns after the first one + droppable_columns.reverse() + for droppable in droppable_columns: + morsel = morsel.remove_column(droppable) + + # remane columns to the internal names (identities) + morsel = morsel.rename_columns(target_column_names) + + # add columns we don't have + for column in schema.columns: + if column.identity not in target_column_names: + null_column = pyarrow.array([None] * morsel.num_rows) + morsel = morsel.append_column(column.identity, null_column) + + # ensure the columns are in the right order + return morsel.select([col.identity for col in schema.columns]) + + +async def fetch_data(blob_names, pool, reader, queue, statistics): + semaphore = asyncio.Semaphore( + CONCURRENT_READS + ) # Adjust based on memory and expected data sizes + + session = aiohttp.ClientSession() + + async def fetch_and_process(blob_name): + async with semaphore: + start_clock = time.monotonic_ns() + reference = await reader(blob_name=blob_name, pool=pool, session=session) + statistics.time_reading_blobs += time.monotonic_ns() - start_clock + queue.put((blob_name, reference)) # Put data onto the queue + + tasks = (fetch_and_process(blob) for blob in blob_names) + + await asyncio.gather(*tasks) + queue.put(None) + await session.close() + + +class AsyncScannerNode(ScannerNode): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.pool = MemoryPool(MAX_BUFFER_SIZE_MB * 1024 * 1024, "read_buffer") + + def execute(self) -> Generator: + """Perform this step, time how long is spent doing work""" + morsel = None + orso_schema = self.parameters["schema"] + reader = self.parameters["connector"] + + orso_schema_cols = [] + for col in orso_schema.columns: + if col.identity in [c.identity for c in self.columns]: + orso_schema_cols.append(col) + orso_schema.columns = orso_schema_cols + arrow_schema = None + start_clock = time.monotonic_ns() + + morsel = None + + blob_names = reader.partition_scheme.get_blobs_in_partition( + start_date=reader.start_date, + end_date=reader.end_date, + blob_list_getter=reader.get_list_of_blob_names, + prefix=reader.dataset, + ) + + data_queue = queue.Queue() + + t = time.monotonic() + loop = asyncio.new_event_loop() + threading.Thread( + target=lambda: loop.run_until_complete( + fetch_data( + blob_names, + AsyncMemoryPool(self.pool), + reader.async_read_blob, + data_queue, + self.statistics, + ) + ), + daemon=True, + ).start() + + while True: + item = data_queue.get() + if item is None: + break + blob_name, reference = item + + decoder = get_decoder(blob_name) + blob_bytes = self.pool.read_and_release(reference) + decoded = decoder(blob_bytes, projection=self.columns, selection=self.predicates) + num_rows, num_columns, morsel = decoded + self.statistics.rows_seen += num_rows + + morsel = normalize_morsel(orso_schema, morsel) + + self.statistics.blobs_read += 1 + self.statistics.rows_read += morsel.num_rows + self.statistics.bytes_processed += morsel.nbytes + + yield morsel + + if morsel: + self.statistics.columns_read += morsel.num_columns + + print(time.monotonic() - t) diff --git a/opteryx/planner/cost_based_optimizer/bench/predicate_compaction_strategy.py b/opteryx/planner/cost_based_optimizer/bench/predicate_compaction_strategy.py new file mode 100644 index 000000000..b8103def8 --- /dev/null +++ b/opteryx/planner/cost_based_optimizer/bench/predicate_compaction_strategy.py @@ -0,0 +1,117 @@ +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class Limit: + value: Optional[int] # None indicates unbounded + inclusive: bool + + +@dataclass +class ValueRange: + lower: Limit = None # Lower limit of the range + upper: Limit = None # Upper limit of the range + untrackable: bool = False + + def update_with_predicate(self, operator: str, value: int): + if self.untrackable or not operator in ("=", ">=", "<=", ">", "<"): + self.untrackable = True + return + + new_limit = Limit(value, inclusive=operator in ("=", ">=", "<=")) + + if operator in ("=", ">=", ">"): + if self.lower is None: + self.lower = new_limit + # Update lower limit if the new one is more restrictive + elif new_limit.value > self.lower.value or ( + new_limit.value == self.lower.value and new_limit.inclusive + ): + self.lower = new_limit + if operator in ("=", "<=", "<"): + if self.upper is None: + self.upper = new_limit + # Update upper limit if the new one is more restrictive + elif new_limit.value < self.upper.value or ( + new_limit.value == self.upper.value and new_limit.inclusive + ): + self.upper = new_limit + + def __bool__(self): + if self.upper is None or self.lower is None: + return True + return self.upper.value >= self.lower.value + + def __str__(self) -> str: + _range = "" + if self.untrackable: + return "Unsupported Conditions" + if not (self): + return "Invalid Range" + if self.lower and self.upper and self.lower.value == self.upper.value: + return f" = {self.lower.value}" + if self.lower is not None: + _range += f" >{'=' if self.lower.inclusive else ''} {self.lower.value}" + if self.upper is not None: + _range += f" <{'=' if self.upper.inclusive else ''} {self.upper.value}" + return _range + + +# Example usage +initial_range = ValueRange() +initial_range.update_with_predicate(">", 3) +initial_range.update_with_predicate("<", 10) +initial_range.update_with_predicate("=", 7) +print("Range after applying predicates:", initial_range) + + +def test_initialization(): + vr = ValueRange() + assert vr.lower is None + assert vr.upper is None + assert not vr.untrackable + + +def test_updates_with_various_predicates(): + vr = ValueRange() + vr.update_with_predicate(">", 3) + assert vr.lower == Limit(3, False) + vr.update_with_predicate("<", 10) + assert vr.upper == Limit(10, False) + + +def test_equality_predicate(): + vr = ValueRange() + vr.update_with_predicate("=", 5) + assert vr.lower == vr.upper == Limit(5, True) + + +def test_untrackable_condition(): + vr = ValueRange() + vr.update_with_predicate("=", 5) + vr.update_with_predicate("LIKE", "%name") # This should make the range untrackable + assert vr.untrackable + + +def test_predicate_compaction(): + vr = ValueRange() + # Apply a series of less-than predicates + vr.update_with_predicate("<", 10) + vr.update_with_predicate("<", 7) # Most restrictive + vr.update_with_predicate("<", 8) + vr.update_with_predicate("<", 9) + + # After compaction, the upper limit should be '< 7' + expected_upper_limit = Limit(7, False) # Assuming exclusive bounds for '<' + assert ( + vr.upper == expected_upper_limit + ), f"Expected upper limit to be {expected_upper_limit}, got {vr.upper}" + + +test_initialization() +test_updates_with_various_predicates() +test_equality_predicate() +test_untrackable_condition() +test_predicate_compaction() +print("okay") diff --git a/opteryx/planner/temporary_physical_planner.py b/opteryx/planner/temporary_physical_planner.py index e7f434f3d..f0523674c 100644 --- a/opteryx/planner/temporary_physical_planner.py +++ b/opteryx/planner/temporary_physical_planner.py @@ -69,7 +69,11 @@ def create_physical_plan(logical_plan, query_properties): elif node_type == LogicalPlanStepType.Project: node = operators.ProjectionNode(query_properties, projection=logical_node.columns) elif node_type == LogicalPlanStepType.Scan: - node = operators.ScannerNode(query_properties, **node_config) + connector = node_config.get("connector") + if connector and hasattr(connector, "async_read_blob"): + node = operators.AsyncScannerNode(query_properties, **node_config) + else: + node = operators.ScannerNode(query_properties, **node_config) elif node_type == LogicalPlanStepType.Set: node = operators.SetVariableNode(query_properties, **node_config) elif node_type == LogicalPlanStepType.Show: diff --git a/opteryx/shared/__init__.py b/opteryx/shared/__init__.py index e4a9f1f34..6f2c9d428 100644 --- a/opteryx/shared/__init__.py +++ b/opteryx/shared/__init__.py @@ -12,7 +12,8 @@ from opteryx.shared.buffer_pool import BufferPool from opteryx.shared.materialized_datasets import MaterializedDatasets +from opteryx.shared.memory_pool import AsyncMemoryPool from opteryx.shared.memory_pool import MemoryPool from opteryx.shared.rolling_log import RollingLog -__all__ = ("BufferPool", "MaterializedDatasets", "MemoryPool", "RollingLog") +__all__ = ("AsyncMemoryPool", "BufferPool", "MaterializedDatasets", "MemoryPool", "RollingLog") diff --git a/opteryx/shared/memory_pool.py b/opteryx/shared/memory_pool.py index 0c17bcb92..affbf073a 100644 --- a/opteryx/shared/memory_pool.py +++ b/opteryx/shared/memory_pool.py @@ -14,8 +14,9 @@ Memory Pool is used to manage access to arbitrary blocks of memory. This is designed to be thread-safe with non-blocking reads. -""" +This module includes an async wrapper around the memory pool +""" from multiprocessing import Lock from typing import Dict @@ -112,6 +113,9 @@ def _level2_compaction(self): def can_commit(self, data: bytes) -> bool: return sum(segment.length for segment in self.free_segments) > len(data) + def available_space(self) -> int: + return sum(segment.length for segment in self.free_segments) + def commit(self, data: bytes) -> int: """ Add an item to the pool and return its reference. @@ -207,6 +211,21 @@ def release(self, ref_id: int): segment = self.used_segments.pop(ref_id) self.free_segments.append(segment) + def read_and_release(self, ref_id: int): + """ + Combine two steps together, we lock everytime here + """ + with self.lock: + self.reads += 1 + self.releases += 1 + if ref_id not in self.used_segments: + raise ValueError("Invalid reference ID.") + self.read_locks += 1 + segment = self.used_segments.pop(ref_id) + view = memoryview(self.pool)[segment.start : segment.start + segment.length] + self.free_segments.append(segment) + return view + @property def stats(self) -> dict: return { @@ -227,3 +246,24 @@ def __del__(self): """ pass # DEBUG: log (f"Memory Pool ({self.name}) ") + + +class AsyncMemoryPool: + def __init__(self, pool: MemoryPool): + self.pool: MemoryPool = pool + + async def commit(self, data: bytes) -> int: + return self.pool.commit(data) + + async def read(self, ref_id: int) -> bytes: + return self.pool.read(ref_id) + + async def release(self, ref_id: int): + self.pool.release(ref_id) + + async def can_commit(self, data: bytes) -> bool: + self.pool.can_commit(data=data) + + @property + def stats(self): + return self.pool.stats diff --git a/tests/misc/test_memory_pool.py b/tests/misc/test_memory_pool.py index 552db18ca..d10d333d8 100644 --- a/tests/misc/test_memory_pool.py +++ b/tests/misc/test_memory_pool.py @@ -8,6 +8,8 @@ sys.path.insert(1, os.path.join(sys.path[0], "../..")) from opteryx.shared import MemoryPool + +# from opteryx.compiled.structures import MemoryPool from orso.tools import random_string @@ -123,6 +125,7 @@ def test_repeated_commits_and_releases(): mp.release(ref) # Optional: Check internal state to ensure all resources are available again mp._level1_compaction() + assert ( mp.free_segments[0].length == mp.size ), "Memory leak detected after repeated commits and releases." @@ -142,8 +145,7 @@ def test_stress_with_random_sized_data(): for ref, _ in refs: mp.release(ref) # Ensure that the pool is not fragmented or leaking - mp_available_space = sum(segment.length for segment in mp.free_segments) - assert mp_available_space >= mp.size - sum( + assert mp.available_space() >= mp.size - sum( size for _, size in refs if size < mp.size ), "Memory fragmentation or leak detected." @@ -183,8 +185,7 @@ def test_repeated_zero_length_commits(): mp.release(ref) # Optional: Verify if the memory pool is back to its initial state. - mp_available_space = sum(segment.length for segment in mp.free_segments) - assert mp_available_space == mp.size, "Memory pool did not recover fully after releases." + assert mp.available_space() == mp.size, "Memory pool did not recover fully after releases." def test_rapid_commit_release(): @@ -194,8 +195,7 @@ def test_rapid_commit_release(): assert ref is not None, "Failed to commit data." mp.release(ref) # Verify memory integrity and state post rapid operations - mp_available_space = sum(segment.length for segment in mp.free_segments) - assert mp_available_space == mp.size, "Memory pool did not return to full availability." + assert mp.available_space() == mp.size, "Memory pool did not return to full availability." def test_commit_max_capacity(): @@ -204,8 +204,7 @@ def test_commit_max_capacity(): ref = mp.commit(data) assert ref is not None, "Failed to commit data that exactly matches the pool capacity." mp.release(ref) - mp_available_space = sum(segment.length for segment in mp.free_segments) - assert mp_available_space == mp.size, "Memory pool did not correctly free up space." + assert mp.available_space() == mp.size, "Memory pool did not correctly free up space." def test_sequential_commits_without_space(): @@ -230,8 +229,7 @@ def test_stress_with_variable_data_sizes(): for ref in refs: mp.release(ref) # Ensure all space is reclaimed - mp_available_space = sum(segment.length for segment in mp.free_segments) - assert mp_available_space == mp.size, "Memory leakage detected with variable data sizes." + assert mp.available_space() == mp.size, "Memory leakage detected with variable data sizes." def test_zero_byte_commit_on_full_pool(): @@ -249,9 +247,8 @@ def test_random_release_order(): random.shuffle(refs) # Randomize the order of releases for ref in refs: mp.release(ref) - mp_available_space = sum(segment.length for segment in mp.free_segments) assert ( - mp_available_space == mp.size + mp.available_space() == mp.size ), "Memory pool failed to reclaim space correctly after random releases." @@ -294,5 +291,4 @@ def thread_task(): if __name__ == "__main__": # pragma: no cover from tests.tools import run_tests - test_concurrent_access() run_tests() From b195f063197409f7a4930d19e8a4371ffb78b8aa Mon Sep 17 00:00:00 2001 From: XB500 Date: Sun, 28 Apr 2024 09:53:13 +0000 Subject: [PATCH 2/2] Opteryx Version 0.14.2-alpha.452 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 93f75906b..7df3f718e 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 450 +__build__ = 452 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.