Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Sep 29, 2024
1 parent 0e39d9b commit 1a6b444
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 31 deletions.
60 changes: 43 additions & 17 deletions opteryx/compiled/cross_join/cython_cross_join.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,54 @@ from libc.stdint cimport int32_t
cpdef tuple build_rows_indices_and_column(cnp.ndarray column_data):
cdef int32_t i, total_size = 0
cdef int32_t length
cdef list flat_data
cdef int32_t row_count = len(column_data)
cdef int32_t *lengths = <int32_t *>malloc(row_count * sizeof(int32_t))
if lengths is NULL:
raise MemoryError("Failed to allocate memory for lengths array.")

# Calculate the total size and fill lengths array
for i in range(row_count):
length = len(column_data[i])
length = column_data[i].shape[0]
lengths[i] = length
total_size += length

# If the total size is zero, return empty arrays
if total_size == 0:
free(lengths)
return (np.array([], dtype=np.int32), np.array([], dtype=object))

flat_data = [''] * total_size
# Determine the dtype of the elements in the arrays, handling the case where the first element is None
element_dtype = object
for i in range(row_count):
if column_data[i] is not None:
element_dtype = column_data[i].dtype
break

# Preallocate arrays for indices and flat data
flat_data = np.empty(total_size, dtype=element_dtype) # More efficient than list
cdef int32_t *indices = <int32_t *>malloc(total_size * sizeof(int32_t))
if indices is NULL:
raise MemoryError("Failed to allocate memory.")
free(lengths)
raise MemoryError("Failed to allocate memory for indices.")

cdef int32_t start = 0
cdef int32_t end = 0
cdef int32_t j = 0

# Flatten the data and fill indices
for i in range(row_count):
end = start + lengths[i]
flat_data[start:end] = column_data[i] # NumPy handles the slicing and copying
for j in range(start, end):
indices[j] = i
flat_data[j] = column_data[i][j - start]
indices[j] = i
start = end
free(lengths)

free(lengths) # Free the lengths array

# Create a NumPy array from indices
cdef cnp.int32_t[:] mv = <cnp.int32_t[:total_size]>indices
# Create a NumPy array that is a copy of the memoryview,
# which in turn makes it safe to free the original indices memory.
np_array = np.array(mv, copy=True)
free(indices) # Now it's safe to free indices since np_array has its own copy.
np_array = np.array(mv, copy=True) # Copy the memoryview into a NumPy array
free(indices) # Free the indices memory now that we've copied it

return (np_array, flat_data)


Expand All @@ -78,20 +89,35 @@ cpdef tuple build_filtered_rows_indices_and_column(cnp.ndarray column_data, set
"""
cdef int32_t i, index = 0, allocated_size
cdef int32_t row_count = len(column_data)
cdef int32_t initial_alloc_size = 500
cdef int32_t initial_alloc_size = row_count * 2
allocated_size = initial_alloc_size
cdef int32_t *indices = <int32_t *>malloc(allocated_size * sizeof(int32_t))
cdef list flat_data = [None] * allocated_size
cdef int32_t *new_indices

if indices is NULL:
raise MemoryError("Failed to allocate memory for indices.")

# Determine the dtype of the elements in the arrays, handling the case where the first element is None
element_dtype = object
for i in range(row_count):
if column_data[i] is not None:
element_dtype = column_data[i].dtype
break

cdef flat_data = np.empty(allocated_size, dtype=element_dtype)

for i in range(row_count):
for value in column_data[i]:
if value in valid_values:
if index == allocated_size: # Check if we need to expand the memory allocation
allocated_size += initial_alloc_size
indices = <int32_t *>realloc(indices, allocated_size * sizeof(int32_t))
flat_data.extend([None] * initial_alloc_size) # Extend flat_data by the same amount
allocated_size = allocated_size * 2 # Double the allocation size to reduce reallocations
# Handle realloc for indices safely
new_indices = <int32_t *>realloc(indices, allocated_size * sizeof(int32_t))
if new_indices is NULL:
free(indices) # Free previously allocated memory to avoid memory leak
raise MemoryError("Failed to reallocate memory for indices.")
indices = new_indices
flat_data = np.resize(flat_data, allocated_size)
if indices is NULL:
raise MemoryError("Failed to reallocate memory for indices.")
flat_data[index] = value
Expand Down
2 changes: 1 addition & 1 deletion opteryx/functions/other_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def list_contains_any(array: numpy.ndarray, items: numpy.ndarray) -> numpy.ndarr
res = numpy.empty(array.size, dtype=bool)
for i, test_set in enumerate(array):
# Using not to correctly capture overlap (isdisjoint is True when no common elements)
res[i] = not items_set.isdisjoint(test_set) if test_set is not None else False
res[i] = bool(set(test_set) & items_set) if test_set is not None else False
return res


Expand Down
21 changes: 13 additions & 8 deletions opteryx/operators/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,15 @@ def _cross_join_unnest_column(
)
]
)
new_block = pyarrow.Table.from_arrays(
[single_column_collector], schema=schema
)
arrow_array = pyarrow.array(single_column_collector)
if arrow_array.type != target_column.arrow_field.type:
arrow_array = arrow_array.cast(target_column.arrow_field.type)
new_block = pyarrow.Table.from_arrays([arrow_array], schema=schema)
single_column_collector.clear()
del arrow_array
statistics.time_cross_join_unnest += time.monotonic_ns() - start
yield new_block
start = time.monotonic_ns()
at_least_once = True
else:
# Rebuild the block with the new column data if we have any rows to build for
Expand All @@ -139,13 +142,13 @@ def _cross_join_unnest_column(
block_size = MORSEL_SIZE_BYTES / (left_block.nbytes / left_block.num_rows)
block_size = int(block_size // 1000) * 1000

for start in range(0, total_rows, block_size):
for start_block in range(0, total_rows, block_size):
# Compute the end index for the current chunk
end = min(start + block_size, total_rows)
end_block = min(start_block + block_size, total_rows)

# Slice the current chunk of indices and new_column_data
indices_chunk = indices[start:end]
new_column_data_chunk = new_column_data[start:end]
indices_chunk = indices[start_block:end_block]
new_column_data_chunk = new_column_data[start_block:end_block]

# Create a new block using the chunk of indices
new_block = left_block.take(indices_chunk)
Expand All @@ -171,16 +174,18 @@ def _cross_join_unnest_column(
if arrow_array.type != target_column.arrow_field.type:
arrow_array = arrow_array.cast(target_column.arrow_field.type)
new_block = pyarrow.Table.from_arrays([arrow_array], schema=schema)

statistics.time_cross_join_unnest += time.monotonic_ns() - start
yield new_block
at_least_once = True
start = time.monotonic_ns()

if not at_least_once:
# Create an empty table with the new schema
schema = left_morsel.schema
new_column = pyarrow.field(target_column.identity, pyarrow.string())
new_schema = pyarrow.schema(list(schema) + [new_column])
new_block = pyarrow.Table.from_batches([], schema=new_schema)
statistics.time_cross_join_unnest += time.monotonic_ns() - start
yield new_block


Expand Down
9 changes: 4 additions & 5 deletions opteryx/planner/binder/binder_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,10 @@ def visit_project(self, node: Node, context: BindingContext) -> Tuple[Node, Bind
(n for n in node.columns if n.schema_column.identity == column.identity),
None,
)
if node_column:
# update the column reference with any AS aliases
if node_column.alias:
node_column.schema_column.aliases.append(node_column.alias)
column.aliases.append(node_column.alias)
# update the column reference with any AS aliases
if node_column and node_column.alias:
node_column.schema_column.aliases.append(node_column.alias)
column.aliases.append(node_column.alias)
# update the schema with columns we have references to, removing redundant columns
schema.columns = schema_columns
for column in node.columns:
Expand Down

0 comments on commit 1a6b444

Please sign in to comment.