Skip to content

Commit

Permalink
Fix categorization of partitioning handles for writer stage
Browse files Browse the repository at this point in the history
We use different configuration of HashSplitAssigner based on partitining
handle used by source stage. We want to determine if we are in write
stage or not. The logic was wrong.
  • Loading branch information
losipiuk committed Oct 24, 2024
1 parent 8820f5a commit 0367798
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ private SplitAssigner createSplitAssigner(
standardSplitSizeInBytes,
maxArbitraryDistributionTaskSplitCount);
}
if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getCatalogHandle().isPresent() ||
(partitioning.getConnectorHandle() instanceof MergePartitioningHandle)) {
if (partitioning.equals(FIXED_HASH_DISTRIBUTION)) {
return HashDistributionSplitAssigner.create(
partitioning.getCatalogHandle(),
partitionedSources,
Expand All @@ -240,7 +239,9 @@ private SplitAssigner createSplitAssigner(
toIntExact(round(getFaultTolerantExecutionHashDistributionComputeTasksToNodesMinRatio(session) * nodeManager.getAllNodes().getActiveNodes().size())),
Integer.MAX_VALUE); // compute tasks are bounded by the number of partitions anyways
}
if (partitioning.equals(SCALED_WRITER_HASH_DISTRIBUTION)) {
if (partitioning.equals(SCALED_WRITER_HASH_DISTRIBUTION)
|| partitioning.getCatalogHandle().isPresent()
|| (partitioning.getConnectorHandle() instanceof MergePartitioningHandle)) {
return HashDistributionSplitAssigner.create(
partitioning.getCatalogHandle(),
partitionedSources,
Expand Down

0 comments on commit 0367798

Please sign in to comment.