Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ha-agent] Run HA enabled integrations only on leader agent #31186

Merged
merged 19 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl"
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/config"
haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock"
"github.com/DataDog/datadog-agent/pkg/collector/externalhost"
"github.com/DataDog/datadog-agent/pkg/serializer"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
Expand All @@ -36,6 +37,7 @@ func TestExternalHostTags(t *testing.T) {
c := newCollector(fxutil.Test[dependencies](t,
core.MockBundle(),
demultiplexerimpl.MockModule(),
haagentmock.Module(),
fx.Provide(func() optional.Option[serializer.MetricSerializer] {
return optional.NewNoneOption[serializer.MetricSerializer]()
}),
Expand Down
16 changes: 10 additions & 6 deletions comp/collector/collector/collectorimpl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
flaretypes "github.com/DataDog/datadog-agent/comp/core/flare/types"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/status"
haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
metadata "github.com/DataDog/datadog-agent/comp/metadata/runner/runnerimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
pkgCollector "github.com/DataDog/datadog-agent/pkg/collector"
Expand All @@ -48,17 +49,19 @@ const (
type dependencies struct {
fx.In

Lc fx.Lifecycle
Config config.Component
Log log.Component
Lc fx.Lifecycle
Config config.Component
Log log.Component
HaAgent haagent.Component

SenderManager sender.SenderManager
MetricSerializer optional.Option[serializer.MetricSerializer]
}

type collectorImpl struct {
log log.Component
config config.Component
log log.Component
config config.Component
haAgent haagent.Component

senderManager sender.SenderManager
metricSerializer optional.Option[serializer.MetricSerializer]
Expand Down Expand Up @@ -119,6 +122,7 @@ func newCollector(deps dependencies) *collectorImpl {
c := &collectorImpl{
log: deps.Log,
config: deps.Config,
haAgent: deps.HaAgent,
senderManager: deps.SenderManager,
metricSerializer: deps.MetricSerializer,
checks: make(map[checkid.ID]*middleware.CheckWrapper),
Expand Down Expand Up @@ -186,7 +190,7 @@ func (c *collectorImpl) start(_ context.Context) error {
c.m.Lock()
defer c.m.Unlock()

run := runner.NewRunner(c.senderManager)
run := runner.NewRunner(c.senderManager, c.haAgent)
sched := scheduler.NewScheduler(run.GetChan())

// let the runner some visibility into the scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
logmock "github.com/DataDog/datadog-agent/comp/core/log/mock"
haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock"
compressionmock "github.com/DataDog/datadog-agent/comp/serializer/compression/fx-mock"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"

Expand Down Expand Up @@ -85,6 +86,7 @@ func (suite *CollectorDemuxTestSuite) SetupTest() {
suite.SenderManagerMock = NewSenderManagerMock(suite.demux)
suite.c = newCollector(fxutil.Test[dependencies](suite.T(),
core.MockBundle(),
haagentmock.Module(),
fx.Provide(func() sender.SenderManager {
return suite.SenderManagerMock
}),
Expand Down
2 changes: 2 additions & 0 deletions comp/collector/collector/collectorimpl/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/DataDog/datadog-agent/comp/collector/collector/collectorimpl/internal/middleware"
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/config"
haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/collector/check"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
Expand Down Expand Up @@ -97,6 +98,7 @@ func (suite *CollectorTestSuite) SetupTest() {
suite.c = newCollector(fxutil.Test[dependencies](suite.T(),
core.MockBundle(),
demultiplexerimpl.MockModule(),
haagentmock.Module(),
fx.Provide(func() optional.Option[serializer.MetricSerializer] {
return optional.NewNoneOption[serializer.MetricSerializer]()
}),
Expand Down
3 changes: 3 additions & 0 deletions comp/haagent/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ type Component interface {
// SetLeader takes the leader agent hostname as input, if it matches the current agent hostname,
// the isLeader state is set to true, otherwise false.
SetLeader(leaderAgentHostname string)

// ShouldRunIntegration returns true if the integration should be run
ShouldRunIntegration(integrationName string) bool
}
9 changes: 9 additions & 0 deletions comp/haagent/impl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
)

// validHaIntegrations represent the list of integrations that will be considered as
// an "HA Integration", meaning it will only run on the leader Agent.
// At the moment, the list of HA Integrations is hardcoded here, but we might provide
// more dynamic way to configure which integration should be considered HA Integration.
var validHaIntegrations = map[string]bool{
"snmp": true,
"network_path": true,
}

type haAgentConfigs struct {
enabled bool
group string
Expand Down
9 changes: 9 additions & 0 deletions comp/haagent/impl/haagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ func (h *haAgentImpl) SetLeader(leaderAgentHostname string) {
h.isLeader.Store(agentHostname == leaderAgentHostname)
}

// ShouldRunIntegration return true if the agent integrations should to run.
// When ha-agent is disabled, the agent behave as standalone agent (non HA) and will always run all integrations.
func (h *haAgentImpl) ShouldRunIntegration(integrationName string) bool {
if h.Enabled() && validHaIntegrations[integrationName] {
return h.isLeader.Load()
}
return true
}

func (h *haAgentImpl) onHaAgentUpdate(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) {
h.log.Debugf("Updates received: count=%d", len(updates))

Expand Down
4 changes: 2 additions & 2 deletions comp/haagent/impl/haagent_comp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type Provides struct {

// NewComponent creates a new haagent component
func NewComponent(reqs Requires) (Provides, error) {
haAgentConfigs := newHaAgentConfigs(reqs.AgentConfig)
haAgent := newHaAgentImpl(reqs.Logger, haAgentConfigs)
haAgentConf := newHaAgentConfigs(reqs.AgentConfig)
haAgent := newHaAgentImpl(reqs.Logger, haAgentConf)
var rcListener rctypes.ListenerProvider
if haAgent.Enabled() {
reqs.Logger.Debug("Add onHaAgentUpdate RCListener")
Expand Down
71 changes: 71 additions & 0 deletions comp/haagent/impl/haagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,74 @@ func Test_haAgentImpl_onHaAgentUpdate(t *testing.T) {
})
}
}

func Test_haAgentImpl_ShouldRunIntegration(t *testing.T) {
testAgentHostname := "my-agent-hostname"
tests := []struct {
name string
leader string
agentConfigs map[string]interface{}
expectShouldRunIntegration map[string]bool
}{
{
name: "ha agent enabled and agent is leader",
// should run HA-integrations
// should run "non HA integrations"
agentConfigs: map[string]interface{}{
"hostname": testAgentHostname,
"ha_agent.enabled": true,
"ha_agent.group": testGroup,
},
leader: testAgentHostname,
expectShouldRunIntegration: map[string]bool{
"snmp": true,
"network_path": true,
"unknown_integration": true,
"cpu": true,
},
},
{
name: "ha agent enabled and agent is not leader",
// should skip HA-integrations
// should run "non HA integrations"
agentConfigs: map[string]interface{}{
"hostname": testAgentHostname,
"ha_agent.enabled": true,
"ha_agent.group": testGroup,
},
leader: "another-agent-is-leader",
expectShouldRunIntegration: map[string]bool{
"snmp": false,
"network_path": false,
"unknown_integration": true,
"cpu": true,
},
},
{
name: "ha agent not enabled",
// should run all integrations
agentConfigs: map[string]interface{}{
"hostname": testAgentHostname,
"ha_agent.enabled": false,
"ha_agent.group": testGroup,
},
leader: testAgentHostname,
expectShouldRunIntegration: map[string]bool{
"snmp": true,
"network_path": true,
"unknown_integration": true,
"cpu": true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
haAgent := newTestHaAgentComponent(t, tt.agentConfigs)
haAgent.Comp.SetLeader(tt.leader)

for integrationName, shouldRun := range tt.expectShouldRunIntegration {
assert.Equalf(t, shouldRun, haAgent.Comp.ShouldRunIntegration(integrationName), "fail for integration: "+integrationName)
}
})
}
}
4 changes: 4 additions & 0 deletions comp/haagent/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (m *mockHaAgent) SetEnabled(enabled bool) {
m.enabled = enabled
}

func (m *mockHaAgent) ShouldRunIntegration(_ string) bool {
return true
}

// Component is the component type.
type Component interface {
haagent.Component
Expand Down
6 changes: 5 additions & 1 deletion pkg/collector/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"go.uber.org/atomic"

haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
"github.com/DataDog/datadog-agent/pkg/collector/check"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
Expand All @@ -40,6 +41,7 @@ var (
// Runner is the object in charge of running all the checks
type Runner struct {
senderManager sender.SenderManager
haAgent haagent.Component
isRunning *atomic.Bool
id int // Globally unique identifier for the Runner
workers map[int]*worker.Worker // Workers currrently under this Runner's management
Expand All @@ -52,11 +54,12 @@ type Runner struct {
}

// NewRunner takes the number of desired goroutines processing incoming checks.
func NewRunner(senderManager sender.SenderManager) *Runner {
func NewRunner(senderManager sender.SenderManager, haAgent haagent.Component) *Runner {
numWorkers := pkgconfigsetup.Datadog().GetInt("check_runners")

r := &Runner{
senderManager: senderManager,
haAgent: haAgent,
id: int(runnerIDGenerator.Inc()),
isRunning: atomic.NewBool(true),
workers: make(map[int]*worker.Worker),
Expand Down Expand Up @@ -117,6 +120,7 @@ func (r *Runner) AddWorker() {
func (r *Runner) newWorker() (*worker.Worker, error) {
worker, err := worker.NewWorker(
r.senderManager,
r.haAgent,
r.id,
int(workerIDGenerator.Inc()),
r.pendingChecksChan,
Expand Down
21 changes: 11 additions & 10 deletions pkg/collector/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock"
"github.com/DataDog/datadog-agent/pkg/aggregator"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
"github.com/DataDog/datadog-agent/pkg/collector/check/stub"
Expand Down Expand Up @@ -152,7 +153,7 @@ func TestNewRunner(t *testing.T) {
testSetUp(t)
pkgconfigsetup.Datadog().SetWithoutSource("check_runners", "3")

r := NewRunner(aggregator.NewNoOpSenderManager())
r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent())
require.NotNil(t, r)
defer r.Stop()

Expand All @@ -166,7 +167,7 @@ func TestRunnerAddWorker(t *testing.T) {
testSetUp(t)
pkgconfigsetup.Datadog().SetWithoutSource("check_runners", "1")

r := NewRunner(aggregator.NewNoOpSenderManager())
r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent())
require.NotNil(t, r)
defer r.Stop()

Expand All @@ -181,7 +182,7 @@ func TestRunnerStaticUpdateNumWorkers(t *testing.T) {
testSetUp(t)
pkgconfigsetup.Datadog().SetWithoutSource("check_runners", "2")

r := NewRunner(aggregator.NewNoOpSenderManager())
r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent())
require.NotNil(t, r)
defer func() {
r.Stop()
Expand Down Expand Up @@ -212,7 +213,7 @@ func TestRunnerDynamicUpdateNumWorkers(t *testing.T) {
assertAsyncWorkerCount(t, 0)
min, max, expectedWorkers := testCase[0], testCase[1], testCase[2]

r := NewRunner(aggregator.NewNoOpSenderManager())
r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent())
require.NotNil(t, r)

for checks := min; checks <= max; checks++ {
Expand All @@ -234,7 +235,7 @@ func TestRunner(t *testing.T) {
checks[idx] = newCheck(t, fmt.Sprintf("mycheck_%d:123", idx), false, nil)
}

r := NewRunner(aggregator.NewNoOpSenderManager())
r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent())
require.NotNil(t, r)
defer r.Stop()

Expand Down Expand Up @@ -262,7 +263,7 @@ func TestRunnerStop(t *testing.T) {
checks[idx].RunLock.Lock()
}

r := NewRunner(aggregator.NewNoOpSenderManager())
r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent())
require.NotNil(t, r)
defer r.Stop()

Expand Down Expand Up @@ -320,7 +321,7 @@ func TestRunnerStopWithStuckCheck(t *testing.T) {
blockedCheck.RunLock.Lock()
blockedCheck.StopLock.Lock()

r := NewRunner(aggregator.NewNoOpSenderManager())
r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent())
require.NotNil(t, r)
defer r.Stop()

Expand Down Expand Up @@ -369,7 +370,7 @@ func TestRunnerStopCheck(t *testing.T) {
blockedCheck.RunLock.Lock()
blockedCheck.StopLock.Lock()

r := NewRunner(aggregator.NewNoOpSenderManager())
r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent())
require.NotNil(t, r)
defer func() {
r.Stop()
Expand Down Expand Up @@ -413,7 +414,7 @@ func TestRunnerScheduler(t *testing.T) {
sched1 := newScheduler()
sched2 := newScheduler()

r := NewRunner(aggregator.NewNoOpSenderManager())
r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent())
require.NotNil(t, r)
defer r.Stop()

Expand All @@ -433,7 +434,7 @@ func TestRunnerShouldAddCheckStats(t *testing.T) {
testCheck := newCheck(t, "test", false, nil)
sched := newScheduler()

r := NewRunner(aggregator.NewNoOpSenderManager())
r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent())
require.NotNil(t, r)
defer r.Stop()

Expand Down
Loading
Loading