Skip to content

Commit

Permalink
Merge pull request #249 from mabel-dev/QUALITY
Browse files Browse the repository at this point in the history
Quality
  • Loading branch information
joocer authored Jul 3, 2022
2 parents 9bba32c + 5badaf8 commit 69c44fc
Show file tree
Hide file tree
Showing 32 changed files with 167 additions and 113 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/static_analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ jobs:
pip install -r $GITHUB_WORKSPACE/tests/requirements.txt
- name: Execute Test
run: pylint --fail-under=0 --load-plugins perflint opteryx
run: pylint --fail-under=7.5 --load-plugins perflint opteryx
13 changes: 13 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[MASTER]

# Add files or directories to the blacklist. They should be base names, not
# paths.
ignore=third_party

[MESSAGES CONTROL]

# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once).
Disable=wrong-import-position, no-name-in-module, line-too-long, no-member
3 changes: 3 additions & 0 deletions docs/Release Notes/Change Log.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### [Unreleased]

**Added**
- [[#232](https://github.com/mabel-dev/opteryx/issues/232)] Support `DATEPART` and `EXTRACT` date functions. ([@joocer](https://github.com/joocer))

### [0.1.0] - 2022-07-02

**Added**
Expand Down
2 changes: 1 addition & 1 deletion docs/SQL Reference/06 Functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Function | Description | Example

Recognized date parts and periods and support across various functions:

Part | `DATE_TRUNC` | `EXTRACT` | Notes
Part | DATE_TRUNC | EXTRACT | Notes
-------- | ------------------------- | ------------------------- | -------------
second | :fontawesome-solid-check: | :fontawesome-solid-check: |
minute | :fontawesome-solid-check: | :fontawesome-solid-check: |
Expand Down
33 changes: 22 additions & 11 deletions opteryx/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
stores, it is not compliant with the standard:
https://www.python.org/dev/peps/pep-0249/
"""
import datetime
import time
from typing import Dict, List, Optional, Tuple, Union

from opteryx.engine.planner import QueryPlanner
from opteryx.engine import QueryStatistics
from opteryx.exceptions import CursorInvalidStateError, ProgrammingError, SqlError
from opteryx.storage import BaseBufferCache, BasePartitionScheme, BaseStorageAdapter
from opteryx.storage.adapters import DiskStorage
from opteryx.storage.schemes import DefaultPartitionScheme, MabelPartitionScheme
from opteryx.utils import arrow

CURSOR_NOT_RUN = "Cursor must be in an executed state"


class Connection:
"""
Expand Down Expand Up @@ -58,9 +62,11 @@ def __init__(
self._kwargs = kwargs

def cursor(self):
"""return a cursor object"""
return Cursor(self)

def close(self):
"""exists for interface compatibility only"""
pass


Expand All @@ -77,7 +83,6 @@ def _format_prepared_param(self, param):
"""
Formats parameters to be passed to a Query.
"""
import datetime
from decimal import Decimal

if param is None:
Expand Down Expand Up @@ -105,32 +110,32 @@ def _format_prepared_param(self, param):
if isinstance(param, (list, tuple, set)):
return f"({','.join(map(self._format_prepared_param, param))})"

raise Exception(f"Query parameter of type '{type(param)}' is not supported.")
raise SqlError(f"Query parameter of type '{type(param)}' is not supported.")

def execute(self, operation, params=None):
if self._query is not None:
raise Exception("Cursor can only be executed once")
raise CursorInvalidStateError("Cursor can only be executed once")

self._stats.start_time = time.time_ns()

if params:
if not isinstance(params, (list, tuple)):
raise Exception(
raise ProgrammingError(
"params must be a list or tuple containing the query parameter values"
)

for param in params:
if operation.find("%s") == -1:
# we have too few placeholders
raise Exception(
raise ProgrammingError(
"Number of placeholders and number of parameters must match."
)
operation = operation.replace(
"%s", self._format_prepared_param(param), 1
)
if operation.find("%s") != -1:
# we have too many placeholders
raise Exception(
raise ProgrammingError(
"Number of placeholders and number of parameters must match."
)

Expand All @@ -152,40 +157,46 @@ def execute(self, operation, params=None):
@property
def rowcount(self):
if self._results is None:
raise Exception("Cursor must be executed first")
raise CursorInvalidStateError(CURSOR_NOT_RUN)
return self._results.count()

@property
def shape(self):
if self._results is None:
raise Exception("Cursor must be executed first")
raise CursorInvalidStateError(CURSOR_NOT_RUN)
return self._results.shape

@property
def stats(self):
"""execution statistics"""
self._stats.end_time = time.time_ns()
return self._stats.as_dict()

def warnings(self):
"""list of run-time warnings"""
return self._stats._warnings

def fetchone(self) -> Optional[Dict]:
"""fetch one record only"""
if self._results is None:
raise Exception("Cursor must be executed first")
raise CursorInvalidStateError(CURSOR_NOT_RUN)
return arrow.fetchone(self._results)

def fetchmany(self, size=None) -> List[Dict]:
"""fetch a given number of records"""
fetch_size = self.arraysize if size is None else size
if self._results is None:
raise Exception("Cursor must be executed first")
raise CursorInvalidStateError(CURSOR_NOT_RUN)
return arrow.fetchmany(self._results, fetch_size)

def fetchall(self) -> List[Dict]:
"""fetch all matching records"""
if self._results is None:
raise Exception("Cursor must be executed first")
raise CursorInvalidStateError(CURSOR_NOT_RUN)
return arrow.fetchall(self._results)

def close(self):
"""close the connection"""
self._connection.close()

def __repr__(self): # pragma: no cover
Expand Down
2 changes: 1 addition & 1 deletion opteryx/engine/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def get_len(obj):
"DATE_TRUNC": _iterate_double_parameter_field_second(date_trunc),
"DATEPART": date_functions.date_part,
"NOW": _iterate_no_parameters(datetime.datetime.utcnow),
"TODAY": _iterate_no_parameters(datetime.date.today),
"TODAY": _iterate_no_parameters(datetime.datetime.utcnow().date),
"TIME": _iterate_no_parameters(date_functions.get_time),
"YESTERDAY": _iterate_no_parameters(date_functions.get_yesterday),
"DATE": _iterate_single_parameter(date_functions.get_date),
Expand Down
45 changes: 26 additions & 19 deletions opteryx/engine/functions/date_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,55 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import numpy

from pyarrow import compute

import datetime
import numpy
from opteryx.exceptions import SqlError

from opteryx.utils.dates import parse_iso


def get_time():
# doing this inline means the utcnow() function is called once and time is then
# called for each row - meaning it's not 'now', it's 'when did you start'
"""
Get the current time
"""
return datetime.datetime.utcnow().time()


def get_yesterday():
return datetime.date.today() - datetime.timedelta(days=1)
"""
calculate yesterday
"""
return datetime.datetime.utcnow().date() - datetime.timedelta(days=1)


def get_date(input):
def get_date(timestamp):
"""
Convert input to a datetime object and extract the Date part
"""
# if it's a string, parse it (to a datetime)
if isinstance(input, str):
input = parse_iso(input)
if isinstance(timestamp, str):
timestamp = parse_iso(timestamp)
# if it's a numpy datetime, convert it to a date
if isinstance(input, (numpy.datetime64)):
input = input.astype("M8[D]").astype("O")
if isinstance(timestamp, (numpy.datetime64)):
timestamp = timestamp.astype("M8[D]").astype("O")
# if it's a datetime, convert it to a date
if isinstance(input, datetime.datetime):
input = input.date()
if isinstance(timestamp, datetime.datetime):
timestamp = timestamp.date()
# set it to midnight that day to make it a datetime
if isinstance(input, datetime.date):
return datetime.datetime.combine(input, datetime.datetime.min.time())
# even though we're getting the date, the supported column type is datetime
if isinstance(timestamp, datetime.date):
return datetime.datetime.combine(timestamp, datetime.datetime.min.time())
return None


def date_part(interval, arr):
"""
Also the EXTRACT function - we extract a given part from an array of dates
"""

EXTRACTOR = {
extractors = {
"microsecond": compute.microsecond,
"second": compute.second,
"minute": compute.minute,
Expand All @@ -65,7 +72,7 @@ def date_part(interval, arr):
}

interval = interval.lower()
if interval in EXTRACTOR:
return EXTRACTOR[interval](arr)
if interval in extractors:
return extractors[interval](arr)

raise SqlError(f"Unsupported date part `{interval}`")
raise SqlError(f"Unsupported date part `{interval}`") # pragma: no cover
16 changes: 8 additions & 8 deletions opteryx/engine/planner/operations/aggregate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
But, on high cardinality data (nearly unique columns), the performance is much faster,
on a 10m record set, timings are 1:400 (50s:1220s where 20s is the read time).
"""
from asyncio import create_subprocess_shell
from typing import Iterable, List

import numpy as np
Expand All @@ -39,6 +38,8 @@
from opteryx.exceptions import SqlError
from opteryx.utils.columns import Columns

COUNT_STAR: str = "COUNT(*)"

# these functions can be applied to each group
INCREMENTAL_AGGREGATES = {
"MIN": min,
Expand Down Expand Up @@ -98,8 +99,7 @@ class AggregateNode(BasePlanNode):
def __init__(
self, directives: QueryDirectives, statistics: QueryStatistics, **config
):

from opteryx.engine.attribute_types import TOKEN_TYPES
super().__init__(directives=directives, statistics=statistics)

self._positions = []
self._aggregates = []
Expand Down Expand Up @@ -168,7 +168,7 @@ def _count_star(self, data_pages):
count = 0
for page in data_pages.execute():
count += page.num_rows
table = pyarrow.Table.from_pylist([{"COUNT(*)": count}])
table = pyarrow.Table.from_pylist([{COUNT_STAR: count}])
table = Columns.create_table_metadata(
table=table,
expected_rows=1,
Expand Down Expand Up @@ -247,10 +247,10 @@ def execute(self) -> Iterable:

# Add the responses to the collector if it's COUNT(*)
if column_name == "COUNT(Wildcard)":
if "COUNT(*)" in group_collector:
group_collector["COUNT(*)"] += 1
if COUNT_STAR in group_collector:
group_collector[COUNT_STAR] += 1
else:
group_collector["COUNT(*)"] = 1
group_collector[COUNT_STAR] = 1
elif function == "COUNT":
if value:
if column_name in group_collector:
Expand Down Expand Up @@ -280,7 +280,7 @@ def execute(self) -> Iterable:
# count should return 0 rather than nothing
if len(collector) == 0 and len(self._aggregates) == 1:
if self._aggregates[0]["aggregate"] == "COUNT":
collector = {(): {"COUNT(*)": 0}}
collector = {(): {COUNT_STAR: 0}}

buffer: List = []
metadata = None
Expand Down
1 change: 1 addition & 0 deletions opteryx/engine/planner/operations/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class CrossJoinNode(BasePlanNode):
def __init__(
self, directives: QueryDirectives, statistics: QueryStatistics, **config
):
super().__init__(directives=directives, statistics=statistics)
self._right_table = config.get("right_table")
self._join_type = config.get("join_type", "CrossJoin")

Expand Down
2 changes: 1 addition & 1 deletion opteryx/engine/planner/operations/dataset_reader_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(
The Dataset Reader Node is responsible for reading the relevant blobs
and returning a Table/Relation.
"""
super().__init__(directives=directives, statistics=statistics, **config)
super().__init__(directives=directives, statistics=statistics)

from opteryx.engine.planner.planner import QueryPlanner

Expand Down
1 change: 1 addition & 0 deletions opteryx/engine/planner/operations/distinct_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class DistinctNode(BasePlanNode):
def __init__(
self, directives: QueryDirectives, statistics: QueryStatistics, **config
):
super().__init__(directives=directives, statistics=statistics)
self._distinct = config.get("distinct", True)

@property
Expand Down
1 change: 1 addition & 0 deletions opteryx/engine/planner/operations/evaluation_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class EvaluationNode(BasePlanNode):
def __init__(
self, directives: QueryDirectives, statistics: QueryStatistics, **config
):
super().__init__(directives=directives, statistics=statistics)
projection = config.get("projection", [])
self.functions = [c for c in projection if "function" in c]
self.aliases: list = []
Expand Down
1 change: 1 addition & 0 deletions opteryx/engine/planner/operations/explain_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ExplainNode(BasePlanNode):
def __init__(
self, directives: QueryDirectives, statistics: QueryStatistics, **config
):
super().__init__(directives=directives, statistics=statistics)
self._query_plan = config.get("query_plan")

@property
Expand Down
1 change: 1 addition & 0 deletions opteryx/engine/planner/operations/function_dataset_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def __init__(
The Blob Reader Node is responsible for reading the relevant blobs
and returning a Table/Relation.
"""
super().__init__(directives=directives, statistics=statistics)
self._statistics = statistics
self._alias = config["alias"]
self._function = config["dataset"]["function"]
Expand Down
1 change: 1 addition & 0 deletions opteryx/engine/planner/operations/inner_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class InnerJoinNode(BasePlanNode):
def __init__(
self, directives: QueryDirectives, statistics: QueryStatistics, **config
):
super().__init__(directives=directives, statistics=statistics)
self._right_table = config.get("right_table")
self._join_type = config.get("join_type", "CrossJoin")
self._on = config.get("join_on")
Expand Down
2 changes: 1 addition & 1 deletion opteryx/engine/planner/operations/internal_dataset_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(
The Blob Reader Node is responsible for reading the relevant blobs
and returning a Table/Relation.
"""
super().__init__(directives=directives, statistics=statistics, **config)
super().__init__(directives=directives, statistics=statistics)

self._statistics = statistics
self._alias = config["alias"]
Expand Down
1 change: 1 addition & 0 deletions opteryx/engine/planner/operations/limit_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class LimitNode(BasePlanNode):
def __init__(
self, directives: QueryDirectives, statistics: QueryStatistics, **config
):
super().__init__(directives=directives, statistics=statistics)
self._limit = config.get("limit")

@property
Expand Down
Loading

0 comments on commit 69c44fc

Please sign in to comment.