Skip to content

Commit

Permalink
Merge pull request #1536 from mabel-dev/#1535
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Mar 21, 2024
2 parents f0cd243 + 6cd428b commit 2bb65a9
Show file tree
Hide file tree
Showing 15 changed files with 373 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,4 @@ version_2/**
planets.duckdb
charts.csv
testdata/tpch/**/**.parquet
**.cpp
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 382
__build__ = 384

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
3 changes: 3 additions & 0 deletions opteryx/compiled/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
from hash_table import HashTable
from hash_table import distinct
from ip_address import ip_in_cidr
from varchar_array import array_encode_utf8
from varchar_array import pack_byte_array
from varchar_array import unpack_byte_array
4 changes: 2 additions & 2 deletions opteryx/compiled/functions/hash_table.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ cdef class HashSet:
def __cinit__(self):
self.c_set = unordered_set[int64_t]()

cdef insert(self, int64_t value):
def insert(self, int64_t value):
self.c_set.insert(value)

cdef bint contains(self, int64_t value):
def contains(self, int64_t value):
return self.c_set.find(value) != self.c_set.end()


Expand Down
120 changes: 120 additions & 0 deletions opteryx/compiled/functions/varchar_array.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
Native accelerators for Parquet encoding and decoding.
"""
# cython: profile=False
# cython: linetrace=False
# cython: binding=False
# cython: language_level=3
# cython: initializedcheck=False
# cython: boundscheck=False
# cython: wraparound=False
# cython: overflowcheck=False
# cython: initializedcheck=False
# cython: cdivision=True
# cython: always_allow_keywords=False

# from: https://github.com/dask/fastparquet/blob/main/fastparquet/speedups.pyx

cdef extern from "string.h":
void *memcpy(void *dest, const void *src, size_t n)

from cpython cimport (PyUnicode_AsUTF8String, PyUnicode_DecodeUTF8,
PyBytes_CheckExact, PyBytes_FromStringAndSize,
PyBytes_GET_SIZE, PyBytes_AS_STRING)
from cpython.unicode cimport PyUnicode_DecodeUTF8

import numpy as np
cimport numpy as np
import cython


_obj_dtype = np.dtype('object')


def array_encode_utf8(inp):
"""
utf-8 encode all elements of a 1d ndarray of "object" dtype.
A new ndarray of bytes objects is returned.
"""
# TODO: combine with pack_byte_array as is done for unpack
cdef:
Py_ssize_t i, n
np.ndarray[object, ndim=1] arr
np.ndarray[object] result

arr = np.array(inp, copy=False)

n = arr.shape[0]
# TODO: why not inplace?
result = np.empty(n, dtype=object)
for i in range(n):
# Fast utf-8 encoding, avoiding method call and codec lookup indirection
result[i] = PyUnicode_AsUTF8String(arr[i])

return result


def pack_byte_array(list items):
"""
Pack a variable length byte array column.
A bytes object is returned.
"""
cdef:
Py_ssize_t i, n, itemlen, total_size
unsigned char *start
unsigned char *data
object val, out

# Strategy: compute the total output size and allocate it in one go.
n = len(items)
total_size = 0
for i in range(n):
val = items[i]
if not PyBytes_CheckExact(val):
raise TypeError("expected list of bytes")
total_size += 4 + PyBytes_GET_SIZE(val)

out = PyBytes_FromStringAndSize(NULL, total_size)
start = data = <unsigned char *> PyBytes_AS_STRING(out)

# Copy data to output.
for i in range(n):
val = items[i]
# `itemlen` should be >= 0, so no signed extension issues
itemlen = PyBytes_GET_SIZE(val)
(<int*> data)[0] = itemlen
data += 4
memcpy(data, PyBytes_AS_STRING(val), itemlen)
data += itemlen

assert (data - start) == total_size
return out


@cython.boundscheck(False)
def unpack_byte_array(const unsigned char[::1] raw_bytes, Py_ssize_t n, const char utf=False):
"""
Unpack a variable length byte array column.
An array of bytes objects is returned.
"""
cdef:
Py_ssize_t i = 0
char* ptr = <char*>&raw_bytes[0]
int itemlen, bytecount
np.ndarray[object, ndim=1, mode="c"] out = np.empty(n, dtype="object")

assert out is not None
bytecount = raw_bytes.shape[0]
while i < n and bytecount > 0:

itemlen = (<int*> ptr)[0]
ptr += 4
if utf:
out[i] = PyUnicode_DecodeUTF8(ptr, itemlen, "ignore")
else:
out[i] = PyBytes_FromStringAndSize(ptr, itemlen)
ptr += itemlen
bytecount -= 4 + itemlen
i += 1

return out
28 changes: 27 additions & 1 deletion opteryx/components/logical_planner/logical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class LogicalPlanStepType(int, Enum):
CTE = auto()
Subquery = auto()
FunctionDataset = auto() # Unnest, GenerateSeries, values + Fake
MetadataWriter = auto()


class LogicalPlan(Graph):
Expand Down Expand Up @@ -726,6 +727,31 @@ def create_node_relation(relation):
return root_node, sub_plan


def analyze_query(statement) -> LogicalPlan:

print(statement)

root_node = "Analyze"
plan = LogicalPlan()

from_step = LogicalPlanNode(node_type=LogicalPlanStepType.Scan)
table = statement[root_node]["table_name"]
from_step.relation = ".".join(part["value"] for part in table)
from_step.alias = from_step.relation
from_step.start_date = table[0].get("start_date")
from_step.end_date = table[0].get("end_date")
step_id = random_string()
plan.add_node(step_id, from_step)

metadata_step = LogicalPlanNode(node_type=LogicalPlanStepType.MetadataWriter)
previous_step_id, step_id = step_id, random_string()
plan.add_node(step_id, metadata_step)
plan.add_edge(previous_step_id, step_id)

return plan
# write manifest


def plan_execute_query(statement) -> LogicalPlan:

import orjson
Expand Down Expand Up @@ -1001,7 +1027,7 @@ def plan_show_variables(statement):


QUERY_BUILDERS = {
# "Analyze": analyze_query,
"Analyze": analyze_query,
"Execute": plan_execute_query,
"Explain": plan_explain,
"Query": plan_query,
Expand Down
10 changes: 8 additions & 2 deletions opteryx/components/temporary_physical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ def create_physical_plan(logical_plan, query_properties):
node = operators.FunctionDatasetNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Join:
if node_config.get("type") == "inner":
# We use our own implementation of INNER JOIN
node = operators.InnerJoinNode(query_properties, **node_config)
elif node_config.get("type") == "left outer":
# We use out own implementation of LEFT JOIN
node = operators.LeftJoinNode(query_properties, **node_config)
elif node_config.get("type") == "cross join":
# CROSS JOINs are quite different
# Pyarrow doesn't have a CROSS JOIN
node = operators.CrossJoinNode(query_properties, **node_config)
else:
# Use Pyarrow for all other joins
node = operators.JoinNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Limit:
node = operators.LimitNode(query_properties, limit=node_config.get("limit"), offset=node_config.get("offset", 0))
Expand All @@ -78,7 +83,8 @@ def create_physical_plan(logical_plan, query_properties):
node = operators.NoOpNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Union:
node = operators.UnionNode(query_properties, **node_config)

elif node_type == LogicalPlanStepType.MetadataWriter:
node = operators.MetadataWriterNode(query_properties, **node_config)
else:
raise Exception(f"something unexpected happed - {node_type.name}")
# fmt: on
Expand Down
4 changes: 2 additions & 2 deletions opteryx/connectors/gcp_firestore_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def get_dataset_schema(self) -> RelationSchema:
if self.schema:
return self.schema

# onlt read one record
record = next(self.read_dataset(chunk_size=1), None)
# only read one record
record = next(self.read_dataset(chunk_size=10), None)

if record is None:
raise DatasetNotFoundError(dataset=self.dataset)
Expand Down
2 changes: 2 additions & 0 deletions opteryx/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pyarrow import compute

import opteryx
from opteryx.compiled.functions import array_encode_utf8
from opteryx.exceptions import FunctionNotFoundError
from opteryx.exceptions import IncorrectTypeError
from opteryx.exceptions import UnsupportedSyntaxError
Expand Down Expand Up @@ -279,6 +280,7 @@ def select_values(boolean_arrays, value_arrays):
"STR": cast_varchar,
"STRUCT": _iterate_single_parameter(lambda x: orjson.loads(str(x))),
"DATE": lambda x: compute.cast(x, pyarrow.date32()),
"BLOB": array_encode_utf8,
"TRY_TIMESTAMP": try_cast("TIMESTAMP"),
"TRY_BOOLEAN": try_cast("BOOLEAN"),
"TRY_NUMERIC": try_cast("DOUBLE"),
Expand Down
2 changes: 2 additions & 0 deletions opteryx/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
# from .information_schema_node import InformationSchemaNode # information_schema
from .inner_join_node import InnerJoinNode
from .join_node import JoinNode
from .left_join_node import LeftJoinNode
from .limit_node import LimitNode # select the first N records
from .metadata_writer_node import MetadataWriterNode
from .morsel_defragment_node import MorselDefragmentNode # consolidate small morsels
from .noop_node import NoOpNode # No Operation
from .projection_node import ProjectionNode # remove unwanted columns including renames
Expand Down
4 changes: 2 additions & 2 deletions opteryx/operators/inner_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
This is a SQL Query Execution Plan Node.
PyArrow has a good INNER JOIN implementation, but it errors when the
PyArrow has a good LEFT JOIN implementation, but it errors when the
relations being joined contain STRUCT or ARRAY columns, this is true
for all of the JOIN types, however we've only written our own INNER
JOIN.
and LEFT JOINs.
It is comparible performance to the PyArrow INNER JOIN, in benchmarks
sometimes native is faster, sometimes PyArrow is faster. Generally
Expand Down
Loading

0 comments on commit 2bb65a9

Please sign in to comment.