Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Housekeeping/3 #1633

Merged
merged 7 commits into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 466
__build__ = 471

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
10 changes: 5 additions & 5 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,23 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
if result is not None:
statistics.bufferpool_hits += 1

ref = await pool.commit(result)
ref = await pool.commit(result) # type: ignore
while ref is None:
await asyncio.sleep(0.1)
statistics.stalls_writing_to_read_buffer += 1
ref = await pool.commit(result)
ref = await pool.commit(result) # type: ignore
statistics.bytes_read += len(result)
return ref

# try the remote cache next
result = remote_cache.get(key)
if result is not None:
statistics.remote_cache_hits += 1
ref = await pool.commit(result)
ref = await pool.commit(result) # type: ignore
while ref is None:
await asyncio.sleep(0.1)
statistics.stalls_writing_to_read_buffer += 1
ref = await pool.commit(result)
ref = await pool.commit(result) # type: ignore
statistics.bytes_read += len(result)
return ref

Expand All @@ -171,7 +171,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
# Write the result to caches
if max_evictions:
# we set a per-query eviction limit
buffer = await pool.read(result)
buffer = await pool.read(result) # type: ignore
if len(buffer) < buffer_pool.max_cacheable_item_size:
evicted = buffer_pool.set(key, buffer)
remote_cache.set(key, buffer)
Expand Down
2 changes: 1 addition & 1 deletion opteryx/connectors/capabilities/predicate_pushable.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def to_dnf(root):

def _predicate_to_dnf(root):
# Reduce look-ahead effort by using Exceptions to control flow
if root.node_type == NodeType.AND:
if root.node_type == NodeType.AND: # pragma: no cover
left = _predicate_to_dnf(root.left)
right = _predicate_to_dnf(root.right)
if not isinstance(left, list):
Expand Down
22 changes: 8 additions & 14 deletions opteryx/connectors/cql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,18 @@ def read_dataset( # type:ignore
# Update SQL if we've pushed predicates
parameters: list = []
for predicate in predicates:
if predicate.node_type == NodeType.UNARY_OPERATOR:
operator = self.OPS_XLAT[predicate.value]
operand, parameters = _handle_operand(predicate.centre, parameters)
operator = operator.replace(":operand", operand)

query_builder.WHERE(operator)
else:
left_operand = predicate.left
right_operand = predicate.right
operator = self.OPS_XLAT[predicate.value]
left_operand = predicate.left
right_operand = predicate.right
operator = self.OPS_XLAT[predicate.value]

left_value, parameters = _handle_operand(left_operand, parameters)
right_value, parameters = _handle_operand(right_operand, parameters)
left_value, parameters = _handle_operand(left_operand, parameters)
right_value, parameters = _handle_operand(right_operand, parameters)

operator = operator.replace(":left", left_value)
operator = operator.replace(":right", right_value)
operator = operator.replace(":left", left_value)
operator = operator.replace(":right", right_value)

query_builder.WHERE(operator)
query_builder.WHERE(operator)

session = self.cluster.connect()
# DEBUG: log ("READ DATASET\n", str(query_builder))
Expand Down
4 changes: 2 additions & 2 deletions opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
headers = {"Authorization": f"Bearer {self.access_token}"}

params = None
blob_names = []
blob_names: List[str] = []
while True:
response = self.session.get(url, headers=headers, timeout=30, params=params)

Expand All @@ -218,7 +218,7 @@ def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
page_token = blob_data.get("nextPageToken")
if not page_token:
break
params["pageToken"] = page_token
params = {"pageToken": page_token}

self.blob_list[prefix] = blob_names
return blob_names
Expand Down
4 changes: 2 additions & 2 deletions opteryx/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ def execute_to_arrow(
if limit is not None:
result_data = utils.arrow.limit_records(result_data, limit) # type: ignore
try:
return pyarrow.concat_tables(result_data, promote_options="none")
except pyarrow.ArrowInvalid as err: # pragma: no cover
return pyarrow.concat_tables(result_data, promote_options="permissive")
except (pyarrow.ArrowInvalid, pyarrow.ArrowTypeError) as err: # pragma: no cover
# DEBUG: log (err)
if "struct" in str(err):
raise InconsistentSchemaError(
Expand Down
2 changes: 1 addition & 1 deletion opteryx/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def select_values(boolean_arrays, value_arrays):
"VARCHAR": cast_varchar,
"STRING": cast_varchar,
"STR": cast_varchar,
"STRUCT": _iterate_single_parameter(lambda x: orjson.loads(str(x))),
"STRUCT": _iterate_single_parameter(lambda x: orjson.loads(str(x)) if x is not None else None),
"DATE": lambda x: compute.cast(x, pyarrow.date32()),
"BLOB": array_encode_utf8,
"TRY_TIMESTAMP": try_cast("TIMESTAMP"),
Expand Down
6 changes: 3 additions & 3 deletions opteryx/managers/schemes/mabel_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ def _inner(*, timestamp):
if as_at is None:
continue
if is_complete_and_not_invalid(control_blobs, as_at):
blob_names = (blob for blob in blob_names if as_at in blob)
data_blobs = (blob for blob in data_blobs if as_at in blob) # type: ignore
break
blob_names = [blob for blob in blob_names if as_at not in blob]
data_blobs = [blob for blob in data_blobs if as_at not in blob]
as_at = None

return blob_names
return data_blobs

start_date = start_date or midnight
end_date = end_date or midnight.replace(hour=23, minute=59)
Expand Down
21 changes: 21 additions & 0 deletions opteryx/managers/serde/physical_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
{
"fields":
[
{
"name"
"type"
}
],
"steps": {
"id": <uuid>,
"operator": "name",
"columns": [],
"config": {<variable>},
"requires": [<uuid>]
}
}

"""

from orso.types import OrsoTypes
150 changes: 0 additions & 150 deletions opteryx/models/node.py

This file was deleted.

11 changes: 6 additions & 5 deletions opteryx/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from .aggregate_and_group_node import AggregateAndGroupNode # Group is always followed by aggregate
from .aggregate_node import AGGREGATORS
from .aggregate_node import AggregateNode # aggregate data
from .async_scanner_node import AsyncScannerNode
from .async_read_node import AsyncReaderNode

# from .build_statistics_node import BuildStatisticsNode # Analyze Tables
from .cross_join_node import CrossJoinNode # CROSS JOIN
from .distinct_node import DistinctNode # remove duplicate records
from .exit_node import ExitNode
from .explain_node import ExplainNode # EXPLAIN queries
from .filter_node import FilterNode # filter unwanted rows
from .function_dataset_node import FunctionDatasetNode # Dataset Constructors

# from .heap_sort_node import HeapSortNode # Heap
Expand All @@ -30,12 +31,12 @@
from .join_node import JoinNode
from .left_join_node import LeftJoinNode
from .limit_node import LimitNode # select the first N records
from .metadata_writer_node import MetadataWriterNode
from .morsel_defragment_node import MorselDefragmentNode # consolidate small morsels

# from .metadata_writer_node import MetadataWriterNode
# from .morsel_defragment_node import MorselDefragmentNode # consolidate small morsels
from .noop_node import NoOpNode # No Operation
from .projection_node import ProjectionNode # remove unwanted columns including renames
from .scanner_node import ScannerNode
from .selection_node import SelectionNode # filter unwanted rows
from .read_node import ReaderNode
from .set_variable_node import SetVariableNode
from .show_columns_node import ShowColumnsNode # column details

Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/aggregate_and_group_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def execute(self) -> Generator[pyarrow.Table, None, None]:
# we're pretty sure we're going to use - this will fail for datasets
# larger than memory
table = pyarrow.concat_tables(
project(morsels.execute(), self.all_identifiers), promote_options="none"
project(morsels.execute(), self.all_identifiers), promote_options="permissive"
)

# Allow grouping by functions by evaluating them first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import pyarrow.parquet
from orso.schema import RelationSchema

from opteryx.operators.scanner_node import ScannerNode
from opteryx.operators.read_node import ReaderNode
from opteryx.shared import AsyncMemoryPool
from opteryx.shared import MemoryPool
from opteryx.utils.file_decoders import get_decoder
Expand Down Expand Up @@ -95,7 +95,7 @@ async def fetch_and_process(blob_name):
await session.close()


class AsyncScannerNode(ScannerNode):
class AsyncReaderNode(ReaderNode):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -122,7 +122,7 @@ def execute(self) -> Generator:
prefix=reader.dataset,
)

data_queue = queue.Queue()
data_queue: queue.Queue = queue.Queue()

loop = asyncio.new_event_loop()
threading.Thread(
Expand Down Expand Up @@ -176,6 +176,7 @@ def execute(self) -> Generator:
self.statistics.add_message(f"failed to read {blob_name}")
self.statistics.failed_reads += 1
print(f"[READER] Cannot read blob {blob_name} due to {err}")
raise err

if morsel:
self.statistics.columns_read += morsel.num_columns
Loading
Loading