From 465d221a9cc9c51b115f829dd05b30c60c1303ac Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Wed, 29 Mar 2023 20:10:06 +0200 Subject: [PATCH 1/7] Update `allow_older_versions` when running under Elastic Agent When Beats are running under Elastic Agent their initial output configuration is empty. Only a few moments later the output configuration arrives as an update via the control protocol. On startup Beats register a global Elasticsearch connection callback to validate the Elasticsearch version. Unfortunately, this callback didn't account for the later `allow_older_versions` update via the control protocol and the updated value was not used. This fixes that broken behaviour and makes an update to the entire in-memory output configuration on each control protocol update. The flag is extracted in a separate struct field for quicker access without a need to parse the configuration again. --- libbeat/cmd/instance/beat.go | 46 ++++++++++++++---- libbeat/cmd/instance/beat_test.go | 79 +++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 10 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index bdd010e68f2..d2d10a50161 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -102,6 +102,12 @@ type Beat struct { // shouldReexec is a flag to indicate the Beat should restart shouldReexec bool + + // allowOlderESVersions is used in a global Elasticsearch connection callback + // this is a pre-cached value for the callback, so we don't have to parse the + // config on each callback. + // This value must be updated every time we update/reload the elasticsearch output configuration. + allowOlderESVersions bool } type beatConfig struct { @@ -336,7 +342,12 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { logSystemInfo(b.Info) logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version) - b.checkElasticsearchVersion() + b.allowOlderESVersions = b.isConnectionToOlderVersionAllowed() + + err = b.registerESVersionCheckCallback() + if err != nil { + return nil, err + } err = b.registerESIndexManagement() if err != nil { @@ -972,15 +983,15 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error { return nil } -// checkElasticsearchVersion registers a global callback to make sure ES instance we are connecting +// registerESVersionCheckCallback registers a global callback to make sure ES instance we are connecting // to is at least on the same version as the Beat. // If the check is disabled or the output is not Elasticsearch, nothing happens. -func (b *Beat) checkElasticsearchVersion() { - if b.isConnectionToOlderVersionAllowed() { - return - } +func (b *Beat) registerESVersionCheckCallback() error { + _, err := elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error { + if b.allowOlderESVersions { + return nil + } - _, _ = elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error { esVersion := conn.GetVersion() beatVersion, err := libversion.New(b.Info.Version) if err != nil { @@ -991,6 +1002,8 @@ func (b *Beat) checkElasticsearchVersion() { } return nil }) + + return err } func (b *Beat) isConnectionToOlderVersionAllowed() bool { @@ -1026,13 +1039,26 @@ func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback { } func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable { - return reload.ReloadableFunc(func(config *reload.ConfigWithMeta) error { + return reload.ReloadableFunc(func(update *reload.ConfigWithMeta) error { if b.OutputConfigReloader != nil { - if err := b.OutputConfigReloader.Reload(config); err != nil { + if err := b.OutputConfigReloader.Reload(update); err != nil { return err } } - return outReloader.Reload(config, b.createOutput) + + err := b.Config.Output.Unpack(update.Config) + if err != nil { + return err + } + + // after the output configuration change + if isElasticsearchOutput(b.Config.Output.Name()) { + // if the output is Elasticsearch, we need to update some pre-saved flags + // for its global connection callbacks + b.allowOlderESVersions = b.isConnectionToOlderVersionAllowed() + } + + return outReloader.Reload(update, b.createOutput) }) } diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index 3737b509964..1ef457e9fa5 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -26,6 +26,9 @@ import ( "testing" "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/elastic-agent-libs/config" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" @@ -224,3 +227,79 @@ func TestSanitizeIPs(t *testing.T) { }) } } + +func TestReloader(t *testing.T) { + t.Run("updates the output configuration on the beat", func(t *testing.T) { + b, err := NewBeat("testbeat", "testidx", "0.9", false) + require.NoError(t, err) + + cfg := ` +elasticsearch: + hosts: ["https://127.0.0.1:9200"] + username: "elastic" + allow_older_versions: true +` + c, err := config.NewConfigWithYAML([]byte(cfg), cfg) + require.NoError(t, err) + outCfg, err := c.Child("elasticsearch", -1) + require.NoError(t, err) + + update := &reload.ConfigWithMeta{Config: c} + m := &outputReloaderMock{} + reloader := b.makeOutputReloader(m) + + require.False(t, b.allowOlderESVersions) + require.False(t, b.Config.Output.IsSet(), "the output should not be set yet") + + err = reloader.Reload(update) + require.NoError(t, err) + + require.True(t, b.Config.Output.IsSet(), "now the output should be set") + require.True(t, b.allowOlderESVersions) + require.Equal(t, outCfg, b.Config.Output.Config()) + require.Same(t, c, m.cfg.Config) + }) + + t.Run("does not update `allowOlderESVersions` if output is not Elasticsearch", func(t *testing.T) { + b, err := NewBeat("testbeat", "testidx", "0.9", false) + require.NoError(t, err) + + cfg := ` +logstash: + hosts: ["https://127.0.0.1:9200"] + username: "elastic" + allow_older_versions: true +` + c, err := config.NewConfigWithYAML([]byte(cfg), cfg) + require.NoError(t, err) + outCfg, err := c.Child("logstash", -1) + require.NoError(t, err) + + update := &reload.ConfigWithMeta{Config: c} + m := &outputReloaderMock{} + reloader := b.makeOutputReloader(m) + + require.False(t, b.allowOlderESVersions) + require.False(t, b.Config.Output.IsSet(), "the output should not be set yet") + + err = reloader.Reload(update) + require.NoError(t, err) + + require.True(t, b.Config.Output.IsSet(), "now the output should be set") + require.False(t, b.allowOlderESVersions, "the flag should not be updated") + require.Equal(t, outCfg, b.Config.Output.Config()) + require.Same(t, c, m.cfg.Config) + }) +} + +type outputReloaderMock struct { + cfg *reload.ConfigWithMeta +} + +func (r *outputReloaderMock) Reload( + cfg *reload.ConfigWithMeta, + factory func(o outputs.Observer, cfg config.Namespace) (outputs.Group, error), +) error { + r.cfg = cfg + return nil +} From c548f2489f20303f6f7d2fb93ae87ddbefa8acc8 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Wed, 29 Mar 2023 20:57:20 +0200 Subject: [PATCH 2/7] Add changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a166cef96fc..87459e4fd7c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -99,6 +99,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix for httpjson first_response object throwing false positive errors by making it a flag based object {issue}34747[34747] {pull}34748[34748] - Fix errors and panics due to re-used processors {pull}34761[34761] - Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689] +- Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964] *Heartbeat* From 04159b31654e4e55653b9ddcf6d012474975da30 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Wed, 29 Mar 2023 20:59:07 +0200 Subject: [PATCH 3/7] Move changelog entry to the right place --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 87459e4fd7c..a4d75abc9be 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491] - Fix panics when a processor is closed twice {pull}34647[34647] - Update elastic-agent-system-metrics to v0.4.6 to allow builds on mips platforms. {pull}34674[34674] +- Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964] *Auditbeat* @@ -99,7 +100,6 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix for httpjson first_response object throwing false positive errors by making it a flag based object {issue}34747[34747] {pull}34748[34748] - Fix errors and panics due to re-used processors {pull}34761[34761] - Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689] -- Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964] *Heartbeat* From 9eb4c54373c709e14678864a1d98bfc3125ae7e5 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Wed, 29 Mar 2023 22:06:12 +0200 Subject: [PATCH 4/7] Fix panic in tests --- libbeat/cmd/instance/beat.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index d2d10a50161..62cce44d9ca 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -1040,22 +1040,28 @@ func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback { func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable { return reload.ReloadableFunc(func(update *reload.ConfigWithMeta) error { + if update == nil { + return nil + } + if b.OutputConfigReloader != nil { if err := b.OutputConfigReloader.Reload(update); err != nil { return err } } - err := b.Config.Output.Unpack(update.Config) - if err != nil { - return err - } + if update.Config != nil { + err := b.Config.Output.Unpack(update.Config) + if err != nil { + return err + } - // after the output configuration change - if isElasticsearchOutput(b.Config.Output.Name()) { - // if the output is Elasticsearch, we need to update some pre-saved flags - // for its global connection callbacks - b.allowOlderESVersions = b.isConnectionToOlderVersionAllowed() + // after the output configuration change + if isElasticsearchOutput(b.Config.Output.Name()) { + // if the output is Elasticsearch, we need to update some pre-saved flags + // for its global connection callbacks + b.allowOlderESVersions = b.isConnectionToOlderVersionAllowed() + } } return outReloader.Reload(update, b.createOutput) From 69f1fa71492902932709a38abf7680765ef17155 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 30 Mar 2023 18:30:45 +0200 Subject: [PATCH 5/7] Remove the cached flag --- libbeat/cmd/instance/beat.go | 20 ++++---------------- libbeat/cmd/instance/beat_test.go | 7 ------- 2 files changed, 4 insertions(+), 23 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index bce6a4d868b..755b8529ac3 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -102,12 +102,6 @@ type Beat struct { // shouldReexec is a flag to indicate the Beat should restart shouldReexec bool - - // allowOlderESVersions is used in a global Elasticsearch connection callback - // this is a pre-cached value for the callback, so we don't have to parse the - // config on each callback. - // This value must be updated every time we update/reload the elasticsearch output configuration. - allowOlderESVersions bool } type beatConfig struct { @@ -335,8 +329,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { logSystemInfo(b.Info) logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version) - b.allowOlderESVersions = b.isConnectionToOlderVersionAllowed() - err = b.registerESVersionCheckCallback() if err != nil { return nil, err @@ -997,7 +989,7 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error { // If the check is disabled or the output is not Elasticsearch, nothing happens. func (b *Beat) registerESVersionCheckCallback() error { _, err := elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error { - if b.allowOlderESVersions { + if isElasticsearchOutput(b.Config.Output.Name()) && b.isConnectionToOlderVersionAllowed() { return nil } @@ -1059,18 +1051,14 @@ func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Re } } + // we need to update the output configuration because + // some callbacks are relying on it to be up to date. + // e.g. the Elasticsearch version validation if update.Config != nil { err := b.Config.Output.Unpack(update.Config) if err != nil { return err } - - // after the output configuration change - if isElasticsearchOutput(b.Config.Output.Name()) { - // if the output is Elasticsearch, we need to update some pre-saved flags - // for its global connection callbacks - b.allowOlderESVersions = b.isConnectionToOlderVersionAllowed() - } } return outReloader.Reload(update, b.createOutput) diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index 1ef457e9fa5..60462dc0232 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -248,14 +248,10 @@ elasticsearch: m := &outputReloaderMock{} reloader := b.makeOutputReloader(m) - require.False(t, b.allowOlderESVersions) require.False(t, b.Config.Output.IsSet(), "the output should not be set yet") - err = reloader.Reload(update) require.NoError(t, err) - require.True(t, b.Config.Output.IsSet(), "now the output should be set") - require.True(t, b.allowOlderESVersions) require.Equal(t, outCfg, b.Config.Output.Config()) require.Same(t, c, m.cfg.Config) }) @@ -279,14 +275,11 @@ logstash: m := &outputReloaderMock{} reloader := b.makeOutputReloader(m) - require.False(t, b.allowOlderESVersions) require.False(t, b.Config.Output.IsSet(), "the output should not be set yet") err = reloader.Reload(update) require.NoError(t, err) - require.True(t, b.Config.Output.IsSet(), "now the output should be set") - require.False(t, b.allowOlderESVersions, "the flag should not be updated") require.Equal(t, outCfg, b.Config.Output.Config()) require.Same(t, c, m.cfg.Config) }) From 304aae3cdf85e4799c05d8d92ffcc0007306afd2 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 30 Mar 2023 18:37:26 +0200 Subject: [PATCH 6/7] Remove unnecessary test, add more checks --- libbeat/cmd/instance/beat_test.go | 30 ++---------------------------- 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index 60462dc0232..6520767d550 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -249,39 +249,13 @@ elasticsearch: reloader := b.makeOutputReloader(m) require.False(t, b.Config.Output.IsSet(), "the output should not be set yet") + require.False(t, b.isConnectionToOlderVersionAllowed(), "the flag should not be present in the empty configuration") err = reloader.Reload(update) require.NoError(t, err) require.True(t, b.Config.Output.IsSet(), "now the output should be set") require.Equal(t, outCfg, b.Config.Output.Config()) require.Same(t, c, m.cfg.Config) - }) - - t.Run("does not update `allowOlderESVersions` if output is not Elasticsearch", func(t *testing.T) { - b, err := NewBeat("testbeat", "testidx", "0.9", false) - require.NoError(t, err) - - cfg := ` -logstash: - hosts: ["https://127.0.0.1:9200"] - username: "elastic" - allow_older_versions: true -` - c, err := config.NewConfigWithYAML([]byte(cfg), cfg) - require.NoError(t, err) - outCfg, err := c.Child("logstash", -1) - require.NoError(t, err) - - update := &reload.ConfigWithMeta{Config: c} - m := &outputReloaderMock{} - reloader := b.makeOutputReloader(m) - - require.False(t, b.Config.Output.IsSet(), "the output should not be set yet") - - err = reloader.Reload(update) - require.NoError(t, err) - require.True(t, b.Config.Output.IsSet(), "now the output should be set") - require.Equal(t, outCfg, b.Config.Output.Config()) - require.Same(t, c, m.cfg.Config) + require.True(t, b.isConnectionToOlderVersionAllowed(), "the flag should be present") }) } From 38987783f4de622834b9facf05e0a8b208af93f3 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 30 Mar 2023 18:39:00 +0200 Subject: [PATCH 7/7] Improve output type handling --- libbeat/cmd/instance/beat.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 755b8529ac3..57064b2e708 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -989,7 +989,10 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error { // If the check is disabled or the output is not Elasticsearch, nothing happens. func (b *Beat) registerESVersionCheckCallback() error { _, err := elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error { - if isElasticsearchOutput(b.Config.Output.Name()) && b.isConnectionToOlderVersionAllowed() { + if !isElasticsearchOutput(b.Config.Output.Name()) { + return errors.New("Elasticsearch output is not configured") + } + if b.isConnectionToOlderVersionAllowed() { return nil }