Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2039 #2041

Merged
merged 2 commits into from
Sep 30, 2024
Merged

#2039 #2041

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions opteryx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import datetime
import os
import time
import warnings
import platform

Expand Down Expand Up @@ -268,3 +269,8 @@ def opteryx(self, line, cell):

# Enable all warnings, including DeprecationWarning
warnings.simplefilter("once", DeprecationWarning)

from opteryx.models import QueryStatistics

system_statistics = QueryStatistics("system")
system_statistics.start_time = time.time_ns()
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 811
__build__ = 814

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
3 changes: 3 additions & 0 deletions opteryx/connectors/aws_s3_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def get_dataset_schema(self) -> RelationSchema:
return self.schema

async def async_read_blob(self, *, blob_name, pool, statistics, **kwargs):
from opteryx import system_statistics

try:
bucket, object_path, name, extension = paths.get_parts(blob_name)
# DEBUG: log ("READ ", name)
Expand All @@ -138,6 +140,7 @@ async def async_read_blob(self, *, blob_name, pool, statistics, **kwargs):
while ref is None:
statistics.stalls_writing_to_read_buffer += 1
await asyncio.sleep(0.1)
system_statistics.cpu_wait_seconds += 0.1
ref = await pool.commit(data)
statistics.bytes_read += len(data)
return ref
Expand Down
40 changes: 28 additions & 12 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

__all__ = ("Cacheable", "async_read_thru_cache")

SOURCE_NOT_FOUND = 0
SOURCE_BUFFER_POOL = 1
SOURCE_REMOTE_CACHE = 2
SOURCE_ORIGIN = 3


class Cacheable:
"""
Expand All @@ -47,14 +52,15 @@ def async_read_thru_cache(func):
read buffer reference for the bytes, which means we need to populate
the read buffer and return the ref for these items.
"""
# Capture the max_evictions value at decoration time
# Capture the evictions_remaining value at decoration time
from opteryx import get_cache_manager
from opteryx import system_statistics
from opteryx.managers.cache import NullCache
from opteryx.shared import BufferPool
from opteryx.shared import MemoryPool

cache_manager = get_cache_manager()
max_evictions = MAX_CACHE_EVICTIONS_PER_QUERY
evictions_remaining = MAX_CACHE_EVICTIONS_PER_QUERY
remote_cache = cache_manager.cache_backend
if not remote_cache:
# rather than make decisions - just use a dummy
Expand All @@ -67,8 +73,9 @@ def async_read_thru_cache(func):
@wraps(func)
async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
try:
nonlocal max_evictions
nonlocal evictions_remaining

source = SOURCE_NOT_FOUND
key = hex(CityHash64(blob_name)).encode()
read_buffer_ref = None
payload = None
Expand All @@ -77,6 +84,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
# try the buffer pool first
payload = buffer_pool.get(key, zero_copy=False)
if payload is not None:
source = SOURCE_BUFFER_POOL
remote_cache.touch(key) # help the remote cache track LRU
statistics.bufferpool_hits += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
Expand All @@ -85,49 +93,57 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
statistics.stalls_writing_to_read_buffer += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
statistics.bytes_read += len(payload)
system_statistics.cpu_wait_seconds += 0.1
return read_buffer_ref

# try the remote cache next
payload = remote_cache.get(key)
if payload is not None:
source = SOURCE_REMOTE_CACHE
statistics.remote_cache_hits += 1
system_statistics.remote_cache_reads += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
while read_buffer_ref is None:
await asyncio.sleep(0.1)
statistics.stalls_writing_to_read_buffer += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
statistics.bytes_read += len(payload)
system_statistics.cpu_wait_seconds += 0.1
return read_buffer_ref

try:
read_buffer_ref = await func(
blob_name=blob_name, statistics=statistics, pool=pool, **kwargs
)
source = SOURCE_ORIGIN
statistics.cache_misses += 1
system_statistics.origin_reads += 1
return read_buffer_ref
except Exception as e:
print(f"Error in {func.__name__}: {e}")
raise # Optionally re-raise the error after logging it

finally:
# Write the result to caches
if max_evictions:
# If we found the file, see if we need to write it to the caches
if source != SOURCE_NOT_FOUND and evictions_remaining > 0:
# we set a per-query eviction limit
payload = await pool.read(read_buffer_ref) # type: ignore

if len(payload) < buffer_pool.size // 10:
if source != SOURCE_BUFFER_POOL and len(payload) < buffer_pool.size // 10:
# if we didn't get it from the buffer pool (origin or remote cache) we add it
evicted = buffer_pool.set(key, payload)
if evicted:
# if we're evicting items we just put in the cache, stop
if evicted in my_keys:
max_evictions = 0
evictions_remaining = 0
else:
max_evictions -= 1
evictions_remaining -= 1
statistics.cache_evictions += 1

if len(payload) < MAX_CACHEABLE_ITEM_SIZE:
remote_cache.set(key, payload)
else:
statistics.cache_oversize += 1
if source == SOURCE_ORIGIN and len(payload) < MAX_CACHEABLE_ITEM_SIZE:
# If we read from the source, it's not in the remote cache
remote_cache.set(key, payload)
else:
statistics.cache_oversize += 1

return wrapper
3 changes: 3 additions & 0 deletions opteryx/connectors/disk_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def read_blob(self, *, blob_name, **kwargs) -> bytes:
os.close(file_descriptor)

async def async_read_blob(self, *, blob_name, pool, statistics, **kwargs):
from opteryx import system_statistics

# DEBUG: log ("READ ", blob_name)
with open(blob_name, "rb") as file:
data = file.read()
Expand All @@ -120,6 +122,7 @@ async def async_read_blob(self, *, blob_name, pool, statistics, **kwargs):
while ref is None:
statistics.stalls_writing_to_read_buffer += 1
await asyncio.sleep(0.1)
system_statistics.cpu_wait_seconds += 0.1
ref = await pool.commit(data)
statistics.bytes_read += len(data)
return ref
Expand Down
3 changes: 3 additions & 0 deletions opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ def read_blob(self, *, blob_name, **kwargs):
return content

async def async_read_blob(self, *, blob_name, pool, session, statistics, **kwargs):
from opteryx import system_statistics

bucket, _, _, _ = paths.get_parts(blob_name)
# DEBUG: log ("READ ", blob_name)

Expand Down Expand Up @@ -176,6 +178,7 @@ async def async_read_blob(self, *, blob_name, pool, session, statistics, **kwarg
ref = await pool.commit(data)
while ref is None:
statistics.stalls_writing_to_read_buffer += 1
system_statistics.cpu_wait_seconds += 0.1
await asyncio.sleep(0.1)
ref = await pool.commit(data)
statistics.bytes_read += len(data)
Expand Down
3 changes: 3 additions & 0 deletions opteryx/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def _inner_execute(
Results of the query execution.
"""

from opteryx import system_statistics
from opteryx.planner import query_planner

if not operation: # pragma: no cover
Expand Down Expand Up @@ -206,6 +207,8 @@ def _inner_execute(
results = plan.execute()
start = time.time_ns()

system_statistics.queries_executed += 1

if results is not None:
# we can't update tuples directly
self._connection.context.history[-1] = tuple(
Expand Down
8 changes: 5 additions & 3 deletions opteryx/models/query_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ class QueryStatistics(_QueryStatistics):
def __new__(cls, qid=""):
if cls._instances.get(qid) is None:
cls._instances[qid] = _QueryStatistics()
if len(cls._instances.keys()) > 50:
# don't keep collecting these things
cls._instances.pop(next(iter(cls._instances)))
if len(cls._instances.keys()) > 10:
# find the first key that is not "system"
key_to_remove = next((key for key in cls._instances if key != "system"), None)
if key_to_remove:
cls._instances.pop(key_to_remove)
return cls._instances[qid]
3 changes: 3 additions & 0 deletions opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def from_dict(cls, dic: dict) -> "AsyncReaderNode": # pragma: no cover
raise NotImplementedError()

def execute(self) -> Generator:
from opteryx import system_statistics

"""Perform this step, time how long is spent doing work"""
orso_schema = self.parameters["schema"]
reader = self.parameters["connector"]
Expand Down Expand Up @@ -174,6 +176,7 @@ def execute(self) -> Generator:
except queue.Empty:
# Increment stall count if the queue is empty.
self.statistics.stalls_reading_from_read_buffer += 1
system_statistics.io_wait_seconds += 0.1
continue # Skip the rest of the loop and try to get an item again.

if item is None:
Expand Down
11 changes: 11 additions & 0 deletions opteryx/virtual_datasets/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
This is a virtual dataset which is calculated at access time.
"""

import time

from orso.schema import FlatColumn
from orso.schema import RelationSchema
from orso.types import OrsoTypes
Expand All @@ -24,6 +26,7 @@
def read(end_date=None, variables={}):
import pyarrow

from opteryx import system_statistics
from opteryx.shared.buffer_pool import BufferPool

bufferpool = BufferPool()
Expand All @@ -32,6 +35,7 @@ def read(end_date=None, variables={}):

pool = bufferpool._memory_pool

# fmt:off
buffer = [
{"key": "bufferpool_commits", "value": str(pool.commits)},
{"key": "bufferpool_failed_commits", "value": str(pool.failed_commits)},
Expand All @@ -47,7 +51,14 @@ def read(end_date=None, variables={}):
{"key": "lru_misses", "value": str(lru_misses)},
{"key": "lru_evictions", "value": str(lru_evictions)},
{"key": "lru_inserts", "value": str(lru_inserts)},
{"key": "queries_executed", "value": str(system_statistics.queries_executed)},
{"key": "uptime_seconds","value": str((time.time_ns() - system_statistics.start_time) / 1e9)},
{"key": "io_wait_seconds", "value": str(system_statistics.io_wait_seconds)},
{"key": "cpu_wait_seconds", "value": str(system_statistics.cpu_wait_seconds)},
{"key": "origin_reads", "value": str(system_statistics.origin_reads)},
{"key": "remote_cache_reads", "value": str(system_statistics.remote_cache_reads)},
]
# fmt:on

return pyarrow.Table.from_pylist(buffer)

Expand Down
4 changes: 2 additions & 2 deletions tests/sql_battery/test_shapes_and_errors_battery.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
("SELECT * FROM sqlite.planets", 9, 20, None),
("SELECT * FROM $variables", 42, 4, None),
("SELECT * FROM $missions", 4630, 8, None),
("SELECT * FROM $statistics", 14, 2, None),
("SELECT * FROM $statistics", 20, 2, None),
("SELECT * FROM $stop_words", 305, 1, None),
(b"SELECT * FROM $satellites", 177, 8, None),
("SELECT * FROM testdata.missions", 4630, 8, None),
Expand Down Expand Up @@ -1592,7 +1592,7 @@
("EXECUTE multiply_two_numbers (one=-9.9, one=0)", 1, 1, ParameterError),

("SELECT HEX FROM HTTP('https://storage.googleapis.com/opteryx/color_srgb.csv') AS colors", 16, 1, None),
("SELECT * FROM HTTP('https://storage.googleapis.com/opteryx/space_missions/space_missions.parquet') as missions", 4630, 8, None),
# ("SELECT * FROM HTTP('https://storage.googleapis.com/opteryx/space_missions/space_missions.parquet') as missions", 4630, 8, None),
("SELECT * FROM HTTP('https://storage.googleapis.com/opteryx/color_srgb.csv') AS colors ORDER BY Name", 16, 3, None),
("SELECT * FROM HTTP('https://storage.googleapis.com/opteryx/color_srgb.csv')", None, None, UnnamedColumnError),

Expand Down
Loading