Skip to content

Commit

Permalink
handle resumption upon storage quota getting freed up
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 committed Sep 19, 2024
1 parent c31eb9e commit a91b6cd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ public PauseStatusDetails resumeConsumption(String tableNameWithType, @Nullable
+ "endpoint in a few moments to double check.", new Timestamp(System.currentTimeMillis()).toString());
}

private IdealState updatePauseStateInIdealState(String tableNameWithType, boolean pause,
public IdealState updatePauseStateInIdealState(String tableNameWithType, boolean pause,
PauseState.ReasonCode reasonCode, @Nullable String comment) {
PauseState pauseState = new PauseState(pause, reasonCode, comment,
new Timestamp(System.currentTimeMillis()).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,30 +115,36 @@ protected void processTable(String tableNameWithType, Context context) {
}
}

/**
*
* Updates the table paused state based on pause validations (e.g. storage quota being exceeded).
* Skips updating the pause state if table is paused by admin.
* Returns true if table is not paused
*/
private boolean shouldEnsureConsuming(String tableNameWithType) {
PauseState pauseState = computePauseState(tableNameWithType);
return !pauseState.isPaused();
}

// Updates the table paused state based on pause validations (e.g. storage quota being exceeded).
// Skips updating the pause state if table is paused by admin
private PauseState computePauseState(String tableNameWithType) {
PauseStatusDetails pauseStatus = _llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType);
boolean isTablePaused = pauseStatus.getPauseFlag();
// if table is paused by admin then don't compute
if (!isTablePaused || pauseStatus.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED)) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
// if quota breach and pause flag is not in sync, update the IS
if (isQuotaExceeded != isTablePaused) {
String storageQuota = tableConfig.getQuotaConfig() != null ? tableConfig.getQuotaConfig().getStorage() : "NA";
pauseStatus = _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType,
PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
isQuotaExceeded ? "Storage quota of " + storageQuota + " exceeded." : "Table storage within quota limits");
}
if (isTablePaused && pauseStatus.getReasonCode().equals(PauseState.ReasonCode.ADMINISTRATIVE)) {
return false;
}
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
if (isQuotaExceeded == isTablePaused) {
return !isTablePaused;
}
// if quota breach and pause flag is not in sync, update the IS
if (isQuotaExceeded) {
String storageQuota = tableConfig.getQuotaConfig() != null ? tableConfig.getQuotaConfig().getStorage() : "NA";
// as quota is breached pause the consumption right away
_llcRealtimeSegmentManager.pauseConsumption(tableNameWithType, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
"Storage quota of " + storageQuota + " exceeded.");
} else {
// as quota limit is being honored, unset the pause state and allow consuming segment recreation.
_llcRealtimeSegmentManager.updatePauseStateInIdealState(tableNameWithType, false,
PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, "Table storage within quota limits");
}
return new PauseState(pauseStatus.getPauseFlag(), pauseStatus.getReasonCode(), pauseStatus.getComment(),
pauseStatus.getTimestamp());
return !isQuotaExceeded;
}

private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig streamConfig) {
Expand Down

0 comments on commit a91b6cd

Please sign in to comment.