Skip to content

Commit

Permalink
Remove recreateDeletedConsumingSegment flag from RealtimeSegmentVal…
Browse files Browse the repository at this point in the history
…idationManager (#14024)

* Remove recreateDeletedConsumingSegment flag

In favour of always recreating deleted consuming segments if table is not paused.

* handle resumption upon storage quota getting freed up
  • Loading branch information
shounakmk219 committed Sep 19, 2024
1 parent 9748dd0 commit 2de61de
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -912,11 +912,9 @@ private Map<Integer, SegmentZKMetadata> getLatestSegmentZKMetadataMap(String rea
* Check whether there are segments in the PROPERTYSTORE with status DONE, but no new segment in status
* IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is ONLINE.
* If so, it should create a new CONSUMING segment for the partition.
* (this operation is done only if @param recreateDeletedConsumingSegment is set to true,
* which means it's manually triggered by admin not by automatic periodic task)
*/
public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig,
boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
OffsetCriteria offsetCriteria) {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");

String realtimeTableName = tableConfig.getTableName();
Expand All @@ -938,7 +936,7 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig s
getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList,
recreateDeletedConsumingSegment, offsetCriteria);
offsetCriteria);
} else {
LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}",
realtimeTableName, isTableEnabled, isTablePaused);
Expand Down Expand Up @@ -1158,8 +1156,7 @@ private boolean isAllInstancesInState(Map<String, String> instanceStateMap, Stri
*/
@VisibleForTesting
IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, IdealState idealState,
List<PartitionGroupMetadata> newPartitionGroupMetadataList, boolean recreateDeletedConsumingSegment,
OffsetCriteria offsetCriteria) {
List<PartitionGroupMetadata> newPartitionGroupMetadataList, OffsetCriteria offsetCriteria) {
String realtimeTableName = tableConfig.getTableName();

InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
Expand Down Expand Up @@ -1275,7 +1272,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig st
instancePartitionsMap, startOffset);
} else {
if (newPartitionGroupSet.contains(partitionGroupId)) {
if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted()
if (latestSegmentZKMetadata.getStatus().isCompleted()
&& isAllInstancesInState(instanceStateMap, SegmentStateModel.ONLINE)) {
// If we get here, that means in IdealState, the latest segment has all replicas ONLINE.
// Create a new IN_PROGRESS segment in PROPERTYSTORE,
Expand Down Expand Up @@ -1737,7 +1734,6 @@ public PauseStatusDetails resumeConsumption(String tableNameWithType, @Nullable

// trigger realtime segment validation job to resume consumption
Map<String, String> taskProperties = new HashMap<>();
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");
if (offsetCriteria != null) {
taskProperties.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA, offsetCriteria);
}
Expand All @@ -1749,7 +1745,7 @@ public PauseStatusDetails resumeConsumption(String tableNameWithType, @Nullable
+ "endpoint in a few moments to double check.", new Timestamp(System.currentTimeMillis()).toString());
}

private IdealState updatePauseStateInIdealState(String tableNameWithType, boolean pause,
public IdealState updatePauseStateInIdealState(String tableNameWithType, boolean pause,
PauseState.ReasonCode reasonCode, @Nullable String comment) {
PauseState pauseState = new PauseState(pause, reasonCode, comment,
new Timestamp(System.currentTimeMillis()).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
private final int _segmentLevelValidationIntervalInSeconds;
private long _lastSegmentLevelValidationRunTimeMs = 0L;

public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY = "recreateDeletedConsumingSegment";
public static final String OFFSET_CRITERIA = "offsetCriteria";

public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
Expand Down Expand Up @@ -87,8 +86,6 @@ protected Context preprocess(Properties periodicTaskProperties) {
context._runSegmentLevelValidation = true;
_lastSegmentLevelValidationRunTimeMs = currentTimeMs;
}
context._recreateDeletedConsumingSegment =
Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY));
String offsetCriteriaStr = periodicTaskProperties.getProperty(OFFSET_CRITERIA);
if (offsetCriteriaStr != null) {
context._offsetCriteria = new OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr);
Expand All @@ -113,44 +110,41 @@ protected void processTable(String tableNameWithType, Context context) {
runSegmentLevelValidation(tableConfig, streamConfig);
}

if (shouldEnsureConsuming(tableNameWithType, context)) {
_llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig,
context._recreateDeletedConsumingSegment, context._offsetCriteria);
if (shouldEnsureConsuming(tableNameWithType)) {
_llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig, context._offsetCriteria);
}
}

private boolean shouldEnsureConsuming(String tableNameWithType, Context context) {
// Keeps the table paused/unpaused based pause validations.
// Skips updating the pause state if table is paused by admin
PauseState pauseState = computePauseState(tableNameWithType);
if (!pauseState.isPaused()) {
boolean unPausedUponStorageWithinQuota =
pauseState.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED);
if (unPausedUponStorageWithinQuota) {
// recreate consuming segments if table is resumed upon the table storage getting within quota limit
context._recreateDeletedConsumingSegment = true;
}
}
return !pauseState.isPaused();
}

private PauseState computePauseState(String tableNameWithType) {
/**
*
* Updates the table paused state based on pause validations (e.g. storage quota being exceeded).
* Skips updating the pause state if table is paused by admin.
* Returns true if table is not paused
*/
private boolean shouldEnsureConsuming(String tableNameWithType) {
PauseStatusDetails pauseStatus = _llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType);
boolean isTablePaused = pauseStatus.getPauseFlag();
// if table is paused by admin then don't compute
if (!isTablePaused || pauseStatus.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED)) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
// if quota breach and pause flag is not in sync, update the IS
if (isQuotaExceeded != isTablePaused) {
String storageQuota = tableConfig.getQuotaConfig() != null ? tableConfig.getQuotaConfig().getStorage() : "NA";
pauseStatus = _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType,
PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
isQuotaExceeded ? "Storage quota of " + storageQuota + " exceeded." : "Table storage within quota limits");
}
if (isTablePaused && pauseStatus.getReasonCode().equals(PauseState.ReasonCode.ADMINISTRATIVE)) {
return false;
}
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
if (isQuotaExceeded == isTablePaused) {
return !isTablePaused;
}
// if quota breach and pause flag is not in sync, update the IS
if (isQuotaExceeded) {
String storageQuota = tableConfig.getQuotaConfig() != null ? tableConfig.getQuotaConfig().getStorage() : "NA";
// as quota is breached pause the consumption right away
_llcRealtimeSegmentManager.pauseConsumption(tableNameWithType, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
"Storage quota of " + storageQuota + " exceeded.");
} else {
// as quota limit is being honored, unset the pause state and allow consuming segment recreation.
_llcRealtimeSegmentManager.updatePauseStateInIdealState(tableNameWithType, false,
PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, "Table storage within quota limits");
}
return new PauseState(pauseStatus.getPauseFlag(), pauseStatus.getReasonCode(), pauseStatus.getComment(),
pauseStatus.getTimestamp());
return !isQuotaExceeded;
}

private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig streamConfig) {
Expand Down Expand Up @@ -204,7 +198,6 @@ public void cleanUpTask() {

public static final class Context {
private boolean _runSegmentLevelValidation;
private boolean _recreateDeletedConsumingSegment;
private OffsetCriteria _offsetCriteria;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,8 +878,7 @@ public void testStopSegmentManager()
// Expected
}
try {
segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false,
null);
segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, null);
fail();
} catch (IllegalStateException e) {
// Expected
Expand Down Expand Up @@ -1146,7 +1145,7 @@ public void setUpNewTable() {

public void ensureAllPartitionsConsuming() {
ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false, null);
getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), null);
}

@Override
Expand Down

0 comments on commit 2de61de

Please sign in to comment.