From da337cb3b3813b0e89dab4da25e14905bbbe0476 Mon Sep 17 00:00:00 2001 From: joocer Date: Sun, 28 Apr 2024 14:05:01 +0100 Subject: [PATCH 1/3] #1613 --- opteryx/connectors/capabilities/__init__.py | 3 +- .../connectors/capabilities/asynchronous.py | 20 +++++ opteryx/connectors/capabilities/cacheable.py | 83 +++++++++++++++++++ .../connectors/gcp_cloudstorage_connector.py | 10 ++- opteryx/operators/async_scanner_node.py | 4 +- opteryx/planner/binder/binder_visitor.py | 9 +- 6 files changed, 123 insertions(+), 6 deletions(-) create mode 100644 opteryx/connectors/capabilities/asynchronous.py diff --git a/opteryx/connectors/capabilities/__init__.py b/opteryx/connectors/capabilities/__init__.py index a31bc4046..854acaf85 100644 --- a/opteryx/connectors/capabilities/__init__.py +++ b/opteryx/connectors/capabilities/__init__.py @@ -10,8 +10,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from opteryx.connectors.capabilities.asynchronous import Asynchronous from opteryx.connectors.capabilities.cacheable import Cacheable from opteryx.connectors.capabilities.partitionable import Partitionable from opteryx.connectors.capabilities.predicate_pushable import PredicatePushable -__all__ = ("Cacheable", "Partitionable", "PredicatePushable") +__all__ = ("Asynchronous", "Cacheable", "Partitionable", "PredicatePushable") diff --git a/opteryx/connectors/capabilities/asynchronous.py b/opteryx/connectors/capabilities/asynchronous.py new file mode 100644 index 000000000..9468ea5e6 --- /dev/null +++ b/opteryx/connectors/capabilities/asynchronous.py @@ -0,0 +1,20 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class Asynchronous: + + def __init__(self, **kwargs): + pass + + async def async_read_blob(self, *, blob_name, pool, **kwargs): + pass diff --git a/opteryx/connectors/capabilities/cacheable.py b/opteryx/connectors/capabilities/cacheable.py index 0706eafc5..306bc9fa9 100644 --- a/opteryx/connectors/capabilities/cacheable.py +++ b/opteryx/connectors/capabilities/cacheable.py @@ -11,6 +11,7 @@ # limitations under the License. +import asyncio from functools import wraps from orso.cityhash import CityHash64 @@ -99,3 +100,85 @@ def wrapper(blob_name, statistics, **kwargs): return result return wrapper + + +def async_read_thru_cache(func): + + # Capture the max_evictions value at decoration time + from opteryx import get_cache_manager + from opteryx.managers.cache import NullCache + from opteryx.shared import BufferPool + from opteryx.shared import MemoryPool + + cache_manager = get_cache_manager() + max_evictions = cache_manager.max_evictions_per_query + remote_cache = cache_manager.cache_backend + if not remote_cache: + # rather than make decisions - just use a dummy + remote_cache = NullCache() + + buffer_pool = BufferPool() + + my_keys = set() + + @wraps(func) + async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs): + nonlocal max_evictions + + key = hex(CityHash64(blob_name)).encode() + my_keys.add(key) + + # try the buffer pool first + result = buffer_pool.get(key) + if result is not None: + statistics.bufferpool_hits += 1 + + ref = await pool.commit(result) + while ref is None: + print("*", end="", flush=True) + await asyncio.sleep(1) + ref = await pool.commit(result) + statistics.bytes_read += len(result) + return ref + + # try the remote cache next + result = remote_cache.get(key) + if result is not None: + statistics.remote_cache_hits += 1 + ref = await pool.commit(result) + while ref is None: + print("@", end="", flush=True) + await asyncio.sleep(1) + ref = await pool.commit(result) + statistics.bytes_read += len(result) + return ref + + try: + result = await func(blob_name=blob_name, statistics=statistics, pool=pool, **kwargs) + except Exception as e: + print(f"Error in {func.__name__}: {e}") + raise # Optionally re-raise the error after logging it + + # Write the result to caches + if max_evictions: + # we set a per-query eviction limit + buffer = await pool.read(result) + buffer = buffer.tobytes() + if len(buffer) < buffer_pool.max_cacheable_item_size: + evicted = buffer_pool.set(key, buffer) + remote_cache.set(key, buffer) + if evicted: + # if we're evicting items we're putting into the cache + if evicted in my_keys: + max_evictions = 0 + else: + max_evictions -= 1 + statistics.cache_evictions += 1 + else: + statistics.cache_oversize += 1 + + statistics.cache_misses += 1 + + return result + + return wrapper diff --git a/opteryx/connectors/gcp_cloudstorage_connector.py b/opteryx/connectors/gcp_cloudstorage_connector.py index 87dbdf618..a139823e1 100644 --- a/opteryx/connectors/gcp_cloudstorage_connector.py +++ b/opteryx/connectors/gcp_cloudstorage_connector.py @@ -23,6 +23,7 @@ from orso.types import OrsoTypes from opteryx.connectors.base.base_connector import BaseConnector +from opteryx.connectors.capabilities import Asynchronous from opteryx.connectors.capabilities import Cacheable from opteryx.connectors.capabilities import Partitionable from opteryx.connectors.capabilities import PredicatePushable @@ -37,7 +38,9 @@ OS_SEP = os.sep -class GcpCloudStorageConnector(BaseConnector, Cacheable, Partitionable, PredicatePushable): +class GcpCloudStorageConnector( + BaseConnector, Cacheable, Partitionable, PredicatePushable, Asynchronous +): __mode__ = "Blob" PUSHABLE_OPS: Dict[str, bool] = { @@ -62,6 +65,7 @@ def __init__(self, credentials=None, **kwargs): Partitionable.__init__(self, **kwargs) Cacheable.__init__(self, **kwargs) PredicatePushable.__init__(self, **kwargs) + Asynchronous.__init__(self, **kwargs) self.dataset = self.dataset.replace(".", OS_SEP) self.credentials = credentials @@ -126,7 +130,7 @@ def read_blob(self, *, blob_name, **kwargs): self.statistics.bytes_read += len(content) return content - async def async_read_blob(self, *, blob_name, pool, session, **kwargs): + async def async_read_blob(self, *, blob_name, pool, session, statistics, **kwargs): # For performance we use the GCS API directly, this is roughly 10% # faster than using the SDK. As one of the slowest parts of the system # 10% can be measured in seconds. @@ -160,7 +164,7 @@ async def async_read_blob(self, *, blob_name, pool, session, **kwargs): print(".", end="", flush=True) await asyncio.sleep(1) ref = await pool.commit(data) - self.statistics.bytes_read += len(data) + statistics.bytes_read += len(data) return ref @single_item_cache diff --git a/opteryx/operators/async_scanner_node.py b/opteryx/operators/async_scanner_node.py index 24556cf05..298fd013a 100644 --- a/opteryx/operators/async_scanner_node.py +++ b/opteryx/operators/async_scanner_node.py @@ -85,7 +85,9 @@ async def fetch_data(blob_names, pool, reader, queue, statistics): async def fetch_and_process(blob_name): async with semaphore: start_clock = time.monotonic_ns() - reference = await reader(blob_name=blob_name, pool=pool, session=session) + reference = await reader( + blob_name=blob_name, pool=pool, session=session, statistics=statistics + ) statistics.time_reading_blobs += time.monotonic_ns() - start_clock queue.put((blob_name, reference)) # Put data onto the queue diff --git a/opteryx/planner/binder/binder_visitor.py b/opteryx/planner/binder/binder_visitor.py index 2ca42c4ae..c304b1160 100644 --- a/opteryx/planner/binder/binder_visitor.py +++ b/opteryx/planner/binder/binder_visitor.py @@ -770,8 +770,10 @@ def visit_project(self, node: Node, context: BindingContext) -> Tuple[Node, Bind def visit_scan(self, node: Node, context: BindingContext) -> Tuple[Node, BindingContext]: from opteryx.connectors import connector_factory + from opteryx.connectors.capabilities import Asynchronous from opteryx.connectors.capabilities import Cacheable from opteryx.connectors.capabilities import Partitionable + from opteryx.connectors.capabilities.cacheable import async_read_thru_cache from opteryx.connectors.capabilities.cacheable import read_thru_cache if node.alias in context.relations: @@ -788,7 +790,12 @@ def visit_scan(self, node: Node, context: BindingContext) -> Tuple[Node, Binding if Cacheable in connector_capabilities: # We add the caching mechanism here if the connector is Cacheable and # we've not disable caching - if not "NO_CACHE" in (node.hints or []): + if "NO_CACHE" in (node.hints or []): + pass + if Asynchronous in connector_capabilities: + original_read_blob = node.connector.async_read_blob + node.connector.async_read_blob = async_read_thru_cache(original_read_blob) + else: original_read_blob = node.connector.read_blob node.connector.read_blob = read_thru_cache(original_read_blob) # get them to tell is the schema of the dataset From f4d030b1f47f3e3c372a129321efc90dbc14d836 Mon Sep 17 00:00:00 2001 From: XB500 Date: Sun, 28 Apr 2024 13:05:28 +0000 Subject: [PATCH 2/3] Opteryx Version 0.14.2-alpha.456 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 16083db55..ad243f3d3 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 453 +__build__ = 456 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From d65a2cb75888de77fe84063ad28e453d07794d28 Mon Sep 17 00:00:00 2001 From: XB500 Date: Sun, 28 Apr 2024 13:07:35 +0000 Subject: [PATCH 3/3] Opteryx Version 0.14.2-alpha.457 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index ad243f3d3..05f47846e 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 456 +__build__ = 457 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.