From 95ded73d0a61cd16578bf13ad2d03a10a83335eb Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 19 Sep 2023 12:00:46 -0500 Subject: [PATCH 01/12] add support for queue settings under outputs - add support for `idle_connection_timeout` for ES output - add support for queue settings under output Closes #35615 --- CHANGELOG.next.asciidoc | 2 + auditbeat/auditbeat.reference.yml | 5 ++ filebeat/filebeat.reference.yml | 5 ++ heartbeat/heartbeat.reference.yml | 5 ++ .../output-elasticsearch.reference.yml.tmpl | 5 ++ libbeat/cmd/instance/beat.go | 18 +++++ libbeat/cmd/instance/beat_test.go | 73 +++++++++++++++++++ libbeat/docs/queueconfig.asciidoc | 9 ++- libbeat/outputs/elasticsearch/client.go | 1 + .../elasticsearch/docs/elasticsearch.asciidoc | 4 + .../outputs/elasticsearch/elasticsearch.go | 1 + metricbeat/metricbeat.reference.yml | 5 ++ packetbeat/packetbeat.reference.yml | 5 ++ winlogbeat/winlogbeat.reference.yml | 5 ++ x-pack/auditbeat/auditbeat.reference.yml | 5 ++ x-pack/filebeat/filebeat.reference.yml | 5 ++ .../functionbeat/functionbeat.reference.yml | 5 ++ x-pack/heartbeat/heartbeat.reference.yml | 5 ++ x-pack/metricbeat/metricbeat.reference.yml | 5 ++ x-pack/osquerybeat/osquerybeat.reference.yml | 5 ++ x-pack/packetbeat/packetbeat.reference.yml | 5 ++ x-pack/winlogbeat/winlogbeat.reference.yml | 5 ++ 22 files changed, 180 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 37bb60d04e3..22e99bfe8a7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -166,6 +166,8 @@ is collected by it. - Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322] - [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506] Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor +- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}99999[99999] +- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}99999[99999] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 32fbef2da04..c55940a33eb 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -522,6 +522,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 7f86b5aa9d2..2467517240f 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1618,6 +1618,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 89fc08ef8e6..24e29870da5 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -614,6 +614,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl b/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl index 48f1ba2c007..25698efe4a3 100644 --- a/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl +++ b/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl @@ -81,6 +81,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 4e72996c966..e0a9de02c19 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -783,6 +783,10 @@ func (b *Beat) configure(settings Settings) error { return fmt.Errorf("error unpacking config data: %w", err) } + if err := mergeOutputQueueSettings(&b.Config); err != nil { + return fmt.Errorf("could not merge output queue settings: %w", err) + } + if err := features.UpdateFromConfig(b.RawConfig); err != nil { return fmt.Errorf("could not parse features: %w", err) } @@ -1480,3 +1484,17 @@ func sanitizeIPs(ips []string) []string { } return validIPs } + +func mergeOutputQueueSettings(bc *beatConfig) error { + if bc.Output.IsSet() && bc.Output.Config().Enabled() { + pc := pipeline.Config{} + err := bc.Output.Config().Unpack(&pc) + if err != nil { + return fmt.Errorf("error unpacking output queue settings: %w", err) + } + if pc.Queue.IsSet() { + bc.Pipeline.Queue = pc.Queue + } + } + return nil +} diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index b6834d89b5d..bd7bfba39ee 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -27,7 +27,9 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/go-ucfg/yaml" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" @@ -269,3 +271,74 @@ func (r *outputReloaderMock) Reload( r.cfg = cfg return nil } + +func TestMergeOutputQueueSettings(t *testing.T) { + tests := map[string]struct { + input []byte + memEvents int + }{ + "blank": {input: []byte(""), + memEvents: 4096}, + "defaults": {input: []byte(` +name: mockbeat +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + memEvents: 4096}, + "topLevelQueue": {input: []byte(` +name: mockbeat +queue: + mem: + events: 8096 +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + memEvents: 8096}, + "outputLevelQueue": {input: []byte(` +name: mockbeat +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + mem: + events: 8096 +`), + memEvents: 8096}, + "topAndOutputLevelQueue": {input: []byte(` +name: mockbeat +queue: + mem: + events: 2048 +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + mem: + events: 8096 +`), + memEvents: 8096}, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + cfg, err := yaml.NewConfig(tc.input) + require.NoError(t, err) + + config := beatConfig{} + err = cfg.Unpack(&config) + require.NoError(t, err) + + err = mergeOutputQueueSettings(&config) + require.NoError(t, err) + + ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config()) + require.NoError(t, err) + require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config()) + }) + } +} diff --git a/libbeat/docs/queueconfig.asciidoc b/libbeat/docs/queueconfig.asciidoc index fb930831dac..a9a50288df7 100644 --- a/libbeat/docs/queueconfig.asciidoc +++ b/libbeat/docs/queueconfig.asciidoc @@ -9,9 +9,12 @@ queue is responsible for buffering and combining events into batches that can be consumed by the outputs. The outputs will use bulk operations to send a batch of events in one transaction. -You can configure the type and behavior of the internal queue by setting -options in the `queue` section of the +{beatname_lc}.yml+ config file. Only one -queue type can be configured. +You can configure the type and behavior of the internal queue by +setting options in the `queue` section of the +{beatname_lc}.yml+ +config file or by setting options in the `queue` section of the +output. Only one queue type can be configured. If both the top level +queue section and the output section are specified the output section +takes precedence. This sample configuration sets the memory queue to buffer up to 4096 events: diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index c80e95ebc90..b485807776e 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -102,6 +102,7 @@ func NewClient( CompressionLevel: s.CompressionLevel, EscapeHTML: s.EscapeHTML, Transport: s.Transport, + IdleConnTimeout: s.IdleConnTimeout, }) if err != nil { return nil, err diff --git a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc index 5ea65c16dc4..482c9e56bb9 100644 --- a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc +++ b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc @@ -689,6 +689,10 @@ default is `1s`. The maximum number of seconds to wait before attempting to connect to Elasticsearch after a network error. The default is `60s`. +===== `idle_connection_timeout` + +The maximum amount of time an idle connection will remain idle before closing itself. Zero means no limit. The format is a Go language duration (example 60s is 60 seconds). The default is 0. + ===== `timeout` The http request timeout in seconds for the Elasticsearch request. The default is 90. diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 9cd33ea8d8a..4d2d9aebe67 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -105,6 +105,7 @@ func makeES( Observer: observer, EscapeHTML: config.EscapeHTML, Transport: config.Transport, + IdleConnTimeout: config.Transport.IdleConnTimeout, }, Index: index, Pipeline: pipeline, diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 88e353e883b..62f8a2580fd 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -1357,6 +1357,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index aebeb1947a6..879164c73bb 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -988,6 +988,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 5619e9a6375..f0c6e0f4d25 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -404,6 +404,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index 999f0416354..c40b92cf45d 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -578,6 +578,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 29704a80ad4..0d2b83316a3 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3988,6 +3988,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index b9a54bd88ec..de3fb3dd171 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -646,6 +646,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index 89fc08ef8e6..24e29870da5 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -614,6 +614,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 63acc6f5245..41853b74de6 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1918,6 +1918,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/osquerybeat/osquerybeat.reference.yml b/x-pack/osquerybeat/osquerybeat.reference.yml index d6770083e62..1da85a9e595 100644 --- a/x-pack/osquerybeat/osquerybeat.reference.yml +++ b/x-pack/osquerybeat/osquerybeat.reference.yml @@ -365,6 +365,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index aebeb1947a6..879164c73bb 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -988,6 +988,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/winlogbeat/winlogbeat.reference.yml b/x-pack/winlogbeat/winlogbeat.reference.yml index 5d52a07b37b..6a82c7d0d84 100644 --- a/x-pack/winlogbeat/winlogbeat.reference.yml +++ b/x-pack/winlogbeat/winlogbeat.reference.yml @@ -406,6 +406,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 From 1c1c16039d19f7ea5c2667af701884709565517a Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 27 Sep 2023 13:30:31 -0500 Subject: [PATCH 02/12] update changelog with PR number --- CHANGELOG.next.asciidoc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 22e99bfe8a7..818531fb588 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -166,8 +166,8 @@ is collected by it. - Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322] - [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506] Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor -- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}99999[99999] -- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}99999[99999] +- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36693[36693] +- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}36693[36693] *Auditbeat* @@ -217,8 +217,7 @@ is collected by it. - Added support for Okta OAuth2 provider in the httpjson input. {pull}36273[36273] - Add support of the interval parameter in Salesforce setupaudittrail-rest fileset. {issue}35917[35917] {pull}35938[35938] - Add device handling to Okta input package for entity analytics. {pull}36049[36049] -- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999] -- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}36286[36286] +- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30916[30916] {pull}36286[36286] - [Azure] Add input metrics to the azure-eventhub input. {pull}35739[35739] - Reduce HTTPJSON metrics allocations. {pull}36282[36282] - Add support for a simplified input configuraton when running under Elastic-Agent {pull}36390[36390] From d6e8568c2a909f90a0bc702bc57073abfd10517f Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 27 Sep 2023 14:04:02 -0500 Subject: [PATCH 03/12] fix lint check --- libbeat/outputs/elasticsearch/elasticsearch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 4d2d9aebe67..04b43fdadbb 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -41,7 +41,7 @@ func makeES( ) (outputs.Group, error) { log := logp.NewLogger(logSelector) if !cfg.HasField("bulk_max_size") { - cfg.SetInt("bulk_max_size", -1, defaultBulkSize) + _ = cfg.SetInt("bulk_max_size", -1, defaultBulkSize) } index, pipeline, err := buildSelectors(im, beat, cfg) From 008f9a857dc26ecb64d4ced2db2fc4abba894944 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 28 Sep 2023 11:35:59 -0500 Subject: [PATCH 04/12] add config validation --- libbeat/cmd/instance/beat.go | 21 ++++++++++++++++++--- libbeat/cmd/instance/beat_test.go | 15 ++++++++++----- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index e0a9de02c19..27af7105020 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -783,8 +783,8 @@ func (b *Beat) configure(settings Settings) error { return fmt.Errorf("error unpacking config data: %w", err) } - if err := mergeOutputQueueSettings(&b.Config); err != nil { - return fmt.Errorf("could not merge output queue settings: %w", err) + if err := promoteOutputQueueSettings(&b.Config); err != nil { + return fmt.Errorf("could not promote output queue settings: %w", err) } if err := features.UpdateFromConfig(b.RawConfig); err != nil { @@ -1485,7 +1485,7 @@ func sanitizeIPs(ips []string) []string { return validIPs } -func mergeOutputQueueSettings(bc *beatConfig) error { +func promoteOutputQueueSettings(bc *beatConfig) error { if bc.Output.IsSet() && bc.Output.Config().Enabled() { pc := pipeline.Config{} err := bc.Output.Config().Unpack(&pc) @@ -1493,8 +1493,23 @@ func mergeOutputQueueSettings(bc *beatConfig) error { return fmt.Errorf("error unpacking output queue settings: %w", err) } if pc.Queue.IsSet() { + logp.Info("global queue settings replaced with output queue settings") bc.Pipeline.Queue = pc.Queue } } return nil } + +func (bc *beatConfig) Validate() error { + if bc.Output.IsSet() && bc.Output.Config().Enabled() { + pc := pipeline.Config{} + err := bc.Output.Config().Unpack(&pc) + if err != nil { + return fmt.Errorf("error unpacking output queue settings: %w", err) + } + if bc.Pipeline.Queue.IsSet() && pc.Queue.IsSet() { + return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed") + } + } + return nil +} diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index bd7bfba39ee..4c8443dfb72 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -272,10 +272,11 @@ func (r *outputReloaderMock) Reload( return nil } -func TestMergeOutputQueueSettings(t *testing.T) { +func TestPromoteOutputQueueSettings(t *testing.T) { tests := map[string]struct { - input []byte - memEvents int + input []byte + memEvents int + expectValidationError bool }{ "blank": {input: []byte(""), memEvents: 4096}, @@ -322,7 +323,7 @@ output: mem: events: 8096 `), - memEvents: 8096}, + expectValidationError: true}, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -331,9 +332,13 @@ output: config := beatConfig{} err = cfg.Unpack(&config) + if tc.expectValidationError { + require.Error(t, err) + return + } require.NoError(t, err) - err = mergeOutputQueueSettings(&config) + err = promoteOutputQueueSettings(&config) require.NoError(t, err) ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config()) From 270499b099b91a9cd0c8acea702956f71c3c2eef Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 28 Sep 2023 13:20:54 -0500 Subject: [PATCH 05/12] remove doc about top level precedence --- libbeat/docs/queueconfig.asciidoc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/libbeat/docs/queueconfig.asciidoc b/libbeat/docs/queueconfig.asciidoc index a9a50288df7..ade3bd2ec8e 100644 --- a/libbeat/docs/queueconfig.asciidoc +++ b/libbeat/docs/queueconfig.asciidoc @@ -12,10 +12,7 @@ batch of events in one transaction. You can configure the type and behavior of the internal queue by setting options in the `queue` section of the +{beatname_lc}.yml+ config file or by setting options in the `queue` section of the -output. Only one queue type can be configured. If both the top level -queue section and the output section are specified the output section -takes precedence. - +output. Only one queue type can be configured. This sample configuration sets the memory queue to buffer up to 4096 events: From 95679d68cc7daab611f38ca0090facfc7b0859aa Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 28 Sep 2023 14:30:27 -0500 Subject: [PATCH 06/12] add check for underAgent and DiskQueue --- libbeat/cmd/instance/beat.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 27af7105020..925246b8038 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -64,8 +64,10 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/plugin" "github.com/elastic/beats/v7/libbeat/pprof" + "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/processing" + "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/file" @@ -838,6 +840,10 @@ func (b *Beat) configure(settings Settings) error { if err != nil { return err } + err = checkAgentDiskQueue(&b.Config) + if err != nil { + return err + } if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil { return err @@ -1513,3 +1519,20 @@ func (bc *beatConfig) Validate() error { } return nil } + +// checkAgentDiskQueue should be run after management.NewManager() so +// that publisher.UnderAgent will be set with correct value +func checkAgentDiskQueue(bc *beatConfig) error { + //restriction is only if under agent + if !publisher.UnderAgent() { + return nil + } + //default queue settings are always allowed + if !bc.Pipeline.Queue.IsSet() { + return nil + } + if bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType { + return fmt.Errorf("disk queue is not supported under elastic-agent") + } + return nil +} From 422fe006d955707d1e9bfba753dccb5d4c047fdd Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Fri, 29 Sep 2023 12:09:25 -0500 Subject: [PATCH 07/12] catch elastic-agent and disk queue at validation --- libbeat/cmd/instance/beat.go | 34 ++++------- libbeat/cmd/instance/beat_test.go | 96 +++++++++++++++++++++++++++---- 2 files changed, 95 insertions(+), 35 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 925246b8038..a0d830bf0b0 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -64,7 +64,6 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/plugin" "github.com/elastic/beats/v7/libbeat/pprof" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" @@ -840,10 +839,6 @@ func (b *Beat) configure(settings Settings) error { if err != nil { return err } - err = checkAgentDiskQueue(&b.Config) - if err != nil { - return err - } if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil { return err @@ -1508,31 +1503,24 @@ func promoteOutputQueueSettings(bc *beatConfig) error { func (bc *beatConfig) Validate() error { if bc.Output.IsSet() && bc.Output.Config().Enabled() { - pc := pipeline.Config{} - err := bc.Output.Config().Unpack(&pc) + outputPC := pipeline.Config{} + err := bc.Output.Config().Unpack(&outputPC) if err != nil { return fmt.Errorf("error unpacking output queue settings: %w", err) } - if bc.Pipeline.Queue.IsSet() && pc.Queue.IsSet() { + if bc.Pipeline.Queue.IsSet() && outputPC.Queue.IsSet() { return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed") } + //elastic-agent doesn't support disk queue yet + if bc.Management.Enabled() && outputPC.Queue.Config().Enabled() && outputPC.Queue.Name() == diskqueue.QueueType { + return fmt.Errorf("disk queue is not supported when management is enabled") + } } - return nil -} -// checkAgentDiskQueue should be run after management.NewManager() so -// that publisher.UnderAgent will be set with correct value -func checkAgentDiskQueue(bc *beatConfig) error { - //restriction is only if under agent - if !publisher.UnderAgent() { - return nil - } - //default queue settings are always allowed - if !bc.Pipeline.Queue.IsSet() { - return nil - } - if bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType { - return fmt.Errorf("disk queue is not supported under elastic-agent") + //elastic-agent doesn't support disk queue yet + if bc.Management.Enabled() && bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType { + return fmt.Errorf("disk queue is not supported when management is enabled") } + return nil } diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index 4c8443dfb72..6f94d8fd62d 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -310,6 +310,46 @@ output: events: 8096 `), memEvents: 8096}, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + cfg, err := yaml.NewConfig(tc.input) + require.NoError(t, err) + + config := beatConfig{} + err = cfg.Unpack(&config) + if tc.expectValidationError { + require.Error(t, err) + return + } + require.NoError(t, err) + + err = promoteOutputQueueSettings(&config) + require.NoError(t, err) + + ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config()) + require.NoError(t, err) + require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config()) + }) + } +} + +func TestValidateBeatConfig(t *testing.T) { + tests := map[string]struct { + input []byte + expectValidationError string + }{ + "blank": {input: []byte(""), + expectValidationError: "", + }, + "defaults": {input: []byte(` +name: mockbeat +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + expectValidationError: ""}, "topAndOutputLevelQueue": {input: []byte(` name: mockbeat queue: @@ -323,27 +363,59 @@ output: mem: events: 8096 `), - expectValidationError: true}, + expectValidationError: "top level queue and output level queue settings defined, only one is allowed accessing config"}, + "managementTopLevelDiskQueue": {input: []byte(` +name: mockbeat +management: + enabled: true +queue: + disk: + max_size: 1G +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + expectValidationError: "disk queue is not supported when management is enabled accessing config"}, + "managementOutputLevelDiskQueue": {input: []byte(` +name: mockbeat +management: + enabled: true +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + disk: + max_size: 1G +`), + expectValidationError: "disk queue is not supported when management is enabled accessing config"}, + "managementFalseOutputLevelDiskQueue": {input: []byte(` +name: mockbeat +management: + enabled: false +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + disk: + max_size: 1G +`), + expectValidationError: ""}, } for name, tc := range tests { t.Run(name, func(t *testing.T) { cfg, err := yaml.NewConfig(tc.input) require.NoError(t, err) - config := beatConfig{} err = cfg.Unpack(&config) - if tc.expectValidationError { + if tc.expectValidationError != "" { require.Error(t, err) - return + require.Equal(t, tc.expectValidationError, err.Error()) + } else { + require.NoError(t, err) } - require.NoError(t, err) - - err = promoteOutputQueueSettings(&config) - require.NoError(t, err) - - ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config()) - require.NoError(t, err) - require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config()) }) } } From b1a86665fd5591d9b431670eda19ea26d56e95c7 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Mon, 2 Oct 2023 13:09:30 -0500 Subject: [PATCH 08/12] support for reloading --- CHANGELOG.next.asciidoc | 4 +- libbeat/outputs/console/config.go | 6 ++- libbeat/outputs/console/console.go | 14 ++---- libbeat/outputs/elasticsearch/config.go | 1 + .../outputs/elasticsearch/elasticsearch.go | 34 ++++++------- libbeat/outputs/fileout/config.go | 24 ++++----- libbeat/outputs/fileout/file.go | 10 ++-- libbeat/outputs/kafka/config.go | 7 +-- libbeat/outputs/kafka/kafka.go | 12 ++--- libbeat/outputs/logstash/config.go | 1 + libbeat/outputs/logstash/logstash.go | 18 +++---- libbeat/outputs/redis/config.go | 2 + libbeat/outputs/redis/redis.go | 34 ++++++------- libbeat/outputs/util.go | 49 ++++++++++++++++--- libbeat/publisher/pipeline/client_test.go | 2 +- libbeat/publisher/pipeline/controller.go | 11 +++-- libbeat/publisher/pipeline/controller_test.go | 2 +- libbeat/publisher/pipeline/pipeline.go | 10 ++-- libbeat/publisher/pipeline/stress/out.go | 13 ++--- libbeat/publisher/queue/diskqueue/queue.go | 1 + libbeat/publisher/queue/memqueue/broker.go | 7 +-- .../publisher/queue/memqueue/queue_test.go | 12 ++--- libbeat/publisher/queue/proxy/broker.go | 1 + libbeat/publisher/queue/queue.go | 2 +- 24 files changed, 155 insertions(+), 122 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 818531fb588..1ab4e02e129 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -166,8 +166,8 @@ is collected by it. - Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322] - [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506] Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor -- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36693[36693] -- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}36693[36693] +- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36788[36788] +- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}36788[36788] *Auditbeat* diff --git a/libbeat/outputs/console/config.go b/libbeat/outputs/console/config.go index 44869e388fa..e0a1cc9ff28 100644 --- a/libbeat/outputs/console/config.go +++ b/libbeat/outputs/console/config.go @@ -17,7 +17,10 @@ package console -import "github.com/elastic/beats/v7/libbeat/outputs/codec" +import ( + "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/elastic-agent-libs/config" +) type Config struct { Codec codec.Config `config:"codec"` @@ -26,6 +29,7 @@ type Config struct { Pretty bool `config:"pretty"` BatchSize int + Queue config.Namespace `config:"queue"` } var defaultConfig = Config{} diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 905aa778998..b81bf336348 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -23,7 +23,6 @@ import ( "fmt" "os" "runtime" - "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" @@ -43,13 +42,6 @@ type console struct { index string } -type consoleEvent struct { - Timestamp time.Time `json:"@timestamp" struct:"@timestamp"` - - // Note: stdlib json doesn't support inlining :( -> use `codec: 2`, to generate proper event - Fields interface{} `struct:",inline"` -} - func init() { outputs.RegisterType("console", makeConsole) } @@ -82,18 +74,18 @@ func makeConsole( index := beat.Beat c, err := newConsole(index, observer, enc) if err != nil { - return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err)) + return outputs.Fail(fmt.Errorf("console output initialization failed with: %w", err)) } // check stdout actually being available if runtime.GOOS != "windows" { if _, err = c.out.Stat(); err != nil { - err = fmt.Errorf("console output initialization failed with: %v", err) + err = fmt.Errorf("console output initialization failed with: %w", err) return outputs.Fail(err) } } - return outputs.Success(config.BatchSize, 0, c) + return outputs.Success(config.Queue, config.BatchSize, 0, c) } func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) { diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index ca77a44b833..e504f2dc213 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -45,6 +45,7 @@ type elasticsearchConfig struct { AllowOlderVersion bool `config:"allow_older_versions"` Transport httpcommon.HTTPTransportSettings `config:",inline"` + Queue config.Namespace `config:"queue"` } type Backoff struct { diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 04b43fdadbb..bc3cca2d560 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -49,12 +49,12 @@ func makeES( return outputs.Fail(err) } - config := defaultConfig - if err := cfg.Unpack(&config); err != nil { + esConfig := defaultConfig + if err := cfg.Unpack(&esConfig); err != nil { return outputs.Fail(err) } - policy, err := newNonIndexablePolicy(config.NonIndexablePolicy) + policy, err := newNonIndexablePolicy(esConfig.NonIndexablePolicy) if err != nil { log.Errorf("error while creating file identifier: %v", err) return outputs.Fail(err) @@ -65,12 +65,12 @@ func makeES( return outputs.Fail(err) } - if proxyURL := config.Transport.Proxy.URL; proxyURL != nil && !config.Transport.Proxy.Disable { + if proxyURL := esConfig.Transport.Proxy.URL; proxyURL != nil && !esConfig.Transport.Proxy.Disable { log.Debugf("breaking down proxy URL. Scheme: '%s', host[:port]: '%s', path: '%s'", proxyURL.Scheme, proxyURL.Host, proxyURL.Path) log.Infof("Using proxy URL: %s", proxyURL) } - params := config.Params + params := esConfig.Params if len(params) == 0 { params = nil } @@ -84,7 +84,7 @@ func makeES( clients := make([]outputs.NetworkClient, len(hosts)) for i, host := range hosts { - esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) + esURL, err := common.MakeURL(esConfig.Protocol, esConfig.Path, host, 9200) if err != nil { log.Errorf("Invalid host param set: %s, Error: %+v", host, err) return outputs.Fail(err) @@ -95,17 +95,17 @@ func makeES( ConnectionSettings: eslegclient.ConnectionSettings{ URL: esURL, Beatname: beat.Beat, - Kerberos: config.Kerberos, - Username: config.Username, - Password: config.Password, - APIKey: config.APIKey, + Kerberos: esConfig.Kerberos, + Username: esConfig.Username, + Password: esConfig.Password, + APIKey: esConfig.APIKey, Parameters: params, - Headers: config.Headers, - CompressionLevel: config.CompressionLevel, + Headers: esConfig.Headers, + CompressionLevel: esConfig.CompressionLevel, Observer: observer, - EscapeHTML: config.EscapeHTML, - Transport: config.Transport, - IdleConnTimeout: config.Transport.IdleConnTimeout, + EscapeHTML: esConfig.EscapeHTML, + Transport: esConfig.Transport, + IdleConnTimeout: esConfig.Transport.IdleConnTimeout, }, Index: index, Pipeline: pipeline, @@ -116,11 +116,11 @@ func makeES( return outputs.Fail(err) } - client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max) + client = outputs.WithBackoff(client, esConfig.Backoff.Init, esConfig.Backoff.Max) clients[i] = client } - return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients) + return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, clients) } func buildSelectors( diff --git a/libbeat/outputs/fileout/config.go b/libbeat/outputs/fileout/config.go index cfd28bfaaf2..e72a9f87d6f 100644 --- a/libbeat/outputs/fileout/config.go +++ b/libbeat/outputs/fileout/config.go @@ -21,21 +21,23 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/file" ) -type config struct { - Path string `config:"path"` - Filename string `config:"filename"` - RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"` - NumberOfFiles uint `config:"number_of_files"` - Codec codec.Config `config:"codec"` - Permissions uint32 `config:"permissions"` - RotateOnStartup bool `config:"rotate_on_startup"` +type fileOutConfig struct { + Path string `config:"path"` + Filename string `config:"filename"` + RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"` + NumberOfFiles uint `config:"number_of_files"` + Codec codec.Config `config:"codec"` + Permissions uint32 `config:"permissions"` + RotateOnStartup bool `config:"rotate_on_startup"` + Queue config.Namespace `config:"queue"` } -func defaultConfig() config { - return config{ +func defaultConfig() fileOutConfig { + return fileOutConfig{ NumberOfFiles: 7, RotateEveryKb: 10 * 1024, Permissions: 0600, @@ -43,7 +45,7 @@ func defaultConfig() config { } } -func (c *config) Validate() error { +func (c *fileOutConfig) Validate() error { if c.NumberOfFiles < 2 || c.NumberOfFiles > file.MaxBackupsLimit { return fmt.Errorf("the number_of_files to keep should be between 2 and %v", file.MaxBackupsLimit) diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 949d835f541..d12a11b25c3 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -51,8 +51,8 @@ func makeFileout( observer outputs.Observer, cfg *c.C, ) (outputs.Group, error) { - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { + foConfig := defaultConfig() + if err := cfg.Unpack(&foConfig); err != nil { return outputs.Fail(err) } @@ -64,14 +64,14 @@ func makeFileout( beat: beat, observer: observer, } - if err := fo.init(beat, config); err != nil { + if err := fo.init(beat, foConfig); err != nil { return outputs.Fail(err) } - return outputs.Success(-1, 0, fo) + return outputs.Success(foConfig.Queue, -1, 0, fo) } -func (out *fileOutput) init(beat beat.Info, c config) error { +func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error { var path string if c.Filename != "" { path = filepath.Join(c.Path, c.Filename) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 7247699500f..8fff8dad0d5 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -76,6 +76,7 @@ type kafkaConfig struct { Codec codec.Config `config:"codec"` Sasl kafka.SaslConfig `config:"sasl"` EnableFAST bool `config:"enable_krb5_fast"` + Queue config.Namespace `config:"queue"` } type metaConfig struct { @@ -101,12 +102,6 @@ var compressionModes = map[string]sarama.CompressionCodec{ "snappy": sarama.CompressionSnappy, } -const ( - saslTypePlaintext = sarama.SASLTypePlaintext - saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256 - saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512 -) - func defaultConfig() kafkaConfig { return kafkaConfig{ Hosts: nil, diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index ef1c253981f..0c856ea425d 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -47,7 +47,7 @@ func makeKafka( log := logp.NewLogger(logSelector) log.Debug("initialize kafka output") - config, err := readConfig(cfg) + kConfig, err := readConfig(cfg) if err != nil { return outputs.Fail(err) } @@ -57,7 +57,7 @@ func makeKafka( return outputs.Fail(err) } - libCfg, err := newSaramaConfig(log, config) + libCfg, err := newSaramaConfig(log, kConfig) if err != nil { return outputs.Fail(err) } @@ -67,21 +67,21 @@ func makeKafka( return outputs.Fail(err) } - codec, err := codec.CreateEncoder(beat, config.Codec) + codec, err := codec.CreateEncoder(beat, kConfig.Codec) if err != nil { return outputs.Fail(err) } - client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, config.Headers, codec, libCfg) + client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg) if err != nil { return outputs.Fail(err) } retry := 0 - if config.MaxRetries < 0 { + if kConfig.MaxRetries < 0 { retry = -1 } - return outputs.Success(config.BulkMaxSize, retry, client) + return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, client) } func buildTopicSelector(cfg *config.C) (outil.Selector, error) { diff --git a/libbeat/outputs/logstash/config.go b/libbeat/outputs/logstash/config.go index 82747fe01d0..9df57514495 100644 --- a/libbeat/outputs/logstash/config.go +++ b/libbeat/outputs/logstash/config.go @@ -43,6 +43,7 @@ type Config struct { Proxy transport.ProxyConfig `config:",inline"` Backoff Backoff `config:"backoff"` EscapeHTML bool `config:"escape_html"` + Queue config.Namespace `config:"queue"` } type Backoff struct { diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 5e7cdfeee7a..072ec049f6f 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -41,7 +41,7 @@ func makeLogstash( observer outputs.Observer, cfg *conf.C, ) (outputs.Group, error) { - config, err := readConfig(cfg, beat) + lsConfig, err := readConfig(cfg, beat) if err != nil { return outputs.Fail(err) } @@ -51,14 +51,14 @@ func makeLogstash( return outputs.Fail(err) } - tls, err := tlscommon.LoadTLSConfig(config.TLS) + tls, err := tlscommon.LoadTLSConfig(lsConfig.TLS) if err != nil { return outputs.Fail(err) } transp := transport.Config{ - Timeout: config.Timeout, - Proxy: &config.Proxy, + Timeout: lsConfig.Timeout, + Proxy: &lsConfig.Proxy, TLS: tls, Stats: observer, } @@ -72,18 +72,18 @@ func makeLogstash( return outputs.Fail(err) } - if config.Pipelining > 0 { - client, err = newAsyncClient(beat, conn, observer, config) + if lsConfig.Pipelining > 0 { + client, err = newAsyncClient(beat, conn, observer, lsConfig) } else { - client, err = newSyncClient(beat, conn, observer, config) + client, err = newSyncClient(beat, conn, observer, lsConfig) } if err != nil { return outputs.Fail(err) } - client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max) + client = outputs.WithBackoff(client, lsConfig.Backoff.Init, lsConfig.Backoff.Max) clients[i] = client } - return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients) + return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, clients) } diff --git a/libbeat/outputs/redis/config.go b/libbeat/outputs/redis/config.go index 01c8f2e0238..4785af137f1 100644 --- a/libbeat/outputs/redis/config.go +++ b/libbeat/outputs/redis/config.go @@ -22,6 +22,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/transport" "github.com/elastic/elastic-agent-libs/transport/tlscommon" ) @@ -40,6 +41,7 @@ type redisConfig struct { Db int `config:"db"` DataType string `config:"datatype"` Backoff backoff `config:"backoff"` + Queue config.Namespace `config:"queue"` } type backoff struct { diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 026cb04d4f8..9814d6abee7 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -34,10 +34,6 @@ import ( "github.com/elastic/elastic-agent-libs/transport/tlscommon" ) -type redisOut struct { - beat beat.Info -} - const ( defaultWaitRetry = 1 * time.Second defaultMaxWaitRetry = 60 * time.Second @@ -58,7 +54,9 @@ func makeRedis( ) (outputs.Group, error) { if !cfg.HasField("index") { - cfg.SetString("index", -1, beat.Beat) + if err := cfg.SetString("index", -1, beat.Beat); err != nil { + return outputs.Fail(err) + } } err := cfgwarn.CheckRemoved6xSettings(cfg, "port") @@ -77,13 +75,13 @@ func makeRedis( } } - config := defaultConfig - if err := cfg.Unpack(&config); err != nil { + rConfig := defaultConfig + if err := cfg.Unpack(&rConfig); err != nil { return outputs.Fail(err) } var dataType redisDataType - switch config.DataType { + switch rConfig.DataType { case "", "list": dataType = redisListType case "channel": @@ -102,7 +100,7 @@ func makeRedis( return outputs.Fail(err) } - tls, err := tlscommon.LoadTLSConfig(config.TLS) + tls, err := tlscommon.LoadTLSConfig(rConfig.TLS) if err != nil { return outputs.Fail(err) } @@ -129,8 +127,8 @@ func makeRedis( } transp := transport.Config{ - Timeout: config.Timeout, - Proxy: &config.Proxy, + Timeout: rConfig.Timeout, + Proxy: &rConfig.Proxy, TLS: tls, Stats: observer, } @@ -138,7 +136,7 @@ func makeRedis( switch hostUrl.Scheme { case redisScheme: if hasScheme { - transp.TLS = nil // disable TLS if user explicitely set `redis` scheme + transp.TLS = nil // disable TLS if user explicitly set `redis` scheme } case tlsRedisScheme: if transp.TLS == nil { @@ -151,23 +149,23 @@ func makeRedis( return outputs.Fail(err) } - pass := config.Password + pass := rConfig.Password hostPass, passSet := hostUrl.User.Password() if passSet { pass = hostPass } - enc, err := codec.CreateEncoder(beat, config.Codec) + enc, err := codec.CreateEncoder(beat, rConfig.Codec) if err != nil { return outputs.Fail(err) } - client := newClient(conn, observer, config.Timeout, - pass, config.Db, key, dataType, config.Index, enc) - clients[i] = newBackoffClient(client, config.Backoff.Init, config.Backoff.Max) + client := newClient(conn, observer, rConfig.Timeout, + pass, rConfig.Db, key, dataType, rConfig.Index, enc) + clients[i] = newBackoffClient(client, rConfig.Backoff.Init, rConfig.Backoff.Max) } - return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients) + return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, rConfig.MaxRetries, clients) } func buildKeySelector(cfg *config.C) (outil.Selector, error) { diff --git a/libbeat/outputs/util.go b/libbeat/outputs/util.go index 15068910f8c..7c096916a6c 100644 --- a/libbeat/outputs/util.go +++ b/libbeat/outputs/util.go @@ -17,16 +17,49 @@ package outputs +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + "github.com/elastic/elastic-agent-libs/config" +) + // Fail helper can be used by output factories, to create a failure response when // loading an output must return an error. func Fail(err error) (Group, error) { return Group{}, err } // Success create a valid output Group response for a set of client instances. -func Success(batchSize, retry int, clients ...Client) (Group, error) { +func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Group, error) { + var q queue.QueueFactory + if cfg.IsSet() && cfg.Config().Enabled() { + switch cfg.Name() { + case memqueue.QueueType: + settings, err := memqueue.SettingsForUserConfig(cfg.Config()) + if err != nil { + return Group{}, fmt.Errorf("unable to get memory queue settings: %w", err) + } + q = memqueue.FactoryForSettings(settings) + case diskqueue.QueueType: + if publisher.UnderAgent() { + return Group{}, fmt.Errorf("disk queue not supported under agent") + } + settings, err := diskqueue.SettingsForUserConfig(cfg.Config()) + if err != nil { + return Group{}, fmt.Errorf("unable to get disk queue settings: %w", err) + } + q = diskqueue.FactoryForSettings(settings) + default: + return Group{}, fmt.Errorf("unknown queue type: %s", cfg.Name()) + } + } return Group{ - Clients: clients, - BatchSize: batchSize, - Retry: retry, + Clients: clients, + BatchSize: batchSize, + Retry: retry, + QueueFactory: q, }, nil } @@ -39,11 +72,13 @@ func NetworkClients(netclients []NetworkClient) []Client { return clients } -func SuccessNet(loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { +// SuccessNet create a valid output Group and creates client instances +func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { + if !loadbalance { - return Success(batchSize, retry, NewFailoverClient(netclients)) + return Success(cfg, batchSize, retry, NewFailoverClient(netclients)) } clients := NetworkClients(netclients) - return Success(batchSize, retry, clients...) + return Success(cfg, batchSize, retry, clients...) } diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 4a212092c7e..15260172ff5 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -144,7 +144,7 @@ func TestClientWaitClose(t *testing.T) { err := logp.TestingSetup() assert.Nil(t, err) - q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}) + q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0) pipeline := makePipeline(Settings{}, q) defer pipeline.Close() diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index bf080677ef4..21fb22f45ce 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -60,8 +60,9 @@ type outputController struct { workerChan chan publisher.Batch - consumer *eventConsumer - workers []outputWorker + consumer *eventConsumer + workers []outputWorker + inputQueueSize int } type producerRequest struct { @@ -81,6 +82,7 @@ func newOutputController( observer outputObserver, eventWaitGroup *sync.WaitGroup, queueFactory queue.QueueFactory, + inputQueueSize int, ) (*outputController, error) { controller := &outputController{ beat: beat, @@ -90,6 +92,7 @@ func newOutputController( queueFactory: queueFactory, workerChan: make(chan publisher.Batch), consumer: newEventConsumer(monitors.Logger, observer), + inputQueueSize: inputQueueSize, } return controller, nil @@ -258,11 +261,11 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { factory = c.queueFactory } - queue, err := factory(logger, c.onACK) + queue, err := factory(logger, c.onACK, c.inputQueueSize) if err != nil { logger.Errorf("queue creation failed, falling back to default memory queue, check your queue configuration") s, _ := memqueue.SettingsForUserConfig(nil) - queue = memqueue.NewQueue(logger, c.onACK, s) + queue = memqueue.NewQueue(logger, c.onACK, s, c.inputQueueSize) } c.queue = queue diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 366f4bff1d9..7384e5f7128 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -189,7 +189,7 @@ func TestOutputQueueFactoryTakesPrecedence(t *testing.T) { func TestFailedQueueFactoryRevertsToDefault(t *testing.T) { defaultSettings, _ := memqueue.SettingsForUserConfig(nil) - failedFactory := func(_ *logp.Logger, _ func(int)) (queue.Queue, error) { + failedFactory := func(_ *logp.Logger, _ func(int), _ int) (queue.Queue, error) { return nil, fmt.Errorf("This queue creation intentionally failed") } controller := outputController{ diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 209688bb5c2..cf03163750e 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -153,13 +153,12 @@ func New( if b := userQueueConfig.Name(); b != "" { queueType = b } - queueFactory, err := queueFactoryForUserConfig( - queueType, userQueueConfig.Config(), settings.InputQueueSize) + queueFactory, err := queueFactoryForUserConfig(queueType, userQueueConfig.Config()) if err != nil { return nil, err } - output, err := newOutputController(beat, monitors, p.observer, p.eventWaitGroup, queueFactory) + output, err := newOutputController(beat, monitors, p.observer, p.eventWaitGroup, queueFactory, settings.InputQueueSize) if err != nil { return nil, err } @@ -399,16 +398,13 @@ func (p *Pipeline) OutputReloader() OutputReloader { // This helper exists to frontload config parsing errors: if there is an // error in the queue config, we want it to show up as fatal during // initialization, even if the queue itself isn't created until later. -func queueFactoryForUserConfig(queueType string, userConfig *conf.C, inQueueSize int) (queue.QueueFactory, error) { +func queueFactoryForUserConfig(queueType string, userConfig *conf.C) (queue.QueueFactory, error) { switch queueType { case memqueue.QueueType: settings, err := memqueue.SettingsForUserConfig(userConfig) if err != nil { return nil, err } - // The memory queue has a special override during pipeline - // initialization for the size of its API channel buffer. - settings.InputQueueSize = inQueueSize return memqueue.FactoryForSettings(settings), nil case diskqueue.QueueType: settings, err := diskqueue.SettingsForUserConfig(userConfig) diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index 6aa510de1b0..d1014b8d782 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -35,11 +35,12 @@ type testOutput struct { } type testOutputConfig struct { - Worker int `config:"worker" validate:"min=1"` - BulkMaxSize int `config:"bulk_max_size"` - Retry int `config:"retry"` - MinWait time.Duration `config:"min_wait"` - MaxWait time.Duration `config:"max_wait"` + Worker int `config:"worker" validate:"min=1"` + BulkMaxSize int `config:"bulk_max_size"` + Retry int `config:"retry"` + MinWait time.Duration `config:"min_wait"` + MaxWait time.Duration `config:"max_wait"` + Queue conf.Namespace `config:"queue"` Fail struct { EveryBatch int } @@ -66,7 +67,7 @@ func makeTestOutput(_ outputs.IndexManager, beat beat.Info, observer outputs.Obs clients[i] = client } - return outputs.Success(config.BulkMaxSize, config.Retry, clients...) + return outputs.Success(config.Queue, config.BulkMaxSize, config.Retry, clients...) } func (*testOutput) Close() error { return nil } diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 2b754890882..74fff3fea64 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -109,6 +109,7 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, ackCallback func(eventCount int), + inputQueueSize int, ) (queue.Queue, error) { return NewQueue(logger, ackCallback, settings) } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 0bb3ff9ed8e..ac5b9dc6615 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -84,7 +84,6 @@ type Settings struct { Events int FlushMinEvents int FlushTimeout time.Duration - InputQueueSize int } type queueEntry struct { @@ -123,8 +122,9 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, ackCallback func(eventCount int), + inputQueueSize int, ) (queue.Queue, error) { - return NewQueue(logger, ackCallback, settings), nil + return NewQueue(logger, ackCallback, settings, inputQueueSize), nil } } @@ -135,6 +135,7 @@ func NewQueue( logger *logp.Logger, ackCallback func(eventCount int), settings Settings, + inputQueueSize int, ) *broker { var ( sz = settings.Events @@ -142,7 +143,7 @@ func NewQueue( flushTimeout = settings.FlushTimeout ) - chanSize := AdjustInputQueueSize(settings.InputQueueSize, sz) + chanSize := AdjustInputQueueSize(inputQueueSize, sz) if minEvents < 1 { minEvents = 1 diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index ef9ee52a944..28cc38025c3 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -103,7 +103,7 @@ func TestQueueMetricsBuffer(t *testing.T) { } func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) { - testQueue := NewQueue(nil, nil, settings) + testQueue := NewQueue(nil, nil, settings, 0) defer testQueue.Close() // Send events to queue @@ -147,7 +147,7 @@ func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.Queu Events: sz, FlushMinEvents: minEvents, FlushTimeout: flushTimeout, - }) + }, 0) } } @@ -258,22 +258,22 @@ func TestEntryIDs(t *testing.T) { } t.Run("acking in forward order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0) testForward(testQueue) }) t.Run("acking in reverse order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0) testBackward(testQueue) }) t.Run("acking in forward order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}, 0) testForward(testQueue) }) t.Run("acking in reverse order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}, 0) testBackward(testQueue) }) } diff --git a/libbeat/publisher/queue/proxy/broker.go b/libbeat/publisher/queue/proxy/broker.go index 20400e3ab75..832739cc26d 100644 --- a/libbeat/publisher/queue/proxy/broker.go +++ b/libbeat/publisher/queue/proxy/broker.go @@ -90,6 +90,7 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, ackCallback func(eventCount int), + inputQueueSize int, ) (queue.Queue, error) { return NewQueue(logger, ackCallback, settings), nil } diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index d0e1c047610..101a3290117 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -74,7 +74,7 @@ type Queue interface { Metrics() (Metrics, error) } -type QueueFactory func(logger *logp.Logger, ack func(eventCount int)) (Queue, error) +type QueueFactory func(logger *logp.Logger, ack func(eventCount int), inputQueueSize int) (Queue, error) // BufferConfig returns the pipelines buffering settings, // for the pipeline to use. From a4a5843f4bd0bf9f6ed2ab08c351933ccdea53fd Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Fri, 13 Oct 2023 13:18:41 -0500 Subject: [PATCH 09/12] remove idle_connection_timeout --- CHANGELOG.next.asciidoc | 1 - NOTICE.txt | 1 + auditbeat/auditbeat.reference.yml | 5 ----- filebeat/filebeat.reference.yml | 5 ----- heartbeat/heartbeat.reference.yml | 5 ----- libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl | 5 ----- libbeat/outputs/elasticsearch/client.go | 1 - libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc | 4 ---- libbeat/outputs/elasticsearch/elasticsearch.go | 1 - metricbeat/metricbeat.reference.yml | 5 ----- packetbeat/packetbeat.reference.yml | 5 ----- winlogbeat/winlogbeat.reference.yml | 5 ----- x-pack/auditbeat/auditbeat.reference.yml | 5 ----- x-pack/filebeat/filebeat.reference.yml | 5 ----- x-pack/functionbeat/functionbeat.reference.yml | 5 ----- x-pack/heartbeat/heartbeat.reference.yml | 5 ----- x-pack/metricbeat/metricbeat.reference.yml | 5 ----- x-pack/osquerybeat/osquerybeat.reference.yml | 5 ----- x-pack/packetbeat/packetbeat.reference.yml | 5 ----- x-pack/winlogbeat/winlogbeat.reference.yml | 5 ----- 20 files changed, 1 insertion(+), 82 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1ab4e02e129..9361608160b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -167,7 +167,6 @@ is collected by it. - [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506] Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor - allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36788[36788] -- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}36788[36788] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index 6ae9f58daba..60ed10e38fc 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12718,6 +12718,7 @@ Licence type (autodetected): Apache-2.0 Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.6.0/LICENSE: + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index c55940a33eb..32fbef2da04 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -522,11 +522,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 2467517240f..7f86b5aa9d2 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1618,11 +1618,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 24e29870da5..89fc08ef8e6 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -614,11 +614,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl b/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl index 25698efe4a3..48f1ba2c007 100644 --- a/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl +++ b/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl @@ -81,11 +81,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index b485807776e..c80e95ebc90 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -102,7 +102,6 @@ func NewClient( CompressionLevel: s.CompressionLevel, EscapeHTML: s.EscapeHTML, Transport: s.Transport, - IdleConnTimeout: s.IdleConnTimeout, }) if err != nil { return nil, err diff --git a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc index 482c9e56bb9..5ea65c16dc4 100644 --- a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc +++ b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc @@ -689,10 +689,6 @@ default is `1s`. The maximum number of seconds to wait before attempting to connect to Elasticsearch after a network error. The default is `60s`. -===== `idle_connection_timeout` - -The maximum amount of time an idle connection will remain idle before closing itself. Zero means no limit. The format is a Go language duration (example 60s is 60 seconds). The default is 0. - ===== `timeout` The http request timeout in seconds for the Elasticsearch request. The default is 90. diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index bc3cca2d560..f7e38853924 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -105,7 +105,6 @@ func makeES( Observer: observer, EscapeHTML: esConfig.EscapeHTML, Transport: esConfig.Transport, - IdleConnTimeout: esConfig.Transport.IdleConnTimeout, }, Index: index, Pipeline: pipeline, diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 62f8a2580fd..88e353e883b 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -1357,11 +1357,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 879164c73bb..aebeb1947a6 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -988,11 +988,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index f0c6e0f4d25..5619e9a6375 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -404,11 +404,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index c40b92cf45d..999f0416354 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -578,11 +578,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 0d2b83316a3..29704a80ad4 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3988,11 +3988,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index de3fb3dd171..b9a54bd88ec 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -646,11 +646,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index 24e29870da5..89fc08ef8e6 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -614,11 +614,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 41853b74de6..63acc6f5245 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1918,11 +1918,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/osquerybeat/osquerybeat.reference.yml b/x-pack/osquerybeat/osquerybeat.reference.yml index 1da85a9e595..d6770083e62 100644 --- a/x-pack/osquerybeat/osquerybeat.reference.yml +++ b/x-pack/osquerybeat/osquerybeat.reference.yml @@ -365,11 +365,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index 879164c73bb..aebeb1947a6 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -988,11 +988,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/winlogbeat/winlogbeat.reference.yml b/x-pack/winlogbeat/winlogbeat.reference.yml index 6a82c7d0d84..5d52a07b37b 100644 --- a/x-pack/winlogbeat/winlogbeat.reference.yml +++ b/x-pack/winlogbeat/winlogbeat.reference.yml @@ -406,11 +406,6 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s - # The maximum amount of time an idle connection will remain idle - # before closing itself. Zero means no limit. The format is a Go - # language duration (example 60s is 60 seconds). The default is 0. - #idle_connection_timeout: 60s - # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 From d075980277bd310c4016aec9403d3e6f29477ba1 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Fri, 13 Oct 2023 14:27:38 -0500 Subject: [PATCH 10/12] respond to comments --- libbeat/cmd/instance/beat_test.go | 66 +++++++++++++++++++------------ libbeat/outputs/util.go | 2 + 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index 6f94d8fd62d..2753dd3a955 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -274,21 +274,25 @@ func (r *outputReloaderMock) Reload( func TestPromoteOutputQueueSettings(t *testing.T) { tests := map[string]struct { - input []byte - memEvents int - expectValidationError bool + input []byte + memEvents int }{ - "blank": {input: []byte(""), - memEvents: 4096}, - "defaults": {input: []byte(` + "blank": { + input: []byte(""), + memEvents: 4096, + }, + "defaults": { + input: []byte(` name: mockbeat output: elasticsearch: hosts: - "localhost:9200" `), - memEvents: 4096}, - "topLevelQueue": {input: []byte(` + memEvents: 4096, + }, + "topLevelQueue": { + input: []byte(` name: mockbeat queue: mem: @@ -298,8 +302,10 @@ output: hosts: - "localhost:9200" `), - memEvents: 8096}, - "outputLevelQueue": {input: []byte(` + memEvents: 8096, + }, + "outputLevelQueue": { + input: []byte(` name: mockbeat output: elasticsearch: @@ -309,7 +315,8 @@ output: mem: events: 8096 `), - memEvents: 8096}, + memEvents: 8096, + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -318,10 +325,6 @@ output: config := beatConfig{} err = cfg.Unpack(&config) - if tc.expectValidationError { - require.Error(t, err) - return - } require.NoError(t, err) err = promoteOutputQueueSettings(&config) @@ -339,18 +342,22 @@ func TestValidateBeatConfig(t *testing.T) { input []byte expectValidationError string }{ - "blank": {input: []byte(""), + "blank": { + input: []byte(""), expectValidationError: "", }, - "defaults": {input: []byte(` + "defaults": { + input: []byte(` name: mockbeat output: elasticsearch: hosts: - "localhost:9200" `), - expectValidationError: ""}, - "topAndOutputLevelQueue": {input: []byte(` + expectValidationError: "", + }, + "topAndOutputLevelQueue": { + input: []byte(` name: mockbeat queue: mem: @@ -363,8 +370,10 @@ output: mem: events: 8096 `), - expectValidationError: "top level queue and output level queue settings defined, only one is allowed accessing config"}, - "managementTopLevelDiskQueue": {input: []byte(` + expectValidationError: "top level queue and output level queue settings defined, only one is allowed accessing config", + }, + "managementTopLevelDiskQueue": { + input: []byte(` name: mockbeat management: enabled: true @@ -376,8 +385,10 @@ output: hosts: - "localhost:9200" `), - expectValidationError: "disk queue is not supported when management is enabled accessing config"}, - "managementOutputLevelDiskQueue": {input: []byte(` + expectValidationError: "disk queue is not supported when management is enabled accessing config", + }, + "managementOutputLevelDiskQueue": { + input: []byte(` name: mockbeat management: enabled: true @@ -389,8 +400,10 @@ output: disk: max_size: 1G `), - expectValidationError: "disk queue is not supported when management is enabled accessing config"}, - "managementFalseOutputLevelDiskQueue": {input: []byte(` + expectValidationError: "disk queue is not supported when management is enabled accessing config", + }, + "managementFalseOutputLevelDiskQueue": { + input: []byte(` name: mockbeat management: enabled: false @@ -402,7 +415,8 @@ output: disk: max_size: 1G `), - expectValidationError: ""}, + expectValidationError: "", + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { diff --git a/libbeat/outputs/util.go b/libbeat/outputs/util.go index 7c096916a6c..caf9280af15 100644 --- a/libbeat/outputs/util.go +++ b/libbeat/outputs/util.go @@ -32,6 +32,7 @@ import ( func Fail(err error) (Group, error) { return Group{}, err } // Success create a valid output Group response for a set of client instances. +// The first argument is expected to contain a queue config.Namespace. func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Group, error) { var q queue.QueueFactory if cfg.IsSet() && cfg.Config().Enabled() { @@ -73,6 +74,7 @@ func NetworkClients(netclients []NetworkClient) []Client { } // SuccessNet create a valid output Group and creates client instances +// The first argument is expected to contain a queue config.Namespace. func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { if !loadbalance { From 4dc26ba173df796a7e2a758b225c77cb615c2ec5 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 19 Oct 2023 13:39:01 -0500 Subject: [PATCH 11/12] Add doc strings to help explain why changes are the way they are --- libbeat/cmd/instance/beat.go | 5 +++++ libbeat/outputs/util.go | 8 ++++++-- libbeat/publisher/pipeline/controller.go | 10 ++++++++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index a0d830bf0b0..bae4c18fa93 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -1486,6 +1486,11 @@ func sanitizeIPs(ips []string) []string { return validIPs } +// promoteOutputQueueSettings checks to see if the output +// configuration has queue settings defined and if so it promotes them +// to the top level queue settings. This is done to allow existing +// behavior of specifying queue settings at the top level or like +// elastic-agent that specifies queue settings under the output func promoteOutputQueueSettings(bc *beatConfig) error { if bc.Output.IsSet() && bc.Output.Config().Enabled() { pc := pipeline.Config{} diff --git a/libbeat/outputs/util.go b/libbeat/outputs/util.go index caf9280af15..ce8765b5c2e 100644 --- a/libbeat/outputs/util.go +++ b/libbeat/outputs/util.go @@ -31,8 +31,10 @@ import ( // loading an output must return an error. func Fail(err error) (Group, error) { return Group{}, err } -// Success create a valid output Group response for a set of client instances. -// The first argument is expected to contain a queue config.Namespace. +// Success create a valid output Group response for a set of client +// instances. The first argument is expected to contain a queue +// config.Namespace. The queue config is passed to assign the queue +// factory when elastic-agent reloads the output. func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Group, error) { var q queue.QueueFactory if cfg.IsSet() && cfg.Config().Enabled() { @@ -75,6 +77,8 @@ func NetworkClients(netclients []NetworkClient) []Client { // SuccessNet create a valid output Group and creates client instances // The first argument is expected to contain a queue config.Namespace. +// The queue config is passed to assign the queue factory when +// elastic-agent reloads the output. func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { if !loadbalance { diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 21fb22f45ce..1c480c01bce 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -60,8 +60,14 @@ type outputController struct { workerChan chan publisher.Batch - consumer *eventConsumer - workers []outputWorker + consumer *eventConsumer + workers []outputWorker + // The InputQueueSize can be set when the Beat is started, in + // libbeat/cmd/instance/Settings we need to preserve that + // value and pass it into the queue factory. The queue + // factory could be made from elastic-agent output + // configuration reloading which doesn't have access to this + // setting. inputQueueSize int } From b005670c1ee222c12550fe2a514c20c37c6c6bd3 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 19 Oct 2023 14:14:55 -0500 Subject: [PATCH 12/12] fix NOTICE.txt --- NOTICE.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/NOTICE.txt b/NOTICE.txt index 60ed10e38fc..6ae9f58daba 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12718,7 +12718,6 @@ Licence type (autodetected): Apache-2.0 Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.6.0/LICENSE: - Apache License Version 2.0, January 2004 http://www.apache.org/licenses/