Skip to content

Commit

Permalink
Merge pull request #1617 from mabel-dev/#1613/3
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Apr 28, 2024
2 parents 3116e23 + d65a2cb commit 9e43621
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 7 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 455
__build__ = 457

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
3 changes: 2 additions & 1 deletion opteryx/connectors/capabilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from opteryx.connectors.capabilities.asynchronous import Asynchronous
from opteryx.connectors.capabilities.cacheable import Cacheable
from opteryx.connectors.capabilities.partitionable import Partitionable
from opteryx.connectors.capabilities.predicate_pushable import PredicatePushable

__all__ = ("Cacheable", "Partitionable", "PredicatePushable")
__all__ = ("Asynchronous", "Cacheable", "Partitionable", "PredicatePushable")
20 changes: 20 additions & 0 deletions opteryx/connectors/capabilities/asynchronous.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class Asynchronous:

def __init__(self, **kwargs):
pass

async def async_read_blob(self, *, blob_name, pool, **kwargs):
pass
83 changes: 83 additions & 0 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# limitations under the License.


import asyncio
from functools import wraps

from orso.cityhash import CityHash64
Expand Down Expand Up @@ -99,3 +100,85 @@ def wrapper(blob_name, statistics, **kwargs):
return result

return wrapper


def async_read_thru_cache(func):

# Capture the max_evictions value at decoration time
from opteryx import get_cache_manager
from opteryx.managers.cache import NullCache
from opteryx.shared import BufferPool
from opteryx.shared import MemoryPool

cache_manager = get_cache_manager()
max_evictions = cache_manager.max_evictions_per_query
remote_cache = cache_manager.cache_backend
if not remote_cache:
# rather than make decisions - just use a dummy
remote_cache = NullCache()

buffer_pool = BufferPool()

my_keys = set()

@wraps(func)
async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
nonlocal max_evictions

key = hex(CityHash64(blob_name)).encode()
my_keys.add(key)

# try the buffer pool first
result = buffer_pool.get(key)
if result is not None:
statistics.bufferpool_hits += 1

ref = await pool.commit(result)
while ref is None:
print("*", end="", flush=True)
await asyncio.sleep(1)
ref = await pool.commit(result)
statistics.bytes_read += len(result)
return ref

# try the remote cache next
result = remote_cache.get(key)
if result is not None:
statistics.remote_cache_hits += 1
ref = await pool.commit(result)
while ref is None:
print("@", end="", flush=True)
await asyncio.sleep(1)
ref = await pool.commit(result)
statistics.bytes_read += len(result)
return ref

try:
result = await func(blob_name=blob_name, statistics=statistics, pool=pool, **kwargs)
except Exception as e:
print(f"Error in {func.__name__}: {e}")
raise # Optionally re-raise the error after logging it

# Write the result to caches
if max_evictions:
# we set a per-query eviction limit
buffer = await pool.read(result)
buffer = buffer.tobytes()
if len(buffer) < buffer_pool.max_cacheable_item_size:
evicted = buffer_pool.set(key, buffer)
remote_cache.set(key, buffer)
if evicted:
# if we're evicting items we're putting into the cache
if evicted in my_keys:
max_evictions = 0
else:
max_evictions -= 1
statistics.cache_evictions += 1
else:
statistics.cache_oversize += 1

statistics.cache_misses += 1

return result

return wrapper
10 changes: 7 additions & 3 deletions opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from orso.types import OrsoTypes

from opteryx.connectors.base.base_connector import BaseConnector
from opteryx.connectors.capabilities import Asynchronous
from opteryx.connectors.capabilities import Cacheable
from opteryx.connectors.capabilities import Partitionable
from opteryx.connectors.capabilities import PredicatePushable
Expand All @@ -37,7 +38,9 @@
OS_SEP = os.sep


class GcpCloudStorageConnector(BaseConnector, Cacheable, Partitionable, PredicatePushable):
class GcpCloudStorageConnector(
BaseConnector, Cacheable, Partitionable, PredicatePushable, Asynchronous
):
__mode__ = "Blob"

PUSHABLE_OPS: Dict[str, bool] = {
Expand All @@ -62,6 +65,7 @@ def __init__(self, credentials=None, **kwargs):
Partitionable.__init__(self, **kwargs)
Cacheable.__init__(self, **kwargs)
PredicatePushable.__init__(self, **kwargs)
Asynchronous.__init__(self, **kwargs)

self.dataset = self.dataset.replace(".", OS_SEP)
self.credentials = credentials
Expand Down Expand Up @@ -126,7 +130,7 @@ def read_blob(self, *, blob_name, **kwargs):
self.statistics.bytes_read += len(content)
return content

async def async_read_blob(self, *, blob_name, pool, session, **kwargs):
async def async_read_blob(self, *, blob_name, pool, session, statistics, **kwargs):
# For performance we use the GCS API directly, this is roughly 10%
# faster than using the SDK. As one of the slowest parts of the system
# 10% can be measured in seconds.
Expand Down Expand Up @@ -160,7 +164,7 @@ async def async_read_blob(self, *, blob_name, pool, session, **kwargs):
print(".", end="", flush=True)
await asyncio.sleep(1)
ref = await pool.commit(data)
self.statistics.bytes_read += len(data)
statistics.bytes_read += len(data)
return ref

@single_item_cache
Expand Down
4 changes: 3 additions & 1 deletion opteryx/operators/async_scanner_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ async def fetch_data(blob_names, pool, reader, queue, statistics):
async def fetch_and_process(blob_name):
async with semaphore:
start_clock = time.monotonic_ns()
reference = await reader(blob_name=blob_name, pool=pool, session=session)
reference = await reader(
blob_name=blob_name, pool=pool, session=session, statistics=statistics
)
statistics.time_reading_blobs += time.monotonic_ns() - start_clock
queue.put((blob_name, reference)) # Put data onto the queue

Expand Down
9 changes: 8 additions & 1 deletion opteryx/planner/binder/binder_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,10 @@ def visit_project(self, node: Node, context: BindingContext) -> Tuple[Node, Bind

def visit_scan(self, node: Node, context: BindingContext) -> Tuple[Node, BindingContext]:
from opteryx.connectors import connector_factory
from opteryx.connectors.capabilities import Asynchronous
from opteryx.connectors.capabilities import Cacheable
from opteryx.connectors.capabilities import Partitionable
from opteryx.connectors.capabilities.cacheable import async_read_thru_cache
from opteryx.connectors.capabilities.cacheable import read_thru_cache

if node.alias in context.relations:
Expand All @@ -788,7 +790,12 @@ def visit_scan(self, node: Node, context: BindingContext) -> Tuple[Node, Binding
if Cacheable in connector_capabilities:
# We add the caching mechanism here if the connector is Cacheable and
# we've not disable caching
if not "NO_CACHE" in (node.hints or []):
if "NO_CACHE" in (node.hints or []):
pass
if Asynchronous in connector_capabilities:
original_read_blob = node.connector.async_read_blob
node.connector.async_read_blob = async_read_thru_cache(original_read_blob)
else:
original_read_blob = node.connector.read_blob
node.connector.read_blob = read_thru_cache(original_read_blob)
# get them to tell is the schema of the dataset
Expand Down

0 comments on commit 9e43621

Please sign in to comment.