Skip to content

Commit

Permalink
Merge pull request #1976 from mabel-dev/#1917-3
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Sep 3, 2024
2 parents a3c2919 + 583510e commit 93ea9e7
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 13 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 762
__build__ = 764

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
33 changes: 24 additions & 9 deletions opteryx/operators/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ def _cross_join_unnest_column(
column_type = None
batch_size: int = INTERNAL_BATCH_SIZE
at_least_once = False
single_column_collector = []

# Loop through each morsel from the morsels execution
for left_morsel in morsels.execute():
start = time.monotonic_ns()
# Break the morsel into batches to avoid memory issues
for left_block in left_morsel.to_batches(max_chunksize=batch_size):
new_block = None
# Fetch the data of the column to be unnested
column_data = left_block[source.schema_column.identity]

Expand Down Expand Up @@ -113,26 +115,39 @@ def _cross_join_unnest_column(

if len(indices) > 0:
if single_column:
schema = pyarrow.schema(
[
pyarrow.field(
name=target_column.identity, type=target_column.arrow_field.type
)
]
)
new_block = pyarrow.Table.from_arrays([new_column_data], schema=schema)
single_column_collector.extend(new_column_data)
if len(single_column_collector) > INTERNAL_BATCH_SIZE:
schema = pyarrow.schema(
[
pyarrow.field(
name=target_column.identity, type=target_column.arrow_field.type
)
]
)
new_block = pyarrow.Table.from_arrays(
[single_column_collector], schema=schema
)
single_column_collector = []
else:
# Rebuild the block with the new column data if we have any rows to build for
new_block = left_block.take(indices)
new_block = pyarrow.Table.from_batches([new_block], schema=left_morsel.schema)
new_block = new_block.append_column(target_column.identity, [new_column_data])

statistics.time_cross_join_unnest += time.monotonic_ns() - start
if new_block.num_rows > 0:
if new_block and new_block.num_rows > 0:
yield new_block
at_least_once = True
start = time.monotonic_ns()

if single_column_collector:
schema = pyarrow.schema(
[pyarrow.field(name=target_column.identity, type=target_column.arrow_field.type)]
)
new_block = pyarrow.Table.from_arrays([single_column_collector], schema=schema)
yield new_block
at_least_once = True

if not at_least_once:
# Create an empty table with the new schema
schema = left_morsel.schema
Expand Down
2 changes: 1 addition & 1 deletion opteryx/planner/cost_based_optimizer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ def __init__(self):
"""
self.strategies = [
ConstantFoldingStrategy(),
DistinctPushdownStrategy(),
BooleanSimplificationStrategy(),
SplitConjunctivePredicatesStrategy(),
PredicateRewriteStrategy(),
PredicatePushdownStrategy(),
ProjectionPushdownStrategy(),
DistinctPushdownStrategy(),
OperatorFusionStrategy(),
RedundantOperationsStrategy(),
ConstantFoldingStrategy(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@
Type: Heuristic
Goal: Reduce Rows
Rules:
- DISTINCT ON can't get pushed
This is a very specific rule, on a CROSS JOIN UNNEST, if the result
is the only column in a DISTINCT clause, we push the DISTINCT into
the JOIN.
We've written as a Optimization rule rather than in the JOIN code
as it is expected other instances of pushing DISTINCT may be found.
Order:
This plan must run after the Projection Pushdown
"""

from opteryx.planner.logical_planner import LogicalPlan
Expand Down Expand Up @@ -47,6 +54,7 @@ def visit(self, node: LogicalPlanNode, context: OptimizerContext) -> OptimizerCo
and context.collected_distincts
and node.type == "cross join"
and node.unnest_target is not None
and node.pre_update_columns == node.unnest_target.identity
):
node.distinct = True
context.optimized_plan[context.node_id] = node
Expand Down

0 comments on commit 93ea9e7

Please sign in to comment.