Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed May 28, 2024
1 parent bf16405 commit d19defa
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 15 deletions.
27 changes: 15 additions & 12 deletions opteryx/compiled/structures/memory_pool.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# cython: language_level=3
# cython: nonecheck=False
# cython: cdivision=True
# cython: initializedcheck=False
# cython: infer_types=True
# cython: wraparound=True
# cython: boundscheck=False

from libc.stdlib cimport malloc, free
from libc.string cimport memcpy
Expand Down Expand Up @@ -81,24 +85,26 @@ cdef class MemoryPool:
cdef vector[MemorySegment] sorted_segments

self.l1_compaction += 1
i = 1
n = len(self.free_segments)
if n <= 1:
return

sorted_segments = sorted(self.free_segments, key=lambda x: x["start"])
new_free_segments = [sorted_segments[0]]
# Sort the free segments by start attribute
self.free_segments = sorted(self.free_segments, key=lambda x: x["start"])
new_free_segments = [self.free_segments[0]]

for segment in sorted_segments[1:]:
for segment in self.free_segments[1:]:
last_segment = new_free_segments[-1]
if last_segment.start + last_segment.length == segment.start:
# If adjacent, merge by extending the last segment
last_segment.length += segment.length
new_free_segments[-1] = last_segment
new_free_segments[-1] = MemorySegment(last_segment.start, last_segment.length + segment.length)
else:
# If not adjacent, just add the segment to the new list
new_free_segments.append(segment)

self.free_segments = new_free_segments


def _level2_compaction(self):
"""
Aggressively compacts by pushing all free memory to the end (Level 2 compaction).
Expand Down Expand Up @@ -134,7 +140,6 @@ cdef class MemoryPool:
# special case for 0 byte segments
if len_data == 0:
new_segment = MemorySegment(0, 0)
ref_id = random_int()
self.used_segments[ref_id] = new_segment
self.commits += 1
return ref_id
Expand Down Expand Up @@ -179,7 +184,7 @@ cdef class MemoryPool:
segment = self.used_segments[ref_id]

if zero_copy != 0:
raw_data = <char[:segment.length]> char_ptr
raw_data = <char[:segment.length]> (char_ptr + segment.start)
data = memoryview(raw_data) # Create a memoryview from the raw data
else:
data = PyBytes_FromStringAndSize(char_ptr + segment.start, segment.length)
Expand All @@ -188,7 +193,6 @@ cdef class MemoryPool:
raise ValueError("Invalid reference ID.")
post_read_segment = self.used_segments[ref_id]
if post_read_segment.start != segment.start or post_read_segment.length != segment.length:

with self.lock:
self.read_locks += 1

Expand All @@ -197,11 +201,10 @@ cdef class MemoryPool:
segment = self.used_segments[ref_id]

if zero_copy != 0:
raw_data = <char[:segment.length]> char_ptr
raw_data = <char[:segment.length]> (char_ptr + segment.start)
data = memoryview(raw_data) # Create a memoryview from the raw data
else:
return PyBytes_FromStringAndSize(char_ptr + segment.start, segment.length)

return data

def read_and_release(self, long ref_id, int zero_copy = 1):
Expand All @@ -219,7 +222,7 @@ cdef class MemoryPool:
self.free_segments.push_back(segment)

if zero_copy != 0:
raw_data = <char[:segment.length]> char_ptr
raw_data = <char[:segment.length]> (char_ptr + segment.start)
return memoryview(raw_data) # Create a memoryview from the raw data
else:
return PyBytes_FromStringAndSize(char_ptr + segment.start, segment.length)
Expand Down
118 changes: 116 additions & 2 deletions tests/misc/test_memory_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def test_compaction():
mp.release(ref1)
ref3 = mp.commit(b"Third")
# Ensure that the third commit succeeds after compaction, despite the first segment being released
assert mp.read(ref3, False) == b"Third"
data = mp.read(ref3, False)
assert data == b"Third"


def test_multiple_commits_and_reads():
Expand All @@ -94,6 +95,119 @@ def test_overlapping_writes():
assert mp.read(ref2, False) == b"abcde"
assert mp.read(ref3, False) == b"XYZ"

def test_overlapping_writes_memcopy():
mp = MemoryPool(size=20)
ref1 = mp.commit(b"12345")
ref2 = mp.commit(b"abcde")
mp.release(ref1)
ref3 = mp.commit(b"XYZ")
# Test if the new write overlaps correctly and does not corrupt other data
r2_memcopy = bytes(mp.read(ref2, True))
r2_no_memcopy = mp.read(ref2, False)
r3_memcopy = bytes(mp.read(ref3, True))
r3_no_memcopy = mp.read(ref3, False)

assert r2_memcopy == r2_no_memcopy == b"abcde", f"{r2_memcopy} / {r2_no_memcopy} / abcde"
assert r3_memcopy == r3_no_memcopy == b"XYZ", f"{r3_memcopy} / {r3_no_memcopy} / XYZ"

def test_zero_copy_vs_copy_reads():
mp = MemoryPool(size=30)

# Initial commits
ref1 = mp.commit(b"12345")
ref2 = mp.commit(b"abcde")
ref3 = mp.commit(b"ABCDE")

# Release one segment to create free space
mp.release(ref1)

# Commit more data to fill the pool
ref4 = mp.commit(b"XYZ")
ref5 = mp.commit(b"7890")

# Additional activity
ref6 = mp.commit(b"LMNOP")
mp.release(ref3)
ref7 = mp.commit(b"qrst")
mp.release(ref2)
ref8 = mp.commit(b"uvwxyz")

# Reading segments with and without zero-copy
r4_memcopy = bytes(mp.read(ref4, True))
r4_no_memcopy = mp.read(ref4, False)
r5_memcopy = bytes(mp.read(ref5, True))
r5_no_memcopy = mp.read(ref5, False)
r6_memcopy = bytes(mp.read(ref6, True))
r6_no_memcopy = mp.read(ref6, False)
r7_memcopy = bytes(mp.read(ref7, True))
r7_no_memcopy = mp.read(ref7, False)
r8_memcopy = bytes(mp.read(ref8, True))
r8_no_memcopy = mp.read(ref8, False)

assert r4_memcopy == r4_no_memcopy == b"XYZ", f"{r4_memcopy} / {r4_no_memcopy} / XYZ"
assert r5_memcopy == r5_no_memcopy == b"7890", f"{r5_memcopy} / {r5_no_memcopy} / 7890"
assert r6_memcopy == r6_no_memcopy == b"LMNOP", f"{r6_memcopy} / {r6_no_memcopy} / LMNOP"
assert r7_memcopy == r7_no_memcopy == b"qrst", f"{r7_memcopy} / {r7_no_memcopy} / qrst"
assert r8_memcopy == r8_no_memcopy == b"uvwxyz", f"{r8_memcopy} / {r8_no_memcopy} / uvwxyz"


def test_zero_copy_vs_copy_reads_and_release():
mp = MemoryPool(size=30)

# Initial commits
ref1 = mp.commit(b"12345")
ref2 = mp.commit(b"abcde")
ref3 = mp.commit(b"ABCDE")

# Release one segment to create free space
mp.release(ref1)

# Commit more data to fill the pool
ref4 = mp.commit(b"XYZ")
ref5 = mp.commit(b"7890")

# Additional activity
ref6 = mp.commit(b"LMNOP")
mp.release(ref3)
ref7 = mp.commit(b"qrst")
mp.release(ref2)
ref8 = mp.commit(b"uvwxyz")

# Reading segments with and without zero-copy, alternating read and read_and_release
# read no zero copy, release zero copy
r4_read_no_memcopy = bytes(mp.read(ref4, False))
r4_release_memcopy = bytes(mp.read_and_release(ref4, True))

# read zero copy, release no zero copy
r5_read_memcopy = bytes(mp.read(ref5, True))
r5_release_no_memcopy = bytes(mp.read_and_release(ref5, False))

# read zero copy, release zero copy
r6_read_memcopy = bytes(mp.read(ref6, True))
r6_release_memcopy = bytes(mp.read_and_release(ref6, True))

# read no zero copy, release no zero copy
r7_read_no_memcopy = bytes(mp.read(ref7, False))
r7_release_no_memcopy = bytes(mp.read_and_release(ref7, False))

# read zero copy, release zero copy
r8_read_memcopy = bytes(mp.read(ref8, True))
r8_release_memcopy = bytes(mp.read_and_release(ref8, True))

assert r4_read_no_memcopy == r4_release_memcopy == b"XYZ", f"{r4_read_no_memcopy} / {r4_release_memcopy} / XYZ"
assert r5_read_memcopy == r5_release_no_memcopy == b"7890", f"{r5_read_memcopy} / {r5_release_no_memcopy} / 7890"
assert r6_read_memcopy == r6_release_memcopy == b"LMNOP", f"{r6_read_memcopy} / {r6_release_memcopy} / LMNOP"
assert r7_read_no_memcopy == r7_release_no_memcopy == b"qrst", f"{r7_read_no_memcopy} / {r7_release_no_memcopy} / qrst"
assert r8_read_memcopy == r8_release_memcopy == b"uvwxyz", f"{r8_read_memcopy} / {r8_release_memcopy} / uvwxyz"

# Ensure that the segments are released and available for new commits
ref9 = mp.commit(b"newdata")
r9_memcopy = bytes(mp.read(ref9, True))
r9_no_memcopy = mp.read(ref9, False)

assert r9_memcopy == r9_no_memcopy == b"newdata", f"{r9_memcopy} / {r9_no_memcopy} / newdata"



def test_pool_exhaustion_and_compaction():
mp = MemoryPool(size=20)
Expand Down Expand Up @@ -375,5 +489,5 @@ def test_return_types():

if __name__ == "__main__": # pragma: no cover
from tests.tools import run_tests

test_compaction_effectiveness()
run_tests()
2 changes: 1 addition & 1 deletion tests/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def run_tests():
for index, method in enumerate(test_methods):
start_time = time.monotonic_ns()
test_name = f"\033[38;2;255;184;108m{(index + 1):04}\033[0m \033[38;2;189;147;249m{str(method.__name__)}\033[0m"
print(test_name.ljust(display_width - 20), end="")
print(test_name.ljust(display_width - 20), end="", flush=True)
error = None
output = ""
try:
Expand Down

0 comments on commit d19defa

Please sign in to comment.