diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 57659a101e6c..9250d7d344d5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -162,8 +162,8 @@ is collected by it. - Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322] - [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506] Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor -- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36693[36693] -- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}36693[36693] +- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36788[36788] +- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}36788[36788] *Auditbeat* diff --git a/libbeat/outputs/console/config.go b/libbeat/outputs/console/config.go index 44869e388fa9..e0a1cc9ff280 100644 --- a/libbeat/outputs/console/config.go +++ b/libbeat/outputs/console/config.go @@ -17,7 +17,10 @@ package console -import "github.com/elastic/beats/v7/libbeat/outputs/codec" +import ( + "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/elastic-agent-libs/config" +) type Config struct { Codec codec.Config `config:"codec"` @@ -26,6 +29,7 @@ type Config struct { Pretty bool `config:"pretty"` BatchSize int + Queue config.Namespace `config:"queue"` } var defaultConfig = Config{} diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 905aa778998c..b81bf3363486 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -23,7 +23,6 @@ import ( "fmt" "os" "runtime" - "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" @@ -43,13 +42,6 @@ type console struct { index string } -type consoleEvent struct { - Timestamp time.Time `json:"@timestamp" struct:"@timestamp"` - - // Note: stdlib json doesn't support inlining :( -> use `codec: 2`, to generate proper event - Fields interface{} `struct:",inline"` -} - func init() { outputs.RegisterType("console", makeConsole) } @@ -82,18 +74,18 @@ func makeConsole( index := beat.Beat c, err := newConsole(index, observer, enc) if err != nil { - return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err)) + return outputs.Fail(fmt.Errorf("console output initialization failed with: %w", err)) } // check stdout actually being available if runtime.GOOS != "windows" { if _, err = c.out.Stat(); err != nil { - err = fmt.Errorf("console output initialization failed with: %v", err) + err = fmt.Errorf("console output initialization failed with: %w", err) return outputs.Fail(err) } } - return outputs.Success(config.BatchSize, 0, c) + return outputs.Success(config.Queue, config.BatchSize, 0, c) } func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) { diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index ca77a44b8331..e504f2dc213c 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -45,6 +45,7 @@ type elasticsearchConfig struct { AllowOlderVersion bool `config:"allow_older_versions"` Transport httpcommon.HTTPTransportSettings `config:",inline"` + Queue config.Namespace `config:"queue"` } type Backoff struct { diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 04b43fdadbb7..bc3cca2d5608 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -49,12 +49,12 @@ func makeES( return outputs.Fail(err) } - config := defaultConfig - if err := cfg.Unpack(&config); err != nil { + esConfig := defaultConfig + if err := cfg.Unpack(&esConfig); err != nil { return outputs.Fail(err) } - policy, err := newNonIndexablePolicy(config.NonIndexablePolicy) + policy, err := newNonIndexablePolicy(esConfig.NonIndexablePolicy) if err != nil { log.Errorf("error while creating file identifier: %v", err) return outputs.Fail(err) @@ -65,12 +65,12 @@ func makeES( return outputs.Fail(err) } - if proxyURL := config.Transport.Proxy.URL; proxyURL != nil && !config.Transport.Proxy.Disable { + if proxyURL := esConfig.Transport.Proxy.URL; proxyURL != nil && !esConfig.Transport.Proxy.Disable { log.Debugf("breaking down proxy URL. Scheme: '%s', host[:port]: '%s', path: '%s'", proxyURL.Scheme, proxyURL.Host, proxyURL.Path) log.Infof("Using proxy URL: %s", proxyURL) } - params := config.Params + params := esConfig.Params if len(params) == 0 { params = nil } @@ -84,7 +84,7 @@ func makeES( clients := make([]outputs.NetworkClient, len(hosts)) for i, host := range hosts { - esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) + esURL, err := common.MakeURL(esConfig.Protocol, esConfig.Path, host, 9200) if err != nil { log.Errorf("Invalid host param set: %s, Error: %+v", host, err) return outputs.Fail(err) @@ -95,17 +95,17 @@ func makeES( ConnectionSettings: eslegclient.ConnectionSettings{ URL: esURL, Beatname: beat.Beat, - Kerberos: config.Kerberos, - Username: config.Username, - Password: config.Password, - APIKey: config.APIKey, + Kerberos: esConfig.Kerberos, + Username: esConfig.Username, + Password: esConfig.Password, + APIKey: esConfig.APIKey, Parameters: params, - Headers: config.Headers, - CompressionLevel: config.CompressionLevel, + Headers: esConfig.Headers, + CompressionLevel: esConfig.CompressionLevel, Observer: observer, - EscapeHTML: config.EscapeHTML, - Transport: config.Transport, - IdleConnTimeout: config.Transport.IdleConnTimeout, + EscapeHTML: esConfig.EscapeHTML, + Transport: esConfig.Transport, + IdleConnTimeout: esConfig.Transport.IdleConnTimeout, }, Index: index, Pipeline: pipeline, @@ -116,11 +116,11 @@ func makeES( return outputs.Fail(err) } - client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max) + client = outputs.WithBackoff(client, esConfig.Backoff.Init, esConfig.Backoff.Max) clients[i] = client } - return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients) + return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, clients) } func buildSelectors( diff --git a/libbeat/outputs/fileout/config.go b/libbeat/outputs/fileout/config.go index cfd28bfaaf26..e72a9f87d6fc 100644 --- a/libbeat/outputs/fileout/config.go +++ b/libbeat/outputs/fileout/config.go @@ -21,21 +21,23 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/file" ) -type config struct { - Path string `config:"path"` - Filename string `config:"filename"` - RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"` - NumberOfFiles uint `config:"number_of_files"` - Codec codec.Config `config:"codec"` - Permissions uint32 `config:"permissions"` - RotateOnStartup bool `config:"rotate_on_startup"` +type fileOutConfig struct { + Path string `config:"path"` + Filename string `config:"filename"` + RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"` + NumberOfFiles uint `config:"number_of_files"` + Codec codec.Config `config:"codec"` + Permissions uint32 `config:"permissions"` + RotateOnStartup bool `config:"rotate_on_startup"` + Queue config.Namespace `config:"queue"` } -func defaultConfig() config { - return config{ +func defaultConfig() fileOutConfig { + return fileOutConfig{ NumberOfFiles: 7, RotateEveryKb: 10 * 1024, Permissions: 0600, @@ -43,7 +45,7 @@ func defaultConfig() config { } } -func (c *config) Validate() error { +func (c *fileOutConfig) Validate() error { if c.NumberOfFiles < 2 || c.NumberOfFiles > file.MaxBackupsLimit { return fmt.Errorf("the number_of_files to keep should be between 2 and %v", file.MaxBackupsLimit) diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 949d835f5419..d12a11b25c3c 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -51,8 +51,8 @@ func makeFileout( observer outputs.Observer, cfg *c.C, ) (outputs.Group, error) { - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { + foConfig := defaultConfig() + if err := cfg.Unpack(&foConfig); err != nil { return outputs.Fail(err) } @@ -64,14 +64,14 @@ func makeFileout( beat: beat, observer: observer, } - if err := fo.init(beat, config); err != nil { + if err := fo.init(beat, foConfig); err != nil { return outputs.Fail(err) } - return outputs.Success(-1, 0, fo) + return outputs.Success(foConfig.Queue, -1, 0, fo) } -func (out *fileOutput) init(beat beat.Info, c config) error { +func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error { var path string if c.Filename != "" { path = filepath.Join(c.Path, c.Filename) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 7247699500f5..8fff8dad0d5c 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -76,6 +76,7 @@ type kafkaConfig struct { Codec codec.Config `config:"codec"` Sasl kafka.SaslConfig `config:"sasl"` EnableFAST bool `config:"enable_krb5_fast"` + Queue config.Namespace `config:"queue"` } type metaConfig struct { @@ -101,12 +102,6 @@ var compressionModes = map[string]sarama.CompressionCodec{ "snappy": sarama.CompressionSnappy, } -const ( - saslTypePlaintext = sarama.SASLTypePlaintext - saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256 - saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512 -) - func defaultConfig() kafkaConfig { return kafkaConfig{ Hosts: nil, diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index ef1c253981f3..0c856ea425db 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -47,7 +47,7 @@ func makeKafka( log := logp.NewLogger(logSelector) log.Debug("initialize kafka output") - config, err := readConfig(cfg) + kConfig, err := readConfig(cfg) if err != nil { return outputs.Fail(err) } @@ -57,7 +57,7 @@ func makeKafka( return outputs.Fail(err) } - libCfg, err := newSaramaConfig(log, config) + libCfg, err := newSaramaConfig(log, kConfig) if err != nil { return outputs.Fail(err) } @@ -67,21 +67,21 @@ func makeKafka( return outputs.Fail(err) } - codec, err := codec.CreateEncoder(beat, config.Codec) + codec, err := codec.CreateEncoder(beat, kConfig.Codec) if err != nil { return outputs.Fail(err) } - client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, config.Headers, codec, libCfg) + client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg) if err != nil { return outputs.Fail(err) } retry := 0 - if config.MaxRetries < 0 { + if kConfig.MaxRetries < 0 { retry = -1 } - return outputs.Success(config.BulkMaxSize, retry, client) + return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, client) } func buildTopicSelector(cfg *config.C) (outil.Selector, error) { diff --git a/libbeat/outputs/logstash/config.go b/libbeat/outputs/logstash/config.go index 82747fe01d09..9df57514495b 100644 --- a/libbeat/outputs/logstash/config.go +++ b/libbeat/outputs/logstash/config.go @@ -43,6 +43,7 @@ type Config struct { Proxy transport.ProxyConfig `config:",inline"` Backoff Backoff `config:"backoff"` EscapeHTML bool `config:"escape_html"` + Queue config.Namespace `config:"queue"` } type Backoff struct { diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 5e7cdfeee7a5..072ec049f6fb 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -41,7 +41,7 @@ func makeLogstash( observer outputs.Observer, cfg *conf.C, ) (outputs.Group, error) { - config, err := readConfig(cfg, beat) + lsConfig, err := readConfig(cfg, beat) if err != nil { return outputs.Fail(err) } @@ -51,14 +51,14 @@ func makeLogstash( return outputs.Fail(err) } - tls, err := tlscommon.LoadTLSConfig(config.TLS) + tls, err := tlscommon.LoadTLSConfig(lsConfig.TLS) if err != nil { return outputs.Fail(err) } transp := transport.Config{ - Timeout: config.Timeout, - Proxy: &config.Proxy, + Timeout: lsConfig.Timeout, + Proxy: &lsConfig.Proxy, TLS: tls, Stats: observer, } @@ -72,18 +72,18 @@ func makeLogstash( return outputs.Fail(err) } - if config.Pipelining > 0 { - client, err = newAsyncClient(beat, conn, observer, config) + if lsConfig.Pipelining > 0 { + client, err = newAsyncClient(beat, conn, observer, lsConfig) } else { - client, err = newSyncClient(beat, conn, observer, config) + client, err = newSyncClient(beat, conn, observer, lsConfig) } if err != nil { return outputs.Fail(err) } - client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max) + client = outputs.WithBackoff(client, lsConfig.Backoff.Init, lsConfig.Backoff.Max) clients[i] = client } - return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients) + return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, clients) } diff --git a/libbeat/outputs/redis/config.go b/libbeat/outputs/redis/config.go index 01c8f2e0238b..4785af137f10 100644 --- a/libbeat/outputs/redis/config.go +++ b/libbeat/outputs/redis/config.go @@ -22,6 +22,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/transport" "github.com/elastic/elastic-agent-libs/transport/tlscommon" ) @@ -40,6 +41,7 @@ type redisConfig struct { Db int `config:"db"` DataType string `config:"datatype"` Backoff backoff `config:"backoff"` + Queue config.Namespace `config:"queue"` } type backoff struct { diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 026cb04d4f8e..9814d6abee7b 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -34,10 +34,6 @@ import ( "github.com/elastic/elastic-agent-libs/transport/tlscommon" ) -type redisOut struct { - beat beat.Info -} - const ( defaultWaitRetry = 1 * time.Second defaultMaxWaitRetry = 60 * time.Second @@ -58,7 +54,9 @@ func makeRedis( ) (outputs.Group, error) { if !cfg.HasField("index") { - cfg.SetString("index", -1, beat.Beat) + if err := cfg.SetString("index", -1, beat.Beat); err != nil { + return outputs.Fail(err) + } } err := cfgwarn.CheckRemoved6xSettings(cfg, "port") @@ -77,13 +75,13 @@ func makeRedis( } } - config := defaultConfig - if err := cfg.Unpack(&config); err != nil { + rConfig := defaultConfig + if err := cfg.Unpack(&rConfig); err != nil { return outputs.Fail(err) } var dataType redisDataType - switch config.DataType { + switch rConfig.DataType { case "", "list": dataType = redisListType case "channel": @@ -102,7 +100,7 @@ func makeRedis( return outputs.Fail(err) } - tls, err := tlscommon.LoadTLSConfig(config.TLS) + tls, err := tlscommon.LoadTLSConfig(rConfig.TLS) if err != nil { return outputs.Fail(err) } @@ -129,8 +127,8 @@ func makeRedis( } transp := transport.Config{ - Timeout: config.Timeout, - Proxy: &config.Proxy, + Timeout: rConfig.Timeout, + Proxy: &rConfig.Proxy, TLS: tls, Stats: observer, } @@ -138,7 +136,7 @@ func makeRedis( switch hostUrl.Scheme { case redisScheme: if hasScheme { - transp.TLS = nil // disable TLS if user explicitely set `redis` scheme + transp.TLS = nil // disable TLS if user explicitly set `redis` scheme } case tlsRedisScheme: if transp.TLS == nil { @@ -151,23 +149,23 @@ func makeRedis( return outputs.Fail(err) } - pass := config.Password + pass := rConfig.Password hostPass, passSet := hostUrl.User.Password() if passSet { pass = hostPass } - enc, err := codec.CreateEncoder(beat, config.Codec) + enc, err := codec.CreateEncoder(beat, rConfig.Codec) if err != nil { return outputs.Fail(err) } - client := newClient(conn, observer, config.Timeout, - pass, config.Db, key, dataType, config.Index, enc) - clients[i] = newBackoffClient(client, config.Backoff.Init, config.Backoff.Max) + client := newClient(conn, observer, rConfig.Timeout, + pass, rConfig.Db, key, dataType, rConfig.Index, enc) + clients[i] = newBackoffClient(client, rConfig.Backoff.Init, rConfig.Backoff.Max) } - return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients) + return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, rConfig.MaxRetries, clients) } func buildKeySelector(cfg *config.C) (outil.Selector, error) { diff --git a/libbeat/outputs/util.go b/libbeat/outputs/util.go index 15068910f8c6..7c096916a6cb 100644 --- a/libbeat/outputs/util.go +++ b/libbeat/outputs/util.go @@ -17,16 +17,49 @@ package outputs +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + "github.com/elastic/elastic-agent-libs/config" +) + // Fail helper can be used by output factories, to create a failure response when // loading an output must return an error. func Fail(err error) (Group, error) { return Group{}, err } // Success create a valid output Group response for a set of client instances. -func Success(batchSize, retry int, clients ...Client) (Group, error) { +func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Group, error) { + var q queue.QueueFactory + if cfg.IsSet() && cfg.Config().Enabled() { + switch cfg.Name() { + case memqueue.QueueType: + settings, err := memqueue.SettingsForUserConfig(cfg.Config()) + if err != nil { + return Group{}, fmt.Errorf("unable to get memory queue settings: %w", err) + } + q = memqueue.FactoryForSettings(settings) + case diskqueue.QueueType: + if publisher.UnderAgent() { + return Group{}, fmt.Errorf("disk queue not supported under agent") + } + settings, err := diskqueue.SettingsForUserConfig(cfg.Config()) + if err != nil { + return Group{}, fmt.Errorf("unable to get disk queue settings: %w", err) + } + q = diskqueue.FactoryForSettings(settings) + default: + return Group{}, fmt.Errorf("unknown queue type: %s", cfg.Name()) + } + } return Group{ - Clients: clients, - BatchSize: batchSize, - Retry: retry, + Clients: clients, + BatchSize: batchSize, + Retry: retry, + QueueFactory: q, }, nil } @@ -39,11 +72,13 @@ func NetworkClients(netclients []NetworkClient) []Client { return clients } -func SuccessNet(loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { +// SuccessNet create a valid output Group and creates client instances +func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { + if !loadbalance { - return Success(batchSize, retry, NewFailoverClient(netclients)) + return Success(cfg, batchSize, retry, NewFailoverClient(netclients)) } clients := NetworkClients(netclients) - return Success(batchSize, retry, clients...) + return Success(cfg, batchSize, retry, clients...) } diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 4a212092c7e7..15260172ff54 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -144,7 +144,7 @@ func TestClientWaitClose(t *testing.T) { err := logp.TestingSetup() assert.Nil(t, err) - q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}) + q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0) pipeline := makePipeline(Settings{}, q) defer pipeline.Close() diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index bf080677ef44..21fb22f45ceb 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -60,8 +60,9 @@ type outputController struct { workerChan chan publisher.Batch - consumer *eventConsumer - workers []outputWorker + consumer *eventConsumer + workers []outputWorker + inputQueueSize int } type producerRequest struct { @@ -81,6 +82,7 @@ func newOutputController( observer outputObserver, eventWaitGroup *sync.WaitGroup, queueFactory queue.QueueFactory, + inputQueueSize int, ) (*outputController, error) { controller := &outputController{ beat: beat, @@ -90,6 +92,7 @@ func newOutputController( queueFactory: queueFactory, workerChan: make(chan publisher.Batch), consumer: newEventConsumer(monitors.Logger, observer), + inputQueueSize: inputQueueSize, } return controller, nil @@ -258,11 +261,11 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { factory = c.queueFactory } - queue, err := factory(logger, c.onACK) + queue, err := factory(logger, c.onACK, c.inputQueueSize) if err != nil { logger.Errorf("queue creation failed, falling back to default memory queue, check your queue configuration") s, _ := memqueue.SettingsForUserConfig(nil) - queue = memqueue.NewQueue(logger, c.onACK, s) + queue = memqueue.NewQueue(logger, c.onACK, s, c.inputQueueSize) } c.queue = queue diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 366f4bff1d94..7384e5f71287 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -189,7 +189,7 @@ func TestOutputQueueFactoryTakesPrecedence(t *testing.T) { func TestFailedQueueFactoryRevertsToDefault(t *testing.T) { defaultSettings, _ := memqueue.SettingsForUserConfig(nil) - failedFactory := func(_ *logp.Logger, _ func(int)) (queue.Queue, error) { + failedFactory := func(_ *logp.Logger, _ func(int), _ int) (queue.Queue, error) { return nil, fmt.Errorf("This queue creation intentionally failed") } controller := outputController{ diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 209688bb5c2d..cf03163750ee 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -153,13 +153,12 @@ func New( if b := userQueueConfig.Name(); b != "" { queueType = b } - queueFactory, err := queueFactoryForUserConfig( - queueType, userQueueConfig.Config(), settings.InputQueueSize) + queueFactory, err := queueFactoryForUserConfig(queueType, userQueueConfig.Config()) if err != nil { return nil, err } - output, err := newOutputController(beat, monitors, p.observer, p.eventWaitGroup, queueFactory) + output, err := newOutputController(beat, monitors, p.observer, p.eventWaitGroup, queueFactory, settings.InputQueueSize) if err != nil { return nil, err } @@ -399,16 +398,13 @@ func (p *Pipeline) OutputReloader() OutputReloader { // This helper exists to frontload config parsing errors: if there is an // error in the queue config, we want it to show up as fatal during // initialization, even if the queue itself isn't created until later. -func queueFactoryForUserConfig(queueType string, userConfig *conf.C, inQueueSize int) (queue.QueueFactory, error) { +func queueFactoryForUserConfig(queueType string, userConfig *conf.C) (queue.QueueFactory, error) { switch queueType { case memqueue.QueueType: settings, err := memqueue.SettingsForUserConfig(userConfig) if err != nil { return nil, err } - // The memory queue has a special override during pipeline - // initialization for the size of its API channel buffer. - settings.InputQueueSize = inQueueSize return memqueue.FactoryForSettings(settings), nil case diskqueue.QueueType: settings, err := diskqueue.SettingsForUserConfig(userConfig) diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index 6aa510de1b0e..d1014b8d782b 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -35,11 +35,12 @@ type testOutput struct { } type testOutputConfig struct { - Worker int `config:"worker" validate:"min=1"` - BulkMaxSize int `config:"bulk_max_size"` - Retry int `config:"retry"` - MinWait time.Duration `config:"min_wait"` - MaxWait time.Duration `config:"max_wait"` + Worker int `config:"worker" validate:"min=1"` + BulkMaxSize int `config:"bulk_max_size"` + Retry int `config:"retry"` + MinWait time.Duration `config:"min_wait"` + MaxWait time.Duration `config:"max_wait"` + Queue conf.Namespace `config:"queue"` Fail struct { EveryBatch int } @@ -66,7 +67,7 @@ func makeTestOutput(_ outputs.IndexManager, beat beat.Info, observer outputs.Obs clients[i] = client } - return outputs.Success(config.BulkMaxSize, config.Retry, clients...) + return outputs.Success(config.Queue, config.BulkMaxSize, config.Retry, clients...) } func (*testOutput) Close() error { return nil } diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 2b7548908822..74fff3fea647 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -109,6 +109,7 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, ackCallback func(eventCount int), + inputQueueSize int, ) (queue.Queue, error) { return NewQueue(logger, ackCallback, settings) } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 0bb3ff9ed8e6..ac5b9dc66159 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -84,7 +84,6 @@ type Settings struct { Events int FlushMinEvents int FlushTimeout time.Duration - InputQueueSize int } type queueEntry struct { @@ -123,8 +122,9 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, ackCallback func(eventCount int), + inputQueueSize int, ) (queue.Queue, error) { - return NewQueue(logger, ackCallback, settings), nil + return NewQueue(logger, ackCallback, settings, inputQueueSize), nil } } @@ -135,6 +135,7 @@ func NewQueue( logger *logp.Logger, ackCallback func(eventCount int), settings Settings, + inputQueueSize int, ) *broker { var ( sz = settings.Events @@ -142,7 +143,7 @@ func NewQueue( flushTimeout = settings.FlushTimeout ) - chanSize := AdjustInputQueueSize(settings.InputQueueSize, sz) + chanSize := AdjustInputQueueSize(inputQueueSize, sz) if minEvents < 1 { minEvents = 1 diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index ef9ee52a9448..28cc38025c38 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -103,7 +103,7 @@ func TestQueueMetricsBuffer(t *testing.T) { } func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) { - testQueue := NewQueue(nil, nil, settings) + testQueue := NewQueue(nil, nil, settings, 0) defer testQueue.Close() // Send events to queue @@ -147,7 +147,7 @@ func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.Queu Events: sz, FlushMinEvents: minEvents, FlushTimeout: flushTimeout, - }) + }, 0) } } @@ -258,22 +258,22 @@ func TestEntryIDs(t *testing.T) { } t.Run("acking in forward order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0) testForward(testQueue) }) t.Run("acking in reverse order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0) testBackward(testQueue) }) t.Run("acking in forward order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}, 0) testForward(testQueue) }) t.Run("acking in reverse order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}, 0) testBackward(testQueue) }) } diff --git a/libbeat/publisher/queue/proxy/broker.go b/libbeat/publisher/queue/proxy/broker.go index 20400e3ab75d..832739cc26d9 100644 --- a/libbeat/publisher/queue/proxy/broker.go +++ b/libbeat/publisher/queue/proxy/broker.go @@ -90,6 +90,7 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, ackCallback func(eventCount int), + inputQueueSize int, ) (queue.Queue, error) { return NewQueue(logger, ackCallback, settings), nil } diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index d0e1c0476109..101a32901177 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -74,7 +74,7 @@ type Queue interface { Metrics() (Metrics, error) } -type QueueFactory func(logger *logp.Logger, ack func(eventCount int)) (Queue, error) +type QueueFactory func(logger *logp.Logger, ack func(eventCount int), inputQueueSize int) (Queue, error) // BufferConfig returns the pipelines buffering settings, // for the pipeline to use.