Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Jul 1, 2023
1 parent 2643def commit b9cff67
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 162 deletions.
42 changes: 24 additions & 18 deletions opteryx/components/binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,13 @@
constraints should be added for contextual access)
"""

"""
TODO:
- to bind the columns we need to pass them upwards, morsels have these columns with these types
with these aliases, this is the form of the data at this point. This includes when functions
and aggregates are run, this column has this type
- we then need to prune columns, once we're at the head of the execution tree, work our way
back down pruning columns (no point reading a column if it's not used)
- we need to do function type validation
- we need to do evaluation (e.g. a + b) type validation
"""

import copy
import re

from orso.logging import get_logger
from orso.schema import ConstantColumn
from orso.schema import FlatColumn

from opteryx.exceptions import AmbiguousIdentifierError
from opteryx.exceptions import ColumnNotFoundError
Expand All @@ -89,6 +81,7 @@
from opteryx.operators.aggregate_node import AGGREGATORS
from opteryx.samples import calculated

COMBINED_FUNCTIONS = {**FUNCTIONS, **AGGREGATORS}
CAMEL_TO_SNAKE = re.compile(r"(?<!^)(?=[A-Z])")
logger = get_logger()

Expand Down Expand Up @@ -150,22 +143,35 @@ def inner_binder(node, relations):

# add values to the node to indicate the source of this data
node.schema_column = column
if node_type == NodeType.FUNCTION:
if node_type in (NodeType.FUNCTION, NodeType.AGGREGATOR):
# we're just going to bind the function into the node
func = FUNCTIONS.get(node.value)
func = COMBINED_FUNCTIONS.get(node.value)
if not func:
# v1:
from opteryx.utils import fuzzy_search

suggest = fuzzy_search(node.value, FUNCTIONS.keys() + AGGREGATORS.keys())
suggest = fuzzy_search(node.value, COMBINED_FUNCTIONS.keys())
# v2: suggest = FUNCTIONS.suggest(node.value)
raise FunctionNotFoundError(function=node.value, suggestion=suggest)

# we need to add this new column to the schema
from opteryx.managers.expression import format_expression

schema_column = FlatColumn(format_expression(node), type=0)
relations["$calculated"].columns.append(schema_column)
node.function = func
if node_type == NodeType.AGGREGATOR:
# We don't suggest because if it wasn't found as an aggregation, it's
# assumed to be a function
agg = AGGREGATORS.get(node.value)
node.function = agg
node.derived_from = []
node.schema_column = schema_column

if node_type == NodeType.LITERAL:
unnamed_columns = len(
[c for c in relations["$calculated"].columns if isinstance(c, ConstantColumn)]
)
unnamed_name = f"unnamed_{unnamed_columns + 1}"
schema_column = ConstantColumn(unnamed_name, type=0, value=node.value)
relations["$calculated"].columns.append(schema_column)
node.schema_column = schema_column
node.query_column = unnamed_name

# Now recurse and do this again for all the sub parts of the evaluation plan
if node.left:
Expand Down
20 changes: 17 additions & 3 deletions opteryx/components/logical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,22 @@ def inner_query_planner(ast_branch):
for relation in _relations:
inner_plan.add_edge(relation["step_id"], step_id)

# If there's no relations, use $no_table
if len(_relations) == 0:
step_id, sub_plan = create_node_relation(
{
"relation": {
"Table": {
"name": [{"value": "$no_table"}],
"args": None,
"alias": None,
"with_hints": [],
}
}
}
)
inner_plan += sub_plan

# selection
_selection = logical_planner_builders.build(ast_branch["Select"].get("selection"))
if _selection:
Expand All @@ -233,9 +249,7 @@ def inner_query_planner(ast_branch):

# aggregates
_projection = logical_planner_builders.build(ast_branch["Select"].get("projection")) or []
_aggregates = get_all_nodes_of_type(
_projection, select_nodes=(NodeType.AGGREGATOR, NodeType.COMPLEX_AGGREGATOR)
)
_aggregates = get_all_nodes_of_type(_projection, select_nodes=(NodeType.AGGREGATOR,))
if len(_aggregates) > 0:
# TODO: aggregates need: functions
aggregate_step = LogicalPlanNode(node_type=LogicalPlanStepType.Aggregate)
Expand Down
55 changes: 37 additions & 18 deletions opteryx/components/logical_planner_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import numpy
import pyarrow
from orso.types import OrsoTypes

from opteryx import functions
from opteryx import operators
Expand Down Expand Up @@ -48,23 +49,41 @@ def literal_number(branch, alias: list = None, key=None):
try:
# Try converting to int first
value = int(value)
return Node(
NodeType.LITERAL,
type=OrsoTypes.INTEGER,
value=numpy.int64(branch[0]), # value
alias=alias,
)
except ValueError:
# If int conversion fails, try converting to float
value = float(value)

return Node(
NodeType.LITERAL_NUMERIC,
value=numpy.float64(branch[0]), # value
alias=alias,
)
return Node(
NodeType.LITERAL,
type=OrsoTypes.DOUBLE,
value=numpy.float64(branch[0]), # value
alias=alias,
)


def literal_string(branch, alias: list = None, key=None):
"""create node for a string branch, this is either a data or a string"""
"""create node for a string branch, this is either a date or a string"""
dte_value = dates.parse_iso(branch)
if dte_value:
return Node(NodeType.LITERAL_TIMESTAMP, value=numpy.datetime64(dte_value), alias=alias)
return Node(NodeType.LITERAL_VARCHAR, value=branch, alias=alias)
if len(branch) <= 10:
return Node(
NodeType.LITERAL,
type=OrsoTypes.DATE,
value=numpy.datetime64(dte_value, "D"),
alias=alias,
)
return Node(
NodeType.LITERAL,
type=OrsoTypes.TIMESTAMP,
value=numpy.datetime64(dte_value, "us"),
alias=alias,
)
return Node(NodeType.LITERAL, type=OrsoTypes.VARCHAR, value=branch, alias=alias)


def literal_interval(branch, alias: list = None, key=None):
Expand Down Expand Up @@ -328,8 +347,8 @@ def map_access(branch, alias=None, key=None):
key = key_dict["SingleQuotedString"]
key_node = Node(NodeType.LITERAL_VARCHAR, value=key)
if "Number" in key_dict:
key = key_dict["Number"][0]
key_node = Node(NodeType.LITERAL_NUMERIC, value=key)
key = int(key_dict["Number"][0])
key_node = Node(NodeType.LITERAL_INTEGER, value=key)
alias.append(f"{identifier}[{key}]")

identifier_node = Node(NodeType.IDENTIFIER, value=field)
Expand All @@ -345,14 +364,14 @@ def unary_op(branch, alias=None, key=None):
if not isinstance(alias, list):
alias = [] if alias is None else [alias]
if branch["op"] == "Not":
right = build(branch["expr"])
return Node(node_type=NodeType.NOT, centre=right)
centre = build(branch["expr"])
return Node(node_type=NodeType.NOT, centre=centre)
if branch["op"] == "Minus":
number = 0 - numpy.float64(branch["expr"]["Value"]["Number"][0])
return Node(NodeType.LITERAL_NUMERIC, value=number, alias=alias)
node = literal_number(branch["expr"]["Value"]["Number"], alias=alias)
node.value = 0 - node.value
return node
if branch["op"] == "Plus":
number = numpy.float64(branch["expr"]["Value"]["Number"][0])
return Node(NodeType.LITERAL_NUMERIC, value=number, alias=alias)
return literal_number(branch["expr"]["Value"]["Number"], alias=alias)


def between(branch, alias=None, key=None):
Expand Down Expand Up @@ -564,7 +583,7 @@ def array_agg(branch, alias=None, key=None):
limit = int(build(branch["limit"]).value)

return Node(
node_type=NodeType.COMPLEX_AGGREGATOR,
node_type=NodeType.AGGREGATOR,
value="ARRAY_AGG",
expression=expression,
distinct=distinct,
Expand Down
3 changes: 2 additions & 1 deletion opteryx/connectors/capabilities/predicate_pushable.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def _predicate_to_dnf(root):
if root.left.node_type != NodeType.IDENTIFIER:
raise NotSupportedError()
if root.left.node_type in (
NodeType.LITERAL_NUMERIC,
NodeType.LITERAL_FLOAT,
NodeType.LITERAL_INTEGER,
NodeType.LITERAL_VARCHAR,
):
# not all operands are universally supported
Expand Down
5 changes: 4 additions & 1 deletion opteryx/connectors/sample_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@

"""
The 'sample' connector provides readers for the internal sample datasets,
($planets, $astronauts, $satellites, and $no_table).
$planets, $astronauts, and $satellites.
- $no_table is used in queries where there is no relation specified 'SELECT 1'
- $calculated is used as a schema to align virtual columns to
"""

import typing
Expand Down
Loading

0 comments on commit b9cff67

Please sign in to comment.