Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1694 #1695

Merged
merged 2 commits into from
May 28, 2024
Merged

#1694 #1695

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 527
__build__ = 532

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
27 changes: 15 additions & 12 deletions opteryx/compiled/structures/memory_pool.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# cython: language_level=3
# cython: nonecheck=False
# cython: cdivision=True
# cython: initializedcheck=False
# cython: infer_types=True
# cython: wraparound=True
# cython: boundscheck=False

from libc.stdlib cimport malloc, free
from libc.string cimport memcpy
Expand Down Expand Up @@ -81,24 +85,26 @@ cdef class MemoryPool:
cdef vector[MemorySegment] sorted_segments

self.l1_compaction += 1
i = 1
n = len(self.free_segments)
if n <= 1:
return

sorted_segments = sorted(self.free_segments, key=lambda x: x["start"])
new_free_segments = [sorted_segments[0]]
# Sort the free segments by start attribute
self.free_segments = sorted(self.free_segments, key=lambda x: x["start"])
new_free_segments = [self.free_segments[0]]

for segment in sorted_segments[1:]:
for segment in self.free_segments[1:]:
last_segment = new_free_segments[-1]
if last_segment.start + last_segment.length == segment.start:
# If adjacent, merge by extending the last segment
last_segment.length += segment.length
new_free_segments[-1] = last_segment
new_free_segments[-1] = MemorySegment(last_segment.start, last_segment.length + segment.length)
else:
# If not adjacent, just add the segment to the new list
new_free_segments.append(segment)

self.free_segments = new_free_segments


def _level2_compaction(self):
"""
Aggressively compacts by pushing all free memory to the end (Level 2 compaction).
Expand Down Expand Up @@ -134,7 +140,6 @@ cdef class MemoryPool:
# special case for 0 byte segments
if len_data == 0:
new_segment = MemorySegment(0, 0)
ref_id = random_int()
self.used_segments[ref_id] = new_segment
self.commits += 1
return ref_id
Expand Down Expand Up @@ -179,7 +184,7 @@ cdef class MemoryPool:
segment = self.used_segments[ref_id]

if zero_copy != 0:
raw_data = <char[:segment.length]> char_ptr
raw_data = <char[:segment.length]> (char_ptr + segment.start)
data = memoryview(raw_data) # Create a memoryview from the raw data
else:
data = PyBytes_FromStringAndSize(char_ptr + segment.start, segment.length)
Expand All @@ -188,7 +193,6 @@ cdef class MemoryPool:
raise ValueError("Invalid reference ID.")
post_read_segment = self.used_segments[ref_id]
if post_read_segment.start != segment.start or post_read_segment.length != segment.length:

with self.lock:
self.read_locks += 1

Expand All @@ -197,11 +201,10 @@ cdef class MemoryPool:
segment = self.used_segments[ref_id]

if zero_copy != 0:
raw_data = <char[:segment.length]> char_ptr
raw_data = <char[:segment.length]> (char_ptr + segment.start)
data = memoryview(raw_data) # Create a memoryview from the raw data
else:
return PyBytes_FromStringAndSize(char_ptr + segment.start, segment.length)

return data

def read_and_release(self, long ref_id, int zero_copy = 1):
Expand All @@ -219,7 +222,7 @@ cdef class MemoryPool:
self.free_segments.push_back(segment)

if zero_copy != 0:
raw_data = <char[:segment.length]> char_ptr
raw_data = <char[:segment.length]> (char_ptr + segment.start)
return memoryview(raw_data) # Create a memoryview from the raw data
else:
return PyBytes_FromStringAndSize(char_ptr + segment.start, segment.length)
Expand Down
118 changes: 116 additions & 2 deletions tests/misc/test_memory_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def test_compaction():
mp.release(ref1)
ref3 = mp.commit(b"Third")
# Ensure that the third commit succeeds after compaction, despite the first segment being released
assert mp.read(ref3, False) == b"Third"
data = mp.read(ref3, False)
assert data == b"Third"


def test_multiple_commits_and_reads():
Expand All @@ -94,6 +95,119 @@ def test_overlapping_writes():
assert mp.read(ref2, False) == b"abcde"
assert mp.read(ref3, False) == b"XYZ"

def test_overlapping_writes_memcopy():
mp = MemoryPool(size=20)
ref1 = mp.commit(b"12345")
ref2 = mp.commit(b"abcde")
mp.release(ref1)
ref3 = mp.commit(b"XYZ")
# Test if the new write overlaps correctly and does not corrupt other data
r2_memcopy = bytes(mp.read(ref2, True))
r2_no_memcopy = mp.read(ref2, False)
r3_memcopy = bytes(mp.read(ref3, True))
r3_no_memcopy = mp.read(ref3, False)

assert r2_memcopy == r2_no_memcopy == b"abcde", f"{r2_memcopy} / {r2_no_memcopy} / abcde"
assert r3_memcopy == r3_no_memcopy == b"XYZ", f"{r3_memcopy} / {r3_no_memcopy} / XYZ"

def test_zero_copy_vs_copy_reads():
mp = MemoryPool(size=30)

# Initial commits
ref1 = mp.commit(b"12345")
ref2 = mp.commit(b"abcde")
ref3 = mp.commit(b"ABCDE")

# Release one segment to create free space
mp.release(ref1)

# Commit more data to fill the pool
ref4 = mp.commit(b"XYZ")
ref5 = mp.commit(b"7890")

# Additional activity
ref6 = mp.commit(b"LMNOP")
mp.release(ref3)
ref7 = mp.commit(b"qrst")
mp.release(ref2)
ref8 = mp.commit(b"uvwxyz")

# Reading segments with and without zero-copy
r4_memcopy = bytes(mp.read(ref4, True))
r4_no_memcopy = mp.read(ref4, False)
r5_memcopy = bytes(mp.read(ref5, True))
r5_no_memcopy = mp.read(ref5, False)
r6_memcopy = bytes(mp.read(ref6, True))
r6_no_memcopy = mp.read(ref6, False)
r7_memcopy = bytes(mp.read(ref7, True))
r7_no_memcopy = mp.read(ref7, False)
r8_memcopy = bytes(mp.read(ref8, True))
r8_no_memcopy = mp.read(ref8, False)

assert r4_memcopy == r4_no_memcopy == b"XYZ", f"{r4_memcopy} / {r4_no_memcopy} / XYZ"
assert r5_memcopy == r5_no_memcopy == b"7890", f"{r5_memcopy} / {r5_no_memcopy} / 7890"
assert r6_memcopy == r6_no_memcopy == b"LMNOP", f"{r6_memcopy} / {r6_no_memcopy} / LMNOP"
assert r7_memcopy == r7_no_memcopy == b"qrst", f"{r7_memcopy} / {r7_no_memcopy} / qrst"
assert r8_memcopy == r8_no_memcopy == b"uvwxyz", f"{r8_memcopy} / {r8_no_memcopy} / uvwxyz"


def test_zero_copy_vs_copy_reads_and_release():
mp = MemoryPool(size=30)

# Initial commits
ref1 = mp.commit(b"12345")
ref2 = mp.commit(b"abcde")
ref3 = mp.commit(b"ABCDE")

# Release one segment to create free space
mp.release(ref1)

# Commit more data to fill the pool
ref4 = mp.commit(b"XYZ")
ref5 = mp.commit(b"7890")

# Additional activity
ref6 = mp.commit(b"LMNOP")
mp.release(ref3)
ref7 = mp.commit(b"qrst")
mp.release(ref2)
ref8 = mp.commit(b"uvwxyz")

# Reading segments with and without zero-copy, alternating read and read_and_release
# read no zero copy, release zero copy
r4_read_no_memcopy = bytes(mp.read(ref4, False))
r4_release_memcopy = bytes(mp.read_and_release(ref4, True))

# read zero copy, release no zero copy
r5_read_memcopy = bytes(mp.read(ref5, True))
r5_release_no_memcopy = bytes(mp.read_and_release(ref5, False))

# read zero copy, release zero copy
r6_read_memcopy = bytes(mp.read(ref6, True))
r6_release_memcopy = bytes(mp.read_and_release(ref6, True))

# read no zero copy, release no zero copy
r7_read_no_memcopy = bytes(mp.read(ref7, False))
r7_release_no_memcopy = bytes(mp.read_and_release(ref7, False))

# read zero copy, release zero copy
r8_read_memcopy = bytes(mp.read(ref8, True))
r8_release_memcopy = bytes(mp.read_and_release(ref8, True))

assert r4_read_no_memcopy == r4_release_memcopy == b"XYZ", f"{r4_read_no_memcopy} / {r4_release_memcopy} / XYZ"
assert r5_read_memcopy == r5_release_no_memcopy == b"7890", f"{r5_read_memcopy} / {r5_release_no_memcopy} / 7890"
assert r6_read_memcopy == r6_release_memcopy == b"LMNOP", f"{r6_read_memcopy} / {r6_release_memcopy} / LMNOP"
assert r7_read_no_memcopy == r7_release_no_memcopy == b"qrst", f"{r7_read_no_memcopy} / {r7_release_no_memcopy} / qrst"
assert r8_read_memcopy == r8_release_memcopy == b"uvwxyz", f"{r8_read_memcopy} / {r8_release_memcopy} / uvwxyz"

# Ensure that the segments are released and available for new commits
ref9 = mp.commit(b"newdata")
r9_memcopy = bytes(mp.read(ref9, True))
r9_no_memcopy = mp.read(ref9, False)

assert r9_memcopy == r9_no_memcopy == b"newdata", f"{r9_memcopy} / {r9_no_memcopy} / newdata"



def test_pool_exhaustion_and_compaction():
mp = MemoryPool(size=20)
Expand Down Expand Up @@ -375,5 +489,5 @@ def test_return_types():

if __name__ == "__main__": # pragma: no cover
from tests.tools import run_tests

test_compaction_effectiveness()
run_tests()
2 changes: 1 addition & 1 deletion tests/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def run_tests():
for index, method in enumerate(test_methods):
start_time = time.monotonic_ns()
test_name = f"\033[38;2;255;184;108m{(index + 1):04}\033[0m \033[38;2;189;147;249m{str(method.__name__)}\033[0m"
print(test_name.ljust(display_width - 20), end="")
print(test_name.ljust(display_width - 20), end="", flush=True)
error = None
output = ""
try:
Expand Down