Skip to content

Commit

Permalink
[8.16](backport #41583) x-pack/filebeat/input/entityanalytics/provide…
Browse files Browse the repository at this point in the history
…r/okta: Rate limiting fixes (#41901)

x-pack/filebeat/input/entityanalytics/provider/okta: Rate limiting fixes (#41583)

- Separate rate limits by endpoint.
- Stop requests until reset when `x-rate-limit-remaining: 0`.

(cherry picked from commit 4e19d09)

- Resolved conflicts by removing changes to `GetUserFactors`, `GetUserRoles` and `GetGroupRoles`, because 8.16 doesn't have #41044.
  • Loading branch information
mergify[bot] authored Dec 6, 2024
1 parent d0c8b9f commit 8c7d219
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393]
- The azure-eventhub input now correctly reports its status to the Elastic Agent on fatal errors {pull}41469[41469]
- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765]
- Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@ import (
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"

"golang.org/x/time/rate"

"github.com/elastic/elastic-agent-libs/logp"
)

Expand Down Expand Up @@ -164,16 +160,23 @@ func (o Response) String() string {
// https://${yourOktaDomain}/reports/rate-limit.
//
// See https://developer.okta.com/docs/reference/api/users/#list-users for details.
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
const endpoint = "/api/v1/users"
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
var endpoint, path string
if user == "" {
endpoint = "/api/v1/users"
path = endpoint
} else {
endpoint = "/api/v1/users/{user}"
path = strings.Replace(endpoint, "{user}", user, 1)
}

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, user),
Path: path,
RawQuery: query.Encode(),
}
return getDetails[User](ctx, cli, u, key, user == "", omit, lim, window, log)
return getDetails[User](ctx, cli, u, endpoint, key, user == "", omit, lim, window, log)
}

// GetUserGroupDetails returns Okta group details using the users API endpoint. host is the
Expand All @@ -182,19 +185,20 @@ func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user strin
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/reference/api/users/#request-parameters-8 (no anchor exists on the page for this endpoint) for details.
func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) {
const endpoint = "/api/v1/users"

func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) {
if user == "" {
return nil, nil, errors.New("no user specified")
}

const endpoint = "/api/v1/users/{user}/groups"
path := strings.Replace(endpoint, "{user}", user, 1)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, user, "groups"),
Path: path,
}
return getDetails[Group](ctx, cli, u, key, true, OmitNone, lim, window, log)
return getDetails[Group](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
}

// GetDeviceDetails returns Okta device details using the list devices API endpoint. host is the
Expand All @@ -204,16 +208,24 @@ func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices for details.
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) {
const endpoint = "/api/v1/devices"
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) {
var endpoint string
var path string
if device == "" {
endpoint = "/api/v1/devices"
path = endpoint
} else {
endpoint = "/api/v1/devices/{device}"
path = strings.Replace(endpoint, "{device}", device, 1)
}

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, device),
Path: path,
RawQuery: query.Encode(),
}
return getDetails[Device](ctx, cli, u, key, device == "", OmitNone, lim, window, log)
return getDetails[Device](ctx, cli, u, endpoint, key, device == "", OmitNone, lim, window, log)
}

// GetDeviceUsers returns Okta user details for users associated with the provided device identifier
Expand All @@ -223,21 +235,22 @@ func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device s
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDeviceUsers for details.
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
if device == "" {
// No user associated with a null device. Not an error.
return nil, nil, nil
}

const endpoint = "/api/v1/devices"
const endpoint = "/api/v1/devices/{device}/users"
path := strings.Replace(endpoint, "{device}", device, 1)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, device, "users"),
Path: path,
RawQuery: query.Encode(),
}
du, h, err := getDetails[devUser](ctx, cli, u, key, true, omit, lim, window, log)
du, h, err := getDetails[devUser](ctx, cli, u, endpoint, key, true, omit, lim, window, log)
if err != nil {
return nil, h, err
}
Expand All @@ -262,7 +275,7 @@ type devUser struct {
// for the specific user are returned, otherwise a list of all users is returned.
//
// See GetUserDetails for details of the query and rate limit parameters.
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, key string, all bool, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) {
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) {
url := u.String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
Expand All @@ -276,8 +289,7 @@ func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, key
req.Header.Set("Content-Type", contentType)
req.Header.Set("Authorization", fmt.Sprintf("SSWS %s", key))

log.Debugw("rate limit", "limit", lim.Limit(), "burst", lim.Burst(), "url", url)
err = lim.Wait(ctx)
err = lim.Wait(ctx, endpoint, u, log)
if err != nil {
return nil, nil, err
}
Expand All @@ -286,7 +298,7 @@ func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, key
return nil, nil, err
}
defer resp.Body.Close()
err = oktaRateLimit(resp.Header, window, lim, log)
err = lim.Update(endpoint, resp.Header, window, log)
if err != nil {
io.Copy(io.Discard, resp.Body)
return nil, nil, err
Expand Down Expand Up @@ -349,59 +361,6 @@ func (e *Error) Error() string {
return fmt.Sprintf("%s: %s", summary, strings.Join(causes, ","))
}

// oktaRateLimit implements the Okta rate limit policy translation.
//
// See https://developer.okta.com/docs/reference/rl-best-practices/ for details.
func oktaRateLimit(h http.Header, window time.Duration, limiter *rate.Limiter, log *logp.Logger) error {
limit := h.Get("X-Rate-Limit-Limit")
remaining := h.Get("X-Rate-Limit-Remaining")
reset := h.Get("X-Rate-Limit-Reset")
log.Debugw("rate limit header", "X-Rate-Limit-Limit", limit, "X-Rate-Limit-Remaining", remaining, "X-Rate-Limit-Reset", reset)
if limit == "" || remaining == "" || reset == "" {
return nil
}

lim, err := strconv.ParseFloat(limit, 64)
if err != nil {
return err
}
rem, err := strconv.ParseFloat(remaining, 64)
if err != nil {
return err
}
rst, err := strconv.ParseInt(reset, 10, 64)
if err != nil {
return err
}
resetTime := time.Unix(rst, 0)
per := time.Until(resetTime).Seconds()

// Be conservative here; the docs don't exactly specify burst rates.
// Make sure we can make at least one new request, even if we fail
// to get a non-zero rate.Limit. We could set to zero for the case
// that limit=rate.Inf, but that detail is not important.
burst := 1

rateLimit := rate.Limit(rem / per)

// Process reset if we need to wait until reset to avoid a request against a zero quota.
if rateLimit <= 0 {
waitUntil := resetTime.UTC()
// next gives us a sane next window estimate, but the
// estimate will be overwritten when we make the next
// permissible API request.
next := rate.Limit(lim / window.Seconds())
limiter.SetLimitAt(waitUntil, next)
limiter.SetBurstAt(waitUntil, burst)
log.Debugw("rate limit adjust", "reset_time", waitUntil, "next_rate", next, "next_burst", burst)
return nil
}
limiter.SetLimit(rateLimit)
limiter.SetBurst(burst)
log.Debugw("rate limit adjust", "set_rate", rateLimit, "set_burst", burst)
return nil
}

// Next returns the next URL query for a pagination sequence. If no further
// page is available, Next returns io.EOF.
func Next(h http.Header) (query url.Values, err error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func Test(t *testing.T) {
t.Skip("okta tests require ${OKTA_TOKEN} to be set")
}

// Make a global limiter with the capacity to proceed once.
limiter := rate.NewLimiter(1, 1)
// Make a global limiter
limiter := NewRateLimiter()

// There are a variety of windows, the most conservative is one minute.
// The rate limit will be adjusted on the second call to the API if
Expand Down Expand Up @@ -213,14 +213,14 @@ var localTests = []struct {
name string
msg string
id string
fn func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) (any, http.Header, error)
fn func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error)
mkWant func(string) (any, error)
}{
{
// Test case constructed from API-returned value with details anonymised.
name: "users",
msg: `[{"id":"userid","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","recovery_question":{"question":"Who's a major player in the cowboy scene?","answer":"Annie Oakley"},"type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"name.surname@example.com","email":"name.surname@example.com"},"credentials":{"password":{"value":"secret"},"emails":[{"value":"name.surname@example.com","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/userid"}}}]`,
fn: func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) (any, http.Header, error) {
fn: func(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) {
return GetUserDetails(context.Background(), cli, host, key, user, query, OmitNone, lim, window, log)
},
mkWant: mkWant[User],
Expand All @@ -229,7 +229,7 @@ var localTests = []struct {
// Test case from https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices
name: "devices",
msg: `[{"id":"devid","status":"CREATED","created":"2019-10-02T18:03:07.000Z","lastUpdated":"2019-10-02T18:03:07.000Z","profile":{"displayName":"Example Device name 1","platform":"WINDOWS","serialNumber":"XXDDRFCFRGF3M8MD6D","sid":"S-1-11-111","registered":true,"secureHardwarePresent":false,"diskEncryptionType":"ALL_INTERNAL_VOLUMES"},"resourceType":"UDDevice","resourceDisplayName":{"value":"Example Device name 1","sensitive":false},"resourceAlternateId":null,"resourceId":"guo4a5u7YAHhjXrMK0g4","_links":{"activate":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4/lifecycle/activate","hints":{"allow":["POST"]}},"self":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4","hints":{"allow":["GET","PATCH","PUT"]}},"users":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g4/users","hints":{"allow":["GET"]}}}},{"id":"guo4a5u7YAHhjXrMK0g5","status":"ACTIVE","created":"2023-06-21T23:24:02.000Z","lastUpdated":"2023-06-21T23:24:02.000Z","profile":{"displayName":"Example Device name 2","platform":"ANDROID","manufacturer":"Google","model":"Pixel 6","osVersion":"13:2023-05-05","registered":true,"secureHardwarePresent":true,"diskEncryptionType":"USER"},"resourceType":"UDDevice","resourceDisplayName":{"value":"Example Device name 2","sensitive":false},"resourceAlternateId":null,"resourceId":"guo4a5u7YAHhjXrMK0g5","_links":{"activate":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5/lifecycle/activate","hints":{"allow":["POST"]}},"self":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5","hints":{"allow":["GET","PATCH","PUT"]}},"users":{"href":"https://{yourOktaDomain}/api/v1/devices/guo4a5u7YAHhjXrMK0g5/users","hints":{"allow":["GET"]}}}}]`,
fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) (any, http.Header, error) {
fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) {
return GetDeviceDetails(context.Background(), cli, host, key, device, query, lim, window, log)
},
mkWant: mkWant[Device],
Expand All @@ -239,7 +239,7 @@ var localTests = []struct {
name: "devices_users",
msg: `[{"created":"2023-08-07T21:48:27.000Z","managementStatus":"NOT_MANAGED","user":{"id":"userid","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"name.surname@example.com","email":"name.surname@example.com"},"credentials":{"password":{"value":"secret"},"recovery_question":{"question":"Who's a major player in the cowboy scene?","answer":"Annie Oakley"},"emails":[{"value":"name.surname@example.com","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/userid"}}}}]`,
id: "devid",
fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) (any, http.Header, error) {
fn: func(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) (any, http.Header, error) {
return GetDeviceUsers(context.Background(), cli, host, key, device, query, OmitNone, lim, window, log)
},
mkWant: mkWant[devUser],
Expand All @@ -265,9 +265,7 @@ func TestLocal(t *testing.T) {

for _, test := range localTests {
t.Run(test.name, func(t *testing.T) {
// Make a global limiter with more capacity than will be set by the mock API.
// This will show the burst drop.
limiter := rate.NewLimiter(10, 10)
limiter := NewRateLimiter()

// There are a variety of windows, the most conservative is one minute.
// The rate limit will be adjusted on the second call to the API if
Expand Down Expand Up @@ -327,12 +325,23 @@ func TestLocal(t *testing.T) {
t.Errorf("unexpected result:\n- want\n+ got\n%s", cmp.Diff(want, got))
}

lim := limiter.Limit()
if lim < 49.0/60.0 || 50.0/60.0 < lim {
t.Errorf("unexpected rate limit (outside [49/60, 50/60]: %f", lim)
if len(limiter) != 1 {
t.Errorf("unexpected number endpoints track by rate limiter: %d", len(limiter))
}
if limiter.Burst() != 1 { // Set in GetUserDetails.
t.Errorf("unexpected burst: got:%d want:1", limiter.Burst())
// retrieve the rate.Limiter parameters for the one endpoint
var limit rate.Limit
var burst int
for _, l := range limiter {
limit = l.Limit()
burst = l.Burst()
break
}

if limit < 49.0/60.0 || 50.0/60.0 < limit {
t.Errorf("unexpected rate limit (outside [49/60, 50/60]: %f", limit)
}
if burst != 1 {
t.Errorf("unexpected burst: got:%d want:1", burst)
}

next, err := Next(h)
Expand Down
Loading

0 comments on commit 8c7d219

Please sign in to comment.