Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Restrict Secret Access #3677

Merged
merged 14 commits into from
Dec 8, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General:** Add support to use pod identities for authentication in Azure Key Vault ([#3813](https://github.com/kedacore/keda/issues/3813)
- **General:** Support disable keep http connection alive([#3874](https://github.com/kedacore/keda/issues/3874)
- **General:** Improve the function used to normalize metric names ([#3789](https://github.com/kedacore/keda/issues/3789)
- **General:** Support Restrict Secret Access to mitigate the security risk ([#3668](https://github.com/kedacore/keda/issues/3668)
- **Apache Kafka Scaler:** SASL/OAuthbearer Implementation ([#3681](https://github.com/kedacore/keda/issues/3681))
- **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610))
- **Azure Event Hub Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569))
Expand Down
30 changes: 27 additions & 3 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
Expand Down Expand Up @@ -131,7 +134,25 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat

broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})
handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder)

kubeClientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Error(err, "Unable to create kube clientset")
return nil, nil, err
}
objectNamespace, err := kedautil.GetClusterObjectNamespace()
if err != nil {
logger.Error(err, "Unable to get cluster object namespace")
return nil, nil, err
}
// the namespaced kubeInformerFactory is used to restrict secret informer to only list/watch secrets in KEDA cluster object namespace,
// refer to https://github.com/kedacore/keda/issues/3668
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Hour, kubeinformers.WithNamespace(objectNamespace))
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

@zroubalik zroubalik Dec 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a short comment for these lines, why kubeInformerFactory and secret informer is needed? Ideally with a link to KEDA issue about this feature? for future reference? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, added.

secretInformer := kubeInformerFactory.Core().V1().Secrets()

handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder, secretInformer.Lister())
kubeInformerFactory.Start(ctx.Done())

externalMetricsInfo := &[]provider.ExternalMetricInfo{}
externalMetricsInfoLock := &sync.RWMutex{}

Expand All @@ -146,13 +167,13 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
}

stopCh := make(chan struct{})
if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh); err != nil {
if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh, secretInformer.Informer().HasSynced); err != nil {
return nil, nil, err
}
return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
}

func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}) error {
func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}, secretSynced cache.InformerSynced) error {
if err := (&kedacontrollers.MetricsScaledObjectReconciler{
Client: mgr.GetClient(),
ScaleHandler: scaleHandler,
Expand All @@ -170,6 +191,9 @@ func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHa
}
}()

if ok := cache.WaitForCacheSync(ctx.Done(), secretSynced); !ok {
return fmt.Errorf("failed to wait Secrets cache synced")
}
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -56,6 +57,8 @@ type ScaledJobReconciler struct {

scaledJobGenerations *sync.Map
scaleHandler scaling.ScaleHandler
SecretsLister corev1listers.SecretLister
SecretsSynced cache.InformerSynced
}

type scaledJobMetricsData struct {
Expand All @@ -75,9 +78,8 @@ func init() {

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister)
r.scaledJobGenerations = &sync.Map{}

return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
Expand Down
1 change: 0 additions & 1 deletion controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
if r.Recorder == nil {
return fmt.Errorf("ScaledObjectReconciler.Recorder is not initialized")
}

// Start controller
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var _ = BeforeSuite(func(done Done) {
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator")),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleClient: scaleClient,
}).SetupWithManager(k8sManager, controller.Options{})
Expect(err).ToNot(HaveOccurred())
Expand Down
32 changes: 30 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import (
"github.com/spf13/pflag"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -158,13 +161,28 @@ func main() {
globalHTTPTimeout := time.Duration(globalHTTPTimeoutMS) * time.Millisecond
eventRecorder := mgr.GetEventRecorderFor("keda-operator")

kubeClientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
setupLog.Error(err, "Unable to create kube clientset")
os.Exit(1)
}
objectNamespace, err := kedautil.GetClusterObjectNamespace()
if err != nil {
setupLog.Error(err, "Unable to get cluster object namespace")
os.Exit(1)
}
// the namespaced kubeInformerFactory is used to restrict secret informer to only list/watch secrets in KEDA cluster object namespace,
// refer to https://github.com/kedacore/keda/issues/3668
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Hour, kubeinformers.WithNamespace(objectNamespace))
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

@zroubalik zroubalik Dec 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a short comment for these lines, why kubeInformerFactory and secret informer is needed? Ideally with a link to KEDA issue about this feature? for future reference? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, added.

secretInformer := kubeInformerFactory.Core().V1().Secrets()

scaleClient, kubeVersion, err := k8s.InitScaleClient(mgr)
if err != nil {
setupLog.Error(err, "unable to init scale client")
os.Exit(1)
}

scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder)
scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister())

if err = (&kedacontrollers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Expand All @@ -181,6 +199,8 @@ func main() {
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
SecretsLister: secretInformer.Lister(),
SecretsSynced: secretInformer.Informer().HasSynced,
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledJobMaxReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
Expand Down Expand Up @@ -223,7 +243,15 @@ func main() {
setupLog.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH))
setupLog.Info(fmt.Sprintf("Running on Kubernetes %s", kubeVersion.PrettyVersion), "version", kubeVersion.Version)

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
ctx := ctrl.SetupSignalHandler()
kubeInformerFactory.Start(ctx.Done())
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved

if ok := cache.WaitForCacheSync(ctx.Done(), secretInformer.Informer().HasSynced); !ok {
setupLog.Error(nil, "failed to wait Secrets cache synced")
os.Exit(1)
}

if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/scaling/resolver/azure_keyvault_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/go-logr/logr"
corev1listers "k8s.io/client-go/listers/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
Expand All @@ -42,13 +43,13 @@ func NewAzureKeyVaultHandler(v *kedav1alpha1.AzureKeyVault) *AzureKeyVaultHandle
}
}

func (vh *AzureKeyVaultHandler) Initialize(ctx context.Context, client client.Client, logger logr.Logger, triggerNamespace string) error {
func (vh *AzureKeyVaultHandler) Initialize(ctx context.Context, client client.Client, logger logr.Logger, triggerNamespace string, secretsLister corev1listers.SecretLister) error {
keyvaultResourceURL, activeDirectoryEndpoint, err := vh.getPropertiesForCloud()
if err != nil {
return err
}

authConfig, err := vh.getAuthConfig(ctx, client, logger, triggerNamespace, keyvaultResourceURL, activeDirectoryEndpoint)
authConfig, err := vh.getAuthConfig(ctx, client, logger, triggerNamespace, keyvaultResourceURL, activeDirectoryEndpoint, secretsLister)
if err != nil {
return err
}
Expand Down Expand Up @@ -101,20 +102,19 @@ func (vh *AzureKeyVaultHandler) getPropertiesForCloud() (string, string, error)
}

func (vh *AzureKeyVaultHandler) getAuthConfig(ctx context.Context, client client.Client, logger logr.Logger,
triggerNamespace, keyVaultResourceURL, activeDirectoryEndpoint string) (auth.AuthorizerConfig, error) {
triggerNamespace, keyVaultResourceURL, activeDirectoryEndpoint string, secretsLister corev1listers.SecretLister) (auth.AuthorizerConfig, error) {
podIdentity := vh.vault.PodIdentity
if podIdentity == nil {
podIdentity = &kedav1alpha1.AuthPodIdentity{}
}

switch podIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
clientID := vh.vault.Credentials.ClientID
tenantID := vh.vault.Credentials.TenantID

clientSecretName := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Name
clientSecretKey := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Key
clientSecret := resolveAuthSecret(ctx, client, logger, clientSecretName, triggerNamespace, clientSecretKey)
clientSecret := resolveAuthSecret(ctx, client, logger, clientSecretName, triggerNamespace, clientSecretKey, secretsLister)

if clientID == "" || tenantID == "" || clientSecret == "" {
return nil, fmt.Errorf("clientID, tenantID and clientSecret are expected when not using a pod identity provider")
Expand Down
Loading