From 283b0ad21e044e80553316e970c1895741e66b8e Mon Sep 17 00:00:00 2001 From: joocer Date: Tue, 30 Apr 2024 12:39:51 +0100 Subject: [PATCH 1/4] #1624 --- opteryx/operators/async_scanner_node.py | 4 +++- opteryx/shared/memory_pool.py | 13 +++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/opteryx/operators/async_scanner_node.py b/opteryx/operators/async_scanner_node.py index 588496059..c058c948c 100644 --- a/opteryx/operators/async_scanner_node.py +++ b/opteryx/operators/async_scanner_node.py @@ -145,7 +145,9 @@ def execute(self) -> Generator: blob_name, reference = item decoder = get_decoder(blob_name) - blob_bytes = self.pool.read_and_release(reference) + # This pool is being used by async processes in another thread, using + # zero copy versions occassionally results in data getting corrupted + blob_bytes = self.pool.read_and_release(reference, zero_copy=False) try: decoded = decoder(blob_bytes, projection=self.columns, selection=self.predicates) num_rows, _, morsel = decoded diff --git a/opteryx/shared/memory_pool.py b/opteryx/shared/memory_pool.py index 41e28c35d..dd3ce5e5f 100644 --- a/opteryx/shared/memory_pool.py +++ b/opteryx/shared/memory_pool.py @@ -21,6 +21,7 @@ import asyncio from multiprocessing import Lock from typing import Dict +from typing import Union from orso.tools import random_int @@ -177,7 +178,7 @@ def commit(self, data: bytes) -> int: self.commits += 1 return ref_id - def read(self, ref_id: int) -> bytes: + def read(self, ref_id: int, zero_copy: bool = True) -> Union[bytes | memoryview]: """ We're using an optimistic locking strategy where we do not acquire a lock, perform the read and then check that the metadata hasn't changed @@ -199,7 +200,9 @@ def read(self, ref_id: int) -> bytes: self.read_locks += 1 segment = self.used_segments[ref_id] view = memoryview(self.pool)[segment.start : segment.start + segment.length] - return view + if zero_copy: + return view + return bytes(view) def release(self, ref_id: int): """ @@ -212,7 +215,7 @@ def release(self, ref_id: int): segment = self.used_segments.pop(ref_id) self.free_segments.append(segment) - def read_and_release(self, ref_id: int): + def read_and_release(self, ref_id: int, zero_copy: bool = True) -> Union[bytes | memoryview]: """ Combine two steps together, we lock everytime here """ @@ -225,7 +228,9 @@ def read_and_release(self, ref_id: int): segment = self.used_segments.pop(ref_id) view = memoryview(self.pool)[segment.start : segment.start + segment.length] self.free_segments.append(segment) - return view + if zero_copy: + return view + return bytes(view) @property def stats(self) -> dict: From d9552def0324a04423f185609b73501780a9531d Mon Sep 17 00:00:00 2001 From: XB500 Date: Tue, 30 Apr 2024 11:43:30 +0000 Subject: [PATCH 2/4] Opteryx Version 0.14.2-beta.463 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index d172a3c37..0b261d12f 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 461 +__build__ = 463 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From b9639b32f411f1253e24d386613c73e6a4a82d5c Mon Sep 17 00:00:00 2001 From: joocer Date: Tue, 30 Apr 2024 12:49:04 +0100 Subject: [PATCH 3/4] #1624 --- opteryx/shared/memory_pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opteryx/shared/memory_pool.py b/opteryx/shared/memory_pool.py index dd3ce5e5f..6503719ac 100644 --- a/opteryx/shared/memory_pool.py +++ b/opteryx/shared/memory_pool.py @@ -178,7 +178,7 @@ def commit(self, data: bytes) -> int: self.commits += 1 return ref_id - def read(self, ref_id: int, zero_copy: bool = True) -> Union[bytes | memoryview]: + def read(self, ref_id: int, zero_copy: bool = True) -> Union[bytes, memoryview]: """ We're using an optimistic locking strategy where we do not acquire a lock, perform the read and then check that the metadata hasn't changed @@ -215,7 +215,7 @@ def release(self, ref_id: int): segment = self.used_segments.pop(ref_id) self.free_segments.append(segment) - def read_and_release(self, ref_id: int, zero_copy: bool = True) -> Union[bytes | memoryview]: + def read_and_release(self, ref_id: int, zero_copy: bool = True) -> Union[bytes, memoryview]: """ Combine two steps together, we lock everytime here """ From 0ed5f1d1bdab1c8280462bf99d34c0628891f5f5 Mon Sep 17 00:00:00 2001 From: XB500 Date: Tue, 30 Apr 2024 11:51:09 +0000 Subject: [PATCH 4/4] Opteryx Version 0.14.2-beta.464 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 0b261d12f..b3a22c7e9 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 463 +__build__ = 464 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.