From 5ed60bbfe99f9fa395e9f6cd87f830836a7d29a9 Mon Sep 17 00:00:00 2001 From: Alexandre Yang Date: Tue, 26 Nov 2024 18:27:34 +0100 Subject: [PATCH] move logic to haagent comp --- comp/haagent/def/component.go | 4 +- comp/haagent/impl/config.go | 3 +- comp/haagent/impl/haagent.go | 4 +- comp/haagent/impl/haagent_test.go | 79 ++++++++++++++++++++++++++----- comp/haagent/mock/mock.go | 2 +- pkg/collector/worker/worker.go | 19 ++------ 6 files changed, 78 insertions(+), 33 deletions(-) diff --git a/comp/haagent/def/component.go b/comp/haagent/def/component.go index 4d10fb4e16058..f1d3f53ce3fa5 100644 --- a/comp/haagent/def/component.go +++ b/comp/haagent/def/component.go @@ -23,6 +23,6 @@ type Component interface { // the isLeader state is set to true, otherwise false. SetLeader(leaderAgentHostname string) - // IsHaIntegration returns true if the integration type is an HA Integration - IsHaIntegration(checkType string) bool + // ShouldRunIntegration returns true if the integration should be run + ShouldRunIntegration(integrationName string) bool } diff --git a/comp/haagent/impl/config.go b/comp/haagent/impl/config.go index 037480284877a..4fea0af76d0fd 100644 --- a/comp/haagent/impl/config.go +++ b/comp/haagent/impl/config.go @@ -14,7 +14,8 @@ import ( // At the moment, the list of HA Integrations is hardcoded here, but we might provide // more dynamic way to configure which integration should be considered HA Integration. var validHaIntegrations = map[string]bool{ - "snmp": true, + "snmp": true, + "network_path": true, } type haAgentConfigs struct { diff --git a/comp/haagent/impl/haagent.go b/comp/haagent/impl/haagent.go index 4f04f600c6a10..3c1dfc27d9d94 100644 --- a/comp/haagent/impl/haagent.go +++ b/comp/haagent/impl/haagent.go @@ -50,8 +50,8 @@ func (h *haAgentImpl) SetLeader(leaderAgentHostname string) { h.isLeader.Store(agentHostname == leaderAgentHostname) } -func (h *haAgentImpl) IsHaIntegration(integrationName string) bool { - return validHaIntegrations[integrationName] +func (h *haAgentImpl) ShouldRunIntegration(integrationName string) bool { + return h.Enabled() && validHaIntegrations[integrationName] && h.isLeader.Load() } func (h *haAgentImpl) onHaAgentUpdate(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) { diff --git a/comp/haagent/impl/haagent_test.go b/comp/haagent/impl/haagent_test.go index 347f192bbabcf..3bd2cf75b9baf 100644 --- a/comp/haagent/impl/haagent_test.go +++ b/comp/haagent/impl/haagent_test.go @@ -69,19 +69,6 @@ func Test_IsLeader_SetLeader(t *testing.T) { assert.True(t, haAgent.IsLeader()) } -func Test_IsHaIntegration(t *testing.T) { - agentConfigs := map[string]interface{}{ - "hostname": "my-agent-hostname", - "ha_agent.enabled": true, - "ha_agent.group": testGroup, - } - haAgent := newTestHaAgentComponent(t, agentConfigs) - - assert.True(t, haAgent.Comp.IsHaIntegration("snmp")) - assert.False(t, haAgent.Comp.IsHaIntegration("unknown_integration")) - assert.False(t, haAgent.Comp.IsHaIntegration("cpu")) -} - func Test_RCListener(t *testing.T) { tests := []struct { name string @@ -182,3 +169,69 @@ func Test_haAgentImpl_onHaAgentUpdate(t *testing.T) { }) } } + +func Test_haAgentImpl_ShouldRunIntegration(t *testing.T) { + testAgentHostname := "my-agent-hostname" + tests := []struct { + name string + leader string + agentConfigs map[string]interface{} + expectShouldRunIntegration map[string]bool + }{ + { + name: "should run: for HA integration", + agentConfigs: map[string]interface{}{ + "hostname": testAgentHostname, + "ha_agent.enabled": true, + "ha_agent.group": testGroup, + }, + leader: testAgentHostname, + expectShouldRunIntegration: map[string]bool{ + "snmp": true, + "network_path": true, + "unknown_integration": false, + "cpu": false, + }, + }, + { + name: "should not run: current agent is not leader", + agentConfigs: map[string]interface{}{ + "hostname": testAgentHostname, + "ha_agent.enabled": true, + "ha_agent.group": testGroup, + }, + leader: "another-agent-is-leader", + expectShouldRunIntegration: map[string]bool{ + "snmp": false, + "network_path": false, + "unknown_integration": false, + "cpu": false, + }, + }, + { + name: "should not run: HA Agent not enabled", + agentConfigs: map[string]interface{}{ + "hostname": testAgentHostname, + "ha_agent.enabled": false, + "ha_agent.group": testGroup, + }, + leader: testAgentHostname, + expectShouldRunIntegration: map[string]bool{ + "snmp": false, + "network_path": false, + "unknown_integration": false, + "cpu": false, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + haAgent := newTestHaAgentComponent(t, tt.agentConfigs) + haAgent.Comp.SetLeader(tt.leader) + + for integrationName, shouldRun := range tt.expectShouldRunIntegration { + assert.Equal(t, shouldRun, haAgent.Comp.ShouldRunIntegration(integrationName)) + } + }) + } +} diff --git a/comp/haagent/mock/mock.go b/comp/haagent/mock/mock.go index 28b3b95f68496..3e3ecea569456 100644 --- a/comp/haagent/mock/mock.go +++ b/comp/haagent/mock/mock.go @@ -44,7 +44,7 @@ func (m *mockHaAgent) SetEnabled(enabled bool) { m.enabled = enabled } -func (m *mockHaAgent) IsHaIntegration(_ string) bool { +func (m *mockHaAgent) ShouldRunIntegration(_ string) bool { return false } diff --git a/pkg/collector/worker/worker.go b/pkg/collector/worker/worker.go index ce2790f3b36ed..144ecf58240f2 100644 --- a/pkg/collector/worker/worker.go +++ b/pkg/collector/worker/worker.go @@ -141,16 +141,14 @@ func (w *Worker) Run() { checkLogger := CheckLogger{Check: check} longRunning := check.Interval() == 0 - // Add check to tracker if it's not already running - if !w.checksTracker.AddCheck(check) { - checkLogger.Debug("Check is already running, skipping execution...") + if !w.haAgent.ShouldRunIntegration(check.String()) { + checkLogger.Debug("Check is an HA integration and current agent is not leader, skipping execution...") continue } - if !w.shouldRunIntegrationInstance(check) { - checkLogger.Debug("HA Integration skipped") - // Remove the check from the running list - w.checksTracker.DeleteCheck(check.ID()) + // Add check to tracker if it's not already running + if !w.checksTracker.AddCheck(check) { + checkLogger.Debug("Check is already running, skipping execution...") continue } @@ -225,13 +223,6 @@ func (w *Worker) Run() { log.Debugf("Runner %d, worker %d: Finished processing checks.", w.runnerID, w.ID) } -func (w *Worker) shouldRunIntegrationInstance(check check.Check) bool { - if w.haAgent.Enabled() && w.haAgent.IsHaIntegration(check.String()) { - return w.haAgent.IsLeader() - } - return true -} - func startUtilizationUpdater(name string, ut *utilizationtracker.UtilizationTracker) { expvars.SetWorkerStats(name, &expvars.WorkerStats{ Utilization: 0.0,