Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add reload listener for apm tracing config #13514

Merged
merged 7 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ https://github.com/elastic/apm-server/compare/8.14\...main[View commits]
- Upgraded bundled APM Java agent attacher CLI to version 1.50.0 {pull}13326[13326]
- Enable Kibana curated UIs to work with hostmetrics from OpenTelemetry's https://pkg.go.dev/go.opentelemetry.io/collector/receiver/hostmetricsreceiver[hostmetricsreceiver] {pull}13196[13196]
- Add require data stream to bulk index requests {pull}13398[13398]
- Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514]
9 changes: 9 additions & 0 deletions internal/beatcmd/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ func TestRunManager(t *testing.T) {
})
assert.NoError(t, err)

expectRunnerParams(t, calls)
err = reload.RegisterV2.GetReloadableAPM().Reload(&reload.ConfigWithMeta{
Config: config.MustNewConfigFrom(`{"elastic.enabled": true, "elastic.environment": "testenv"}`),
})
assert.NoError(t, err)
args := expectRunnerParams(t, calls)
var m map[string]interface{}
err = args.Config.Unpack(&m)
Expand All @@ -177,6 +182,10 @@ func TestRunManager(t *testing.T) {
"enabled": true,
},
},
"instrumentation": map[string]interface{}{
"enabled": true,
"environment": "testenv",
},
}, m)

require.NotNil(t, manager.stopCallback)
Expand Down
55 changes: 46 additions & 9 deletions internal/beatcmd/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func NewReloader(info beat.Info, newRunner NewRunnerFunc) (*Reloader, error) {
if err := reload.RegisterV2.Register(reload.OutputRegName, reload.ReloadableFunc(r.reloadOutput)); err != nil {
return nil, fmt.Errorf("failed to register output reloader: %w", err)
}
if err := reload.RegisterV2.Register(reload.APMRegName, reload.ReloadableFunc(r.reloadAPMTracing)); err != nil {
return nil, fmt.Errorf("failed to register apm tracing reloader: %w", err)
}
return r, nil
}

Expand All @@ -84,10 +87,11 @@ type Reloader struct {
runner Runner
stopRunner func() error

mu sync.Mutex
inputConfig *config.C
outputConfig *config.C
stopped chan struct{}
mu sync.Mutex
inputConfig *config.C
outputConfig *config.C
apmTracingConfig *config.C
stopped chan struct{}
}

// Run runs the Reloader, blocking until ctx is cancelled or a fatal error occurs.
Expand Down Expand Up @@ -122,7 +126,7 @@ func (r *Reloader) reloadInputs(configs []*reload.ConfigWithMeta) error {
return fmt.Errorf("failed to extract input config revision: %w", err)
}

if err := r.reload(cfg, r.outputConfig); err != nil {
if err := r.reload(cfg, r.outputConfig, r.apmTracingConfig); err != nil {
return fmt.Errorf("failed to load input config: %w", err)
}
r.inputConfig = cfg
Expand All @@ -136,15 +140,31 @@ func (r *Reloader) reloadInputs(configs []*reload.ConfigWithMeta) error {
func (r *Reloader) reloadOutput(cfg *reload.ConfigWithMeta) error {
r.mu.Lock()
defer r.mu.Unlock()
if err := r.reload(r.inputConfig, cfg.Config); err != nil {
if err := r.reload(r.inputConfig, cfg.Config, r.apmTracingConfig); err != nil {
return fmt.Errorf("failed to load output config: %w", err)
}
r.outputConfig = cfg.Config
r.logger.Info("loaded output config")
return nil
}

func (r *Reloader) reload(inputConfig, outputConfig *config.C) error {
// reloadAPMTracing (re)loads apm tracing configuration.
func (r *Reloader) reloadAPMTracing(cfg *reload.ConfigWithMeta) error {
r.mu.Lock()
defer r.mu.Unlock()
var c *config.C
if cfg != nil {
c = cfg.Config
}
if err := r.reload(r.inputConfig, r.outputConfig, c); err != nil {
return fmt.Errorf("failed to load apm tracing config: %w", err)
}
r.apmTracingConfig = c
r.logger.Info("loaded apm tracing config")
return nil
}

func (r *Reloader) reload(inputConfig, outputConfig, apmTracingConfig *config.C) error {
var outputNamespace config.Namespace
if outputConfig != nil {
if err := outputConfig.Unpack(&outputNamespace); err != nil {
Expand All @@ -153,6 +173,7 @@ func (r *Reloader) reload(inputConfig, outputConfig *config.C) error {
}
if inputConfig == nil || !outputNamespace.IsSet() {
// Wait until both input and output have been received.
// apm tracing config is not mandatory so not waiting for it
return nil
}
select {
Expand All @@ -165,11 +186,27 @@ func (r *Reloader) reload(inputConfig, outputConfig *config.C) error {
wrappedOutputConfig := config.MustNewConfigFrom(map[string]interface{}{
"output": outputConfig,
})
mergedConfig, err := config.MergeConfigs(inputConfig, wrappedOutputConfig)

var wrappedApmTracingConfig *config.C
// apmTracingConfig is nil when disabled
if apmTracingConfig != nil {
c, err := apmTracingConfig.Child("elastic", -1)
if err != nil {
return fmt.Errorf("APM tracing config for elastic not found")
}
// set enabled manually as APMConfig doesn't contain it
c.SetBool("enabled", -1, true)
wrappedApmTracingConfig = config.MustNewConfigFrom(map[string]interface{}{
"instrumentation": c,
})
} else {
// empty instrumentation config
wrappedApmTracingConfig = config.NewConfig()
}
mergedConfig, err := config.MergeConfigs(inputConfig, wrappedOutputConfig, wrappedApmTracingConfig)
if err != nil {
return err
}

// Create a new runner. We separate creation from starting to
// allow the runner to perform initialisations that must run
// synchronously.
Expand Down
27 changes: 24 additions & 3 deletions internal/beatcmd/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestReloader(t *testing.T) {
defer func() { assert.NoError(t, g.Wait()) }()
defer cancel()

// No reload until there's input and output configuration.
// No reload until there's input, output, apm tracing configuration.
assertNoReload()

err = reload.RegisterV2.GetInputList().Reload([]*reload.ConfigWithMeta{{
Expand All @@ -104,6 +104,10 @@ func TestReloader(t *testing.T) {
assert.NoError(t, err)
assertNoReload() // an output must be set

err = reload.RegisterV2.GetReloadableAPM().Reload(nil)
kyungeunni marked this conversation as resolved.
Show resolved Hide resolved
assert.NoError(t, err)
assertNoReload()

err = reload.RegisterV2.GetReloadableOutput().Reload(&reload.ConfigWithMeta{
Config: config.MustNewConfigFrom(`{"console.enabled": true}`),
})
Expand All @@ -130,8 +134,17 @@ func TestReloader(t *testing.T) {
expectEvent(t, r2.running, "new runner should have been started")
expectNoEvent(t, r2.stopped, "new runner should not have been stopped")

err = reload.RegisterV2.GetReloadableAPM().Reload(&reload.ConfigWithMeta{
Config: config.MustNewConfigFrom(`{"elastic.enabled": true, "elastic.api_key": "boo"}`),
})
assert.NoError(t, err)
r3 := assertReload()
expectEvent(t, r2.stopped, "old runner should have been stopped")
expectEvent(t, r3.running, "new runner should have been started")
expectNoEvent(t, r3.stopped, "new runner should not have been stopped")

cancel()
expectEvent(t, r2.stopped, "runner should have been stopped")
expectEvent(t, r3.stopped, "runner should have been stopped")
}

func TestReloaderNewRunnerParams(t *testing.T) {
Expand Down Expand Up @@ -159,13 +172,21 @@ func TestReloaderNewRunnerParams(t *testing.T) {
reload.RegisterV2.GetInputList().Reload([]*reload.ConfigWithMeta{{
Config: config.MustNewConfigFrom(`{"revision": 1, "input": 123}`),
}})

// reloader will wait until input and output are available.
// triggering APM reload before output reload will let the params to contain
// the apm tracing config too in this test setup
reload.RegisterV2.GetReloadableAPM().Reload(&reload.ConfigWithMeta{
Config: config.MustNewConfigFrom(`{"elastic.environment": "test"}`),
})

reload.RegisterV2.GetReloadableOutput().Reload(&reload.ConfigWithMeta{
Config: config.MustNewConfigFrom(`{"console.enabled": true}`),
})
args := <-calls
assert.NotNil(t, args.Logger)
assert.Equal(t, info, args.Info)
assert.Equal(t, config.MustNewConfigFrom(`{"revision": 1, "input": 123, "output.console.enabled": true}`), args.Config)
assert.Equal(t, config.MustNewConfigFrom(`{"revision": 1, "input": 123, "output.console.enabled": true, "instrumentation.enabled":true, "instrumentation.environment":"test"}`), args.Config)
}

func expectNoEvent(t testing.TB, ch <-chan struct{}, message string) {
Expand Down