Skip to content

Commit

Permalink
Merge pull request #1619 from mabel-dev/#1613/4
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Apr 28, 2024
2 parents 9e43621 + 7207e45 commit 43c223f
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 64 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 457
__build__ = 459

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
7 changes: 2 additions & 5 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):

ref = await pool.commit(result)
while ref is None:
print("*", end="", flush=True)
await asyncio.sleep(1)
await asyncio.sleep(0.1)
ref = await pool.commit(result)
statistics.bytes_read += len(result)
return ref
Expand All @@ -147,8 +146,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
statistics.remote_cache_hits += 1
ref = await pool.commit(result)
while ref is None:
print("@", end="", flush=True)
await asyncio.sleep(1)
await asyncio.sleep(0.1)
ref = await pool.commit(result)
statistics.bytes_read += len(result)
return ref
Expand All @@ -163,7 +161,6 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
if max_evictions:
# we set a per-query eviction limit
buffer = await pool.read(result)
buffer = buffer.tobytes()
if len(buffer) < buffer_pool.max_cacheable_item_size:
evicted = buffer_pool.set(key, buffer)
remote_cache.set(key, buffer)
Expand Down
29 changes: 17 additions & 12 deletions opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
from typing import Dict
from typing import List

import aiohttp
import pyarrow
from orso.schema import RelationSchema
from orso.tools import single_item_cache
from orso.types import OrsoTypes

from opteryx.connectors.base.base_connector import BaseConnector
Expand Down Expand Up @@ -82,10 +80,13 @@ def __init__(self, credentials=None, **kwargs):
self.client_credentials.refresh(request)
self.access_token = self.client_credentials.token

# Create a HTTP connection session to reduce effort for
# each fetch
# Create a HTTP connection session to reduce effort for each fetch
# synchronous only
self.session = requests.Session()

# cache so we only fetch this once
self.blob_list = {}

def _get_storage_client(self):
try:
from google.cloud import storage
Expand Down Expand Up @@ -131,12 +132,9 @@ def read_blob(self, *, blob_name, **kwargs):
return content

async def async_read_blob(self, *, blob_name, pool, session, statistics, **kwargs):
# For performance we use the GCS API directly, this is roughly 10%
# faster than using the SDK. As one of the slowest parts of the system
# 10% can be measured in seconds.

bucket, _, _, _ = paths.get_parts(blob_name)
print("READ ", blob_name)
# DEBUG: log ("READ ", blob_name)

# Ensure the credentials are valid, refreshing them if necessary
if not self.client_credentials.valid: # pragma: no cover
Expand All @@ -149,6 +147,7 @@ async def async_read_blob(self, *, blob_name, pool, session, statistics, **kwarg
if "kh" not in bucket:
bucket = bucket.replace("va_data", "va-data")
bucket = bucket.replace("data_", "data-")

object_full_path = urllib.parse.quote(blob_name[(len(bucket) + 1) :], safe="")

url = f"https://storage.googleapis.com/storage/v1/b/{bucket}/o/{object_full_path}?alt=media"
Expand All @@ -161,14 +160,19 @@ async def async_read_blob(self, *, blob_name, pool, session, statistics, **kwarg
data = await response.read()
ref = await pool.commit(data)
while ref is None:
print(".", end="", flush=True)
await asyncio.sleep(1)
# DEBUG: log (".", end="", flush=True)
statistics.write_to_buffer_stalls += 1
await asyncio.sleep(0.5)
ref = await pool.commit(data)
statistics.bytes_read += len(data)
statistics.bytes_read += len(data)
return ref

@single_item_cache
def get_list_of_blob_names(self, *, prefix: str) -> List[str]:

# only fetch once per prefix (partition)
if prefix in self.blob_list:
return self.blob_list[prefix]

bucket, object_path, _, _ = paths.get_parts(prefix)
if "kh" not in bucket:
bucket = bucket.replace("va_data", "va-data")
Expand Down Expand Up @@ -201,6 +205,7 @@ def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
if name.endswith(TUPLE_OF_VALID_EXTENSIONS)
)

self.blob_list[prefix] = blob_names
return blob_names

def read_dataset(
Expand Down
22 changes: 7 additions & 15 deletions opteryx/operators/async_scanner_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,23 @@ def normalize_morsel(schema: RelationSchema, morsel: pyarrow.Table) -> pyarrow.T
return morsel.select([col.identity for col in schema.columns])


async def fetch_data(blob_names, pool, reader, queue, statistics):
semaphore = asyncio.Semaphore(
CONCURRENT_READS
) # Adjust based on memory and expected data sizes

async def fetch_data(blob_names, pool, reader, reply_queue, statistics):
semaphore = asyncio.Semaphore(CONCURRENT_READS)
session = aiohttp.ClientSession()

async def fetch_and_process(blob_name):
async with semaphore:
start_clock = time.monotonic_ns()
start_per_blob = time.monotonic_ns()
reference = await reader(
blob_name=blob_name, pool=pool, session=session, statistics=statistics
)
statistics.time_reading_blobs += time.monotonic_ns() - start_clock
queue.put((blob_name, reference)) # Put data onto the queue
reply_queue.put((blob_name, reference)) # Put data onto the queue
statistics.time_reading_blobs += time.monotonic_ns() - start_per_blob

tasks = (fetch_and_process(blob) for blob in blob_names)

await asyncio.gather(*tasks)
queue.put(None)
reply_queue.put(None)
await session.close()


Expand All @@ -115,8 +112,6 @@ def execute(self) -> Generator:
if col.identity in [c.identity for c in self.columns]:
orso_schema_cols.append(col)
orso_schema.columns = orso_schema_cols
arrow_schema = None
start_clock = time.monotonic_ns()

morsel = None

Expand All @@ -129,7 +124,6 @@ def execute(self) -> Generator:

data_queue = queue.Queue()

t = time.monotonic()
loop = asyncio.new_event_loop()
threading.Thread(
target=lambda: loop.run_until_complete(
Expand All @@ -153,7 +147,7 @@ def execute(self) -> Generator:
decoder = get_decoder(blob_name)
blob_bytes = self.pool.read_and_release(reference)
decoded = decoder(blob_bytes, projection=self.columns, selection=self.predicates)
num_rows, num_columns, morsel = decoded
num_rows, _, morsel = decoded
self.statistics.rows_seen += num_rows

morsel = normalize_morsel(orso_schema, morsel)
Expand All @@ -166,5 +160,3 @@ def execute(self) -> Generator:

if morsel:
self.statistics.columns_read += morsel.num_columns

print(time.monotonic() - t)
25 changes: 18 additions & 7 deletions opteryx/shared/memory_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
This module includes an async wrapper around the memory pool
"""

import asyncio
from multiprocessing import Lock
from typing import Dict

Expand Down Expand Up @@ -245,25 +246,35 @@ def __del__(self):
This function exists just to wrap the debug logging
"""
pass
# DEBUG: log (f"Memory Pool ({self.name}) <size={self.size}, commits={self.commits}, reads={self.reads}, releases={self.releases}, L1={self.l1_compaction}, L2={self.l2_compaction}>")
# DEBUG: log (f"Memory Pool ({self.name}) <size={self.size}, commits={self.commits} ({self.failed_commits}), reads={self.reads}, releases={self.releases}, L1={self.l1_compaction}, L2={self.l2_compaction}>")


class AsyncMemoryPool:
def __init__(self, pool: MemoryPool):
self.pool: MemoryPool = pool
self.lock = asyncio.Lock()

async def commit(self, data: bytes) -> int:
return self.pool.commit(data)
async with self.lock:
return self.pool.commit(data)

async def read(self, ref_id: int) -> bytes:
return self.pool.read(ref_id)
"""
In an async environment, we much more certain the bytes will be overwritten
if we don't materialize them
"""
async with self.lock:
return bytes(self.pool.read(ref_id))

async def release(self, ref_id: int):
self.pool.release(ref_id)
async with self.lock:
self.pool.release(ref_id)

async def can_commit(self, data: bytes) -> bool:
self.pool.can_commit(data=data)
async with self.lock:
return self.pool.can_commit(data)

@property
def stats(self):
return self.pool.stats
async def stats(self):
async with self.lock:
return self.pool.stats
73 changes: 73 additions & 0 deletions tests/misc/test_memory_pool_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Stress test the memory pool in an asynchronous execution environment.
We add and remove items from the pool in quick succession, random sizes
and random items removed, to see how it responds to random concurrent
access. This is targetting the async wrapper, but that is a relatively
thin layer over the memory pool.
"""

import os
import sys
import random

sys.path.insert(1, os.path.join(sys.path[0], "../.."))

os.environ["OPTERYX_DEBUG"] = "1"

from opteryx.shared import MemoryPool, AsyncMemoryPool


import asyncio
import random


async def stress_with_random_sized_data():
bmp = MemoryPool(size=100000)
mp = AsyncMemoryPool(bmp)
refs = {}

async def add_random_data():
for _ in range(500):
size = random.randint(10, 100)
data = bytes([random.randint(0, 255) for _ in range(size)])
ref = await mp.commit(data)
if ref is not None:
refs[ref] = data
else:
# Memory pool is likely full, start removing
await remove_random_data()

# Simulate asynchronous behavior
await asyncio.sleep(random.random() * 0.005)

async def remove_random_data():
for _ in range(random.randint(10, 25)):
if refs:
ref = random.choice(list(refs.keys()))
correct_data = refs.pop(ref)
data_removed = await mp.read(ref)
await mp.release(ref)
assert data_removed == correct_data, "Data integrity check failed"

# Start tasks for adding and randomly removing data
tasks = [add_random_data() for _ in range(50)] # Simulating concurrent access
await asyncio.gather(*tasks)

# Final cleanup: remove remaining items
for ref in list(refs):
await mp.read(ref)
await mp.release(ref)

# Ensure all memory is accounted for
assert bmp.available_space() == bmp.size, "Memory leak or fragmentation detected."


def test_async_memorypool():
asyncio.run(stress_with_random_sized_data())


if __name__ == "__main__": # pragma: no cover
from tests.tools import run_tests

run_tests()
Loading

0 comments on commit 43c223f

Please sign in to comment.