-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
.../input/entityanalytics/provider/okta: Rate limiting fix, improvements #41977
.../input/entityanalytics/provider/okta: Rate limiting fix, improvements #41977
Conversation
Pinging @elastic/security-service-integrations (Team:Security-Service Integrations) |
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
|
@@ -27,6 +27,7 @@ func defaultConfig() conf { | |||
SyncInterval: 24 * time.Hour, | |||
UpdateInterval: 15 * time.Minute, | |||
LimitWindow: time.Minute, | |||
LimitFixed: nil, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't needed. Is it here for explicitness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I was thinking out loud. Doesn't need to be there. Removed.
x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go
Show resolved
Hide resolved
x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter.go
Show resolved
Hide resolved
ready := make(chan struct{}) | ||
close(ready) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest something like
// alwaysReady is an non-blocking chan.
var alwaysReady = make(chan struct{})
func init() { close(alwaysReady) }
func (r RateLimiter) endpoint(path string) endpointRateLimiter {
if existing, ok := r.byEndpoint[path]; ok {
return existing
}
limit := rate.Limit(1)
if r.fixedLimit != nil {
limit = rate.Limit(float64(*r.fixedLimit) / r.window.Seconds())
}
limiter := rate.NewLimiter(limit, 1) // Allow a single fetch operation to obtain limits from the API
newEndpointRateLimiter := endpointRateLimiter{
limiter: limiter,
ready: alwaysReady,
}
r.byEndpoint[path] = newEndpointRateLimiter
return newEndpointRateLimiter
}
so that we have a single never-blocking chan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done with the name immediatelyReady
.
It's true that channel will always be ready, but the endpointRateLimiter
's ready
field won't always have that value...
I don't know. Is this better, or the same? (or less good?)
log.Debugw("rate limit", "limit", limiter.Limit(), "burst", limiter.Burst(), "url", url.String()) | ||
return limiter.Wait(ctx) | ||
e := r.endpoint(endpoint) | ||
<-e.ready |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<-e.ready | |
<-e.ready | |
select { | |
case <-e.ready: | |
case <-ctx.Done(): | |
return ctx.Err() | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
e := r.endpoint(endpoint) | ||
<-e.ready | ||
log.Debugw("rate limit", "limit", e.limiter.Limit(), "burst", e.limiter.Burst(), "url", url.String()) | ||
ctxWithDeadline, cancel := context.WithDeadline(ctx, time.Now().Add(waitDeadline)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the deadline be calculated before the e.ready
recv?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intended to apply it only when using the rate limiter, not when requests are shut down, but I'll change it to apply to both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
target := 30.0 | ||
buffer := 0.01 | ||
|
||
if tokens < target-buffer || tokens > target+buffer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if tokens < target-buffer || tokens > target+buffer { | |
if tokens < target-buffer || target+buffer < tokens { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
wait := time.Since(start) | ||
|
||
if wait > 1010*time.Millisecond { | ||
t.Errorf("doesn't allow requests to resume after reset. had to wait %d milliseconds", wait.Milliseconds()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t.Errorf("doesn't allow requests to resume after reset. had to wait %d milliseconds", wait.Milliseconds()) | |
t.Errorf("doesn't allow requests to resume after reset. had to wait %s", wait) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
const endpoint = "/foo" | ||
limiter := r.limiter(endpoint) | ||
url, _ := url.Parse(endpoint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
url, _ := url.Parse(endpoint) | |
url, err := url.Parse(endpoint) | |
if err != nil { | |
t.Fatalf("failed to parse endpoint: %v", err) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
|
||
const endpoint = "/foo" | ||
|
||
url, _ := url.Parse(endpoint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@efd6 Addressed comments. Also added documentation changes that I just forgot to commit before. Now also has |
This pull request is now in conflicts. Could you fix it? 🙏
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
…limiting fix, improvements (#42008) * .../input/entityanalytics/provider/okta: Rate limiting fix, improvements (#41977) - Fix a bug in the stopping of requests when `x-rate-limit-remaining: 0`. - Add a deadline so long waits return immediately as errors. - Add an option to set a fixed request rate. (cherry picked from commit 08b7d84) --------- Co-authored-by: Chris Berkhout <chris.berkhout@elastic.co>
…nts (elastic#41977) - Fix a bug in the stopping of requests when `x-rate-limit-remaining: 0`. - Add a deadline so long waits return immediately as errors. - Add an option to set a fixed request rate.
Proposed commit message
Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Related issues