Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Jun 29, 2023
1 parent 0e313ea commit 04e8157
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 10 deletions.
17 changes: 14 additions & 3 deletions opteryx/components/binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import re

from orso.logging import get_logger
from orso.tools import random_string

from opteryx.exceptions import AmbiguousIdentifierError
from opteryx.exceptions import ColumnNotFoundError
Expand Down Expand Up @@ -107,10 +106,10 @@ def inner_binder(node, relations):
# The relation hasn't been loaded in a FROM or JOIN statement
raise UnexpectedDatasetReferenceError(dataset=node.source)

column = found_source_relation.find_column(node.source_column)
column = found_source_relation.find_column(node.source_column.name)
if column is None:
# The column wasn't in the relation
candidates = found_source_relation.get_all_columns()
candidates = found_source_relation.all_column_names()
from opteryx.utils import fuzzy_search

suggestion = fuzzy_search(node.value, candidates)
Expand Down Expand Up @@ -181,6 +180,18 @@ def visit_unsupported(self, node, context):
logger.warning(f"No visit method implemented for node type {node.node_type.name}")
return node, context

def visit_exit(self, node, context):
# For each of the columns in the projection, identify the relation it
# will be taken from
if node is None:
pass

columns = []
for column in node.columns:
columns.append(inner_binder(column, context.get("schemas", {})))
node.columns = columns
return node, context

def visit_project(self, node, context):
# For each of the columns in the projection, identify the relation it
# will be taken from
Expand Down
9 changes: 9 additions & 0 deletions opteryx/components/logical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class LogicalPlanStepType(int, Enum):
Limit = auto() # limit and offset
Order = auto() # order by
Distinct = auto()
Exit = auto()

CTE = auto()
Subquery = auto()
Expand Down Expand Up @@ -303,6 +304,14 @@ def inner_query_planner(ast_branch):
if previous_step_id is not None:
inner_plan.add_edge(previous_step_id, step_id)

# add the exit node
exit_node = LogicalPlanNode(node_type=LogicalPlanStepType.Exit)
exit_node.columns = _projection
previous_step_id, step_id = step_id, random_string()
inner_plan.add_node(step_id, exit_node)
if previous_step_id is not None:
inner_plan.add_edge(previous_step_id, step_id)

return inner_plan


Expand Down
5 changes: 5 additions & 0 deletions opteryx/components/temporary_physical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
Gen 2 execution engine (a later piece of work)
"""

from orso.tools import random_string

from opteryx import operators
from opteryx.components.logical_planner import LogicalPlanStepType
from opteryx.models import ExecutionTree
Expand All @@ -38,6 +40,8 @@ def create_physical_plan(logical_plan):
node = operators.NoOpNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Distinct:
node = operators.NoOpNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Exit:
node = operators.ExitNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Explain:
node = operators.NoOpNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Fake:
Expand Down Expand Up @@ -68,6 +72,7 @@ def create_physical_plan(logical_plan):
node = operators.NoOpNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Values:
node = operators.NoOpNode(query_properties, **node_config)

else:
raise Exception(f"something unexpected happed - {node_type.name}")

Expand Down
12 changes: 5 additions & 7 deletions opteryx/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
https://www.python.org/dev/peps/pep-0249/
"""
import datetime
import os
import time
import typing
from dataclasses import dataclass
Expand All @@ -33,7 +32,6 @@
from opteryx.exceptions import CursorInvalidStateError
from opteryx.exceptions import MissingSqlStatement
from opteryx.exceptions import PermissionsError
from opteryx.exceptions import ProgrammingError
from opteryx.managers.kvstores import BaseKeyValueStore
from opteryx.shared import QueryStatistics
from opteryx.shared.variables import SystemVariables
Expand Down Expand Up @@ -89,9 +87,9 @@ def __init__(
permissions = PERMISSIONS
permissions = set(permissions)
if permissions.intersection(PERMISSIONS) == set():
raise ProgrammingError("No valid permissions presented.")
raise PermissionsError("No valid permissions presented.")
if not permissions.issubset(PERMISSIONS):
raise ProgrammingError(
raise PermissionsError(
f"Invalid permissions presented - {PERMISSIONS.difference(permissions)}"
)
self.permissions = permissions
Expand Down Expand Up @@ -134,6 +132,8 @@ def id(self):
return self._qid

def _inner_execute(self, operation, params=None):
from opteryx.components import query_planner

if not operation:
raise MissingSqlStatement("SQL statement not found")

Expand All @@ -142,8 +142,6 @@ def _inner_execute(self, operation, params=None):

self._connection.context.history.append((operation, True, datetime.datetime.utcnow()))

from opteryx.components import query_planner

plans = query_planner(operation=operation, parameters=params, connection=self._connection)

results = None
Expand All @@ -156,7 +154,7 @@ def _inner_execute(self, operation, params=None):
True if i == 1 else value
for i, value in enumerate(self._connection.context.history[-1])
)
return utils.arrow.rename_columns(results)
return results

def execute(self, operation, params=None):
results = self._inner_execute(operation, params)
Expand Down
1 change: 1 addition & 0 deletions opteryx/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .column_filter_node import ColumnFilterNode # filter for SHOW COLUMNS
from .cross_join_node import CrossJoinNode # CROSS JOIN
from .distinct_node import DistinctNode # remove duplicate records
from .exit_node import ExitNode
from .explain_node import ExplainNode # EXPLAIN queries
from .file_reader_node import FileReaderNode # usually on the CLI
from .function_dataset_node import FunctionDatasetNode # Dataset Constructors
Expand Down
62 changes: 62 additions & 0 deletions opteryx/operators/exit_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Exit Node
This is a SQL Query Execution Plan Node.
This does the final preparation before returning results to users.
This does two things that the projection node doesn't do:
- renames columns from the internal names
- removes all columns not being returned to the user
This node doesn't do any calculations, it is a pure Projection.
"""
from typing import Iterable

from opteryx.exceptions import SqlError
from opteryx.models import QueryProperties
from opteryx.operators import BasePlanNode


class ExitNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
self.columns = config["columns"]

@property
def config(self): # pragma: no cover
return None

@property
def name(self): # pragma: no cover
return "Exit"

def execute(self) -> Iterable:
if len(self._producers) != 1: # pragma: no cover
raise SqlError(f"{self.name} expects a single producer")

morsels = self._producers[0] # type:ignore

final_columns = []
final_names = []
for column in self.columns:
final_columns.append(str(column.source_column))
final_names.append(column.query_column)

for morsel in morsels.execute():
morsel = morsel.select(final_columns)
morsel = morsel.rename_columns(final_names)

yield morsel

0 comments on commit 04e8157

Please sign in to comment.