Skip to content

Commit

Permalink
Merge pull request #1534 from mabel-dev/#1533
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Mar 20, 2024
2 parents c1ecb6b + fabb5f6 commit f0cd243
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 27 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 380
__build__ = 382

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
14 changes: 7 additions & 7 deletions opteryx/compiled/cross_join/cython_cross_join.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ from libc.stdint cimport int32_t
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef build_rows_indices_and_column(cnp.ndarray column_data):
cdef Py_ssize_t i, total_size = 0
cdef Py_ssize_t length
cdef int32_t i, total_size = 0
cdef int32_t length
cdef list flat_data
cdef Py_ssize_t row_count = len(column_data)
cdef Py_ssize_t *lengths = <Py_ssize_t *>malloc(row_count * sizeof(Py_ssize_t))
cdef int32_t row_count = len(column_data)
cdef int32_t *lengths = <int32_t *>malloc(row_count * sizeof(int32_t))
if lengths is NULL:
raise MemoryError("Failed to allocate memory for lengths array.")

Expand All @@ -29,9 +29,9 @@ cpdef build_rows_indices_and_column(cnp.ndarray column_data):
if indices is NULL:
raise MemoryError("Failed to allocate memory.")

cdef Py_ssize_t start = 0
cdef Py_ssize_t end = 0
cdef Py_ssize_t j = 0
cdef int32_t start = 0
cdef int32_t end = 0
cdef int32_t j = 0

for i in range(row_count):
end = start + lengths[i]
Expand Down
2 changes: 1 addition & 1 deletion opteryx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def get(key, default=None):
# The maximum number of evictions by a single query
MAX_CACHE_EVICTIONS_PER_QUERY: int = int(get("MAX_CACHE_EVICTIONS_PER_QUERY", 32))
# Maximum size for items saved to the buffer cache
MAX_CACHEABLE_ITEM_SIZE: int = get("MAX_CACHEABLE_ITEM_SIZE", 1024 * 1024)
MAX_CACHEABLE_ITEM_SIZE: int = int(get("MAX_CACHEABLE_ITEM_SIZE", 1024 * 1024))
# The local buffer pool size
MAX_LOCAL_BUFFER_CAPACITY: int = int(get("MAX_LOCAL_BUFFER_CAPACITY", 256))
# don't try to raise the priority of the server process
Expand Down
33 changes: 15 additions & 18 deletions opteryx/operators/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from opteryx.models import QueryProperties
from opteryx.operators import BasePlanNode

INTERNAL_BATCH_SIZE: int = 5000 # config
INTERNAL_BATCH_SIZE: int = 2500 # config
MAX_JOIN_SIZE: int = 1000 # config


Expand Down Expand Up @@ -93,12 +93,13 @@ def _cross_join_unnest_column(


def _cross_join_unnest_literal(
morsels: BasePlanNode, source: Tuple, target_column: FlatColumn
morsels: BasePlanNode, source: Tuple, target_column: FlatColumn, statistics
) -> Generator[pyarrow.Table, None, None]:
joined_list_size = len(source)

# Loop through each morsel from the morsels execution
for left_morsel in morsels.execute():
start = time.monotonic_ns()
# Break the morsel into batches to avoid memory issues
for left_block in left_morsel.to_batches(max_chunksize=INTERNAL_BATCH_SIZE):
left_block = pyarrow.Table.from_batches([left_block], schema=left_morsel.schema)
Expand All @@ -115,7 +116,9 @@ def _cross_join_unnest_literal(
array_column = pyarrow.array(tiled_array)
appended_table = appended_table.append_column(target_column.identity, array_column)

statistics.time_cross_join_unnest += time.monotonic_ns() - start
yield appended_table
start = time.monotonic_ns()


def _cartesian_product(*arrays):
Expand All @@ -129,7 +132,7 @@ def _cartesian_product(*arrays):
return numpy.hsplit(arr.reshape(-1, array_count), array_count)


def _cross_join(left, right):
def _cross_join(left, right, statistics):
"""
A cross join is the cartesian product of two tables - this usually isn't very
useful, but it does allow you to the theta joins (non-equi joins)
Expand All @@ -148,6 +151,7 @@ def _chunker(seq_1, seq_2, size):
from opteryx.third_party.pyarrow_ops import align_tables

for left_morsel in left.execute():
start = time.monotonic_ns()
# Iterate through left table in chunks of size INTERNAL_BATCH_SIZE
for left_block in left_morsel.to_batches(max_chunksize=INTERNAL_BATCH_SIZE):
# Convert the chunk to a table to retain column names
Expand All @@ -166,7 +170,9 @@ def _chunker(seq_1, seq_2, size):
table = align_tables(left_block, right, left_chunk.flatten(), right_chunk.flatten())

# Yield the resulting table to the caller
statistics.time_cross_join_unnest += time.monotonic_ns() - start
yield table
start = time.monotonic_ns()


class CrossJoinNode(BasePlanNode):
Expand Down Expand Up @@ -209,28 +215,19 @@ def execute(self) -> Generator:
) # type:ignore

if self._unnest_column is None:
start = time.monotonic_ns()
for morsel in _cross_join(left_node, right_table):
self.statistics.time_cross_join += time.monotonic_ns() - start
yield morsel
start = time.monotonic_ns()
yield from _cross_join(left_node, right_table, self.statistics)

elif isinstance(self._unnest_column.value, tuple):
start = time.monotonic_ns()
for morsel in _cross_join_unnest_literal(
yield from _cross_join_unnest_literal(
morsels=left_node,
source=self._unnest_column.value,
target_column=self._unnest_target,
):
self.statistics.time_cross_join_literal += time.monotonic_ns() - start
yield morsel
start = time.monotonic_ns()
statistics=self.statistics,
)
else:
start = time.monotonic_ns()
for morsel in _cross_join_unnest_column(
yield from _cross_join_unnest_column(
morsels=left_node,
source=self._unnest_column,
target_column=self._unnest_target,
statistics=self.statistics,
):
yield morsel
)

0 comments on commit f0cd243

Please sign in to comment.