Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Sep 30, 2024
1 parent 76d34da commit 132ef67
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 17 deletions.
6 changes: 6 additions & 0 deletions opteryx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import datetime
import os
import time
import warnings
import platform

Expand Down Expand Up @@ -268,3 +269,8 @@ def opteryx(self, line, cell):

# Enable all warnings, including DeprecationWarning
warnings.simplefilter("once", DeprecationWarning)

from opteryx.models import QueryStatistics

system_statistics = QueryStatistics("system")
system_statistics.start_time = time.time_ns()
3 changes: 3 additions & 0 deletions opteryx/connectors/aws_s3_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def get_dataset_schema(self) -> RelationSchema:
return self.schema

async def async_read_blob(self, *, blob_name, pool, statistics, **kwargs):
from opteryx import system_statistics

try:
bucket, object_path, name, extension = paths.get_parts(blob_name)
# DEBUG: log ("READ ", name)
Expand All @@ -138,6 +140,7 @@ async def async_read_blob(self, *, blob_name, pool, statistics, **kwargs):
while ref is None:
statistics.stalls_writing_to_read_buffer += 1
await asyncio.sleep(0.1)
system_statistics.cpu_wait_seconds += 0.1
ref = await pool.commit(data)
statistics.bytes_read += len(data)
return ref
Expand Down
40 changes: 28 additions & 12 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

__all__ = ("Cacheable", "async_read_thru_cache")

SOURCE_NOT_FOUND = 0
SOURCE_BUFFER_POOL = 1
SOURCE_REMOTE_CACHE = 2
SOURCE_ORIGIN = 3


class Cacheable:
"""
Expand All @@ -47,14 +52,15 @@ def async_read_thru_cache(func):
read buffer reference for the bytes, which means we need to populate
the read buffer and return the ref for these items.
"""
# Capture the max_evictions value at decoration time
# Capture the evictions_remaining value at decoration time
from opteryx import get_cache_manager
from opteryx import system_statistics
from opteryx.managers.cache import NullCache
from opteryx.shared import BufferPool
from opteryx.shared import MemoryPool

cache_manager = get_cache_manager()
max_evictions = MAX_CACHE_EVICTIONS_PER_QUERY
evictions_remaining = MAX_CACHE_EVICTIONS_PER_QUERY
remote_cache = cache_manager.cache_backend
if not remote_cache:
# rather than make decisions - just use a dummy
Expand All @@ -67,8 +73,9 @@ def async_read_thru_cache(func):
@wraps(func)
async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
try:
nonlocal max_evictions
nonlocal evictions_remaining

source = SOURCE_NOT_FOUND
key = hex(CityHash64(blob_name)).encode()
read_buffer_ref = None
payload = None
Expand All @@ -77,6 +84,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
# try the buffer pool first
payload = buffer_pool.get(key, zero_copy=False)
if payload is not None:
source = SOURCE_BUFFER_POOL
remote_cache.touch(key) # help the remote cache track LRU
statistics.bufferpool_hits += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
Expand All @@ -85,49 +93,57 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
statistics.stalls_writing_to_read_buffer += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
statistics.bytes_read += len(payload)
system_statistics.cpu_wait_seconds += 0.1
return read_buffer_ref

# try the remote cache next
payload = remote_cache.get(key)
if payload is not None:
source = SOURCE_REMOTE_CACHE
statistics.remote_cache_hits += 1
system_statistics.remote_cache_reads += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
while read_buffer_ref is None:
await asyncio.sleep(0.1)
statistics.stalls_writing_to_read_buffer += 1
read_buffer_ref = await pool.commit(payload) # type: ignore
statistics.bytes_read += len(payload)
system_statistics.cpu_wait_seconds += 0.1
return read_buffer_ref

try:
read_buffer_ref = await func(
blob_name=blob_name, statistics=statistics, pool=pool, **kwargs
)
source = SOURCE_ORIGIN
statistics.cache_misses += 1
system_statistics.origin_reads += 1
return read_buffer_ref
except Exception as e:
print(f"Error in {func.__name__}: {e}")
raise # Optionally re-raise the error after logging it

finally:
# Write the result to caches
if max_evictions:
# If we found the file, see if we need to write it to the caches
if source != SOURCE_NOT_FOUND and evictions_remaining > 0:
# we set a per-query eviction limit
payload = await pool.read(read_buffer_ref) # type: ignore

if len(payload) < buffer_pool.size // 10:
if source != SOURCE_BUFFER_POOL and len(payload) < buffer_pool.size // 10:
# if we didn't get it from the buffer pool (origin or remote cache) we add it
evicted = buffer_pool.set(key, payload)
if evicted:
# if we're evicting items we just put in the cache, stop
if evicted in my_keys:
max_evictions = 0
evictions_remaining = 0
else:
max_evictions -= 1
evictions_remaining -= 1
statistics.cache_evictions += 1

if len(payload) < MAX_CACHEABLE_ITEM_SIZE:
remote_cache.set(key, payload)
else:
statistics.cache_oversize += 1
if source == SOURCE_ORIGIN and len(payload) < MAX_CACHEABLE_ITEM_SIZE:
# If we read from the source, it's not in the remote cache
remote_cache.set(key, payload)
else:
statistics.cache_oversize += 1

return wrapper
3 changes: 3 additions & 0 deletions opteryx/connectors/disk_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def read_blob(self, *, blob_name, **kwargs) -> bytes:
os.close(file_descriptor)

async def async_read_blob(self, *, blob_name, pool, statistics, **kwargs):
from opteryx import system_statistics

# DEBUG: log ("READ ", blob_name)
with open(blob_name, "rb") as file:
data = file.read()
Expand All @@ -120,6 +122,7 @@ async def async_read_blob(self, *, blob_name, pool, statistics, **kwargs):
while ref is None:
statistics.stalls_writing_to_read_buffer += 1
await asyncio.sleep(0.1)
system_statistics.cpu_wait_seconds += 0.1
ref = await pool.commit(data)
statistics.bytes_read += len(data)
return ref
Expand Down
3 changes: 3 additions & 0 deletions opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ def read_blob(self, *, blob_name, **kwargs):
return content

async def async_read_blob(self, *, blob_name, pool, session, statistics, **kwargs):
from opteryx import system_statistics

bucket, _, _, _ = paths.get_parts(blob_name)
# DEBUG: log ("READ ", blob_name)

Expand Down Expand Up @@ -176,6 +178,7 @@ async def async_read_blob(self, *, blob_name, pool, session, statistics, **kwarg
ref = await pool.commit(data)
while ref is None:
statistics.stalls_writing_to_read_buffer += 1
system_statistics.cpu_wait_seconds += 0.1
await asyncio.sleep(0.1)
ref = await pool.commit(data)
statistics.bytes_read += len(data)
Expand Down
3 changes: 3 additions & 0 deletions opteryx/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def _inner_execute(
Results of the query execution.
"""

from opteryx import system_statistics
from opteryx.planner import query_planner

if not operation: # pragma: no cover
Expand Down Expand Up @@ -206,6 +207,8 @@ def _inner_execute(
results = plan.execute()
start = time.time_ns()

system_statistics.queries_executed += 1

if results is not None:
# we can't update tuples directly
self._connection.context.history[-1] = tuple(
Expand Down
8 changes: 5 additions & 3 deletions opteryx/models/query_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ class QueryStatistics(_QueryStatistics):
def __new__(cls, qid=""):
if cls._instances.get(qid) is None:
cls._instances[qid] = _QueryStatistics()
if len(cls._instances.keys()) > 50:
# don't keep collecting these things
cls._instances.pop(next(iter(cls._instances)))
if len(cls._instances.keys()) > 10:
# find the first key that is not "system"
key_to_remove = next((key for key in cls._instances if key != "system"), None)
if key_to_remove:
cls._instances.pop(key_to_remove)
return cls._instances[qid]
3 changes: 3 additions & 0 deletions opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def from_dict(cls, dic: dict) -> "AsyncReaderNode": # pragma: no cover
raise NotImplementedError()

def execute(self) -> Generator:
from opteryx import system_statistics

"""Perform this step, time how long is spent doing work"""
orso_schema = self.parameters["schema"]
reader = self.parameters["connector"]
Expand Down Expand Up @@ -174,6 +176,7 @@ def execute(self) -> Generator:
except queue.Empty:
# Increment stall count if the queue is empty.
self.statistics.stalls_reading_from_read_buffer += 1
system_statistics.io_wait_seconds += 0.1
continue # Skip the rest of the loop and try to get an item again.

if item is None:
Expand Down
11 changes: 11 additions & 0 deletions opteryx/virtual_datasets/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
This is a virtual dataset which is calculated at access time.
"""

import time

from orso.schema import FlatColumn
from orso.schema import RelationSchema
from orso.types import OrsoTypes
Expand All @@ -24,6 +26,7 @@
def read(end_date=None, variables={}):
import pyarrow

from opteryx import system_statistics
from opteryx.shared.buffer_pool import BufferPool

bufferpool = BufferPool()
Expand All @@ -32,6 +35,7 @@ def read(end_date=None, variables={}):

pool = bufferpool._memory_pool

# fmt:off
buffer = [
{"key": "bufferpool_commits", "value": str(pool.commits)},
{"key": "bufferpool_failed_commits", "value": str(pool.failed_commits)},
Expand All @@ -47,7 +51,14 @@ def read(end_date=None, variables={}):
{"key": "lru_misses", "value": str(lru_misses)},
{"key": "lru_evictions", "value": str(lru_evictions)},
{"key": "lru_inserts", "value": str(lru_inserts)},
{"key": "queries_executed", "value": str(system_statistics.queries_executed)},
{"key": "uptime_seconds","value": str((time.time_ns() - system_statistics.start_time) / 1e9)},
{"key": "io_wait_seconds", "value": str(system_statistics.io_wait_seconds)},
{"key": "cpu_wait_seconds", "value": str(system_statistics.cpu_wait_seconds)},
{"key": "origin_reads", "value": str(system_statistics.origin_reads)},
{"key": "remote_cache_reads", "value": str(system_statistics.remote_cache_reads)},
]
# fmt:on

return pyarrow.Table.from_pylist(buffer)

Expand Down
4 changes: 2 additions & 2 deletions tests/sql_battery/test_shapes_and_errors_battery.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
("SELECT * FROM sqlite.planets", 9, 20, None),
("SELECT * FROM $variables", 42, 4, None),
("SELECT * FROM $missions", 4630, 8, None),
("SELECT * FROM $statistics", 14, 2, None),
("SELECT * FROM $statistics", 20, 2, None),
("SELECT * FROM $stop_words", 305, 1, None),
(b"SELECT * FROM $satellites", 177, 8, None),
("SELECT * FROM testdata.missions", 4630, 8, None),
Expand Down Expand Up @@ -1592,7 +1592,7 @@
("EXECUTE multiply_two_numbers (one=-9.9, one=0)", 1, 1, ParameterError),

("SELECT HEX FROM HTTP('https://storage.googleapis.com/opteryx/color_srgb.csv') AS colors", 16, 1, None),
("SELECT * FROM HTTP('https://storage.googleapis.com/opteryx/space_missions/space_missions.parquet') as missions", 4630, 8, None),
# ("SELECT * FROM HTTP('https://storage.googleapis.com/opteryx/space_missions/space_missions.parquet') as missions", 4630, 8, None),
("SELECT * FROM HTTP('https://storage.googleapis.com/opteryx/color_srgb.csv') AS colors ORDER BY Name", 16, 3, None),
("SELECT * FROM HTTP('https://storage.googleapis.com/opteryx/color_srgb.csv')", None, None, UnnamedColumnError),

Expand Down

0 comments on commit 132ef67

Please sign in to comment.