Skip to content

Commit

Permalink
Add APM trace configuration to V2AgentManager (#40030)
Browse files Browse the repository at this point in the history
  • Loading branch information
blakerouse authored Jun 28, 2024
1 parent 5323dc6 commit 0bb4e10
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 148 deletions.
19 changes: 19 additions & 0 deletions libbeat/common/reload/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const InputRegName = "input"
// OutputRegName is the registation name for V2 Outputs
const OutputRegName = "output"

// APMRegName is the registation name for APM tracing.
const APMRegName = "apm"

// ConfigWithMeta holds a pair of config.C and optional metadata for it
type ConfigWithMeta struct {
// Config to store
Expand Down Expand Up @@ -146,6 +149,14 @@ func (r *Registry) MustRegisterInput(list ReloadableList) {
}
}

// MustRegisterAPM is a V2-specific registration function
// that declares a reloadable APM tracing configuration
func (r *Registry) MustRegisterAPM(list Reloadable) {
if err := r.Register(APMRegName, list); err != nil {
panic(err)
}
}

// GetInputList is a V2-specific function
// That returns the reloadable list created for an input
func (r *Registry) GetInputList() ReloadableList {
Expand All @@ -162,6 +173,14 @@ func (r *Registry) GetReloadableOutput() Reloadable {
return r.confs[OutputRegName]
}

// GetReloadableAPM is a V2-specific function
// That returns the reloader for the registered APM trace
func (r *Registry) GetReloadableAPM() Reloadable {
r.RLock()
defer r.RUnlock()
return r.confs[APMRegName]
}

// GetRegisteredNames returns the list of names registered
func (r *Registry) GetRegisteredNames() []string {
r.RLock()
Expand Down
42 changes: 11 additions & 31 deletions libbeat/tests/integration/mockserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/client/mock"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand All @@ -51,49 +50,32 @@ type unitKey struct {
// sent units, the server will wait for `delay` before sending the
// next state. This will block the check-in call from the Beat.
func NewMockServer(
units [][]*proto.UnitExpected,
featuresIdxs []uint64,
features []*proto.Features,
expected []*proto.CheckinExpected,
observedCallback func(*proto.CheckinObserved, int),
delay time.Duration,
) *mock.StubServerV2 {
i := 0
agentInfo := &proto.AgentInfo{
Id: "elastic-agent-id",
Version: version.GetDefaultVersion(),
Snapshot: true,
}
return &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
if observedCallback != nil {
observedCallback(observed, i)
}
matches := doesStateMatch(observed, units[i], featuresIdxs[i])
matches := doesStateMatch(observed, expected[i])
if !matches {
// send same set of units and features
return &proto.CheckinExpected{
AgentInfo: agentInfo,
Units: units[i],
Features: features[i],
FeaturesIdx: featuresIdxs[i],
}
return expected[i]
}
// delay sending next expected based on delay
if delay > 0 {
<-time.After(delay)
}
// send next set of units and features
// send next expected
i += 1
if i >= len(units) {
if i >= len(expected) {
// stay on last index
i = len(units) - 1
}
return &proto.CheckinExpected{
AgentInfo: agentInfo,
Units: units[i],
Features: features[i],
FeaturesIdx: featuresIdxs[i],
i = len(expected) - 1
}
return expected[i]
},
ActionImpl: func(response *proto.ActionResponse) error {
// actions not tested here
Expand All @@ -105,14 +87,13 @@ func NewMockServer(

func doesStateMatch(
observed *proto.CheckinObserved,
expectedUnits []*proto.UnitExpected,
expectedFeaturesIdx uint64,
expected *proto.CheckinExpected,
) bool {
if len(observed.Units) != len(expectedUnits) {
if len(observed.Units) != len(expected.Units) {
return false
}
expectedMap := make(map[unitKey]*proto.UnitExpected)
for _, exp := range expectedUnits {
for _, exp := range expected.Units {
expectedMap[unitKey{client.UnitType(exp.Type), exp.Id}] = exp
}
for _, unit := range observed.Units {
Expand All @@ -124,8 +105,7 @@ func doesStateMatch(
return false
}
}

return observed.FeaturesIdx == expectedFeaturesIdx
return observed.FeaturesIdx == expected.FeaturesIdx
}

func RequireNewStruct(t *testing.T, v map[string]interface{}) *structpb.Struct {
Expand Down
39 changes: 27 additions & 12 deletions x-pack/filebeat/tests/integration/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,36 +468,51 @@ func TestRecoverFromInvalidOutputConfiguration(t *testing.T) {
// Those are the 'states' Filebeat will go through.
// After each state is reached the mockServer will
// send the next.
protoUnits := [][]*proto.UnitExpected{
agentInfo := &proto.AgentInfo{
Id: "elastic-agent-id",
Version: version.GetDefaultVersion(),
Snapshot: true,
}
protos := []*proto.CheckinExpected{
{
&healthyOutput,
&filestreamInputHealthy,
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{
&healthyOutput,
&filestreamInputHealthy,
},
},
{
&brokenOutput,
&filestreamInputStarting,
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{
&brokenOutput,
&filestreamInputStarting,
},
},
{
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{
&healthyOutput,
&filestreamInputHealthy,
},
},
{
&healthyOutput,
&filestreamInputHealthy,
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{}, // An empty one makes the Beat exit
},
{}, // An empty one makes the Beat exit
}

// We use `success` to signal the test has ended successfully
// if `success` is never closed, then the test will fail with a timeout.
success := make(chan struct{})
// The test is successful when we reach the last element of `protoUnits`
onObserved := func(observed *proto.CheckinObserved, protoUnitsIdx int) {
if protoUnitsIdx == len(protoUnits)-1 {
if protoUnitsIdx == len(protos)-1 {
close(success)
}
}

server := integration.NewMockServer(
protoUnits,
[]uint64{0, 0, 0, 0},
[]*proto.Features{nil, nil, nil, nil},
protos,
onObserved,
100*time.Millisecond,
)
Expand Down
76 changes: 76 additions & 0 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,14 @@ type BeatV2Manager struct {
// set with the last applied input configs
lastInputCfgs map[string]*proto.UnitExpectedConfig

// set with the last applied APM config
lastAPMCfg *proto.APMConfig

// used for the debug callback to report as-running config
lastBeatOutputCfg *reload.ConfigWithMeta
lastBeatInputCfgs []*reload.ConfigWithMeta
lastBeatFeaturesCfg *conf.C
lastBeatAPMCfg *reload.ConfigWithMeta

// changeDebounce is the debounce time for a configuration change
changeDebounce time.Duration
Expand Down Expand Up @@ -669,6 +673,10 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*agentUnit) {
cm.logger.Errorw("setting output state", "error", err)
}

// reload APM tracing configuration
// all error handling is handled inside of reloadAPM
cm.reloadAPM(outputUnit)

// compute the input configuration
//
// in v2 only a single input type will be started per component, so we don't need to
Expand Down Expand Up @@ -865,6 +873,63 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error {
return nil
}

// reloadAPM reloads APM tracing
//
// An error is not returned from this function, because in no case do we want APM trace configuration
// to cause the beat to fail. The error is logged appropriately in the case of a failure on reload.
func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) {
// Assuming that the output reloadable isn't a list, see createBeater() in cmd/instance/beat.go
apm := cm.registry.GetReloadableAPM()
if apm == nil {
// no APM reloadable, nothing to do
cm.logger.Debug("Unable to reload APM tracing; no APM reloadable registered")
return
}

var apmConfig *proto.APMConfig
if unit != nil {
expected := unit.Expected()
if expected.APMConfig != nil {
apmConfig = expected.APMConfig
}
}
if apmConfig == nil {
// APM tracing is being stopped
cm.logger.Debug("Stopping APM tracing")
err := apm.Reload(nil)
if err != nil {
cm.logger.Errorf("Error stopping APM tracing: %s", err)
return
}
cm.lastAPMCfg = nil
cm.lastBeatAPMCfg = nil
cm.logger.Debug("Stopped APM tracing")
return
}

if cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig) {
// configuration for the APM tracing did not change; do nothing
cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change")
return
}

uconfig, err := conf.NewConfigFrom(apmConfig)
if err != nil {
cm.logger.Errorf("Failed to create uconfig from APM configuration: %s", err)
return
}
reloadConfig := &reload.ConfigWithMeta{Config: uconfig}
cm.logger.Debug("Reloading APM tracing")
err = apm.Reload(reloadConfig)
if err != nil {
cm.logger.Debugf("Error reloading APM tracing: %s", err)
return
}
cm.lastAPMCfg = apmConfig
cm.lastBeatAPMCfg = reloadConfig
cm.logger.Debugf("Reloaded APM tracing")
}

// this function is registered as a debug hook
// it prints the last known configuration generated by the beat
func (cm *BeatV2Manager) handleDebugYaml() []byte {
Expand Down Expand Up @@ -899,17 +964,28 @@ func (cm *BeatV2Manager) handleDebugYaml() []byte {
}
}

// generate APM
var apmCfg map[string]interface{}
if cm.lastBeatAPMCfg != nil {
if err := cm.lastBeatAPMCfg.Config.Unpack(&apmCfg); err != nil {
cm.logger.Errorf("error unpacking APM tracing config for debug callback: %s", err)
return nil
}
}

// combine all of the above in a somewhat coherent way
// This isn't perfect, but generating a config that can actually be fed back into the beat
// would require
beatCfg := struct {
Inputs []map[string]interface{}
Outputs map[string]interface{}
Features map[string]interface{}
APM map[string]interface{}
}{
Inputs: inputList,
Outputs: outputCfg,
Features: featuresCfg,
APM: apmCfg,
}

data, err := yaml.Marshal(beatCfg)
Expand Down
Loading

0 comments on commit 0bb4e10

Please sign in to comment.