-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add base class of rate limited queue #60
Conversation
HCAD can bombard ES with "thundering herd" traffic, in which many entities make requests that need similar ES reads/writes at approximately the same time. To remedy this issue, we queue the requests and ensure that only a limited set of requests are out for ES reads/writes. This PR adds the class RateLimitedQueue that is the parent abstract class of all of the queues. The process is asynchronous as the put and execute operations do not block each other. How to execute requests is abstracted out and left to RateLimitedQueue's subclasses to implement. Each request is associated with a segment. That is, a queue consists of segments. Segments have their corresponding priorities: HIGH, MEDIUM, and LOW. An example of HIGH priority requests is anomaly results with errors or its anomaly grade larger than zero. An example of MEDIUM priority requests is a cold start request for an entity. An example of LOW priority requests is checkpoint write requests for a cold entity. LOW priority requests have the slightest chance to be selected to be executed. MEDIUM and HIGH priority requests have higher stakes. LOW priority requests have higher chances of being deleted when the size of the queue reaches beyond a limit compared to MEDIUM/HIGH priority requests. Testing done: 1. Manual tests using 10 HCAD detectors and 12,000 entities in a 3 node cluster.
src/main/java/org/opensearch/ad/ratelimit/EntityFeatureRequest.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/ratelimit/RateLimitedQueue.java
Outdated
Show resolved
Hide resolved
*/ | ||
public abstract class RateLimitedQueue<RequestType extends QueuedRequest> implements MaintenanceState { | ||
/** | ||
* Each request is associated with a segment. That is, a queue consists of segments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does the terminology segment come from? Why doesn't the queue just consist of requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I coined the word. Because different requests have different priorities. Want to differentiate them using segments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Segment doesn't seem intuitive to me, but Im not sure I have a better name. Segment just appears to be a queue of requests, so why can't it just be RequestQueue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit strange to call a variable queue inside a queue. Any other name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But a segment is just a queue of requests with additional meta data describing when it was accessed. Therefore, RequestQueue or AccessTimeStampedRequestQueue are good names. I understand what the class is just from the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we rename RateLimitedQueue to RateLimitedRequestWorker and rename segment to RequestQueue?
src/main/java/org/opensearch/ad/ratelimit/RateLimitedQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/ratelimit/RateLimitedQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/ratelimit/RateLimitedQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/ratelimit/RateLimitedQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/ratelimit/RateLimitedQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/ratelimit/RateLimitedQueue.java
Outdated
Show resolved
Hide resolved
private Instant lastAccessTime; | ||
// data structure to hold requests | ||
private BlockingQueue<RequestType> content; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we use different ways to access these 2 variables? in 229 line, segment.content is used to access content. but some accessing methods are also provided such as setLastAccessTime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, removed setLastAccessTime and access variables directly.
// For medium priority requests, the segment id is detector id. The objective | ||
// is to separate requests from different detectors and fairly process requests | ||
// from each detector. | ||
protected final ConcurrentSkipListMap<String, Segment> requestSegments; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this map? I see this class is always segment priority aware. we can add 3 different queue field variables instead of a map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for medium priority requests, the segment id is detector id. So will need the map.
* | ||
* @param <RequestType> Individual request type that is a subtype of ADRequest | ||
*/ | ||
public abstract class RateLimitedQueue<RequestType extends QueuedRequest> implements MaintenanceState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use a better name? I think this class isn't just a queue, also includes the logics to handle the requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any suggestion?
} | ||
|
||
public void put(RequestType request) throws InterruptedException { | ||
this.content.put(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: put is a blocking call although it's rare since the queue capacity is INTEGER.MAX_VALUE. if we wanna have a capacity limit in future, we probably don't wanna putting thread is blocked as queue is full.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queue's size is maintained in other places. Please check RateLimitedQueue.maintainForMemory. I know it won't be full so don't do anything for the blocking call here.
private final long dataStartTimeMillis; | ||
|
||
public EntityFeatureRequest( | ||
long expirationEpochMs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how to decide this value? do we need this to be set per request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expiry time is the start timestamp of the next detector run. Yes, it needs to be set per request.
|
||
private static final Logger LOG = LogManager.getLogger(RateLimitedQueue.class); | ||
|
||
protected int queueSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add volatile
as it will be used in setting update consumer in line 159 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
try { | ||
Segment requestQueue = requestSegments | ||
.computeIfAbsent( | ||
SegmentPriority.MEDIUM == request.getPriority() ? request.getDetectorId() : request.getPriority().name(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only consider MEDIUM
priority here? Can you add some comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because only medium priority segments use detector id as the key of the segment map. low and high priority requests just use the segment priority (i.e., low or high) as the key of the segment map. Added the comments.
// map from segment Id to its segment. | ||
// For high priority requests, the segment id is SegmentPriority.HIGH.name(). | ||
// For low priority requests, the segment id is SegmentPriority.LOW.name(). | ||
// For medium priority requests, the segment id is detector id. The objective |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't separate high priority requests on detector level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes things more complicated and I don't find the need now :)
|
||
BlockingQueue<RequestType> requests = segment.content; | ||
|
||
if (requests != null && false == requests.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: false == requests.isEmpty()
-> !requests.isEmpty()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a style I try to use false instead of ! as it is easier to read: https://stackoverflow.com/questions/11831881/if-boolean-false-vs-if-boolean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different people has different understanding https://softwareengineering.stackexchange.com/questions/136908/why-use-boolean-variable-over-boolean-variable-false
Not a big problem
startId = requestSegments.higherKey(startId); | ||
} | ||
|
||
if (startId.equals(SegmentPriority.LOW.name())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have different logic for high priority as well? We just pull high and medium requests with round-robin way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we just pull high and medium requests with round-robin way. We can definitely add a special branch for high priority requests in the future :)
// remove until reaching below queueSize | ||
do { | ||
prune(requestSegments); | ||
} while (isSizeExceeded()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we calculate how many requests we should prune directly to avoid this while loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Created an issue: #66
Will do it after cutoff.
This PR is a conglomerate of the following PRs. #60 #64 #65 #67 #68 #69 #70 #71 #74 #75 #76 #77 #78 #79 #82 #83 #84 #92 #94 #93 #95 kaituo#1 kaituo#2 kaituo#3 kaituo#4 kaituo#5 kaituo#6 kaituo#7 kaituo#8 kaituo#9 kaituo#10 This spreadsheet contains the mappings from files to PR number (bug fix in my AD fork and tests are not included): https://gist.github.com/kaituo/9e1592c4ac4f2f449356cb93d0591167
…ject#121) This PR is a conglomerate of the following PRs. opensearch-project#60 opensearch-project#64 opensearch-project#65 opensearch-project#67 opensearch-project#68 opensearch-project#69 opensearch-project#70 opensearch-project#71 opensearch-project#74 opensearch-project#75 opensearch-project#76 opensearch-project#77 opensearch-project#78 opensearch-project#79 opensearch-project#82 opensearch-project#83 opensearch-project#84 opensearch-project#92 opensearch-project#94 opensearch-project#93 opensearch-project#95 kaituo#1 kaituo#2 kaituo#3 kaituo#4 kaituo#5 kaituo#6 kaituo#7 kaituo#8 kaituo#9 kaituo#10 This spreadsheet contains the mappings from files to PR number (bug fix in my AD fork and tests are not included): https://gist.github.com/kaituo/9e1592c4ac4f2f449356cb93d0591167
…ject#121) This PR is a conglomerate of the following PRs. opensearch-project#60 opensearch-project#64 opensearch-project#65 opensearch-project#67 opensearch-project#68 opensearch-project#69 opensearch-project#70 opensearch-project#71 opensearch-project#74 opensearch-project#75 opensearch-project#76 opensearch-project#77 opensearch-project#78 opensearch-project#79 opensearch-project#82 opensearch-project#83 opensearch-project#84 opensearch-project#92 opensearch-project#94 opensearch-project#93 opensearch-project#95 kaituo#1 kaituo#2 kaituo#3 kaituo#4 kaituo#5 kaituo#6 kaituo#7 kaituo#8 kaituo#9 kaituo#10 This spreadsheet contains the mappings from files to PR number (bug fix in my AD fork and tests are not included): https://gist.github.com/kaituo/9e1592c4ac4f2f449356cb93d0591167
This PR is a conglomerate of the following PRs. #60 #64 #65 #67 #68 #69 #70 #71 #74 #75 #76 #77 #78 #79 #82 #83 #84 #92 #94 #93 #95 kaituo#1 kaituo#2 kaituo#3 kaituo#4 kaituo#5 kaituo#6 kaituo#7 kaituo#8 kaituo#9 kaituo#10 This spreadsheet contains the mappings from files to PR number (bug fix in my AD fork and tests are not included): https://gist.github.com/kaituo/9e1592c4ac4f2f449356cb93d0591167
Note: since there are a lot of dependencies, I only list the main class and test code to save reviewers' time. The build will fail due to missing dependencies. I will use that PR just for review. will not merge it. Will have a big one in the end and merge once after all review PRs get approved. Now the code is missing unit tests. Posting PRs now to meet the cutoff date (June 1). Will add unit tests, run performance tests, and fix bugs before the official release.
Description
HCAD can bombard Opensearch with "thundering herd" traffic, in which many entities make requests that need similar Opensearch reads/writes at approximately the same time. To remedy this issue, we queue the requests and ensure that only a limited set of requests are out for Opensearch reads/writes.
This PR adds the class RateLimitedQueue that is the parent abstract class of all of the queues. The process is asynchronous as the put and execute operations do not block each other. How to execute requests is abstracted out and left to RateLimitedQueue's subclasses to implement.
Each request is associated with a segment. That is, a queue consists of segments. Segments have their corresponding priorities: HIGH, MEDIUM, and LOW. An example of HIGH priority requests is anomaly results with errors or its anomaly grade larger than zero. An example of MEDIUM priority requests is a cold start request for an entity. An example of LOW priority requests is checkpoint write requests for a cold entity. LOW priority requests have the slightest chance to be selected to be executed. MEDIUM and HIGH priority requests have higher stakes. LOW priority requests have higher chances of being deleted when the size of the queue reaches beyond a limit compared to MEDIUM/HIGH priority requests.
Testing done:
Issues Resolved
[List any issues this PR will resolve]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.