Skip to content

Commit

Permalink
Merge pull request #1633 from mabel-dev/HOUSEKEEPING/3
Browse files Browse the repository at this point in the history
Housekeeping/3
  • Loading branch information
joocer authored May 4, 2024
2 parents 64f4c17 + bd129a1 commit 125b8d0
Show file tree
Hide file tree
Showing 36 changed files with 161 additions and 276 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 466
__build__ = 471

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
10 changes: 5 additions & 5 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,23 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
if result is not None:
statistics.bufferpool_hits += 1

ref = await pool.commit(result)
ref = await pool.commit(result) # type: ignore
while ref is None:
await asyncio.sleep(0.1)
statistics.stalls_writing_to_read_buffer += 1
ref = await pool.commit(result)
ref = await pool.commit(result) # type: ignore
statistics.bytes_read += len(result)
return ref

# try the remote cache next
result = remote_cache.get(key)
if result is not None:
statistics.remote_cache_hits += 1
ref = await pool.commit(result)
ref = await pool.commit(result) # type: ignore
while ref is None:
await asyncio.sleep(0.1)
statistics.stalls_writing_to_read_buffer += 1
ref = await pool.commit(result)
ref = await pool.commit(result) # type: ignore
statistics.bytes_read += len(result)
return ref

Expand All @@ -171,7 +171,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
# Write the result to caches
if max_evictions:
# we set a per-query eviction limit
buffer = await pool.read(result)
buffer = await pool.read(result) # type: ignore
if len(buffer) < buffer_pool.max_cacheable_item_size:
evicted = buffer_pool.set(key, buffer)
remote_cache.set(key, buffer)
Expand Down
2 changes: 1 addition & 1 deletion opteryx/connectors/capabilities/predicate_pushable.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def to_dnf(root):

def _predicate_to_dnf(root):
# Reduce look-ahead effort by using Exceptions to control flow
if root.node_type == NodeType.AND:
if root.node_type == NodeType.AND: # pragma: no cover
left = _predicate_to_dnf(root.left)
right = _predicate_to_dnf(root.right)
if not isinstance(left, list):
Expand Down
22 changes: 8 additions & 14 deletions opteryx/connectors/cql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,18 @@ def read_dataset( # type:ignore
# Update SQL if we've pushed predicates
parameters: list = []
for predicate in predicates:
if predicate.node_type == NodeType.UNARY_OPERATOR:
operator = self.OPS_XLAT[predicate.value]
operand, parameters = _handle_operand(predicate.centre, parameters)
operator = operator.replace(":operand", operand)

query_builder.WHERE(operator)
else:
left_operand = predicate.left
right_operand = predicate.right
operator = self.OPS_XLAT[predicate.value]
left_operand = predicate.left
right_operand = predicate.right
operator = self.OPS_XLAT[predicate.value]

left_value, parameters = _handle_operand(left_operand, parameters)
right_value, parameters = _handle_operand(right_operand, parameters)
left_value, parameters = _handle_operand(left_operand, parameters)
right_value, parameters = _handle_operand(right_operand, parameters)

operator = operator.replace(":left", left_value)
operator = operator.replace(":right", right_value)
operator = operator.replace(":left", left_value)
operator = operator.replace(":right", right_value)

query_builder.WHERE(operator)
query_builder.WHERE(operator)

session = self.cluster.connect()
# DEBUG: log ("READ DATASET\n", str(query_builder))
Expand Down
4 changes: 2 additions & 2 deletions opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
headers = {"Authorization": f"Bearer {self.access_token}"}

params = None
blob_names = []
blob_names: List[str] = []
while True:
response = self.session.get(url, headers=headers, timeout=30, params=params)

Expand All @@ -218,7 +218,7 @@ def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
page_token = blob_data.get("nextPageToken")
if not page_token:
break
params["pageToken"] = page_token
params = {"pageToken": page_token}

self.blob_list[prefix] = blob_names
return blob_names
Expand Down
4 changes: 2 additions & 2 deletions opteryx/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ def execute_to_arrow(
if limit is not None:
result_data = utils.arrow.limit_records(result_data, limit) # type: ignore
try:
return pyarrow.concat_tables(result_data, promote_options="none")
except pyarrow.ArrowInvalid as err: # pragma: no cover
return pyarrow.concat_tables(result_data, promote_options="permissive")
except (pyarrow.ArrowInvalid, pyarrow.ArrowTypeError) as err: # pragma: no cover
# DEBUG: log (err)
if "struct" in str(err):
raise InconsistentSchemaError(
Expand Down
2 changes: 1 addition & 1 deletion opteryx/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def select_values(boolean_arrays, value_arrays):
"VARCHAR": cast_varchar,
"STRING": cast_varchar,
"STR": cast_varchar,
"STRUCT": _iterate_single_parameter(lambda x: orjson.loads(str(x))),
"STRUCT": _iterate_single_parameter(lambda x: orjson.loads(str(x)) if x is not None else None),
"DATE": lambda x: compute.cast(x, pyarrow.date32()),
"BLOB": array_encode_utf8,
"TRY_TIMESTAMP": try_cast("TIMESTAMP"),
Expand Down
6 changes: 3 additions & 3 deletions opteryx/managers/schemes/mabel_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ def _inner(*, timestamp):
if as_at is None:
continue
if is_complete_and_not_invalid(control_blobs, as_at):
blob_names = (blob for blob in blob_names if as_at in blob)
data_blobs = (blob for blob in data_blobs if as_at in blob) # type: ignore
break
blob_names = [blob for blob in blob_names if as_at not in blob]
data_blobs = [blob for blob in data_blobs if as_at not in blob]
as_at = None

return blob_names
return data_blobs

start_date = start_date or midnight
end_date = end_date or midnight.replace(hour=23, minute=59)
Expand Down
21 changes: 21 additions & 0 deletions opteryx/managers/serde/physical_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
{
"fields":
[
{
"name"
"type"
}
],
"steps": {
"id": <uuid>,
"operator": "name",
"columns": [],
"config": {<variable>},
"requires": [<uuid>]
}
}
"""

from orso.types import OrsoTypes
150 changes: 0 additions & 150 deletions opteryx/models/node.py

This file was deleted.

11 changes: 6 additions & 5 deletions opteryx/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from .aggregate_and_group_node import AggregateAndGroupNode # Group is always followed by aggregate
from .aggregate_node import AGGREGATORS
from .aggregate_node import AggregateNode # aggregate data
from .async_scanner_node import AsyncScannerNode
from .async_read_node import AsyncReaderNode

# from .build_statistics_node import BuildStatisticsNode # Analyze Tables
from .cross_join_node import CrossJoinNode # CROSS JOIN
from .distinct_node import DistinctNode # remove duplicate records
from .exit_node import ExitNode
from .explain_node import ExplainNode # EXPLAIN queries
from .filter_node import FilterNode # filter unwanted rows
from .function_dataset_node import FunctionDatasetNode # Dataset Constructors

# from .heap_sort_node import HeapSortNode # Heap
Expand All @@ -30,12 +31,12 @@
from .join_node import JoinNode
from .left_join_node import LeftJoinNode
from .limit_node import LimitNode # select the first N records
from .metadata_writer_node import MetadataWriterNode
from .morsel_defragment_node import MorselDefragmentNode # consolidate small morsels

# from .metadata_writer_node import MetadataWriterNode
# from .morsel_defragment_node import MorselDefragmentNode # consolidate small morsels
from .noop_node import NoOpNode # No Operation
from .projection_node import ProjectionNode # remove unwanted columns including renames
from .scanner_node import ScannerNode
from .selection_node import SelectionNode # filter unwanted rows
from .read_node import ReaderNode
from .set_variable_node import SetVariableNode
from .show_columns_node import ShowColumnsNode # column details

Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/aggregate_and_group_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def execute(self) -> Generator[pyarrow.Table, None, None]:
# we're pretty sure we're going to use - this will fail for datasets
# larger than memory
table = pyarrow.concat_tables(
project(morsels.execute(), self.all_identifiers), promote_options="none"
project(morsels.execute(), self.all_identifiers), promote_options="permissive"
)

# Allow grouping by functions by evaluating them first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import pyarrow.parquet
from orso.schema import RelationSchema

from opteryx.operators.scanner_node import ScannerNode
from opteryx.operators.read_node import ReaderNode
from opteryx.shared import AsyncMemoryPool
from opteryx.shared import MemoryPool
from opteryx.utils.file_decoders import get_decoder
Expand Down Expand Up @@ -95,7 +95,7 @@ async def fetch_and_process(blob_name):
await session.close()


class AsyncScannerNode(ScannerNode):
class AsyncReaderNode(ReaderNode):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -122,7 +122,7 @@ def execute(self) -> Generator:
prefix=reader.dataset,
)

data_queue = queue.Queue()
data_queue: queue.Queue = queue.Queue()

loop = asyncio.new_event_loop()
threading.Thread(
Expand Down Expand Up @@ -176,6 +176,7 @@ def execute(self) -> Generator:
self.statistics.add_message(f"failed to read {blob_name}")
self.statistics.failed_reads += 1
print(f"[READER] Cannot read blob {blob_name} due to {err}")
raise err

if morsel:
self.statistics.columns_read += morsel.num_columns
Loading

0 comments on commit 125b8d0

Please sign in to comment.