Skip to content

Commit

Permalink
libbeat outputs: support worker or workers settings (#38257)
Browse files Browse the repository at this point in the history
* Support worker or workers

* Update docs

* Add CHANGELOG entry

* Add license header

* Fix imports
  • Loading branch information
ycombinator authored Mar 12, 2024
1 parent 7987272 commit 4340e02
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Enhance add_cloud_metadata processor with `orchestrator.cluster.name`, `orchestrator.cluster.id` and `azure.resourcegroup.name` when running inside an AKS cluster. {issue}33081[33081] {pull}37685[37685]
- Upgrade go-sysinfo from 1.12.0 to 1.13.1. {pull}37996[37996]
- Make `range` condition work with numeric values as strings. {pull}38080[38080]
- Allow users to configure number of output workers (for outputs that support workers) with either `worker` or `workers`. {pull}38257[38257]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ The default value is `false`.


[[worker-option]]
===== `worker`
===== `worker` or `workers`

The number of workers per configured host publishing events to Elasticsearch. This
is best used with load balancing mode enabled. Example: If you have 2 hosts and
Expand Down
52 changes: 40 additions & 12 deletions libbeat/outputs/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,61 @@

package outputs

import "github.com/elastic/elastic-agent-libs/config"
import (
"github.com/elastic/elastic-agent-libs/config"
)

type hostWorkerCfg struct {
Hosts []string `config:"hosts" validate:"required"`

// Worker is the number of output workers desired.
Worker int `config:"worker"`

// Workers is an alias for Worker. If both Worker and Workers are set,
// the value of Worker should take precedence. To always retrieve the correct
// value, use the NumWorkers() method.
Workers int `config:"workers"`
}

// NumWorkers returns the number of output workers desired.
func (hwc hostWorkerCfg) NumWorkers() int {
// Both Worker and Workers are set; give precedence to Worker.
if hwc.Worker != 0 && hwc.Workers != 0 {
return hwc.Worker
}

// Only one is set; figure out which one and return its value.
if hwc.Worker != 0 {
return hwc.Worker
}

return hwc.Workers
}

// ReadHostList reads a list of hosts to connect to from an configuration
// object. If the `worker` settings is > 1, each host is duplicated in the final
// host list by the number of `worker`.
func ReadHostList(cfg *config.C) ([]string, error) {
config := struct {
Hosts []string `config:"hosts" validate:"required"`
Worker int `config:"worker" validate:"min=1"`
}{
Worker: 1,
}

var config hostWorkerCfg
err := cfg.Unpack(&config)
if err != nil {
return nil, err
}

// Default to one worker
if config.NumWorkers() < 1 {
config.Worker = 1
}

lst := config.Hosts
if len(lst) == 0 || config.Worker <= 1 {
if len(lst) == 0 || config.NumWorkers() <= 1 {
return lst, nil
}

// duplicate entries config.Worker times
hosts := make([]string, 0, len(lst)*config.Worker)
// duplicate entries config.NumWorkers() times
hosts := make([]string, 0, len(lst)*config.NumWorkers())
for _, entry := range lst {
for i := 0; i < config.Worker; i++ {
for i := 0; i < config.NumWorkers(); i++ {
hosts = append(hosts, entry)
}
}
Expand Down
90 changes: 90 additions & 0 deletions libbeat/outputs/hosts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package outputs

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/config"
)

func TestHostsNumWorkers(t *testing.T) {
tests := map[string]struct {
hwc hostWorkerCfg
expectedNumWorkers int
}{
"worker_set": {hwc: hostWorkerCfg{Worker: 17}, expectedNumWorkers: 17},
"workers_set": {hwc: hostWorkerCfg{Workers: 23}, expectedNumWorkers: 23},
"both_set": {hwc: hostWorkerCfg{Worker: 17, Workers: 23}, expectedNumWorkers: 17},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
require.Equal(t, test.expectedNumWorkers, test.hwc.NumWorkers())
})
}
}

func TestReadHostList(t *testing.T) {
tests := map[string]struct {
cfg map[string]interface{}
expectedHosts []string
}{
"one_host_no_worker_set": {
cfg: map[string]interface{}{
"hosts": []string{"foo.bar"},
},
expectedHosts: []string{"foo.bar"},
},
"one_host_worker_set": {
cfg: map[string]interface{}{
"hosts": []string{"foo.bar"},
"worker": 3,
},
expectedHosts: []string{"foo.bar", "foo.bar", "foo.bar"},
},
"one_host_workers_set": {
cfg: map[string]interface{}{
"hosts": []string{"foo.bar"},
"workers": 2,
},
expectedHosts: []string{"foo.bar", "foo.bar"},
},
"one_host_worker_workers_both_set": {
cfg: map[string]interface{}{
"hosts": []string{"foo.bar"},
"worker": 3,
"workers": 2,
},
expectedHosts: []string{"foo.bar", "foo.bar", "foo.bar"},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
cfg, err := config.NewConfigFrom(test.cfg)
require.NoError(t, err)

hosts, err := ReadHostList(cfg)
require.NoError(t, err)
require.Equal(t, test.expectedHosts, hosts)
})
}
}
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/docs/logstash.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ Configure escaping of HTML in strings. Set to `true` to enable escaping.

The default value is `false`.

===== `worker`
===== `worker` or `workers`

The number of workers per configured host publishing events to {ls}. This
is best used with load balancing mode enabled. Example: If you have 2 hosts and
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/redis/docs/redis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Output codec configuration. If the `codec` section is missing, events will be js

See <<configuration-output-codec>> for more information.

===== `worker`
===== `worker` or `workers`

The number of workers to use for each host configured to publish events to Redis. Use this setting along with the
`loadbalance` option. For example, if you have 2 hosts and 3 workers, in total 6 workers are started (3 for each host).
Expand Down

0 comments on commit 4340e02

Please sign in to comment.