Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for queue settings under outputs #36788

Merged
merged 13 commits into from
Oct 19, 2023
4 changes: 2 additions & 2 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ is collected by it.
- Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322]
- [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506]
Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor
- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36788[36788]
- Beats will now connect to older Elasticsearch instances by default {pull}36884[36884]

*Auditbeat*
Expand Down Expand Up @@ -216,8 +217,7 @@ is collected by it.
- Added support for Okta OAuth2 provider in the httpjson input. {pull}36273[36273]
- Add support of the interval parameter in Salesforce setupaudittrail-rest fileset. {issue}35917[35917] {pull}35938[35938]
- Add device handling to Okta input package for entity analytics. {pull}36049[36049]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}36286[36286]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30916[30916] {pull}36286[36286]
- [Azure] Add input metrics to the azure-eventhub input. {pull}35739[35739]
- Reduce HTTPJSON metrics allocations. {pull}36282[36282]
- Add support for a simplified input configuraton when running under Elastic-Agent {pull}36390[36390]
Expand Down
49 changes: 49 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/elastic/beats/v7/libbeat/pprof"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/file"
Expand Down Expand Up @@ -783,6 +784,10 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error unpacking config data: %w", err)
}

if err := promoteOutputQueueSettings(&b.Config); err != nil {
return fmt.Errorf("could not promote output queue settings: %w", err)
}

if err := features.UpdateFromConfig(b.RawConfig); err != nil {
return fmt.Errorf("could not parse features: %w", err)
}
Expand Down Expand Up @@ -1482,3 +1487,47 @@ func sanitizeIPs(ips []string) []string {
}
return validIPs
}

// promoteOutputQueueSettings checks to see if the output
// configuration has queue settings defined and if so it promotes them
// to the top level queue settings. This is done to allow existing
// behavior of specifying queue settings at the top level or like
// elastic-agent that specifies queue settings under the output
func promoteOutputQueueSettings(bc *beatConfig) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

promoteOutputQueueSettings handles the use case where the beat is started with a configuration file that has outputs defined (normal stand alone beat).

if bc.Output.IsSet() && bc.Output.Config().Enabled() {
pc := pipeline.Config{}
err := bc.Output.Config().Unpack(&pc)
if err != nil {
return fmt.Errorf("error unpacking output queue settings: %w", err)
}
if pc.Queue.IsSet() {
logp.Info("global queue settings replaced with output queue settings")
bc.Pipeline.Queue = pc.Queue
}
}
return nil
}

func (bc *beatConfig) Validate() error {
if bc.Output.IsSet() && bc.Output.Config().Enabled() {
outputPC := pipeline.Config{}
err := bc.Output.Config().Unpack(&outputPC)
if err != nil {
return fmt.Errorf("error unpacking output queue settings: %w", err)
}
if bc.Pipeline.Queue.IsSet() && outputPC.Queue.IsSet() {
return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed")
}
//elastic-agent doesn't support disk queue yet
if bc.Management.Enabled() && outputPC.Queue.Config().Enabled() && outputPC.Queue.Name() == diskqueue.QueueType {
return fmt.Errorf("disk queue is not supported when management is enabled")
}
}

//elastic-agent doesn't support disk queue yet
if bc.Management.Enabled() && bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType {
return fmt.Errorf("disk queue is not supported when management is enabled")
}

return nil
}
164 changes: 164 additions & 0 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/go-ucfg/yaml"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -269,3 +271,165 @@ func (r *outputReloaderMock) Reload(
r.cfg = cfg
return nil
}

func TestPromoteOutputQueueSettings(t *testing.T) {
tests := map[string]struct {
input []byte
memEvents int
}{
"blank": {
input: []byte(""),
memEvents: 4096,
},
"defaults": {
input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
memEvents: 4096,
},
"topLevelQueue": {
input: []byte(`
name: mockbeat
queue:
mem:
events: 8096
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
memEvents: 8096,
},
"outputLevelQueue": {
input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
mem:
events: 8096
`),
memEvents: 8096,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
cfg, err := yaml.NewConfig(tc.input)
require.NoError(t, err)

config := beatConfig{}
err = cfg.Unpack(&config)
require.NoError(t, err)

err = promoteOutputQueueSettings(&config)
require.NoError(t, err)

ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config())
require.NoError(t, err)
require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config())
})
}
}

func TestValidateBeatConfig(t *testing.T) {
tests := map[string]struct {
input []byte
expectValidationError string
}{
"blank": {
input: []byte(""),
expectValidationError: "",
},
"defaults": {
input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
expectValidationError: "",
},
"topAndOutputLevelQueue": {
input: []byte(`
name: mockbeat
queue:
mem:
events: 2048
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
mem:
events: 8096
`),
expectValidationError: "top level queue and output level queue settings defined, only one is allowed accessing config",
},
"managementTopLevelDiskQueue": {
input: []byte(`
name: mockbeat
management:
enabled: true
queue:
disk:
max_size: 1G
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
expectValidationError: "disk queue is not supported when management is enabled accessing config",
},
"managementOutputLevelDiskQueue": {
input: []byte(`
name: mockbeat
management:
enabled: true
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
disk:
max_size: 1G
`),
expectValidationError: "disk queue is not supported when management is enabled accessing config",
},
"managementFalseOutputLevelDiskQueue": {
input: []byte(`
name: mockbeat
management:
enabled: false
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
disk:
max_size: 1G
`),
expectValidationError: "",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
cfg, err := yaml.NewConfig(tc.input)
require.NoError(t, err)
config := beatConfig{}
err = cfg.Unpack(&config)
if tc.expectValidationError != "" {
require.Error(t, err)
require.Equal(t, tc.expectValidationError, err.Error())
} else {
require.NoError(t, err)
}
})
}
}
8 changes: 4 additions & 4 deletions libbeat/docs/queueconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ queue is responsible for buffering and combining events into batches that can
be consumed by the outputs. The outputs will use bulk operations to send a
batch of events in one transaction.

You can configure the type and behavior of the internal queue by setting
options in the `queue` section of the +{beatname_lc}.yml+ config file. Only one
queue type can be configured.

You can configure the type and behavior of the internal queue by
setting options in the `queue` section of the +{beatname_lc}.yml+
config file or by setting options in the `queue` section of the
output. Only one queue type can be configured.

This sample configuration sets the memory queue to buffer up to 4096 events:

Expand Down
6 changes: 5 additions & 1 deletion libbeat/outputs/console/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package console

import "github.com/elastic/beats/v7/libbeat/outputs/codec"
import (
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
)

type Config struct {
Codec codec.Config `config:"codec"`
Expand All @@ -26,6 +29,7 @@ type Config struct {
Pretty bool `config:"pretty"`

BatchSize int
Queue config.Namespace `config:"queue"`
}

var defaultConfig = Config{}
14 changes: 3 additions & 11 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"os"
"runtime"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand All @@ -43,13 +42,6 @@ type console struct {
index string
}

type consoleEvent struct {
Timestamp time.Time `json:"@timestamp" struct:"@timestamp"`

// Note: stdlib json doesn't support inlining :( -> use `codec: 2`, to generate proper event
Fields interface{} `struct:",inline"`
}

func init() {
outputs.RegisterType("console", makeConsole)
}
Expand Down Expand Up @@ -82,18 +74,18 @@ func makeConsole(
index := beat.Beat
c, err := newConsole(index, observer, enc)
if err != nil {
return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err))
return outputs.Fail(fmt.Errorf("console output initialization failed with: %w", err))
}

// check stdout actually being available
if runtime.GOOS != "windows" {
if _, err = c.out.Stat(); err != nil {
err = fmt.Errorf("console output initialization failed with: %v", err)
err = fmt.Errorf("console output initialization failed with: %w", err)
return outputs.Fail(err)
}
}

return outputs.Success(config.BatchSize, 0, c)
return outputs.Success(config.Queue, config.BatchSize, 0, c)
}

func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) {
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type elasticsearchConfig struct {
AllowOlderVersion bool `config:"allow_older_versions"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
Queue config.Namespace `config:"queue"`
}

type Backoff struct {
Expand Down
Loading
Loading