Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage Quota imposition on Realtime tables #13584

Merged
merged 14 commits into from
Sep 18, 2024

Conversation

shounakmk219
Copy link
Collaborator

@shounakmk219 shounakmk219 commented Jul 11, 2024

Description

This PR allows pinot to impose storage quota restrictions on realtime tables.
To block the ingestion on realtime tables below consideration are kept in mind

  1. Replicas should be consistent.
  2. Ingestion should auto resume upon quota increase or freed up storage.
  3. Easy observability into the blocked ingestion with relevant info.
  4. Query results should be consistent

Also to keep it simple the ingestion is blocked only when storage quotas are breached

Blocking ingestion

  • The storage quota will be imposed as part of the RealtimeSegmentValidationManager controller job.
  • If the table is breaching quota, the PauseState is updated accordingly by setting pause reasonCode as STORAGE_QUOTA_EXCEEDED on the table IS
  • Pause state from table IS is checked during the new consuming segment creation
  • If table is paused then new consuming segments will not be created
  • Ongoing segment commit will be unaffected.

What if the pauseState is updated manually on zk directly to resume the table ingestion?

Updating the pauseState alone will not create the new consuming segments. New consuming segments can be created by:

  1. Calling the /resumeConsumption API on controller
  2. Running RealtimeSegmentValidationManager job with setting recreateDeletedConsumingSegment as true

The resume consumption API internally depends on the RealtimeSegmentValidationManager task itself to handle the consuming segment creation.
As the storage quota validation is itself part of the RealtimeSegmentValidationManager job it will again check for the storage quota and set the pauseState to the right value.

Resuming ingestion

Once a table exceeds the storage quota, we may need to resume the consumption in below cases:

  1. Storage quota is extended for the table
  2. Storage frees up upon older segments being deleted (by retention manager)

To automate the resume consumption flow we will depend on the RealtimeSegmentValidationManager job itself.
In case the user does not want to wait for the periodic job run, they can manually trigger the job from the controller API or use the resume consumption endpoint.

@shounakmk219 shounakmk219 added Configuration Config changes (addition/deletion/change in behavior) real-time labels Jul 11, 2024
@shounakmk219 shounakmk219 added the release-notes Referenced by PRs that need attention when compiling the next release notes label Jul 11, 2024
@codecov-commenter
Copy link

codecov-commenter commented Jul 11, 2024

Codecov Report

Attention: Patch coverage is 54.05405% with 17 lines in your changes missing coverage. Please review.

Project coverage is 57.93%. Comparing base (59551e4) to head (0739028).
Report is 1037 commits behind head on master.

Files with missing lines Patch % Lines
...r/validation/RealtimeSegmentValidationManager.java 54.54% 4 Missing and 6 partials ⚠️
...not/controller/validation/StorageQuotaChecker.java 54.54% 3 Missing and 2 partials ⚠️
.../core/realtime/PinotLLCRealtimeSegmentManager.java 0.00% 1 Missing ⚠️
.../org/apache/pinot/spi/config/table/PauseState.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13584      +/-   ##
============================================
- Coverage     61.75%   57.93%   -3.82%     
- Complexity      207      219      +12     
============================================
  Files          2436     2613     +177     
  Lines        133233   143294   +10061     
  Branches      20636    22004    +1368     
============================================
+ Hits          82274    83021     +747     
- Misses        44911    53766    +8855     
- Partials       6048     6507     +459     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 57.86% <24.32%> (-3.85%) ⬇️
java-21 57.82% <54.05%> (-3.81%) ⬇️
skip-bytebuffers-false 57.93% <54.05%> (-3.82%) ⬇️
skip-bytebuffers-true 57.75% <24.32%> (+30.02%) ⬆️
temurin 57.93% <54.05%> (-3.82%) ⬇️
unittests 57.93% <54.05%> (-3.82%) ⬇️
unittests1 40.72% <0.00%> (-6.17%) ⬇️
unittests2 27.99% <54.05%> (+0.26%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly good. Can we add a test to verify this?

@swaminathanmanish @sajjad-moradi please also take a look

ZNRecord znRecord = idealState.getRecord();
znRecord.setSimpleField(IS_QUOTA_EXCEEDED, Boolean.valueOf(quotaExceeded).toString());
return new IdealState(znRecord);
}, RetryPolicies.noDelayRetryPolicy(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not allow retry here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong reason to not retry. Was keeping it inline with the table pause IS update.
Right now in case of failure the next segment commit or validation job, whichever runs first, will update the IS.
Let me know if you have any magic number for retry in mind, maybe 3?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do retry, we will have to move this check also inside the updater lambda right
IdealState is = getIdealState(tableNameWithType);
if (is.getRecord().getBooleanField(IS_QUOTA_EXCEEDED, false) != quotaExceeded)

@sajjad-moradi
Copy link
Contributor

Mostly good. Can we add a test to verify this?

@swaminathanmanish @sajjad-moradi please also take a look

Will review next week.

ZNRecord znRecord = idealState.getRecord();
znRecord.setSimpleField(IS_QUOTA_EXCEEDED, Boolean.valueOf(quotaExceeded).toString());
return new IdealState(znRecord);
}, RetryPolicies.noDelayRetryPolicy(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do retry, we will have to move this check also inside the updater lambda right
IdealState is = getIdealState(tableNameWithType);
if (is.getRecord().getBooleanField(IS_QUOTA_EXCEEDED, false) != quotaExceeded)

Copy link
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@shounakmk219
Copy link
Collaborator Author

@sajjad-moradi can you please review the PR?
Thanks!

@sajjad-moradi
Copy link
Contributor

I just had a discussion with @mcvsubbu about this. Here's what we think:
With the proposed solution in this PR, there will be two IS flags indicating if the table is paused: "isTablePaused" and "isQuotaExceeded". That doesn't sound right. Instead we can have a more detailed field called pauseStatus. Here's an example for the storage quota exceed case:

"pauseStatus": {
  "isPaused": true,
  "reasonCode": "STORAGE_QUOTA_EXCEEDED",
  "comment": "Current storage 2.3G exceeds quota 2.0G",
  "time": "2024-08-01T09:30:00Z"
}

And here is an example for a regular pause that's caused by an admin hitting the pause endpoint:

"pauseStatus": {
  "isPaused": true,
  "reasonCode": "ADMINISTRATIVE",
  "comment": "Need to change the underlying topic",
  "time": "2024-08-01T09:30:00Z"
}

This way in case in future there's another reason for pausing the table, there's no need to add yet another field to IS to pause the table. We can simply do that by adding another reasonCode which is an enum in the source code.
Moreover, in case in future we want to extend pause functionality like adding support for partition level pausing, we can simply add more fields to pauseStatus.

@shounakmk219
Copy link
Collaborator Author

@sajjad-moradi @mcvsubbu , the pauseStatus field makes sense. Do you think we need to maintain the set of different pause reasons active on a table as well? Context will be lost during the reason being overridden/not recorded based on how we define the priority of these reasons.

  • paused consumption should not be overridden by quota breach as in that case consumption will resume once quota gets honoured
  • quota breach can be overridden by paused consumption as anyways quota limit is checked every time before resuming consumption.

Here the priority is well established so may not be a big concern.

@sajjad-moradi
Copy link
Contributor

sajjad-moradi commented Aug 7, 2024

The priority you described makes sense. Basically storage quota should not be even checked if the table is paused by an admin.

@sajjad-moradi
Copy link
Contributor

On a separate note, @mcvsubbu thinks it's better to have all the logic for hanlding storage quota (pausing/resuming) in one place, in "realtime segment validation manager" periodic job.
Of course the benefit is that all the logic is in one place, so it's better in terms of maintainability. The downside of doing that is the the storage won't be checked till the next round of the periodic job. Maybe that's ok because normally consuming segments take hours to complete, and even checking quota upon segment completion is also delayed. WDYT?

@sajjad-moradi
Copy link
Contributor

@Jackie-Jiang any thoughts here?

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 7, 2024

On a separate note, @mcvsubbu thinks it's better to have all the logic for hanlding storage quota (pausing/resuming) in one place, in "realtime segment validation manager" periodic job. Of course the benefit is that all the logic is in one place, so it's better in terms of maintainability. The downside of doing that is the the storage won't be checked till the next round of the periodic job. Maybe that's ok because normally consuming segments take hours to complete, and even checking quota upon segment completion is also delayed. WDYT?

There is enough going on during the segment completion, that I would hate to add more logic there with more conditions. Can you state why you need the quota to be checked during segment completion?

I am even fine adding logic in the background task to remove latest segments (and adjust consumption offsets, of course) if the quota is exceeded (it may matter in some cases that the excess quota will result in charges). On the other hand, if this is the case, the metric on current storage use should be monitored in order to flag such situations early and in advance

@shounakmk219
Copy link
Collaborator Author

shounakmk219 commented Aug 7, 2024

Hey @mcvsubbu @sajjad-moradi , it is ok if the table storage spills over the quota by few segments before stopping the consumption and that said we can keep the quota validation + IS update logic at one place in segment validation manager.
Also as per the suggestion of introducing pauseStatus, a single check on pauseStatus will cover both the pause reasons, eliminating the need of adding any extra checks during segment completion.

Can you state why you need the quota to be checked during segment completion?

There is no strict storage quota imposition requirement, but also didn’t saw any concern in putting the check during segment completion

remove latest segments (and adjust consumption offsets, of course) if the quota is exceeded

We don’t want to do this as this will affect the query results (moving back in time).

If it turns out that segment validation manager job frequency is not working for the storage quota imposition we can introduce a separate job when needed.
Let me know if this sounds good.

@shounakmk219 shounakmk219 force-pushed the realtime-table-storage-quota branch 2 times, most recently from ec9e673 to 17ae277 Compare August 30, 2024 15:02
@shounakmk219
Copy link
Collaborator Author

Hey @mcvsubbu @sajjad-moradi I have updated the PR with the changes from PauseState PR and also removed the storage quota check from segment commit process and now only have it as part of the RealtimeSegmentValidationManager.
Please take another look!
Thanks

Copy link
Contributor

@sajjad-moradi sajjad-moradi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making all the changes.

_tableSizeReader = tableSizeReader;
_controllerMetrics = controllerMetrics;
_leadControllerManager = leadControllerManager;
_pinotHelixResourceManager = pinotHelixResourceManager;
_controllerConf = controllerConf;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. I may just pick up whatever is needed from config and not keep the entire config object here. In this case, an _isEnabled boolean will suffice?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, will make the change.

@@ -43,14 +44,19 @@ public class StorageQuotaChecker {
private final ControllerMetrics _controllerMetrics;
private final LeadControllerManager _leadControllerManager;
private final PinotHelixResourceManager _pinotHelixResourceManager;
private final ControllerConf _controllerConf;
private final int _timeout;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is useful to call it _timeoutMs since it improves readability.

_llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig,
context._recreateDeletedConsumingSegment, context._offsetCriteria);
context._recreateDeletedConsumingSegment || !pauseState.isPaused(), context._offsetCriteria);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this to not call ensureAllPartitionsConsuming if the table is paused (or storage is exceeeded) rather than overloading the paused boolean with recreation of deleted segment.

// Skips updating the pause state if table is paused by admin
PauseState pauseState = computePauseState(tableNameWithType);

if (!pauseState.isPaused()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the nit, but this reads like it is invoking the periodic consumption fix if the table is not paused. That is partially true, but another factor is that the table may have run out of quota.

So, it is better to create a method like shouldEnsurePartitionsConsuming() from which we return a boolean instead of returning the PauseState.

Thanks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a chat offline, will handle the future of _recreateDeletedConsumingSegment flag in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I learned that the flag is not quite as I had imagined. Yes, it is a config, and if turned true, then it ends up invoking the periodic job with 'true' (which ends up handling a condition of someone accidentally or intentionally removing the consuming segments of a table).

the flag is also used in the "resume" part because resume happens via kicking off the periodic task.

cc: @sajjad-moradi

boolean unPausedUponStorageWithinQuota =
pauseState.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED);
_llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig,
context._recreateDeletedConsumingSegment || unPausedUponStorageWithinQuota, context._offsetCriteria);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us re-factor this as follows:

  • Create a method called shouldRestartConsumption() that returns a boolean
  • Change the original call to ensureConsuming to be under an if check based on the return of this method
  • The method shouldRestartConsumption() returns false if either one of the conditions hold: (a) the table has been paused by admin (2) the table is out of quota

Agree?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about _recreateDeletedConsumingSegment ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave it as it is. That variable is not needed to evaluate shouldRestartConsumption() (actually, a better name could be shouldEnsureConsuming()). If the condition evaluates to true, then call it the same way as before.

pauseState.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED);
if (unPausedUponStorageWithinQuota) {
// recreate consuming segments if table is resumed upon the table storage getting within quota limit
context._recreateDeletedConsumingSegment = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
context._recreateDeletedConsumingSegment = true;
return true;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the quota is increased on table or storage is freed due to retention, how will the consuming segments be recreated unless we set this flag?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I was thinking of the world when the flag is not there. Maybe it is good to remove the flag in this PR. Alternatively, you can commit this and either one of us can remove the flag.

here is what I have for the flag removal:

  • change ensureConsuming() method to be in favor of the flag always being true
  • Remove the flag from the config and the map. Anyone who deletes the consuming segments by mistake will want to restart consumption, so it will work. Anyone who deliberately wants to discard consuming segments can do with the pause consumption operation command we have. If necessary, we can enhance the pause command to optionally discard current consuming segments.

I will give a ship it, let me know if you want me to raise a PR to remove that flag.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, I'll raise a PR to remove the flag

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KKcorps KKcorps merged commit e9271f6 into apache:master Sep 18, 2024
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) real-time release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants