Skip to content

Commit

Permalink
Merge pull request #250 from mabel-dev/FEATURE/#35
Browse files Browse the repository at this point in the history
Feature/#35
  • Loading branch information
joocer authored Jul 3, 2022
2 parents 69c44fc + 2c9a36a commit 87ab43d
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 128 deletions.
5 changes: 5 additions & 0 deletions docs/Release Notes/Change Log.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
**Added**
- [[#232](https://github.com/mabel-dev/opteryx/issues/232)] Support `DATEPART` and `EXTRACT` date functions. ([@joocer](https://github.com/joocer))

**Changed**
- [[#35](https://github.com/mabel-dev/opteryx/issues/35)] Table scan planning done during query planning. ([@joocer](https://github.com/joocer))
- [[#173](https://github.com/mabel-dev/opteryx/issues/173)] Data not found raises different errors under different scenarios. ([@joocer](https://github.com/joocer))


### [0.1.0] - 2022-07-02

**Added**
Expand Down
4 changes: 4 additions & 0 deletions opteryx/engine/functions/date_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def date_part(interval, arr):
"year": compute.year,
}

# if we get a date literal
if not hasattr(arr, "__iter__"):
arr = numpy.array([arr])

interval = interval.lower()
if interval in extractors:
return extractors[interval](arr)
Expand Down
224 changes: 114 additions & 110 deletions opteryx/engine/planner/operations/dataset_reader_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@
- As one blob is read, the next is immediately cached for reading
"""
import datetime
import time

from enum import Enum
from typing import Iterable
from cityhash import CityHash64

import time

from opteryx.engine import QueryDirectives, QueryStatistics
from opteryx.engine.planner.operations import BasePlanNode
from opteryx.exceptions import DatasetNotFoundError
from opteryx.exceptions import DatabaseError
from opteryx.storage import file_decoders
from opteryx.storage.adapters import DiskStorage
from opteryx.storage.schemes import MabelPartitionScheme
Expand Down Expand Up @@ -116,11 +115,12 @@ def __init__(
self._start_date = config.get("start_date", today)
self._end_date = config.get("end_date", today)

# pushed down projection
self._projection = config.get("projection")
# pushed down selection
self._selection = config.get("selection")

# scan
self._reading_list = self._scanner()

@property
def config(self): # pragma: no cover
use_cache = ""
Expand Down Expand Up @@ -153,74 +153,6 @@ def execute(self) -> Iterable:

return

# datasets from storage
partitions = self._reader.get_partitions(
dataset=self._dataset,
partitioning=self._partition_scheme.partition_format(),
start_date=self._start_date,
end_date=self._end_date,
)

self._statistics.partitions_found += len(partitions)

partition_structure: dict = {}
expected_rows = 0

# Build the list of blobs we're going to read and collect summary statistics
# so we can use them for decisions later.

if len(partitions) == 0:
raise DatasetNotFoundError("Dataset was not found.")

for partition in partitions:

partition_structure[partition] = {}
partition_structure[partition]["blob_list"] = []
self._statistics.partitions_scanned += 1

# Get a list of all of the blobs in the partition.
time_scanning_partitions = time.time_ns()
blob_list = self._reader.get_blob_list(partition)
self._statistics.time_scanning_partitions = (
time.time_ns() - time_scanning_partitions
)

# remove folders, that's items ending with '/'
blob_list = [blob for blob in blob_list if not blob.endswith("/")]

# Track how many blobs we found
count_blobs_found = len(blob_list)
self._statistics.count_blobs_found += count_blobs_found

# Filter the blob list to just the frame we're interested in
if self._partition_scheme is not None:
blob_list = self._partition_scheme.filter_blobs(
blob_list, self._statistics
)
self._statistics.count_blobs_ignored_frames += count_blobs_found - len(
blob_list
)

for blob_name in blob_list:

# the the blob filename extension
extension = blob_name.split(".")[-1]

# find out how to read this blob
decoder, file_type = KNOWN_EXTENSIONS.get(extension, (None, None))

if file_type == ExtentionType.DATA:
partition_structure[partition]["blob_list"].append(
(
blob_name,
decoder,
)
)
elif file_type == ExtentionType.CONTROL:
self._statistics.count_control_blobs_found += 1
else:
self._statistics.count_unknown_blob_type_found += 1

metadata = None
schema = None

Expand All @@ -236,53 +168,21 @@ def execute(self) -> Iterable:
if not metadata:
plasma_channel = None

for partition in partitions:
for partition in self._reading_list.values():

# we're reading this partition now
if len(blob_list) > 0:
self._statistics.partitions_read += 1

def _read_and_parse(config):
path, reader, parser, cache = config

# print(f"start {path}")

start_read = time.time_ns()

# if we have a cache set
if cache:
# hash the blob name for the look up
blob_hash = format(CityHash64(path), "X")
# try to read the cache
blob_bytes = cache.get(blob_hash)

# if the item was a miss, get it from storage and add it to the cache
if blob_bytes is None:
self._statistics.cache_misses += 1
blob_bytes = reader(path)
cache.set(blob_hash, blob_bytes)
else:
self._statistics.cache_hits += 1
else:
blob_bytes = reader(path)

table = parser(blob_bytes, None)

# print(f"read {path} - {(time.time_ns() - start_read) / 1e9}")

time_to_read = time.time_ns() - start_read
return time_to_read, blob_bytes.getbuffer().nbytes, table, path
self._statistics.partitions_read += 1

for (
time_to_read,
blob_bytes,
pyarrow_blob,
path,
) in multiprocessor.processed_reader(
_read_and_parse,
self._read_and_parse,
[
(path, self._reader.read_blob, parser, self._cache)
for path, parser in partition_structure[partition]["blob_list"]
for path, parser in partition["blob_list"]
],
plasma_channel,
):
Expand All @@ -301,7 +201,7 @@ def _read_and_parse(config):
if metadata is None:
pyarrow_blob = Columns.create_table_metadata(
table=pyarrow_blob,
expected_rows=expected_rows,
expected_rows=0,
name=self._dataset.replace("/", ".")[:-1],
table_aliases=[self._alias],
)
Expand All @@ -326,3 +226,107 @@ def _read_and_parse(config):

# yield this blob
yield pyarrow_blob

def _read_and_parse(self, config):
path, reader, parser, cache = config
start_read = time.time_ns()

# if we have a cache set
if cache:
# hash the blob name for the look up
blob_hash = format(CityHash64(path), "X")
# try to read the cache
blob_bytes = cache.get(blob_hash)

# if the item was a miss, get it from storage and add it to the cache
if blob_bytes is None:
self._statistics.cache_misses += 1
blob_bytes = reader(path)
cache.set(blob_hash, blob_bytes)
else:
self._statistics.cache_hits += 1
else:
blob_bytes = reader(path)

table = parser(blob_bytes, None)

# print(f"read {path} - {(time.time_ns() - start_read) / 1e9}")

time_to_read = time.time_ns() - start_read
return time_to_read, blob_bytes.getbuffer().nbytes, table, path

def _scanner(self):
"""
The scanner works out what blobs/files should be read
"""
# datasets from storage
partitions = self._reader.get_partitions(
dataset=self._dataset,
partitioning=self._partition_scheme.partition_format(),
start_date=self._start_date,
end_date=self._end_date,
)

self._statistics.partitions_found += len(partitions)

partition_structure: dict = {}

# Build the list of blobs we're going to read and collect summary statistics
# so we can use them for decisions later.

for partition in partitions:

partition_structure[partition] = {}
partition_structure[partition]["blob_list"] = []
self._statistics.partitions_scanned += 1

# Get a list of all of the blobs in the partition.
time_scanning_partitions = time.time_ns()
blob_list = self._reader.get_blob_list(partition)
self._statistics.time_scanning_partitions = (
time.time_ns() - time_scanning_partitions
)

# remove folders, that's items ending with '/'
blob_list = [blob for blob in blob_list if not blob.endswith("/")]

# Track how many blobs we found
count_blobs_found = len(blob_list)
self._statistics.count_blobs_found += count_blobs_found

# Filter the blob list to just the frame we're interested in
if self._partition_scheme is not None:
blob_list = self._partition_scheme.filter_blobs(
blob_list, self._statistics
)
self._statistics.count_blobs_ignored_frames += count_blobs_found - len(
blob_list
)

for blob_name in blob_list:

# the the blob filename extension
extension = blob_name.split(".")[-1]

# find out how to read this blob
decoder, file_type = KNOWN_EXTENSIONS.get(extension, (None, None))

if file_type == ExtentionType.DATA:
partition_structure[partition]["blob_list"].append(
(
blob_name,
decoder,
)
)
elif file_type == ExtentionType.CONTROL:
self._statistics.count_control_blobs_found += 1
else:
self._statistics.count_unknown_blob_type_found += 1

if len(partition_structure[partition]["blob_list"]) == 0:
partition_structure.pop(partition)

if len(partition_structure) == 0:
raise DatabaseError("No blobs found that match the requested dataset.")

return partition_structure
2 changes: 1 addition & 1 deletion opteryx/engine/planner/operations/selection_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def _evaluate(predicate: Union[tuple, list], table: Table):
arg_list.append(arg[0])

if len(arg_list) == 0:
arg_list = (table.num_rows,)
arg_list = (table.num_rows,) # type: ignore

calculated_values = FUNCTIONS[function["function"]](*arg_list)
if isinstance(calculated_values, (pyarrow.lib.StringScalar)):
Expand Down
5 changes: 2 additions & 3 deletions opteryx/utils/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
"""

from typing import Iterable, List
from pyarrow import Table


def fetchmany(pages, limit: int = 1000):
"""fetch records from a Table as Python Dicts"""

from pyarrow import Table
from opteryx.utils.columns import Columns
from opteryx.utils.columns import Columns # circulat imports

if pages is None:
return []
Expand Down
15 changes: 1 addition & 14 deletions tests/sql_battery/test_battery_shape.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,22 +259,9 @@
("SELECT * FROM tests.data.dated WITH (NOCACHE) FOR '2020-02-03'", 25, 8),
("SELECT * FROM tests.data.dated FOR '2020-02-03'", 25, 8),
("SELECT * FROM tests.data.dated FOR '2020-02-04'", 25, 8),
("SELECT * FROM tests.data.dated FOR '2020-02-05'", 0, 0),
("SELECT * FROM tests.data.dated FOR DATES BETWEEN '2020-02-01' AND '2020-02-28'", 50, 8),
("SELECT * FROM tests.data.dated FOR DATES BETWEEN YESTERDAY AND TODAY", 0, 0),
("SELECT * FROM tests.data.dated FOR TODAY", 0, 0),
("SELECT * FROM tests.data.dated FOR Today", 0, 0),
("SELECT * FROM tests.data.dated FOR today", 0, 0),
("SELECT * FROM tests.data.dated FOR TODAY", 0, 0),
("SELECT * FROM tests.data.dated FOR\nTODAY", 0, 0),
("SELECT * FROM tests.data.dated FOR\tTODAY", 0, 0),
("SELECT * FROM tests.data.dated FOR YESTERDAY", 0, 0),
("SELECT * FROM tests.data.dated FOR '2020-02-03' OFFSET 1", 24, 8),
("SELECT * FROM tests.data.dated FOR DATES BETWEEN '2020-02-01' AND '2020-02-28' OFFSET 1", 49, 8),
("SELECT * FROM tests.data.dated FOR DATES IN LAST_MONTH", 0, 0),
("SELECT * FROM tests.data.dated FOR DATES IN THIS_MONTH", 0, 0),
("SELECT * FROM tests.data.dated FOR DATES IN PREVIOUS_MONTH", 0, 0),
("SELECT * FROM tests.data.dated FOR YESTERDAY OFFSET 1", 0, 0),
("SELECT * FROM $satellites FOR YESTERDAY ORDER BY planetId OFFSET 10", 167, 8),

("SELECT * FROM tests.data.segmented FOR '2020-02-03'", 25, 8),
Expand Down Expand Up @@ -374,6 +361,7 @@
("SELECT EXTRACT(DOY FROM birth_date) FROM $astronauts", 357, 1),
("SELECT EXTRACT(dow FROM birth_date) FROM $astronauts", 357, 1),
("SELECT EXTRACT(DOW FROM birth_date) FROM $astronauts", 357, 1),
("SELECT EXTRACT(YEAR FROM '2022-02-02')", 1, 1),

# These are queries which have been found to return the wrong result or not run correctly
# FILTERING ON FUNCTIONS
Expand All @@ -399,7 +387,6 @@
# FRAME HANDLING
("SELECT * FROM tests.data.framed FOR '2021-03-28'", 100000, 1),
("SELECT * FROM tests.data.framed FOR '2021-03-29'", 100000, 1),
("SELECT * FROM tests.data.framed FOR '2021-03-30'", 0, 0),
("SELECT * FROM tests.data.framed FOR DATES BETWEEN '2021-03-28' AND '2021-03-29", 200000, 1),
("SELECT * FROM tests.data.framed FOR DATES BETWEEN '2021-03-29' AND '2021-03-30", 100000, 1),
("SELECT * FROM tests.data.framed FOR DATES BETWEEN '2021-03-28' AND '2021-03-30", 200000, 1),
Expand Down
Loading

0 comments on commit 87ab43d

Please sign in to comment.