Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.16](backport #41583) x-pack/filebeat/input/entityanalytics/provider/okta: Rate limiting fixes #41901

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393]
- The azure-eventhub input now correctly reports its status to the Elastic Agent on fatal errors {pull}41469[41469]
- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765]
- Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"

"golang.org/x/time/rate"

"github.com/elastic/elastic-agent-libs/logp"
)

Expand Down Expand Up @@ -164,56 +160,144 @@
// https://${yourOktaDomain}/reports/rate-limit.
//
// See https://developer.okta.com/docs/reference/api/users/#list-users for details.
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
const endpoint = "/api/v1/users"
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
var endpoint, path string
if user == "" {
endpoint = "/api/v1/users"
path = endpoint
} else {
endpoint = "/api/v1/users/{user}"
path = strings.Replace(endpoint, "{user}", user, 1)
}

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, user),
Path: path,
RawQuery: query.Encode(),
}
return getDetails[User](ctx, cli, u, key, user == "", omit, lim, window, log)
return getDetails[User](ctx, cli, u, endpoint, key, user == "", omit, lim, window, log)
}

<<<<<<< HEAD

Check failure on line 182 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected declaration, found '<<' (typecheck)
=======
// GetUserFactors returns Okta group roles using the groups API endpoint. host is the
// Okta user domain and key is the API token to use for the query. group must not be empty.
//
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/UserFactor/#tag/UserFactor/operation/listFactors.
func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Factor, http.Header, error) {
if user == "" {
return nil, nil, errors.New("no user specified")
}

const endpoint = "/api/v1/users/{user}/factors"
path := strings.Replace(endpoint, "{user}", user, 1)

Check failure on line 196 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected declaration, found path (typecheck)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path,
}
return getDetails[Factor](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
}

// GetUserRoles returns Okta group roles using the groups API endpoint. host is the
// Okta user domain and key is the API token to use for the query. group must not be empty.
//
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles.
func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) {
if user == "" {
return nil, nil, errors.New("no user specified")
}

const endpoint = "/api/v1/users/{user}/roles"
path := strings.Replace(endpoint, "{user}", user, 1)

Check failure on line 218 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected declaration, found path (typecheck)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path,
}
return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
}

>>>>>>> 4e19d09ab2 (x-pack/filebeat/input/entityanalytics/provider/okta: Rate limiting fixes (#41583))

Check failure on line 228 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)
// GetUserGroupDetails returns Okta group details using the users API endpoint. host is the
// Okta user domain and key is the API token to use for the query. user must not be empty.
//
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/reference/api/users/#request-parameters-8 (no anchor exists on the page for this endpoint) for details.
func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) {
const endpoint = "/api/v1/users"

func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) {
if user == "" {
return nil, nil, errors.New("no user specified")
}

const endpoint = "/api/v1/users/{user}/groups"
path := strings.Replace(endpoint, "{user}", user, 1)

Check failure on line 241 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected declaration, found path (typecheck)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path,
}
return getDetails[Group](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
}

<<<<<<< HEAD
=======
// GetGroupRoles returns Okta group roles using the groups API endpoint. host is the
// Okta user domain and key is the API token to use for the query. group must not be empty.
//
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles.
func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) {
if group == "" {
return nil, nil, errors.New("no group specified")
}

const endpoint = "/api/v1/groups/{group}/rules"
path := strings.Replace(endpoint, "{group}", group, 1)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, user, "groups"),
Path: path,
}
return getDetails[Group](ctx, cli, u, key, true, OmitNone, lim, window, log)
return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
}

>>>>>>> 4e19d09ab2 (x-pack/filebeat/input/entityanalytics/provider/okta: Rate limiting fixes (#41583))

Check failure on line 275 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)
// GetDeviceDetails returns Okta device details using the list devices API endpoint. host is the
// Okta user domain and key is the API token to use for the query. If device is not empty,
// details for the specific device are returned, otherwise a list of all devices is returned.
//
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices for details.
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) {
const endpoint = "/api/v1/devices"
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) {
var endpoint string
var path string
if device == "" {

Check failure on line 286 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected declaration, found 'if' (typecheck)
endpoint = "/api/v1/devices"
path = endpoint
} else {
endpoint = "/api/v1/devices/{device}"
path = strings.Replace(endpoint, "{device}", device, 1)
}

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, device),
Path: path,
RawQuery: query.Encode(),
}
return getDetails[Device](ctx, cli, u, key, device == "", OmitNone, lim, window, log)
return getDetails[Device](ctx, cli, u, endpoint, key, device == "", OmitNone, lim, window, log)
}

// GetDeviceUsers returns Okta user details for users associated with the provided device identifier
Expand All @@ -223,21 +307,22 @@
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDeviceUsers for details.
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
if device == "" {
// No user associated with a null device. Not an error.
return nil, nil, nil
}

const endpoint = "/api/v1/devices"
const endpoint = "/api/v1/devices/{device}/users"
path := strings.Replace(endpoint, "{device}", device, 1)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, device, "users"),
Path: path,
RawQuery: query.Encode(),
}
du, h, err := getDetails[devUser](ctx, cli, u, key, true, omit, lim, window, log)
du, h, err := getDetails[devUser](ctx, cli, u, endpoint, key, true, omit, lim, window, log)
if err != nil {
return nil, h, err
}
Expand All @@ -262,7 +347,7 @@
// for the specific user are returned, otherwise a list of all users is returned.
//
// See GetUserDetails for details of the query and rate limit parameters.
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, key string, all bool, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) {
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) {
url := u.String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
Expand All @@ -276,8 +361,7 @@
req.Header.Set("Content-Type", contentType)
req.Header.Set("Authorization", fmt.Sprintf("SSWS %s", key))

log.Debugw("rate limit", "limit", lim.Limit(), "burst", lim.Burst(), "url", url)
err = lim.Wait(ctx)
err = lim.Wait(ctx, endpoint, u, log)
if err != nil {
return nil, nil, err
}
Expand All @@ -286,7 +370,7 @@
return nil, nil, err
}
defer resp.Body.Close()
err = oktaRateLimit(resp.Header, window, lim, log)
err = lim.Update(endpoint, resp.Header, window, log)
if err != nil {
io.Copy(io.Discard, resp.Body)
return nil, nil, err
Expand Down Expand Up @@ -349,59 +433,6 @@
return fmt.Sprintf("%s: %s", summary, strings.Join(causes, ","))
}

// oktaRateLimit implements the Okta rate limit policy translation.
//
// See https://developer.okta.com/docs/reference/rl-best-practices/ for details.
func oktaRateLimit(h http.Header, window time.Duration, limiter *rate.Limiter, log *logp.Logger) error {
limit := h.Get("X-Rate-Limit-Limit")
remaining := h.Get("X-Rate-Limit-Remaining")
reset := h.Get("X-Rate-Limit-Reset")
log.Debugw("rate limit header", "X-Rate-Limit-Limit", limit, "X-Rate-Limit-Remaining", remaining, "X-Rate-Limit-Reset", reset)
if limit == "" || remaining == "" || reset == "" {
return nil
}

lim, err := strconv.ParseFloat(limit, 64)
if err != nil {
return err
}
rem, err := strconv.ParseFloat(remaining, 64)
if err != nil {
return err
}
rst, err := strconv.ParseInt(reset, 10, 64)
if err != nil {
return err
}
resetTime := time.Unix(rst, 0)
per := time.Until(resetTime).Seconds()

// Be conservative here; the docs don't exactly specify burst rates.
// Make sure we can make at least one new request, even if we fail
// to get a non-zero rate.Limit. We could set to zero for the case
// that limit=rate.Inf, but that detail is not important.
burst := 1

rateLimit := rate.Limit(rem / per)

// Process reset if we need to wait until reset to avoid a request against a zero quota.
if rateLimit <= 0 {
waitUntil := resetTime.UTC()
// next gives us a sane next window estimate, but the
// estimate will be overwritten when we make the next
// permissible API request.
next := rate.Limit(lim / window.Seconds())
limiter.SetLimitAt(waitUntil, next)
limiter.SetBurstAt(waitUntil, burst)
log.Debugw("rate limit adjust", "reset_time", waitUntil, "next_rate", next, "next_burst", burst)
return nil
}
limiter.SetLimit(rateLimit)
limiter.SetBurst(burst)
log.Debugw("rate limit adjust", "set_rate", rateLimit, "set_burst", burst)
return nil
}

// Next returns the next URL query for a pagination sequence. If no further
// page is available, Next returns io.EOF.
func Next(h http.Header) (query url.Values, err error) {
Expand Down
Loading
Loading