From a91b6cdd373d22407c64cb36e56360e90bcc2ff4 Mon Sep 17 00:00:00 2001 From: Shounak kulkarni Date: Thu, 19 Sep 2024 16:17:40 +0530 Subject: [PATCH] handle resumption upon storage quota getting freed up --- .../PinotLLCRealtimeSegmentManager.java | 2 +- .../RealtimeSegmentValidationManager.java | 44 +++++++++++-------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index e02e9810352..7a459d7ddbe 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1745,7 +1745,7 @@ public PauseStatusDetails resumeConsumption(String tableNameWithType, @Nullable + "endpoint in a few moments to double check.", new Timestamp(System.currentTimeMillis()).toString()); } - private IdealState updatePauseStateInIdealState(String tableNameWithType, boolean pause, + public IdealState updatePauseStateInIdealState(String tableNameWithType, boolean pause, PauseState.ReasonCode reasonCode, @Nullable String comment) { PauseState pauseState = new PauseState(pause, reasonCode, comment, new Timestamp(System.currentTimeMillis()).toString()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 4c04e88532e..b8460a406a1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -115,30 +115,36 @@ protected void processTable(String tableNameWithType, Context context) { } } + /** + * + * Updates the table paused state based on pause validations (e.g. storage quota being exceeded). + * Skips updating the pause state if table is paused by admin. + * Returns true if table is not paused + */ private boolean shouldEnsureConsuming(String tableNameWithType) { - PauseState pauseState = computePauseState(tableNameWithType); - return !pauseState.isPaused(); - } - - // Updates the table paused state based on pause validations (e.g. storage quota being exceeded). - // Skips updating the pause state if table is paused by admin - private PauseState computePauseState(String tableNameWithType) { PauseStatusDetails pauseStatus = _llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType); boolean isTablePaused = pauseStatus.getPauseFlag(); // if table is paused by admin then don't compute - if (!isTablePaused || pauseStatus.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED)) { - TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); - boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig); - // if quota breach and pause flag is not in sync, update the IS - if (isQuotaExceeded != isTablePaused) { - String storageQuota = tableConfig.getQuotaConfig() != null ? tableConfig.getQuotaConfig().getStorage() : "NA"; - pauseStatus = _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType, - PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, - isQuotaExceeded ? "Storage quota of " + storageQuota + " exceeded." : "Table storage within quota limits"); - } + if (isTablePaused && pauseStatus.getReasonCode().equals(PauseState.ReasonCode.ADMINISTRATIVE)) { + return false; + } + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig); + if (isQuotaExceeded == isTablePaused) { + return !isTablePaused; + } + // if quota breach and pause flag is not in sync, update the IS + if (isQuotaExceeded) { + String storageQuota = tableConfig.getQuotaConfig() != null ? tableConfig.getQuotaConfig().getStorage() : "NA"; + // as quota is breached pause the consumption right away + _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, + "Storage quota of " + storageQuota + " exceeded."); + } else { + // as quota limit is being honored, unset the pause state and allow consuming segment recreation. + _llcRealtimeSegmentManager.updatePauseStateInIdealState(tableNameWithType, false, + PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, "Table storage within quota limits"); } - return new PauseState(pauseStatus.getPauseFlag(), pauseStatus.getReasonCode(), pauseStatus.getComment(), - pauseStatus.getTimestamp()); + return !isQuotaExceeded; } private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig streamConfig) {