Skip to content

Commit

Permalink
notify leader election subscribers on leadership state change
Browse files Browse the repository at this point in the history
  • Loading branch information
adel121 committed Dec 18, 2024
1 parent 188825c commit 3270cee
Show file tree
Hide file tree
Showing 19 changed files with 183 additions and 111 deletions.
32 changes: 16 additions & 16 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,13 @@ func start(log log.Component,
if config.GetBool("admission_controller.enabled") {
if config.GetBool("admission_controller.auto_instrumentation.patcher.enabled") {
patchCtx := admissionpatch.ControllerContext{
IsLeaderFunc: le.IsLeader,
LeaderSubscribeFunc: le.Subscribe,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
ClusterID: clusterID,
StopCh: stopCh,
IsLeaderFunc: le.IsLeader,
LeadershipStateSubscribeFunc: le.Subscribe,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
ClusterID: clusterID,
StopCh: stopCh,
}
if err := admissionpatch.StartControllers(patchCtx); err != nil {
log.Errorf("Cannot start auto instrumentation patcher: %v", err)
Expand All @@ -471,15 +471,15 @@ func start(log log.Component,
}

admissionCtx := admissionpkg.ControllerContext{
IsLeaderFunc: le.IsLeader,
LeaderSubscribeFunc: le.Subscribe,
SecretInformers: apiCl.CertificateSecretInformerFactory,
ValidatingInformers: apiCl.WebhookConfigInformerFactory,
MutatingInformers: apiCl.WebhookConfigInformerFactory,
Client: apiCl.Cl,
StopCh: stopCh,
ValidatingStopCh: validatingStopCh,
Demultiplexer: demultiplexer,
IsLeaderFunc: le.IsLeader,
LeadershipStateSubscribeFunc: le.Subscribe,
SecretInformers: apiCl.CertificateSecretInformerFactory,
ValidatingInformers: apiCl.WebhookConfigInformerFactory,
MutatingInformers: apiCl.WebhookConfigInformerFactory,
Client: apiCl.Cl,
StopCh: stopCh,
ValidatingStopCh: validatingStopCh,
Demultiplexer: demultiplexer,
}

webhooks, err := admissionpkg.StartControllers(admissionCtx, wmeta, pa, datadogConfig)
Expand Down
33 changes: 18 additions & 15 deletions pkg/clusteragent/admission/controllers/secret/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/metrics"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/certificate"
"github.com/DataDog/datadog-agent/pkg/util/log"

Expand All @@ -34,19 +35,19 @@ import (
// Controller is responsible for creating and refreshing the Secret object
// that contains the certificate of the Admission Webhook.
type Controller struct {
clientSet kubernetes.Interface
secretsLister corelisters.SecretLister
secretsSynced cache.InformerSynced
config Config
dnsNames []string
dnsNamesDigest uint64
queue workqueue.TypedRateLimitingInterface[string]
isLeaderFunc func() bool
isLeaderNotif <-chan struct{}
clientSet kubernetes.Interface
secretsLister corelisters.SecretLister
secretsSynced cache.InformerSynced
config Config
dnsNames []string
dnsNamesDigest uint64
queue workqueue.TypedRateLimitingInterface[string]
isLeaderFunc func() bool
leadershipStateNotif <-chan leaderelection.LeadershipState
}

// NewController returns a new Secret Controller.
func NewController(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, isLeaderFunc func() bool, isLeaderNotif <-chan struct{}, config Config) *Controller {
func NewController(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, isLeaderFunc func() bool, leadershipStateNotif <-chan leaderelection.LeadershipState, config Config) *Controller {
dnsNames := generateDNSNames(config.GetNs(), config.GetSvc())
controller := &Controller{
clientSet: client,
Expand All @@ -59,8 +60,8 @@ func NewController(client kubernetes.Interface, secretInformer coreinformers.Sec
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "secrets"},
),
isLeaderFunc: isLeaderFunc,
isLeaderNotif: isLeaderNotif,
isLeaderFunc: isLeaderFunc,
leadershipStateNotif: leadershipStateNotif,
}
if _, err := secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
Expand Down Expand Up @@ -101,9 +102,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
func (c *Controller) enqueueOnLeaderNotif(stop <-chan struct{}) {
for {
select {
case <-c.isLeaderNotif:
log.Infof("Got a leader notification, enqueuing a reconciliation for %s/%s", c.config.GetNs(), c.config.GetName())
c.triggerReconciliation()
case leadership := <-c.leadershipStateNotif:
if leadership == leaderelection.LEADER {
log.Infof("Got a leader notification, enqueuing a reconciliation for %s/%s", c.config.GetNs(), c.config.GetName())
c.triggerReconciliation()
}
case <-stop:
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/certificate"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -175,7 +176,7 @@ func (f *fixture) run(stopCh <-chan struct{}) *Controller {
f.client,
factory.Core().V1().Secrets(),
func() bool { return true },
make(chan struct{}),
make(<-chan leaderelection.LeadershipState),
cfg,
)

Expand Down
17 changes: 10 additions & 7 deletions pkg/clusteragent/admission/controllers/webhook/controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/mutate/tagsfromlabels"
"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/validate/kubernetesadmissionevents"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand All @@ -51,17 +52,17 @@ func NewController(
validatingInformers admissionregistration.Interface,
mutatingInformers admissionregistration.Interface,
isLeaderFunc func() bool,
isLeaderNotif <-chan struct{},
leadershipStateNotif <-chan leaderelection.LeadershipState,
config Config,
wmeta workloadmeta.Component,
pa workload.PodPatcher,
datadogConfig config.Component,
demultiplexer demultiplexer.Component,
) Controller {
if config.useAdmissionV1() {
return NewControllerV1(client, secretInformer, validatingInformers.V1().ValidatingWebhookConfigurations(), mutatingInformers.V1().MutatingWebhookConfigurations(), isLeaderFunc, isLeaderNotif, config, wmeta, pa, datadogConfig, demultiplexer)
return NewControllerV1(client, secretInformer, validatingInformers.V1().ValidatingWebhookConfigurations(), mutatingInformers.V1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, pa, datadogConfig, demultiplexer)
}
return NewControllerV1beta1(client, secretInformer, validatingInformers.V1beta1().ValidatingWebhookConfigurations(), mutatingInformers.V1beta1().MutatingWebhookConfigurations(), isLeaderFunc, isLeaderNotif, config, wmeta, pa, datadogConfig, demultiplexer)
return NewControllerV1beta1(client, secretInformer, validatingInformers.V1beta1().ValidatingWebhookConfigurations(), mutatingInformers.V1beta1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, pa, datadogConfig, demultiplexer)
}

// Webhook represents an admission webhook
Expand Down Expand Up @@ -162,7 +163,7 @@ type controllerBase struct {
mutatingWebhooksSynced cache.InformerSynced //nolint:structcheck
queue workqueue.TypedRateLimitingInterface[string]
isLeaderFunc func() bool
isLeaderNotif <-chan struct{}
leadershipStateNotif <-chan leaderelection.LeadershipState
webhooks []Webhook
}

Expand All @@ -186,9 +187,11 @@ func (c *controllerBase) EnabledWebhooks() []Webhook {
func (c *controllerBase) enqueueOnLeaderNotif(stop <-chan struct{}) {
for {
select {
case <-c.isLeaderNotif:
log.Infof("Got a leader notification, enqueuing a reconciliation for %q", c.config.getWebhookName())
c.triggerReconciliation()
case leadership := <-c.leadershipStateNotif:
if leadership == leaderelection.LEADER {
log.Infof("Got a leader notification, enqueuing a reconciliation for %q", c.config.getWebhookName())
c.triggerReconciliation()
}
case <-stop:
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
workloadmetafxmock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx-mock"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
)

func TestNewController(t *testing.T) {
Expand All @@ -35,7 +36,7 @@ func TestNewController(t *testing.T) {
factory.Admissionregistration(),
factory.Admissionregistration(),
func() bool { return true },
make(chan struct{}),
make(<-chan leaderelection.LeadershipState),
getV1Cfg(t),
wmeta,
nil,
Expand All @@ -52,7 +53,7 @@ func TestNewController(t *testing.T) {
factory.Admissionregistration(),
factory.Admissionregistration(),
func() bool { return true },
make(chan struct{}),
make(<-chan leaderelection.LeadershipState),
getV1beta1Cfg(t),
wmeta,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/common"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/certificate"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand All @@ -53,7 +54,7 @@ func NewControllerV1(
validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer,
mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer,
isLeaderFunc func() bool,
isLeaderNotif <-chan struct{},
leadershipStateNotif <-chan leaderelection.LeadershipState,
config Config,
wmeta workloadmeta.Component,
pa workload.PodPatcher,
Expand All @@ -75,7 +76,7 @@ func NewControllerV1(
workqueue.TypedRateLimitingQueueConfig[string]{Name: "webhooks"},
)
controller.isLeaderFunc = isLeaderFunc
controller.isLeaderNotif = isLeaderNotif
controller.leadershipStateNotif = leadershipStateNotif
controller.webhooks = controller.generateWebhooks(wmeta, pa, datadogConfig, demultiplexer)
controller.generateTemplates()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/config/model"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/certificate"
)

Expand Down Expand Up @@ -1190,7 +1191,7 @@ func (f *fixtureV1) createController() (*ControllerV1, informers.SharedInformerF
factory.Admissionregistration().V1().ValidatingWebhookConfigurations(),
factory.Admissionregistration().V1().MutatingWebhookConfigurations(),
func() bool { return true },
make(chan struct{}),
make(<-chan leaderelection.LeadershipState),
getV1Cfg(f.t),
wmeta,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/common"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/certificate"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand All @@ -54,7 +55,7 @@ func NewControllerV1beta1(
validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer,
mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer,
isLeaderFunc func() bool,
isLeaderNotif <-chan struct{},
leadershipStateNotif <-chan leaderelection.LeadershipState,
config Config,
wmeta workloadmeta.Component,
pa workload.PodPatcher,
Expand All @@ -76,7 +77,7 @@ func NewControllerV1beta1(
workqueue.TypedRateLimitingQueueConfig[string]{Name: "webhooks"},
)
controller.isLeaderFunc = isLeaderFunc
controller.isLeaderNotif = isLeaderNotif
controller.leadershipStateNotif = leadershipStateNotif
controller.webhooks = controller.generateWebhooks(wmeta, pa, datadogConfig, demultiplexer)
controller.generateTemplates()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/config/model"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/certificate"
)

Expand Down Expand Up @@ -1184,7 +1185,7 @@ func (f *fixtureV1beta1) createController() (*ControllerV1beta1, informers.Share
factory.Admissionregistration().V1beta1().ValidatingWebhookConfigurations(),
factory.Admissionregistration().V1beta1().MutatingWebhookConfigurations(),
func() bool { return true },
make(chan struct{}),
make(<-chan leaderelection.LeadershipState),
getV1beta1Cfg(f.t),
wmeta,
nil,
Expand Down
23 changes: 13 additions & 10 deletions pkg/clusteragent/admission/patch/file_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
"os"
"time"

"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// filePatchProvider this is a stub and will be used for e2e testing only
type filePatchProvider struct {
file string
isLeaderNotif <-chan struct{}
leadershipStateNotif <-chan leaderelection.LeadershipState
pollInterval time.Duration
subscribers map[TargetObjKind]chan Request
lastSuccessfulRefresh time.Time
Expand All @@ -27,13 +28,13 @@ type filePatchProvider struct {

var _ patchProvider = &filePatchProvider{}

func newfileProvider(file string, isLeaderNotif <-chan struct{}, clusterName string) *filePatchProvider {
func newfileProvider(file string, leadershipStateNotif <-chan leaderelection.LeadershipState, clusterName string) *filePatchProvider {
return &filePatchProvider{
file: file,
isLeaderNotif: isLeaderNotif,
pollInterval: 15 * time.Second,
subscribers: make(map[TargetObjKind]chan Request),
clusterName: clusterName,
file: file,
leadershipStateNotif: leadershipStateNotif,
pollInterval: 15 * time.Second,
subscribers: make(map[TargetObjKind]chan Request),
clusterName: clusterName,
}
}

Expand All @@ -49,9 +50,11 @@ func (fpp *filePatchProvider) start(stopCh <-chan struct{}) {
defer ticker.Stop()
for {
select {
case <-fpp.isLeaderNotif:
log.Info("Got a leader notification, polling from file")
fpp.process(true)
case leadership := <-fpp.leadershipStateNotif:
if leadership == leaderelection.LEADER {
log.Info("Got a leader notification, polling from file")
fpp.process(true)
}
case <-ticker.C:
fpp.process(false)
case <-stopCh:
Expand Down
4 changes: 3 additions & 1 deletion pkg/clusteragent/admission/patch/file_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ package patch
import (
"testing"

"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"

"github.com/stretchr/testify/require"
)

func TestFileProviderProcess(t *testing.T) {
fpp := newfileProvider("testdata/auto-instru.json", make(chan struct{}), "dev")
fpp := newfileProvider("testdata/auto-instru.json", make(<-chan leaderelection.LeadershipState), "dev")
notifs := fpp.subscribe(KindDeployment)
fpp.process(false)
require.Len(t, notifs, 1)
Expand Down
7 changes: 4 additions & 3 deletions pkg/clusteragent/admission/patch/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@ import (
"github.com/DataDog/datadog-agent/pkg/clusteragent/telemetry"
rcclient "github.com/DataDog/datadog-agent/pkg/config/remote/client"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
)

type patchProvider interface {
start(stopCh <-chan struct{})
subscribe(kind TargetObjKind) chan Request
}

func newPatchProvider(rcClient *rcclient.Client, isLeaderNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (patchProvider, error) {
func newPatchProvider(rcClient *rcclient.Client, leadershipStateNotif <-chan leaderelection.LeadershipState, telemetryCollector telemetry.TelemetryCollector, clusterName string) (patchProvider, error) {
if pkgconfigsetup.IsRemoteConfigEnabled(pkgconfigsetup.Datadog()) {
return newRemoteConfigProvider(rcClient, isLeaderNotif, telemetryCollector, clusterName)
return newRemoteConfigProvider(rcClient, leadershipStateNotif, telemetryCollector, clusterName)
}
if pkgconfigsetup.Datadog().GetBool("admission_controller.auto_instrumentation.patcher.fallback_to_file_provider") {
// Use the file config provider for e2e testing only (it replaces RC as a source of configs)
file := pkgconfigsetup.Datadog().GetString("admission_controller.auto_instrumentation.patcher.file_provider_path")
return newfileProvider(file, isLeaderNotif, clusterName), nil
return newfileProvider(file, leadershipStateNotif, clusterName), nil
}
return nil, errors.New("remote config is disabled")
}
Loading

0 comments on commit 3270cee

Please sign in to comment.