diff --git a/.github/workflows/regression_suite.yaml b/.github/workflows/regression_suite.yaml index dac54768e..7141e0d9f 100644 --- a/.github/workflows/regression_suite.yaml +++ b/.github/workflows/regression_suite.yaml @@ -50,6 +50,11 @@ jobs: run: | echo "${{ secrets.ASTRA_SECURE_BUNDLE_BASE64 }}" | base64 -d > secure-connect.zip + - name: run memchache test + run: python tests/storage/test_cache_memcached.py + env: + MEMCACHED_SERVER: 'localhost:11211' + - name: Run Regression Tests run: python -m coverage run -m pytest --color=yes env: @@ -68,9 +73,12 @@ jobs: MEMCACHED_SERVER: 'localhost:11211' DATASTAX_CLIENT_ID: '${{ secrets.DATASTAX_CLIENT_ID }}' DATASTAX_CLIENT_SECRET: '${{ secrets.DATASTAX_CLIENT_SECRET }}' + OPTERYX_DEBUG: 1 + MAX_LOCAL_BUFFER_CAPACITY: 100 + MAX_CACHE_EVICTIONS_PER_QUERY: 4 - name: Check Coverage - run: python -m coverage report --include=opteryx/** --fail-under=80 -m + run: python -m coverage report --include=opteryx/** --fail-under=90 -m - name: "Upload coverage to Codecov" if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.10' diff --git a/opteryx.yaml b/opteryx.yaml deleted file mode 100644 index 0e0697cf9..000000000 --- a/opteryx.yaml +++ /dev/null @@ -1,4 +0,0 @@ - -PARTITION_SCHEME: mabel -PROFILE_LOCATION: execution_plan.txt -QUERY_LOG_LOCATION: query.log \ No newline at end of file diff --git a/opteryx/__version__.py b/opteryx/__version__.py index be55fbb5c..516842e4b 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 509 +__build__ = 512 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/opteryx/config.py b/opteryx/config.py index d8ca22041..002bccc36 100644 --- a/opteryx/config.py +++ b/opteryx/config.py @@ -132,7 +132,7 @@ def get(key, default=None): # GCP project ID - for Google Cloud Data GCP_PROJECT_ID: str = get("GCP_PROJECT_ID") # The maximum number of evictions by a single query -MAX_CACHE_EVICTIONS_PER_QUERY: int = int(get("MAX_CACHE_EVICTIONS_PER_QUERY", 32)) +MAX_CACHE_EVICTIONS_PER_QUERY: int = int(get("MAX_CACHE_EVICTIONS_PER_QUERY", 64)) # Maximum size for items saved to the buffer cache MAX_CACHEABLE_ITEM_SIZE: int = int(get("MAX_CACHEABLE_ITEM_SIZE", 2 * 1024 * 1024)) # The local buffer pool size in either bytes or fraction of system memory diff --git a/opteryx/connectors/disk_connector.py b/opteryx/connectors/disk_connector.py index 7240b0be2..16da7e065 100644 --- a/opteryx/connectors/disk_connector.py +++ b/opteryx/connectors/disk_connector.py @@ -21,7 +21,6 @@ import pyarrow from orso.schema import RelationSchema -from orso.tools import single_item_cache from orso.types import OrsoTypes from opteryx.connectors.base.base_connector import BaseConnector diff --git a/opteryx/managers/cache/memcached.py b/opteryx/managers/cache/memcached.py index 46f6f60fc..446e2f27b 100644 --- a/opteryx/managers/cache/memcached.py +++ b/opteryx/managers/cache/memcached.py @@ -108,7 +108,7 @@ def get(self, key: bytes) -> Union[bytes, None]: import datetime print( - f"{datetime.datetime.now()} [CACHE] Disabling remote Memcached cache due to persistent errors ({err})." + f"{datetime.datetime.now()} [CACHE] Disabling remote Memcached cache due to persistent errors ({err}) [GET]." ) self.errors += 1 return None @@ -121,10 +121,17 @@ def set(self, key: bytes, value: bytes) -> None: try: self._server.set(key, value) self.sets += 1 - except: + except Exception as err: # if we fail to set, stop trying self._consecutive_failures = MAXIMUM_CONSECUTIVE_FAILURES self.errors += 1 + import datetime + + print( + f"{datetime.datetime.now()} [CACHE] Disabling remote Memcached cache due to persistent errors ({err}) [SET]." + ) + else: + self.skips += 1 def __del__(self): pass diff --git a/requirements.txt b/requirements.txt index 0cde14600..e8eefb88c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ cython numpy==1.* orjson orso>=0.0.158 +psutil pyarrow>=12.0.1 typer==0.11.* aiohttp diff --git a/tests/storage/test_cache_memcached.py b/tests/storage/test_cache_memcached.py index fbaec086a..f2f3cbc1f 100644 --- a/tests/storage/test_cache_memcached.py +++ b/tests/storage/test_cache_memcached.py @@ -27,23 +27,27 @@ def test_memcached_cache(): cache._server.flush_all() opteryx.set_cache_manager(CacheManager(cache_backend=cache)) - # read the data five times, this should populate the cache if it hasn't already + conn = opteryx.Connection() + + # read the data ten times, this should populate the cache if it hasn't already for i in range(10): - cur = opteryx.query("SELECT * FROM testdata.flat.ten_files;") + cur = conn.cursor() + cur.execute("SELECT * FROM testdata.flat.ten_files;") # read the data again time, this should hit the cache - cur = opteryx.query("SELECT * FROM testdata.flat.ten_files;") + cur = conn.cursor() + cur.execute("SELECT * FROM testdata.flat.ten_files;") stats = cur.stats assert ( cache.hits >= 11 - ), f"hits: {cache.hits}, misses: {cache.misses}, skips: {cache.skips}, errors: {cache.errors}" + ), f"hits: {cache.hits}, misses: {cache.misses}, skips: {cache.skips}, errors: {cache.errors}, sets: {cache.sets}" assert ( cache.skips == 0 - ), f"hits: {cache.hits}, misses: {cache.misses}, skips: {cache.skips}, errors: {cache.errors}" + ), f"hits: {cache.hits}, misses: {cache.misses}, skips: {cache.skips}, errors: {cache.errors}, sets: {cache.sets}" assert ( cache.errors == 0 - ), f"hits: {cache.hits}, misses: {cache.misses}, skips: {cache.skips}, errors: {cache.errors}" + ), f"hits: {cache.hits}, misses: {cache.misses}, skips: {cache.skips}, errors: {cache.errors}, sets: {cache.sets}" assert stats["remote_cache_hits"] >= stats["blobs_read"], stats assert stats.get("cache_misses", 0) == 0, stats