Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/entityanalytics/provider/{azuread,okta}: add request tracing support #39821

Merged
merged 5 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update CEL mito extensions to v1.12.2. {pull}39755[39755]
- Add support for base64-encoded HMAC headers to HTTP Endpoint. {pull}39655[39655]
- Add user group membership support to Okta entity analytics provider. {issue}39814[39814] {pull}39815[39815]
- Add request trace support for Okta and EntraID entity analytics providers. {pull}39821[39821]

*Auditbeat*

Expand Down
24 changes: 24 additions & 0 deletions x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,18 @@ This is a list of optional query parameters. The default is `["accountEnabled",
"displayName", "operatingSystem", "operatingSystemVersion", "physicalIds", "extensionAttributes",
"alternativeSecurityIds"]`.

[float]
==== `tracer.filename`

It is possible to log HTTP requests and responses to the EntraID API to a local file-system for debugging configurations.
This option is enabled by setting the `tracer.filename` value. Additional options are available to
tune log rotation behavior.

To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id.
For Example, `http-request-trace-*.ndjson`.

Enabling this option compromises security and should only be used for debugging.

[id="provider-okta"]
==== Okta User Identities (`okta`)

Expand Down Expand Up @@ -797,6 +809,18 @@ The interval in which incremental updates should occur. The interval must be
shorter than the full synchronization interval (`sync_interval`). Expressed as a
duration string (e.g., 1m, 3h, 24h). Defaults to `15m` (15 minutes).

[float]
==== `tracer.filename`

It is possible to log HTTP requests and responses to the Okta API to a local file-system for debugging configurations.
This option is enabled by setting the `tracer.filename` value. Additional options are available to
tune log rotation behavior.

To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id.
For Example, `http-request-trace-*.ndjson`.

Enabling this option compromises security and should only be used for debugging.

[float]
==== Metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type azure struct {
logger *logp.Logger
auth authenticator.Authenticator
fetcher fetcher.Fetcher

ctx v2.Context
}

// Name returns the name of this provider.
Expand All @@ -71,6 +73,7 @@ func (p *azure) Test(testCtx v2.TestContext) error {
// Run will start data collection on this provider.
func (p *azure) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error {
p.logger = inputCtx.Logger.With("tenant_id", p.conf.TenantID, "provider", Name)
p.ctx = inputCtx
p.auth.SetLogger(p.logger)
p.fetcher.SetLogger(p.logger)
p.metrics = newMetrics(inputCtx.ID, nil)
Expand Down Expand Up @@ -575,7 +578,7 @@ func (p *azure) configure(cfg *config.C) (kvstore.Input, error) {
if p.auth, err = oauth2.New(cfg, p.Manager.Logger); err != nil {
return nil, fmt.Errorf("unable to create authenticator: %w", err)
}
if p.fetcher, err = graph.New(cfg, p.Manager.Logger, p.auth); err != nil {
if p.fetcher, err = graph.New(ctxtool.FromCanceller(p.ctx.Cancelation), p.ctx.ID, cfg, p.Manager.Logger, p.auth); err != nil {
return nil, fmt.Errorf("unable to create fetcher: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.ndjson
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ import (
"io"
"net/http"
"net/url"
"path/filepath"
"strings"

"github.com/google/uuid"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/collections"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread/authenticator"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread/fetcher"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -104,6 +110,9 @@ type graphConf struct {
Select selection `config:"select"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`

// Tracer allows configuration of request trace logging.
Tracer *lumberjack.Logger `config:"tracer"`
}

type selection struct {
Expand Down Expand Up @@ -329,16 +338,22 @@ func (f *graph) doRequest(ctx context.Context, method, url string, body io.Reade
}

// New creates a new instance of the graph fetcher.
func New(cfg *config.C, logger *logp.Logger, auth authenticator.Authenticator) (fetcher.Fetcher, error) {
func New(ctx context.Context, id string, cfg *config.C, logger *logp.Logger, auth authenticator.Authenticator) (fetcher.Fetcher, error) {
var c graphConf
if err := cfg.Unpack(&c); err != nil {
return nil, fmt.Errorf("unable to unpack Graph API Fetcher config: %w", err)
}

if c.Tracer != nil {
id = sanitizeFileName(id)
c.Tracer.Filename = strings.ReplaceAll(c.Tracer.Filename, "*", id)
}

client, err := c.Transport.Client()
if err != nil {
return nil, fmt.Errorf("unable to create HTTP client: %w", err)
}
client = requestTrace(ctx, client, c, logger)

f := graph{
conf: c,
Expand Down Expand Up @@ -383,6 +398,41 @@ func New(cfg *config.C, logger *logp.Logger, auth authenticator.Authenticator) (
return &f, nil
}

// requestTrace decorates cli with an httplog.LoggingRoundTripper if cfg.Trace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// requestTrace decorates cli with an httplog.LoggingRoundTripper if cfg.Trace
// requestTrace decorates cli with an httplog.LoggingRoundTripper if cfg.Tracer

// is non-nil.
func requestTrace(ctx context.Context, cli *http.Client, cfg graphConf, log *logp.Logger) *http.Client {
if cfg.Tracer == nil {
return cli
}
w := zapcore.AddSync(cfg.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
cfg.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)

const margin = 1e3 // 1OkB ought to be enough room for all the remainder of the trace details.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't 1e3 == 1kB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed it is 🤦.

maxSize := cfg.Tracer.MaxSize * 1e6
cli.Transport = httplog.NewLoggingRoundTripper(cli.Transport, traceLogger, max(0, maxSize-margin), log)
return cli
}

// sanitizeFileName returns name with ":" and "/" replaced with "_", removing
// repeated instances. The request.tracer.filename may have ":" when an input
// has cursor config and the macOS Finder will treat this as path-separator and
// causes to show up strange filepaths.
func sanitizeFileName(name string) string {
name = strings.ReplaceAll(name, ":", string(filepath.Separator))
name = filepath.Clean(name)
return strings.ReplaceAll(name, string(filepath.Separator), "_")
}

func formatQuery(name string, query []string, dflt string) string {
q := dflt
if len(query) != 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package graph
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/collections"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread/authenticator/mock"
Expand All @@ -27,6 +29,8 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

var trace = flag.Bool("request_trace", false, "enable request tracing during tests")

var usersResponse1 = apiUserResponse{
Users: []userAPI{
{
Expand Down Expand Up @@ -313,11 +317,16 @@ func TestGraph_Groups(t *testing.T) {
rawConf := graphConf{
APIEndpoint: "http://" + testSrv.addr,
}
if *trace {
rawConf.Tracer = &lumberjack.Logger{
Filename: "test_trace-*.ndjson",
}
}
c, err := config.NewConfigFrom(&rawConf)
require.NoError(t, err)
auth := mock.New(mock.DefaultTokenValue)

f, err := New(c, logp.L(), auth)
f, err := New(context.Background(), t.Name(), c, logp.L(), auth)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down Expand Up @@ -372,11 +381,16 @@ func TestGraph_Users(t *testing.T) {
rawConf := graphConf{
APIEndpoint: "http://" + testSrv.addr,
}
if *trace {
rawConf.Tracer = &lumberjack.Logger{
Filename: "test_trace-*.ndjson",
}
}
c, err := config.NewConfigFrom(&rawConf)
require.NoError(t, err)
auth := mock.New(mock.DefaultTokenValue)

f, err := New(c, logp.L(), auth)
f, err := New(context.Background(), t.Name(), c, logp.L(), auth)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down Expand Up @@ -477,11 +491,16 @@ func TestGraph_Devices(t *testing.T) {
APIEndpoint: "http://" + testSrv.addr,
Select: test.selection,
}
if *trace {
rawConf.Tracer = &lumberjack.Logger{
Filename: "test_trace-*.ndjson",
}
}
c, err := config.NewConfigFrom(&rawConf)
require.NoError(t, err)
auth := mock.New(mock.DefaultTokenValue)

f, err := New(c, logp.L(), auth)
f, err := New(context.Background(), t.Name(), c, logp.L(), auth)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.ndjson
20 changes: 19 additions & 1 deletion x-pack/filebeat/input/entityanalytics/provider/okta/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

Expand Down Expand Up @@ -62,6 +64,9 @@ type conf struct {
// Request is the configuration for establishing
// HTTP requests to the API.
Request *requestConfig `config:"request"`

// Tracer allows configuration of request trace logging.
Tracer *lumberjack.Logger `config:"tracer"`
}

type requestConfig struct {
Expand Down Expand Up @@ -163,10 +168,23 @@ func (c *conf) Validate() error {
}
switch strings.ToLower(c.Dataset) {
case "", "all", "users", "devices":
return nil
default:
return errors.New("dataset must be 'all', 'users', 'devices' or empty")
}

if c.Tracer == nil {
return nil
}
if c.Tracer.Filename == "" {
return errors.New("request tracer must have a filename if used")
}
if c.Tracer.MaxSize == 0 {
// By default Lumberjack caps file sizes at 100MB which
// is excessive for a debugging logger, so default to 1MB
// which is the minimum.
c.Tracer.MaxSize = 1
}
return nil
}

func (c *conf) wantUsers() bool {
Expand Down
52 changes: 49 additions & 3 deletions x-pack/filebeat/input/entityanalytics/provider/okta/okta.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,22 @@
"io"
"net/http"
"net/url"
"path/filepath"
"strings"
"time"

"github.com/hashicorp/go-retryablehttp"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/kvstore"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -105,8 +110,13 @@
// Allow a single fetch operation to obtain limits from the API.
p.lim = rate.NewLimiter(1, 1)

if p.cfg.Tracer != nil {
id := sanitizeFileName(inputCtx.ID)
p.cfg.Tracer.Filename = strings.ReplaceAll(p.cfg.Tracer.Filename, "*", id)
}

var err error
p.client, err = newClient(p.cfg, p.logger)
p.client, err = newClient(ctxtool.FromCanceller(inputCtx.Cancelation), p.cfg, p.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -152,12 +162,14 @@
}
}

func newClient(cfg conf, log *logp.Logger) (*http.Client, error) {
func newClient(ctx context.Context, cfg conf, log *logp.Logger) (*http.Client, error) {
c, err := cfg.Request.Transport.Client(clientOptions(cfg.Request.KeepAlive.settings())...)
if err != nil {
return nil, err
}

c = requestTrace(ctx, c, cfg, log)

c.CheckRedirect = checkRedirect(cfg.Request, log)

client := &retryablehttp.Client{
Expand All @@ -169,10 +181,44 @@
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}

return client.StandardClient(), nil
}

// requestTrace decorates cli with an httplog.LoggingRoundTripper if cfg.Trace
// is non-nil.
func requestTrace(ctx context.Context, cli *http.Client, cfg conf, log *logp.Logger) *http.Client {
if cfg.Tracer == nil {
return cli
}
w := zapcore.AddSync(cfg.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
cfg.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)

const margin = 1e3 // 1OkB ought to be enough room for all the remainder of the trace details.
maxSize := cfg.Tracer.MaxSize * 1e6
cli.Transport = httplog.NewLoggingRoundTripper(cli.Transport, traceLogger, max(0, maxSize-margin), log)
return cli
}

// sanitizeFileName returns name with ":" and "/" replaced with "_", removing
// repeated instances. The request.tracer.filename may have ":" when an input
// has cursor config and the macOS Finder will treat this as path-separator and
// causes to show up strange filepaths.
func sanitizeFileName(name string) string {
name = strings.ReplaceAll(name, ":", string(filepath.Separator))
name = filepath.Clean(name)
return strings.ReplaceAll(name, string(filepath.Separator), "_")
}

// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
Expand Down Expand Up @@ -403,7 +449,7 @@

next, err := okta.Next(h)
if err != nil {
if err == io.EOF {

Check failure on line 452 in x-pack/filebeat/input/entityanalytics/provider/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
break
}
p.logger.Debugf("received %d users from API", len(users))
Expand Down Expand Up @@ -513,7 +559,7 @@

next, err := okta.Next(h)
if err != nil {
if err == io.EOF {

Check failure on line 562 in x-pack/filebeat/input/entityanalytics/provider/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
break
}
p.logger.Debugf("received %d devices from API", len(devices))
Expand Down Expand Up @@ -542,7 +588,7 @@

next, err := okta.Next(h)
if err != nil {
if err == io.EOF {

Check failure on line 591 in x-pack/filebeat/input/entityanalytics/provider/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
break
}
p.logger.Debugf("received %d devices from API", len(devices))
Expand Down
Loading
Loading