Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ha-agent] Run HA enabled integrations only on leader agent #31186

Merged
merged 19 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl"
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/config"
haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock"
"github.com/DataDog/datadog-agent/pkg/collector/externalhost"
"github.com/DataDog/datadog-agent/pkg/serializer"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
Expand All @@ -36,6 +37,7 @@ func TestExternalHostTags(t *testing.T) {
c := newCollector(fxutil.Test[dependencies](t,
core.MockBundle(),
demultiplexerimpl.MockModule(),
haagentmock.Module(),
fx.Provide(func() optional.Option[serializer.MetricSerializer] {
return optional.NewNoneOption[serializer.MetricSerializer]()
}),
Expand Down
16 changes: 10 additions & 6 deletions comp/collector/collector/collectorimpl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
flaretypes "github.com/DataDog/datadog-agent/comp/core/flare/types"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/status"
haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
metadata "github.com/DataDog/datadog-agent/comp/metadata/runner/runnerimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
pkgCollector "github.com/DataDog/datadog-agent/pkg/collector"
Expand All @@ -48,17 +49,19 @@ const (
type dependencies struct {
fx.In

Lc fx.Lifecycle
Config config.Component
Log log.Component
Lc fx.Lifecycle
Config config.Component
Log log.Component
HaAgent haagent.Component

SenderManager sender.SenderManager
MetricSerializer optional.Option[serializer.MetricSerializer]
}

type collectorImpl struct {
log log.Component
config config.Component
log log.Component
config config.Component
haAgent haagent.Component

senderManager sender.SenderManager
metricSerializer optional.Option[serializer.MetricSerializer]
Expand Down Expand Up @@ -119,6 +122,7 @@ func newCollector(deps dependencies) *collectorImpl {
c := &collectorImpl{
log: deps.Log,
config: deps.Config,
haAgent: deps.HaAgent,
senderManager: deps.SenderManager,
metricSerializer: deps.MetricSerializer,
checks: make(map[checkid.ID]*middleware.CheckWrapper),
Expand Down Expand Up @@ -186,7 +190,7 @@ func (c *collectorImpl) start(_ context.Context) error {
c.m.Lock()
defer c.m.Unlock()

run := runner.NewRunner(c.senderManager)
run := runner.NewRunner(c.senderManager, c.haAgent)
sched := scheduler.NewScheduler(run.GetChan())

// let the runner some visibility into the scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
logmock "github.com/DataDog/datadog-agent/comp/core/log/mock"
haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"

"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
Expand Down Expand Up @@ -85,6 +86,7 @@ func (suite *CollectorDemuxTestSuite) SetupTest() {
suite.SenderManagerMock = NewSenderManagerMock(suite.demux)
suite.c = newCollector(fxutil.Test[dependencies](suite.T(),
core.MockBundle(),
haagentmock.Module(),
fx.Provide(func() sender.SenderManager {
return suite.SenderManagerMock
}),
Expand Down
2 changes: 2 additions & 0 deletions comp/collector/collector/collectorimpl/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/DataDog/datadog-agent/comp/collector/collector/collectorimpl/internal/middleware"
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/config"
haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/collector/check"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
Expand Down Expand Up @@ -97,6 +98,7 @@ func (suite *CollectorTestSuite) SetupTest() {
suite.c = newCollector(fxutil.Test[dependencies](suite.T(),
core.MockBundle(),
demultiplexerimpl.MockModule(),
haagentmock.Module(),
fx.Provide(func() optional.Option[serializer.MetricSerializer] {
return optional.NewNoneOption[serializer.MetricSerializer]()
}),
Expand Down
3 changes: 3 additions & 0 deletions comp/haagent/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ type Component interface {
// SetLeader takes the leader agent hostname as input, if it matches the current agent hostname,
// the isLeader state is set to true, otherwise false.
SetLeader(leaderAgentHostname string)

// IsHaIntegration returns true if the integration type is an HA Integration
IsHaIntegration(checkType string) bool
}
8 changes: 8 additions & 0 deletions comp/haagent/impl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
)

// validHaIntegrations represent the list of integrations that will be considered as
// an "HA Integration", meaning it will only run on the leader Agent.
// At the moment, the list of HA Integrations is hardcoded here, but we might provide
// more dynamic way to configure which integration should be considered HA Integration.
var validHaIntegrations = map[string]bool{
"snmp": true,
}

type haAgentConfigs struct {
enabled bool
group string
Expand Down
41 changes: 41 additions & 0 deletions comp/haagent/impl/haagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ package haagentimpl

import (
"context"
"encoding/json"

log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
"github.com/DataDog/datadog-agent/pkg/util/hostname"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -47,3 +49,42 @@ func (h *haAgentImpl) SetLeader(leaderAgentHostname string) {
}
h.isLeader.Store(agentHostname == leaderAgentHostname)
}

func (h *haAgentImpl) IsHaIntegration(integrationName string) bool {
return validHaIntegrations[integrationName]
}

func (h *haAgentImpl) onHaAgentUpdate(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) {
h.log.Debugf("Updates received: count=%d", len(updates))

for configPath, rawConfig := range updates {
h.log.Debugf("Received config %s: %s", configPath, string(rawConfig.Config))
haAgentMsg := haAgentConfig{}
err := json.Unmarshal(rawConfig.Config, &haAgentMsg)
if err != nil {
h.log.Warnf("Skipping invalid HA_AGENT update %s: %v", configPath, err)
applyStateCallback(configPath, state.ApplyStatus{
State: state.ApplyStateError,
Error: "error unmarshalling payload",
})
continue
}
if haAgentMsg.Group != h.GetGroup() {
h.log.Warnf("Skipping invalid HA_AGENT update %s: expected group %s, got %s",
configPath, h.GetGroup(), haAgentMsg.Group)
applyStateCallback(configPath, state.ApplyStatus{
State: state.ApplyStateError,
Error: "group does not match",
})
continue
}

h.SetLeader(haAgentMsg.Leader)

h.log.Debugf("Processed config %s: %v", configPath, haAgentMsg)

applyStateCallback(configPath, state.ApplyStatus{
State: state.ApplyStateUnacknowledged,
})
}
}
19 changes: 16 additions & 3 deletions comp/haagent/impl/haagent_comp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
rctypes "github.com/DataDog/datadog-agent/comp/remote-config/rcclient/types"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
)

// Requires defines the dependencies for the haagent component
Expand All @@ -20,14 +22,25 @@ type Requires struct {

// Provides defines the output of the haagent component
type Provides struct {
Comp haagent.Component
Comp haagent.Component
RCListener rctypes.ListenerProvider
}

// NewComponent creates a new haagent component
func NewComponent(reqs Requires) (Provides, error) {
haAgentConfigs := newHaAgentConfigs(reqs.AgentConfig)
haAgentConf := newHaAgentConfigs(reqs.AgentConfig)
haAgent := newHaAgentImpl(reqs.Logger, haAgentConf)
var rcListener rctypes.ListenerProvider
if haAgent.Enabled() {
reqs.Logger.Debug("Add onHaAgentUpdate RCListener")
rcListener.ListenerProvider = rctypes.RCListener{
state.ProductHaAgent: haAgent.onHaAgentUpdate,
}
}

provides := Provides{
Comp: newHaAgentImpl(reqs.Logger, haAgentConfigs),
Comp: haAgent,
RCListener: rcListener,
}
return provides, nil
}
128 changes: 125 additions & 3 deletions comp/haagent/impl/haagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,17 @@ package haagentimpl
import (
"testing"

"github.com/DataDog/datadog-agent/comp/core/config"
logmock "github.com/DataDog/datadog-agent/comp/core/log/mock"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/stretchr/testify/assert"
"go.uber.org/fx"
)

var testConfigID = "datadog/2/HA_AGENT/group-62345762794c0c0b/65f17d667fb50f8ae28a3c858bdb1be9ea994f20249c119e007c520ac115c807"
var testGroup = "testGroup01"

func Test_Enabled(t *testing.T) {
tests := []struct {
name string
Expand All @@ -34,7 +42,7 @@ func Test_Enabled(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
haAgent := newTestHaAgentComponent(t, tt.configs)
haAgent := newTestHaAgentComponent(t, tt.configs).Comp
assert.Equal(t, tt.expectedEnabled, haAgent.Enabled())
})
}
Expand All @@ -44,19 +52,133 @@ func Test_GetGroup(t *testing.T) {
agentConfigs := map[string]interface{}{
"ha_agent.group": "my-group-01",
}
haAgent := newTestHaAgentComponent(t, agentConfigs)
haAgent := newTestHaAgentComponent(t, agentConfigs).Comp
assert.Equal(t, "my-group-01", haAgent.GetGroup())
}

func Test_IsLeader_SetLeader(t *testing.T) {
agentConfigs := map[string]interface{}{
"hostname": "my-agent-hostname",
}
haAgent := newTestHaAgentComponent(t, agentConfigs)
haAgent := newTestHaAgentComponent(t, agentConfigs).Comp

haAgent.SetLeader("another-agent")
assert.False(t, haAgent.IsLeader())

haAgent.SetLeader("my-agent-hostname")
assert.True(t, haAgent.IsLeader())
}

func Test_IsHaIntegration(t *testing.T) {
agentConfigs := map[string]interface{}{
"hostname": "my-agent-hostname",
"ha_agent.enabled": true,
"ha_agent.group": testGroup,
}
haAgent := newTestHaAgentComponent(t, agentConfigs)

assert.True(t, haAgent.Comp.IsHaIntegration("snmp"))
assert.False(t, haAgent.Comp.IsHaIntegration("unknown_integration"))
assert.False(t, haAgent.Comp.IsHaIntegration("cpu"))
}

func Test_RCListener(t *testing.T) {
tests := []struct {
name string
configs map[string]interface{}
expectRCListener bool
}{
{
name: "enabled",
configs: map[string]interface{}{
"ha_agent.enabled": true,
},
expectRCListener: true,
},
{
name: "disabled",
configs: map[string]interface{}{
"ha_agent.enabled": false,
},
expectRCListener: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provides := newTestHaAgentComponent(t, tt.configs)
if tt.expectRCListener {
assert.NotNil(t, provides.RCListener.ListenerProvider)
} else {
assert.Nil(t, provides.RCListener.ListenerProvider)
}
})
}
}

func Test_haAgentImpl_onHaAgentUpdate(t *testing.T) {

tests := []struct {
name string
updates map[string]state.RawConfig
expectedApplyID string
expectedApplyStatus state.ApplyStatus
}{
{
name: "successful update",
updates: map[string]state.RawConfig{
testConfigID: {Config: []byte(`{"group":"testGroup01","leader":"ha-agent1"}`)},
},
expectedApplyID: testConfigID,
expectedApplyStatus: state.ApplyStatus{
State: state.ApplyStateUnacknowledged,
},
},
{
name: "invalid payload",
updates: map[string]state.RawConfig{
testConfigID: {Config: []byte(`invalid-json`)},
},
expectedApplyID: testConfigID,
expectedApplyStatus: state.ApplyStatus{
State: state.ApplyStateError,
Error: "error unmarshalling payload",
},
},
{
name: "invalid group",
updates: map[string]state.RawConfig{
testConfigID: {Config: []byte(`{"group":"invalidGroup","leader":"ha-agent1"}`)},
},
expectedApplyID: testConfigID,
expectedApplyStatus: state.ApplyStatus{
State: state.ApplyStateError,
Error: "group does not match",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
agentConfigs := map[string]interface{}{
"hostname": "my-agent-hostname",
"ha_agent.enabled": true,
"ha_agent.group": testGroup,
}
agentConfigComponent := fxutil.Test[config.Component](t, fx.Options(
config.MockModule(),
fx.Replace(config.MockParams{Overrides: agentConfigs}),
))

h := newHaAgentImpl(logmock.New(t), newHaAgentConfigs(agentConfigComponent))

var applyID string
var applyStatus state.ApplyStatus
applyFunc := func(id string, status state.ApplyStatus) {
applyID = id
applyStatus = status
}
h.onHaAgentUpdate(tt.updates, applyFunc)
assert.Equal(t, tt.expectedApplyID, applyID)
assert.Equal(t, tt.expectedApplyStatus, applyStatus)
})
}
}
Loading
Loading