Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HOUSEKEEPING #1650

Merged
merged 6 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 489
__build__ = 492

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 2 additions & 0 deletions opteryx/connectors/disk_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ async def async_read_blob(self, *, blob_name, pool, statistics, **kwargs):
# DEBUG: log ("READ ", blob_name)
with open(blob_name, "rb") as file:
data = file.read()
if len(data) > pool.size():
raise ValueError(f"File {blob_name} is too large for read buffer")
ref = await pool.commit(data)
while ref is None:
statistics.stalls_writing_to_read_buffer += 1
Expand Down
14 changes: 6 additions & 8 deletions opteryx/managers/schemes/mabel_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
class UnsupportedSegementationError(DataError):
"""Exception raised for unsupported segmentations."""

def __init__(self, dataset: str, segments: set = None):
def __init__(self, dataset: str, segment: str):
self.dataset = dataset
self.segments = segments
message = f"'{dataset}' contains unsupported segmentation (`{'`, `'.join(segments)}`), only 'by_hour' segments are supported."
self.segment = segment
message = f"'{dataset}' contains unsupported segmentation (found `{segment}`), only 'by_hour' segments are supported."
super().__init__(message)


Expand Down Expand Up @@ -124,14 +124,12 @@ def _inner(*, date, start, end):
if by_label in blob:
segment = extract_prefix(blob, "by_")
if segment != "by_hour":
raise UnsupportedSegementationError(dataset=prefix, segments=[segment])
raise UnsupportedSegementationError(dataset=prefix, segment=segment)

if any(f"{OS_SEP}by_hour{OS_SEP}" in blob_name for blob_name in data_blobs):

if start > date:
start = date
if end < date:
end = date
start = min(start, date)
end = max(end, date)

selected_blobs = []

Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from opteryx.utils.file_decoders import get_decoder

CONCURRENT_READS = 4
MAX_BUFFER_SIZE_MB = 256
MAX_BUFFER_SIZE_MB = 512


def normalize_morsel(schema: RelationSchema, morsel: pyarrow.Table) -> pyarrow.Table:
Expand Down
2 changes: 1 addition & 1 deletion opteryx/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
# limitations under the License.

from opteryx.compiled.structures import MemoryPool
from opteryx.shared.async_memory_pool import AsyncMemoryPool
from opteryx.shared.buffer_pool import BufferPool
from opteryx.shared.materialized_datasets import MaterializedDatasets
from opteryx.shared.memory_pool import AsyncMemoryPool
from opteryx.shared.rolling_log import RollingLog

__all__ = ("AsyncMemoryPool", "BufferPool", "MaterializedDatasets", "MemoryPool", "RollingLog")
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ async def read(self, ref_id: int) -> bytes:
async def release(self, ref_id: int):
async with self.lock:
self.pool.release(ref_id)

def size(self):
return self.pool.size
2 changes: 1 addition & 1 deletion opteryx/shared/buffer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"""
from typing import Optional

from opteryx.shared.memory_pool import MemoryPool
from opteryx.shared import MemoryPool
from opteryx.utils.lru_2 import LRU2


Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ skips = ["B101", "B105", "B324", "B608"]
# B105 - Hardcoded passwords
# B324 - Use of weak crypto
# B608 - Hardcoded SQL

[build-system]
requires = ["setuptools>=42", "wheel", "Cython", "numpy"]
build-backend = "setuptools.build_meta"
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ orjson
orso>=0.0.151
pyarrow>=12.0.1
typer==0.11.*

aiohttp
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ def rust_build(setup_kwargs: Dict[str, Any]) -> None:
"entry_points": {
"console_scripts": ["opteryx=opteryx.command:main"],
},
"package_data": {
"": ["*.pyx", "*.pxd"],
},
}

rust_build(setup_config)
Expand Down
3 changes: 2 additions & 1 deletion tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ cassandra-driver

setuptools_rust

aiohttp
aiohttp
psutil
Loading