Skip to content

Commit

Permalink
Merge pull request #1615 from mabel-dev/#1613/1
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Apr 28, 2024
2 parents 8f47655 + b195f06 commit 777ff33
Show file tree
Hide file tree
Showing 11 changed files with 543 additions and 17 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 450
__build__ = 452

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
1 change: 1 addition & 0 deletions opteryx/compiled/structures/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .hash_table import HashSet
from .hash_table import HashTable
from .hash_table import distinct
from .memory_pool import MemoryPool
from .node import Node
159 changes: 159 additions & 0 deletions opteryx/compiled/structures/memory_pool.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# memory_pool.pyx
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy
from cpython.bytes cimport PyBytes_AsString, PyBytes_FromStringAndSize
from threading import Lock
from orso.tools import random_int
from libcpp.vector cimport vector

cdef struct MemorySegment:
int start
int length

cdef class MemoryPool:
cdef:
unsigned char* pool
public int size
public vector[MemorySegment] free_segments
public dict used_segments
public str name
public int commits, failed_commits, reads, read_locks, l1_compaction, l2_compaction, releases
object lock

def __cinit__(self, int size, str name="Memory Pool"):
if size <= 0:
raise ValueError("MemoryPool size must be a positive integer")
self.size = size
self.pool = <unsigned char*>malloc(size * sizeof(unsigned char))
if not self.pool:
raise MemoryError("Failed to allocate memory pool")
self.name = name
self.free_segments = [MemorySegment(0, size)]
self.used_segments = {}
self.lock = Lock()
# Initialize statistics
self.commits = 0
self.failed_commits = 0
self.reads = 0
self.read_locks = 0
self.l1_compaction = 0
self.l2_compaction = 0
self.releases = 0

def __dealloc__(self):
if self.pool is not NULL:
free(self.pool)

def _find_free_segment(self, int size) -> int:
cdef int i
cdef MemorySegment segment
for i in range(len(self.free_segments)):
segment = self.free_segments[i]
if segment.length >= size:
return i
return -1

def _level1_compaction(self):
cdef int i, n
cdef MemorySegment last_segment, current_segment, segment
cdef vector[MemorySegment] sorted_segments

self.l1_compaction += 1
i = 1
n = len(self.free_segments)

sorted_segments = sorted(self.free_segments, key=lambda x: x["start"])
new_free_segments = [sorted_segments[0]]

for segment in self.free_segments[1:]:
last_segment = new_free_segments[-1]
if last_segment.start + last_segment.length == segment.start:
# If adjacent, merge by extending the last segment
last_segment.length += segment.length
else:
# If not adjacent, just add the segment to the new list
new_free_segments.append(segment)

self.free_segments = new_free_segments

def _level2_compaction(self):
"""
Aggressively compacts by pushing all free memory to the end (Level 2 compaction).
This is slower, but ensures we get the maximum free space.
"""
cdef MemorySegment segment
cdef long segment_id
cdef int offset = 0

self.l2_compaction += 1

total_free_space = sum(segment.length for segment in self.free_segments)
compacted_start = self.size - total_free_space
self.free_segments = [MemorySegment(compacted_start, total_free_space)]

for segment_id, segment in sorted(self.used_segments.items(), key=lambda x: x[1]["start"]):
memcpy(self.pool + offset, self.pool + segment.start, segment.length)
segment.start = offset
self.used_segments[segment_id] = segment
offset += segment.length

def commit(self, bytes data) -> long:
cdef int len_data = len(data)
cdef int segment_index
cdef MemorySegment segment
cdef long ref_id = random_int()

# special case for 0 byte segments
if len_data == 0:
new_segment = MemorySegment(0, 0)
ref_id = random_int()
self.used_segments[ref_id] = new_segment
self.commits += 1
return ref_id

total_free_space = sum(segment.length for segment in self.free_segments)
if total_free_space < len_data:
self.failed_commits += 1
return None

with self.lock:
segment_index = self._find_free_segment(len_data)
if segment_index == -1:
self._level1_compaction()
segment_index = self._find_free_segment(len_data)
if segment_index == -1:
self._level2_compaction()
segment_index = self._find_free_segment(len_data)
if segment_index == -1:
return None # No space available

segment = self.free_segments[segment_index]
self.free_segments.erase(self.free_segments.begin() + segment_index)
if segment.length > len_data:
self.free_segments.push_back(MemorySegment(segment.start + len_data, segment.length - len_data))

memcpy(self.pool + segment.start, PyBytes_AsString(data), len_data)
self.used_segments[ref_id] = MemorySegment(segment.start, len_data)
return ref_id

def read(self, long ref_id) -> bytes:
cdef MemorySegment segment
cdef char* char_ptr = <char*> self.pool
with self.lock:
if ref_id not in self.used_segments:
raise ValueError("Invalid reference ID.")
segment = self.used_segments[ref_id]

data = PyBytes_FromStringAndSize(char_ptr + segment.start, segment.length)
return data

def release(self, long ref_id):
with self.lock:
if ref_id not in self.used_segments:
raise ValueError(f"Invalid reference ID - {ref_id}.")
segment = self.used_segments.pop(ref_id)
self.free_segments.push_back(segment)

def available_space(self) -> int:
return sum(segment.length for segment in self.free_segments)
39 changes: 39 additions & 0 deletions opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import os
import urllib.request
from typing import Dict
from typing import List

import aiohttp
import pyarrow
from orso.schema import RelationSchema
from orso.tools import single_item_cache
Expand Down Expand Up @@ -124,6 +126,43 @@ def read_blob(self, *, blob_name, **kwargs):
self.statistics.bytes_read += len(content)
return content

async def async_read_blob(self, *, blob_name, pool, session, **kwargs):
# For performance we use the GCS API directly, this is roughly 10%
# faster than using the SDK. As one of the slowest parts of the system
# 10% can be measured in seconds.

bucket, _, _, _ = paths.get_parts(blob_name)
print("READ ", blob_name)

# Ensure the credentials are valid, refreshing them if necessary
if not self.client_credentials.valid: # pragma: no cover
from google.auth.transport.requests import Request

request = Request()
self.client_credentials.refresh(request)
self.access_token = self.client_credentials.token

if "kh" not in bucket:
bucket = bucket.replace("va_data", "va-data")
bucket = bucket.replace("data_", "data-")
object_full_path = urllib.parse.quote(blob_name[(len(bucket) + 1) :], safe="")

url = f"https://storage.googleapis.com/storage/v1/b/{bucket}/o/{object_full_path}?alt=media"

async with session.get(
url, headers={"Authorization": f"Bearer {self.access_token}"}, timeout=30
) as response:
if response.status != 200:
raise Exception(f"Unable to read '{blob_name}' - {response.status_code}")
data = await response.read()
ref = await pool.commit(data)
while ref is None:
print(".", end="", flush=True)
await asyncio.sleep(1)
ref = await pool.commit(data)
self.statistics.bytes_read += len(data)
return ref

@single_item_cache
def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
bucket, object_path, _, _ = paths.get_parts(prefix)
Expand Down
1 change: 1 addition & 0 deletions opteryx/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .aggregate_and_group_node import AggregateAndGroupNode # Group is always followed by aggregate
from .aggregate_node import AGGREGATORS
from .aggregate_node import AggregateNode # aggregate data
from .async_scanner_node import AsyncScannerNode

# from .build_statistics_node import BuildStatisticsNode # Analyze Tables
from .cross_join_node import CrossJoinNode # CROSS JOIN
Expand Down
Loading

0 comments on commit 777ff33

Please sign in to comment.