From 26e1b06f2c48688d5d67fd385b9eba6692990b19 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 5 Nov 2024 19:54:12 +0800 Subject: [PATCH] Revert "[fix](load) fix broker load progress due to retry" This reverts commit c9ec9ecdf9ddbdcbb851548e1bc4dca47ad2c4d7. --- .../org/apache/doris/load/loadv2/ProgressManager.java | 8 ++++---- .../src/main/java/org/apache/doris/qe/Coordinator.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java index 87767e484771825..c333f88ad98a3a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java @@ -60,10 +60,10 @@ public void updateProgress(String id, TUniqueId queryId, TUniqueId fragmentId, i } } - public void setTotalScanNums(String id, int num) { + public void addTotalScanNums(String id, int num) { Progress progress = idToProgress.get(id); if (progress != null) { - progress.setTotalScanNums(num); + progress.addTotalScanNums(num); } } @@ -87,8 +87,8 @@ static class Progress { private Table finishedScanNums = HashBasedTable.create(); private int totalScanNums = 0; - public synchronized void setTotalScanNums(int num) { - totalScanNums = num; + public synchronized void addTotalScanNums(int num) { + totalScanNums += num; } public synchronized void updateFinishedScanNums(TUniqueId queryId, TUniqueId fragmentId, int finishedScanNum) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index e42e9d67f6c1ac8..294e9e3405690c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -628,7 +628,7 @@ public TPipelineFragmentParams getStreamLoadPlan() throws Exception { List relatedBackendIds = Lists.newArrayList(addressToBackendID.values()); Env.getCurrentEnv().getLoadManager().initJobProgress(jobId, queryId, instanceIds, relatedBackendIds); - Env.getCurrentEnv().getProgressManager().setTotalScanNums(String.valueOf(jobId), scanRangeNum); + Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum); LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet()); Map tExecPlanFragmentParams @@ -777,7 +777,7 @@ private void execInternal() throws Exception { List relatedBackendIds = Lists.newArrayList(addressToBackendID.values()); Env.getCurrentEnv().getLoadManager().initJobProgress(jobId, queryId, instanceIds, relatedBackendIds); - Env.getCurrentEnv().getProgressManager().setTotalScanNums(String.valueOf(jobId), scanRangeNum); + Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum); LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet()); }