Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add APM trace configuration to V2AgentManager #40030

Merged
merged 6 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions libbeat/common/reload/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const InputRegName = "input"
// OutputRegName is the registation name for V2 Outputs
const OutputRegName = "output"

// APMRegName is the registation name for APM tracing.
const APMRegName = "apm"

// ConfigWithMeta holds a pair of config.C and optional metadata for it
type ConfigWithMeta struct {
// Config to store
Expand Down Expand Up @@ -146,6 +149,14 @@ func (r *Registry) MustRegisterInput(list ReloadableList) {
}
}

// MustRegisterAPM is a V2-specific registration function
// that declares a reloadable APM tracing configuration
func (r *Registry) MustRegisterAPM(list Reloadable) {
if err := r.Register(APMRegName, list); err != nil {
panic(err)
}
}

// GetInputList is a V2-specific function
// That returns the reloadable list created for an input
func (r *Registry) GetInputList() ReloadableList {
Expand All @@ -162,6 +173,14 @@ func (r *Registry) GetReloadableOutput() Reloadable {
return r.confs[OutputRegName]
}

// GetReloadableAPM is a V2-specific function
// That returns the reloader for the registered APM trace
func (r *Registry) GetReloadableAPM() Reloadable {
r.RLock()
defer r.RUnlock()
return r.confs[APMRegName]
}

// GetRegisteredNames returns the list of names registered
func (r *Registry) GetRegisteredNames() []string {
r.RLock()
Expand Down
42 changes: 11 additions & 31 deletions libbeat/tests/integration/mockserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/client/mock"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand All @@ -51,49 +50,32 @@ type unitKey struct {
// sent units, the server will wait for `delay` before sending the
// next state. This will block the check-in call from the Beat.
func NewMockServer(
units [][]*proto.UnitExpected,
featuresIdxs []uint64,
features []*proto.Features,
expected []*proto.CheckinExpected,
observedCallback func(*proto.CheckinObserved, int),
delay time.Duration,
) *mock.StubServerV2 {
i := 0
agentInfo := &proto.AgentInfo{
Id: "elastic-agent-id",
Version: version.GetDefaultVersion(),
Snapshot: true,
}
return &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
if observedCallback != nil {
observedCallback(observed, i)
}
matches := doesStateMatch(observed, units[i], featuresIdxs[i])
matches := doesStateMatch(observed, expected[i])
if !matches {
// send same set of units and features
return &proto.CheckinExpected{
AgentInfo: agentInfo,
Units: units[i],
Features: features[i],
FeaturesIdx: featuresIdxs[i],
}
return expected[i]
}
// delay sending next expected based on delay
if delay > 0 {
<-time.After(delay)
}
// send next set of units and features
// send next expected
i += 1
if i >= len(units) {
if i >= len(expected) {
// stay on last index
i = len(units) - 1
}
return &proto.CheckinExpected{
AgentInfo: agentInfo,
Units: units[i],
Features: features[i],
FeaturesIdx: featuresIdxs[i],
i = len(expected) - 1
}
return expected[i]
},
ActionImpl: func(response *proto.ActionResponse) error {
// actions not tested here
Expand All @@ -105,14 +87,13 @@ func NewMockServer(

func doesStateMatch(
observed *proto.CheckinObserved,
expectedUnits []*proto.UnitExpected,
expectedFeaturesIdx uint64,
expected *proto.CheckinExpected,
) bool {
if len(observed.Units) != len(expectedUnits) {
if len(observed.Units) != len(expected.Units) {
return false
}
expectedMap := make(map[unitKey]*proto.UnitExpected)
for _, exp := range expectedUnits {
for _, exp := range expected.Units {
expectedMap[unitKey{client.UnitType(exp.Type), exp.Id}] = exp
}
for _, unit := range observed.Units {
Expand All @@ -124,8 +105,7 @@ func doesStateMatch(
return false
}
}

return observed.FeaturesIdx == expectedFeaturesIdx
return observed.FeaturesIdx == expected.FeaturesIdx
}

func RequireNewStruct(t *testing.T, v map[string]interface{}) *structpb.Struct {
Expand Down
39 changes: 27 additions & 12 deletions x-pack/filebeat/tests/integration/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,36 +468,51 @@ func TestRecoverFromInvalidOutputConfiguration(t *testing.T) {
// Those are the 'states' Filebeat will go through.
// After each state is reached the mockServer will
// send the next.
protoUnits := [][]*proto.UnitExpected{
agentInfo := &proto.AgentInfo{
Id: "elastic-agent-id",
Version: version.GetDefaultVersion(),
Snapshot: true,
}
protos := []*proto.CheckinExpected{
{
&healthyOutput,
&filestreamInputHealthy,
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{
&healthyOutput,
&filestreamInputHealthy,
},
},
{
&brokenOutput,
&filestreamInputStarting,
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{
&brokenOutput,
&filestreamInputStarting,
},
},
{
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{
&healthyOutput,
&filestreamInputHealthy,
},
},
{
&healthyOutput,
&filestreamInputHealthy,
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{}, // An empty one makes the Beat exit
},
{}, // An empty one makes the Beat exit
}

// We use `success` to signal the test has ended successfully
// if `success` is never closed, then the test will fail with a timeout.
success := make(chan struct{})
// The test is successful when we reach the last element of `protoUnits`
onObserved := func(observed *proto.CheckinObserved, protoUnitsIdx int) {
if protoUnitsIdx == len(protoUnits)-1 {
if protoUnitsIdx == len(protos)-1 {
close(success)
}
}

server := integration.NewMockServer(
protoUnits,
[]uint64{0, 0, 0, 0},
[]*proto.Features{nil, nil, nil, nil},
protos,
onObserved,
100*time.Millisecond,
)
Expand Down
76 changes: 76 additions & 0 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,14 @@ type BeatV2Manager struct {
// set with the last applied input configs
lastInputCfgs map[string]*proto.UnitExpectedConfig

// set with the last applied APM config
lastAPMCfg *proto.APMConfig

// used for the debug callback to report as-running config
lastBeatOutputCfg *reload.ConfigWithMeta
lastBeatInputCfgs []*reload.ConfigWithMeta
lastBeatFeaturesCfg *conf.C
lastBeatAPMCfg *reload.ConfigWithMeta

// changeDebounce is the debounce time for a configuration change
changeDebounce time.Duration
Expand Down Expand Up @@ -669,6 +673,10 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*agentUnit) {
cm.logger.Errorw("setting output state", "error", err)
}

// reload APM tracing configuration
// all error handling is handled inside of reloadAPM
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both reloadInputs and reloadOutput return an error, why does reloadAPM not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in no case, do we want APM trace configuration to cause the beat to fail. I don't think we want that, so I handle all of that logic internally inside of reloadAPM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the docstring on reloadAPM to provide this reasoning.

cm.reloadAPM(outputUnit)

// compute the input configuration
//
// in v2 only a single input type will be started per component, so we don't need to
Expand Down Expand Up @@ -865,6 +873,63 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error {
return nil
}

// reloadAPM reloads APM tracing
//
// An error is not returned from this function, because in no case do we want APM trace configuration
// to cause the beat to fail. The error is logged appropriately in the case of a failure on reload.
func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) {
// Assuming that the output reloadable isn't a list, see createBeater() in cmd/instance/beat.go
apm := cm.registry.GetReloadableAPM()
if apm == nil {
// no APM reloadable, nothing to do
cm.logger.Debug("Unable to reload APM tracing; no APM reloadable registered")
return
}

var apmConfig *proto.APMConfig
if unit != nil {
expected := unit.Expected()
if expected.APMConfig != nil {
apmConfig = expected.APMConfig
}
}
if apmConfig == nil {
// APM tracing is being stopped
cm.logger.Debug("Stopping APM tracing")
err := apm.Reload(nil)
if err != nil {
cm.logger.Errorf("Error stopping APM tracing: %s", err)
return
}
cm.lastAPMCfg = nil
cm.lastBeatAPMCfg = nil
cm.logger.Debug("Stopped APM tracing")
return
}

if cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig) {
// configuration for the APM tracing did not change; do nothing
cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change")
return
}

uconfig, err := conf.NewConfigFrom(apmConfig)
if err != nil {
cm.logger.Errorf("Failed to create uconfig from APM configuration: %s", err)
return
}
reloadConfig := &reload.ConfigWithMeta{Config: uconfig}
cm.logger.Debug("Reloading APM tracing")
err = apm.Reload(reloadConfig)
if err != nil {
cm.logger.Debugf("Error reloading APM tracing: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldn't this be logged as an error, similar to line 918?

return
}
cm.lastAPMCfg = apmConfig
cm.lastBeatAPMCfg = reloadConfig
cm.logger.Debugf("Reloaded APM tracing")
}

// this function is registered as a debug hook
// it prints the last known configuration generated by the beat
func (cm *BeatV2Manager) handleDebugYaml() []byte {
Expand Down Expand Up @@ -899,17 +964,28 @@ func (cm *BeatV2Manager) handleDebugYaml() []byte {
}
}

// generate APM
var apmCfg map[string]interface{}
if cm.lastBeatAPMCfg != nil {
if err := cm.lastBeatAPMCfg.Config.Unpack(&apmCfg); err != nil {
cm.logger.Errorf("error unpacking APM tracing config for debug callback: %s", err)
return nil
}
}

// combine all of the above in a somewhat coherent way
// This isn't perfect, but generating a config that can actually be fed back into the beat
// would require
beatCfg := struct {
Inputs []map[string]interface{}
Outputs map[string]interface{}
Features map[string]interface{}
APM map[string]interface{}
}{
Inputs: inputList,
Outputs: outputCfg,
Features: featuresCfg,
APM: apmCfg,
}

data, err := yaml.Marshal(beatCfg)
Expand Down
Loading
Loading