Skip to content

Commit

Permalink
Merge pull request #1625 from mabel-dev/#1624
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Apr 30, 2024
2 parents 38de13c + 2fcee79 commit 3ce06b2
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 462
__build__ = 464

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
4 changes: 3 additions & 1 deletion opteryx/operators/async_scanner_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ def execute(self) -> Generator:
blob_name, reference = item

decoder = get_decoder(blob_name)
blob_bytes = self.pool.read_and_release(reference)
# This pool is being used by async processes in another thread, using
# zero copy versions occassionally results in data getting corrupted
blob_bytes = self.pool.read_and_release(reference, zero_copy=False)
try:
decoded = decoder(blob_bytes, projection=self.columns, selection=self.predicates)
num_rows, _, morsel = decoded
Expand Down
13 changes: 9 additions & 4 deletions opteryx/shared/memory_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import asyncio
from multiprocessing import Lock
from typing import Dict
from typing import Union

from orso.tools import random_int

Expand Down Expand Up @@ -177,7 +178,7 @@ def commit(self, data: bytes) -> int:
self.commits += 1
return ref_id

def read(self, ref_id: int) -> bytes:
def read(self, ref_id: int, zero_copy: bool = True) -> Union[bytes, memoryview]:
"""
We're using an optimistic locking strategy where we do not acquire
a lock, perform the read and then check that the metadata hasn't changed
Expand All @@ -199,7 +200,9 @@ def read(self, ref_id: int) -> bytes:
self.read_locks += 1
segment = self.used_segments[ref_id]
view = memoryview(self.pool)[segment.start : segment.start + segment.length]
return view
if zero_copy:
return view
return bytes(view)

def release(self, ref_id: int):
"""
Expand All @@ -212,7 +215,7 @@ def release(self, ref_id: int):
segment = self.used_segments.pop(ref_id)
self.free_segments.append(segment)

def read_and_release(self, ref_id: int):
def read_and_release(self, ref_id: int, zero_copy: bool = True) -> Union[bytes, memoryview]:
"""
Combine two steps together, we lock everytime here
"""
Expand All @@ -225,7 +228,9 @@ def read_and_release(self, ref_id: int):
segment = self.used_segments.pop(ref_id)
view = memoryview(self.pool)[segment.start : segment.start + segment.length]
self.free_segments.append(segment)
return view
if zero_copy:
return view
return bytes(view)

@property
def stats(self) -> dict:
Expand Down

0 comments on commit 3ce06b2

Please sign in to comment.