Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a header round tripper option to httpcommon #27509

Merged
Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Bump AWS SDK version to v0.24.0 for WebIdentity authentication flow {issue}19393[19393] {pull}27126[27126]
- Add Linux pressure metricset {pull}27355[27355]
- Add support for kube-state-metrics v2.0.0 {pull}27552[27552]
- Add User-Agent header to HTTP requests. {issue}18160[18160] {pull}27509[27509]

*Packetbeat*

Expand Down
2 changes: 1 addition & 1 deletion dev-tools/cmd/dashboards/export_dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func main() {
Path: u.Path,
SpaceID: *spaceID,
Transport: transport,
})
}, "Beat Development Tools")
if err != nil {
log.Fatalf("Error while connecting to Kibana: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {

overwritePipelines := true
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := eslegclient.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat")
if err != nil {
return err
}
Expand Down Expand Up @@ -428,7 +428,7 @@ func (fb *Filebeat) Stop() {
// Create a new pipeline loader (es client) factory
func newPipelineLoaderFactory(esConfig *common.Config) fileset.PipelineLoaderFactory {
pipelineLoaderFactory := func() (fileset.PipelineLoader, error) {
esClient, err := eslegclient.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat")
if err != nil {
return nil, errors.Wrap(err, "Error creating Elasticsearch client")
}
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/useragent"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand All @@ -38,6 +39,8 @@ func init() {

var debugf = logp.MakeDebug("http")

var userAgent = useragent.UserAgent("Heartbeat")

// Create makes a new HTTP monitor
func create(
name string,
Expand Down Expand Up @@ -128,5 +131,6 @@ func newRoundTripper(config *Config) (http.RoundTripper, error) {
httpcommon.WithKeepaliveSettings{
Disable: true,
},
httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}),
)
}
26 changes: 26 additions & 0 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/hbtest"
Expand Down Expand Up @@ -674,3 +675,28 @@ func mustParseURL(t *testing.T, url string) *url.URL {
}
return parsed
}

func TestUserAgentInject(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for writing this excellent test!

ua := ""
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ua = r.Header.Get("User-Agent")
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()

cfg, err := common.NewConfigFrom(map[string]interface{}{
"urls": ts.URL,
})
require.NoError(t, err)

p, err := create("ua", cfg)
require.NoError(t, err)

sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]

event := &beat.Event{}
_, err = job(event)
require.NoError(t, err)
assert.Contains(t, ua, "Heartbeat")
}
44 changes: 20 additions & 24 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ import (
"github.com/elastic/beats/v7/heartbeat/reason"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/useragent"
)

var userAgent = useragent.UserAgent("Heartbeat")

func newHTTPMonitorHostJob(
addr string,
config *Config,
Expand Down Expand Up @@ -141,27 +139,28 @@ func createPingFactory(
// prevents following redirects in this case, we know that
// config.MaxRedirects must be zero to even be here
checkRedirect := makeCheckRedirect(0, nil)
transport := &SimpleTransport{
Dialer: dialer,
OnStartWrite: func() {
cbMutex.Lock()
writeStart = time.Now()
cbMutex.Unlock()
},
OnEndWrite: func() {
cbMutex.Lock()
writeEnd = time.Now()
cbMutex.Unlock()
},
OnStartRead: func() {
cbMutex.Lock()
readStart = time.Now()
cbMutex.Unlock()
},
}
client := &http.Client{
CheckRedirect: checkRedirect,
Timeout: timeout,
Transport: &SimpleTransport{
Dialer: dialer,
OnStartWrite: func() {
cbMutex.Lock()
writeStart = time.Now()
cbMutex.Unlock()
},
OnEndWrite: func() {
cbMutex.Lock()
writeEnd = time.Now()
cbMutex.Unlock()
},
OnStartRead: func() {
cbMutex.Lock()
readStart = time.Now()
cbMutex.Unlock()
},
},
Transport: httpcommon.HeaderRoundTripper(transport, map[string]string{"User-Agent": userAgent}),
}

_, end, err := execPing(event, client, request, body, timeout, validator, config.Response)
Expand Down Expand Up @@ -206,9 +205,6 @@ func buildRequest(addr string, config *Config, enc contentEncoder) (*http.Reques

request.Header.Add(k, v)
}
if ua := request.Header.Get("User-Agent"); ua == "" {
request.Header.Set("User-Agent", userAgent)
}

if enc != nil {
enc.AddHeaders(&request.Header)
Expand Down
9 changes: 0 additions & 9 deletions heartbeat/monitors/active/http/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"reflect"
"testing"

"github.com/elastic/beats/v7/libbeat/common/useragent"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -174,13 +172,6 @@ func TestRequestBuildingWithCustomHost(t *testing.T) {
}
}

func TestRequestBuildingWithNoUserAgent(t *testing.T) {
request, err := buildRequest("localhost", &Config{}, nilEncoder{})

require.NoError(t, err)
assert.Equal(t, useragent.UserAgent("Heartbeat"), request.Header.Get("User-Agent"))
}

func TestRequestBuildingWithExplicitUserAgent(t *testing.T) {
expectedUserAgent := "some-user-agent"

Expand Down
2 changes: 1 addition & 1 deletion libbeat/cmd/export/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command {
// part of the initialization.
initConfig := instance.InitKibanaConfig(b.Config)

client, err := kibana.NewKibanaClient(initConfig)
client, err := kibana.NewKibanaClient(initConfig, b.Info.Beat)
if err != nil {
fatalf("Error creating Kibana client: %+v.\n", err)
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
if outCfg.Name() != "elasticsearch" {
return fmt.Errorf("Index management requested but the Elasticsearch output is not configured/enabled")
}
esClient, err := eslegclient.NewConnectedClient(outCfg.Config())
esClient, err := eslegclient.NewConnectedClient(outCfg.Config(), b.Info.Beat)
if err != nil {
return err
}
Expand Down Expand Up @@ -808,7 +808,7 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error {
// initKibanaConfig will attach the username and password into kibana config as a part of the initialization.
kibanaConfig := InitKibanaConfig(b.Config)

client, err := kibana.NewKibanaClient(kibanaConfig)
client, err := kibana.NewKibanaClient(kibanaConfig, b.Info.Beat)
if err != nil {
return fmt.Errorf("error connecting to Kibana: %v", err)
}
Expand Down
27 changes: 27 additions & 0 deletions libbeat/common/transport/httpcommon/httpcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ type extraOptionFunc func(*extraSettings)
func (extraOptionFunc) sealTransportOption() {}
func (fn extraOptionFunc) applyExtra(s *extraSettings) { fn(s) }

type headerRoundTripper struct {
headers map[string]string
rt http.RoundTripper
}

func (rt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
for k, v := range rt.headers {
if len(req.Header.Get(k)) == 0 {
req.Header.Set(k, v)
}
}
return rt.rt.RoundTrip(req)
}

// DefaultHTTPTransportSettings returns the default HTTP transport setting.
func DefaultHTTPTransportSettings() HTTPTransportSettings {
return HTTPTransportSettings{
Expand Down Expand Up @@ -373,6 +387,19 @@ func WithAPMHTTPInstrumentation() TransportOption {
return withAPMHTTPRountTripper
}

// HeaderRoundTripper will return a RoundTripper that sets header KVs if the key is not present.
func HeaderRoundTripper(rt http.RoundTripper, headers map[string]string) http.RoundTripper {
return &headerRoundTripper{headers, rt}
}

// WithHeaderRoundTripper instuments the HTTP client via a custom http.RoundTripper.
// This RoundTripper will add headers to each request if the key is not present.
func WithHeaderRoundTripper(headers map[string]string) TransportOption {
return WithModRoundtripper(func(rt http.RoundTripper) http.RoundTripper {
return HeaderRoundTripper(rt, headers)
})
}

// WithLogger sets the internal logger that will be used to log dial or TCP level errors.
// Logging at the connection level will only happen if the logger has been set.
func WithLogger(logger *logp.Logger) TransportOption {
Expand Down
6 changes: 3 additions & 3 deletions libbeat/dashboards/dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func ImportDashboards(
return errors.New("kibana configuration missing for loading dashboards")
}

return setupAndImportDashboardsViaKibana(ctx, beatInfo.Hostname, kibanaConfig, &dashConfig, msgOutputter, pattern)
return setupAndImportDashboardsViaKibana(ctx, beatInfo.Hostname, beatInfo.Beat, kibanaConfig, &dashConfig, msgOutputter, pattern)
}

func setupAndImportDashboardsViaKibana(ctx context.Context, hostname string, kibanaConfig *common.Config,
func setupAndImportDashboardsViaKibana(ctx context.Context, hostname, beatname string, kibanaConfig *common.Config,
dashboardsConfig *Config, msgOutputter MessageOutputter, fields common.MapStr) error {

kibanaLoader, err := NewKibanaLoader(ctx, kibanaConfig, dashboardsConfig, hostname, msgOutputter)
kibanaLoader, err := NewKibanaLoader(ctx, kibanaConfig, dashboardsConfig, hostname, msgOutputter, beatname)
if err != nil {
return fmt.Errorf("fail to create the Kibana loader: %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions libbeat/dashboards/kibana_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ type KibanaLoader struct {
}

// NewKibanaLoader creates a new loader to load Kibana files
func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig *Config, hostname string, msgOutputter MessageOutputter) (*KibanaLoader, error) {
func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig *Config, hostname string, msgOutputter MessageOutputter, beatname string) (*KibanaLoader, error) {

if cfg == nil || !cfg.Enabled() {
return nil, fmt.Errorf("Kibana is not configured or enabled")
}

client, err := getKibanaClient(ctx, cfg, dashboardsConfig.Retry, 0)
client, err := getKibanaClient(ctx, cfg, dashboardsConfig.Retry, 0, beatname)
if err != nil {
return nil, fmt.Errorf("Error creating Kibana client: %v", err)
}
Expand All @@ -76,15 +76,15 @@ func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig *
return &loader, nil
}

func getKibanaClient(ctx context.Context, cfg *common.Config, retryCfg *Retry, retryAttempt uint) (*kibana.Client, error) {
client, err := kibana.NewKibanaClient(cfg)
func getKibanaClient(ctx context.Context, cfg *common.Config, retryCfg *Retry, retryAttempt uint, beatname string) (*kibana.Client, error) {
client, err := kibana.NewKibanaClient(cfg, beatname)
if err != nil {
if retryCfg.Enabled && (retryCfg.Maximum == 0 || retryCfg.Maximum > retryAttempt) {
select {
case <-ctx.Done():
return nil, err
case <-time.After(retryCfg.Interval):
return getKibanaClient(ctx, cfg, retryCfg, retryAttempt+1)
return getKibanaClient(ctx, cfg, retryCfg, retryAttempt+1, beatname)
}
}
return nil, fmt.Errorf("Error creating Kibana client: %v", err)
Expand Down
17 changes: 13 additions & 4 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/useragent"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/testing"
)
Expand All @@ -57,7 +58,8 @@ type Connection struct {

// ConnectionSettings are the settings needed for a Connection
type ConnectionSettings struct {
URL string
URL string
Beatname string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this client useful for things other than a Beat? Having a parameter that is the beat name sort of implies that this is only usable by a Beat. An alternative would be to accept the user agent string as a parameter or call it "product name". Even if it were called "product name" the code would still assume it's a beat since it creates the user-agent string based on the beat version info. (I'm not saying you need to change it.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to expose this one? this is also not serialized so it hiding it should not matter, wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewkroh, I think that Beatname is OK; i'm not aware of it being used in other locations
@michalpristas the outputs/elasticsearch and monitoring/report/elasticsearch both construct ConnectionSettings structs when creating clients. Having this exposed allows them to do them to fill it in


Username string
Password string
Expand Down Expand Up @@ -110,6 +112,11 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
}
}

if s.Beatname == "" {
s.Beatname = "Libbeat"
}
userAgent := useragent.UserAgent(s.Beatname)

httpClient, err := s.Transport.Client(
httpcommon.WithLogger(logger),
httpcommon.WithIOStats(s.Observer),
Expand All @@ -119,6 +126,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
// eg, like in https://github.com/elastic/apm-server/blob/7.7/elasticsearch/client.go
return apmelasticsearch.WrapRoundTripper(rt)
}),
httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -160,7 +168,7 @@ func settingsWithDefaults(s ConnectionSettings) ConnectionSettings {
// configuration. It accepts the same configuration parameters as the Elasticsearch
// output, except for the output specific configuration options. If multiple hosts
// are defined in the configuration, a client is returned for each of them.
func NewClients(cfg *common.Config) ([]Connection, error) {
func NewClients(cfg *common.Config, beatname string) ([]Connection, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand All @@ -185,6 +193,7 @@ func NewClients(cfg *common.Config) ([]Connection, error) {

client, err := NewConnection(ConnectionSettings{
URL: esURL,
Beatname: beatname,
Kerberos: config.Kerberos,
Username: config.Username,
Password: config.Password,
Expand All @@ -205,8 +214,8 @@ func NewClients(cfg *common.Config) ([]Connection, error) {
return clients, nil
}

func NewConnectedClient(cfg *common.Config) (*Connection, error) {
clients, err := NewClients(cfg)
func NewConnectedClient(cfg *common.Config, beatname string) (*Connection, error) {
clients, err := NewClients(cfg, beatname)
if err != nil {
return nil, err
}
Expand Down
Loading