diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 15ef83946..b3a22c7e9 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 462 +__build__ = 464 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/opteryx/operators/async_scanner_node.py b/opteryx/operators/async_scanner_node.py index 588496059..c058c948c 100644 --- a/opteryx/operators/async_scanner_node.py +++ b/opteryx/operators/async_scanner_node.py @@ -145,7 +145,9 @@ def execute(self) -> Generator: blob_name, reference = item decoder = get_decoder(blob_name) - blob_bytes = self.pool.read_and_release(reference) + # This pool is being used by async processes in another thread, using + # zero copy versions occassionally results in data getting corrupted + blob_bytes = self.pool.read_and_release(reference, zero_copy=False) try: decoded = decoder(blob_bytes, projection=self.columns, selection=self.predicates) num_rows, _, morsel = decoded diff --git a/opteryx/shared/memory_pool.py b/opteryx/shared/memory_pool.py index 41e28c35d..6503719ac 100644 --- a/opteryx/shared/memory_pool.py +++ b/opteryx/shared/memory_pool.py @@ -21,6 +21,7 @@ import asyncio from multiprocessing import Lock from typing import Dict +from typing import Union from orso.tools import random_int @@ -177,7 +178,7 @@ def commit(self, data: bytes) -> int: self.commits += 1 return ref_id - def read(self, ref_id: int) -> bytes: + def read(self, ref_id: int, zero_copy: bool = True) -> Union[bytes, memoryview]: """ We're using an optimistic locking strategy where we do not acquire a lock, perform the read and then check that the metadata hasn't changed @@ -199,7 +200,9 @@ def read(self, ref_id: int) -> bytes: self.read_locks += 1 segment = self.used_segments[ref_id] view = memoryview(self.pool)[segment.start : segment.start + segment.length] - return view + if zero_copy: + return view + return bytes(view) def release(self, ref_id: int): """ @@ -212,7 +215,7 @@ def release(self, ref_id: int): segment = self.used_segments.pop(ref_id) self.free_segments.append(segment) - def read_and_release(self, ref_id: int): + def read_and_release(self, ref_id: int, zero_copy: bool = True) -> Union[bytes, memoryview]: """ Combine two steps together, we lock everytime here """ @@ -225,7 +228,9 @@ def read_and_release(self, ref_id: int): segment = self.used_segments.pop(ref_id) view = memoryview(self.pool)[segment.start : segment.start + segment.length] self.free_segments.append(segment) - return view + if zero_copy: + return view + return bytes(view) @property def stats(self) -> dict: