Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2032 #2033

Merged
merged 4 commits into from
Sep 29, 2024
Merged

#2032 #2033

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 809
__build__ = 811

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
60 changes: 43 additions & 17 deletions opteryx/compiled/cross_join/cython_cross_join.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,54 @@ from libc.stdint cimport int32_t
cpdef tuple build_rows_indices_and_column(cnp.ndarray column_data):
cdef int32_t i, total_size = 0
cdef int32_t length
cdef list flat_data
cdef int32_t row_count = len(column_data)
cdef int32_t *lengths = <int32_t *>malloc(row_count * sizeof(int32_t))
if lengths is NULL:
raise MemoryError("Failed to allocate memory for lengths array.")

# Calculate the total size and fill lengths array
for i in range(row_count):
length = len(column_data[i])
length = column_data[i].shape[0]
lengths[i] = length
total_size += length

# If the total size is zero, return empty arrays
if total_size == 0:
free(lengths)
return (np.array([], dtype=np.int32), np.array([], dtype=object))

flat_data = [''] * total_size
# Determine the dtype of the elements in the arrays, handling the case where the first element is None
element_dtype = object
for i in range(row_count):
if column_data[i] is not None:
element_dtype = column_data[i].dtype
break

# Preallocate arrays for indices and flat data
flat_data = np.empty(total_size, dtype=element_dtype) # More efficient than list
cdef int32_t *indices = <int32_t *>malloc(total_size * sizeof(int32_t))
if indices is NULL:
raise MemoryError("Failed to allocate memory.")
free(lengths)
raise MemoryError("Failed to allocate memory for indices.")

cdef int32_t start = 0
cdef int32_t end = 0
cdef int32_t j = 0

# Flatten the data and fill indices
for i in range(row_count):
end = start + lengths[i]
flat_data[start:end] = column_data[i] # NumPy handles the slicing and copying
for j in range(start, end):
indices[j] = i
flat_data[j] = column_data[i][j - start]
indices[j] = i
start = end
free(lengths)

free(lengths) # Free the lengths array

# Create a NumPy array from indices
cdef cnp.int32_t[:] mv = <cnp.int32_t[:total_size]>indices
# Create a NumPy array that is a copy of the memoryview,
# which in turn makes it safe to free the original indices memory.
np_array = np.array(mv, copy=True)
free(indices) # Now it's safe to free indices since np_array has its own copy.
np_array = np.array(mv, copy=True) # Copy the memoryview into a NumPy array
free(indices) # Free the indices memory now that we've copied it

return (np_array, flat_data)


Expand All @@ -78,20 +89,35 @@ cpdef tuple build_filtered_rows_indices_and_column(cnp.ndarray column_data, set
"""
cdef int32_t i, index = 0, allocated_size
cdef int32_t row_count = len(column_data)
cdef int32_t initial_alloc_size = 500
cdef int32_t initial_alloc_size = row_count * 2
allocated_size = initial_alloc_size
cdef int32_t *indices = <int32_t *>malloc(allocated_size * sizeof(int32_t))
cdef list flat_data = [None] * allocated_size
cdef int32_t *new_indices

if indices is NULL:
raise MemoryError("Failed to allocate memory for indices.")

# Determine the dtype of the elements in the arrays, handling the case where the first element is None
element_dtype = object
for i in range(row_count):
if column_data[i] is not None:
element_dtype = column_data[i].dtype
break

cdef flat_data = np.empty(allocated_size, dtype=element_dtype)

for i in range(row_count):
for value in column_data[i]:
if value in valid_values:
if index == allocated_size: # Check if we need to expand the memory allocation
allocated_size += initial_alloc_size
indices = <int32_t *>realloc(indices, allocated_size * sizeof(int32_t))
flat_data.extend([None] * initial_alloc_size) # Extend flat_data by the same amount
allocated_size = allocated_size * 2 # Double the allocation size to reduce reallocations
# Handle realloc for indices safely
new_indices = <int32_t *>realloc(indices, allocated_size * sizeof(int32_t))
if new_indices is NULL:
free(indices) # Free previously allocated memory to avoid memory leak
raise MemoryError("Failed to reallocate memory for indices.")
indices = new_indices
flat_data = np.resize(flat_data, allocated_size)
if indices is NULL:
raise MemoryError("Failed to reallocate memory for indices.")
flat_data[index] = value
Expand Down
2 changes: 1 addition & 1 deletion opteryx/functions/other_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def list_contains_any(array: numpy.ndarray, items: numpy.ndarray) -> numpy.ndarr
res = numpy.empty(array.size, dtype=bool)
for i, test_set in enumerate(array):
# Using not to correctly capture overlap (isdisjoint is True when no common elements)
res[i] = not items_set.isdisjoint(test_set) if test_set is not None else False
res[i] = bool(set(test_set) & items_set) if test_set is not None else False
return res


Expand Down
21 changes: 13 additions & 8 deletions opteryx/operators/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,15 @@ def _cross_join_unnest_column(
)
]
)
new_block = pyarrow.Table.from_arrays(
[single_column_collector], schema=schema
)
arrow_array = pyarrow.array(single_column_collector)
if arrow_array.type != target_column.arrow_field.type:
arrow_array = arrow_array.cast(target_column.arrow_field.type)
new_block = pyarrow.Table.from_arrays([arrow_array], schema=schema)
single_column_collector.clear()
del arrow_array
statistics.time_cross_join_unnest += time.monotonic_ns() - start
yield new_block
start = time.monotonic_ns()
at_least_once = True
else:
# Rebuild the block with the new column data if we have any rows to build for
Expand All @@ -139,13 +142,13 @@ def _cross_join_unnest_column(
block_size = MORSEL_SIZE_BYTES / (left_block.nbytes / left_block.num_rows)
block_size = int(block_size // 1000) * 1000

for start in range(0, total_rows, block_size):
for start_block in range(0, total_rows, block_size):
# Compute the end index for the current chunk
end = min(start + block_size, total_rows)
end_block = min(start_block + block_size, total_rows)

# Slice the current chunk of indices and new_column_data
indices_chunk = indices[start:end]
new_column_data_chunk = new_column_data[start:end]
indices_chunk = indices[start_block:end_block]
new_column_data_chunk = new_column_data[start_block:end_block]

# Create a new block using the chunk of indices
new_block = left_block.take(indices_chunk)
Expand All @@ -171,16 +174,18 @@ def _cross_join_unnest_column(
if arrow_array.type != target_column.arrow_field.type:
arrow_array = arrow_array.cast(target_column.arrow_field.type)
new_block = pyarrow.Table.from_arrays([arrow_array], schema=schema)

statistics.time_cross_join_unnest += time.monotonic_ns() - start
yield new_block
at_least_once = True
start = time.monotonic_ns()

if not at_least_once:
# Create an empty table with the new schema
schema = left_morsel.schema
new_column = pyarrow.field(target_column.identity, pyarrow.string())
new_schema = pyarrow.schema(list(schema) + [new_column])
new_block = pyarrow.Table.from_batches([], schema=new_schema)
statistics.time_cross_join_unnest += time.monotonic_ns() - start
yield new_block


Expand Down
9 changes: 4 additions & 5 deletions opteryx/planner/binder/binder_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,10 @@ def visit_project(self, node: Node, context: BindingContext) -> Tuple[Node, Bind
(n for n in node.columns if n.schema_column.identity == column.identity),
None,
)
if node_column:
# update the column reference with any AS aliases
if node_column.alias:
node_column.schema_column.aliases.append(node_column.alias)
column.aliases.append(node_column.alias)
# update the column reference with any AS aliases
if node_column and node_column.alias:
node_column.schema_column.aliases.append(node_column.alias)
column.aliases.append(node_column.alias)
# update the schema with columns we have references to, removing redundant columns
schema.columns = schema_columns
for column in node.columns:
Expand Down