From 0367798dcc6aca2fe93bc46b65f9f73dd1b9a2da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Wed, 23 Oct 2024 19:05:43 +0200 Subject: [PATCH] Fix categorization of partitioning handles for writer stage We use different configuration of HashSplitAssigner based on partitining handle used by source stage. We want to determine if we are in write stage or not. The logic was wrong. --- .../faulttolerant/EventDrivenTaskSourceFactory.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSourceFactory.java index 80f03d3eff8cc..452893ce62799 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSourceFactory.java @@ -227,8 +227,7 @@ private SplitAssigner createSplitAssigner( standardSplitSizeInBytes, maxArbitraryDistributionTaskSplitCount); } - if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getCatalogHandle().isPresent() || - (partitioning.getConnectorHandle() instanceof MergePartitioningHandle)) { + if (partitioning.equals(FIXED_HASH_DISTRIBUTION)) { return HashDistributionSplitAssigner.create( partitioning.getCatalogHandle(), partitionedSources, @@ -240,7 +239,9 @@ private SplitAssigner createSplitAssigner( toIntExact(round(getFaultTolerantExecutionHashDistributionComputeTasksToNodesMinRatio(session) * nodeManager.getAllNodes().getActiveNodes().size())), Integer.MAX_VALUE); // compute tasks are bounded by the number of partitions anyways } - if (partitioning.equals(SCALED_WRITER_HASH_DISTRIBUTION)) { + if (partitioning.equals(SCALED_WRITER_HASH_DISTRIBUTION) + || partitioning.getCatalogHandle().isPresent() + || (partitioning.getConnectorHandle() instanceof MergePartitioningHandle)) { return HashDistributionSplitAssigner.create( partitioning.getCatalogHandle(), partitionedSources,