Skip to content

Commit

Permalink
Support Restrict Secret Access
Browse files Browse the repository at this point in the history
Support Restrict Secret Access, refer to #3668

Signed-off-by: kevin <tengkang@msn.com>
  • Loading branch information
kevinteng525 committed Sep 20, 2022
1 parent 77f31b7 commit 1f5a85d
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 51 deletions.
24 changes: 21 additions & 3 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
corev1 "k8s.io/api/core/v1"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericapiserver "k8s.io/apiserver/pkg/server"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
Expand Down Expand Up @@ -132,22 +135,34 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat

broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})
handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder)

kubeClientset, _ := kubernetes.NewForConfig(ctrl.GetConfigOrDie())
objectNamespace, err := kedautil.GetClusterObjectNamespace()
if err != nil {
logger.Error(err, "Unable to get cluster object namespace")
return nil, nil, err
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Second, kubeinformers.WithNamespace(objectNamespace))
secretInformer := kubeInformerFactory.Core().V1().Secrets()

handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder, secretInformer.Lister())
kubeInformerFactory.Start(ctx.Done())

externalMetricsInfo := &[]provider.ExternalMetricInfo{}
externalMetricsInfoLock := &sync.RWMutex{}

prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()
stopCh := make(chan struct{})

if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh); err != nil {
if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh, secretInformer.Informer().HasSynced); err != nil {
return nil, nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
}

func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}) error {
func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}, secretSynced cache.InformerSynced) error {
if err := (&kedacontrollers.MetricsScaledObjectReconciler{
Client: mgr.GetClient(),
ScaleHandler: scaleHandler,
Expand All @@ -165,6 +180,9 @@ func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHa
}
}()

if ok := cache.WaitForCacheSync(ctx.Done(), secretSynced); !ok {
return fmt.Errorf("failed to wait Secrets cache synced")
}
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -52,11 +54,14 @@ type ScaledJobReconciler struct {
Recorder record.EventRecorder

scaleHandler scaling.ScaleHandler
SecretsLister corev1listers.SecretLister
SecretsSynced cache.InformerSynced

}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister)

return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
Expand Down
8 changes: 7 additions & 1 deletion controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
corev1listers "k8s.io/client-go/listers/core/v1"
)

// +kubebuilder:rbac:groups=keda.sh,resources=scaledobjects;scaledobjects/finalizers;scaledobjects/status,verbs="*"
Expand All @@ -74,6 +75,8 @@ type ScaledObjectReconciler struct {
scaledObjectsGenerations *sync.Map
scaleHandler scaling.ScaleHandler
kubeVersion kedautil.K8sVersion
SecretsLister corev1listers.SecretLister
SecretsSynced cache.InformerSynced
}

// A cache mapping "resource.group" to true or false if we know if this resource is scalable.
Expand Down Expand Up @@ -114,7 +117,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
// Init the rest of ScaledObjectReconciler
r.restMapper = mgr.GetRESTMapper()
r.scaledObjectsGenerations = &sync.Map{}
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout, r.Recorder)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout, r.Recorder, r.SecretsLister)

// Start controller
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -144,6 +147,9 @@ func initScaleClient(mgr manager.Manager, clientset *discovery.DiscoveryClient)

// Reconcile performs reconciliation on the identified ScaledObject resource based on the request information passed, returns the result and an error (if any).
func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if ok := cache.WaitForCacheSync(ctx.Done(), r.SecretsSynced); !ok {
return ctrl.Result{}, fmt.Errorf("failed to wait Secrets cache synced")
}
reqLogger := log.FromContext(ctx)

// Fetch the ScaledObject instance
Expand Down
19 changes: 19 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"flag"
"fmt"
"os"
Expand All @@ -25,6 +26,7 @@ import (

apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -36,6 +38,7 @@ import (
kedacontrollers "github.com/kedacore/keda/v2/controllers/keda"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/kedacore/keda/v2/version"
kubeinformers "k8s.io/client-go/informers"
//+kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -140,11 +143,22 @@ func main() {
globalHTTPTimeout := time.Duration(globalHTTPTimeoutMS) * time.Millisecond
eventRecorder := mgr.GetEventRecorderFor("keda-operator")

kubeClientset, _ := kubernetes.NewForConfig(ctrl.GetConfigOrDie())
objectNamespace, err := kedautil.GetClusterObjectNamespace()
if err != nil {
setupLog.Error(err, "Unable to get cluster object namespace")
os.Exit(1)
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Second, kubeinformers.WithNamespace(objectNamespace))
secretInformer := kubeInformerFactory.Core().V1().Secrets()

if err = (&kedacontrollers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
SecretsLister: secretInformer.Lister(),
SecretsSynced: secretInformer.Informer().HasSynced,
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledObjectMaxReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledObject")
os.Exit(1)
Expand All @@ -154,6 +168,8 @@ func main() {
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
SecretsLister: secretInformer.Lister(),
SecretsSynced: secretInformer.Informer().HasSynced,
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledJobMaxReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
Expand Down Expand Up @@ -191,6 +207,9 @@ func main() {
setupLog.Info(fmt.Sprintf("Go Version: %s", runtime.Version()))
setupLog.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH))

ctx := context.Background()
kubeInformerFactory.Start(ctx.Done())

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
Expand Down
9 changes: 5 additions & 4 deletions pkg/scaling/resolver/azure_keyvault_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/go-logr/logr"
corev1listers "k8s.io/client-go/listers/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
Expand All @@ -44,13 +45,13 @@ func NewAzureKeyVaultHandler(v *kedav1alpha1.AzureKeyVault, podIdentity kedav1al
}
}

func (vh *AzureKeyVaultHandler) Initialize(ctx context.Context, client client.Client, logger logr.Logger, triggerNamespace string) error {
func (vh *AzureKeyVaultHandler) Initialize(ctx context.Context, client client.Client, logger logr.Logger, triggerNamespace string, secretsLister corev1listers.SecretLister) error {
keyvaultResourceURL, activeDirectoryEndpoint, err := vh.getPropertiesForCloud()
if err != nil {
return err
}

authConfig, err := vh.getAuthConfig(ctx, client, logger, triggerNamespace, keyvaultResourceURL, activeDirectoryEndpoint)
authConfig, err := vh.getAuthConfig(ctx, client, logger, triggerNamespace, keyvaultResourceURL, activeDirectoryEndpoint, secretsLister)
if err != nil {
return err
}
Expand Down Expand Up @@ -103,15 +104,15 @@ func (vh *AzureKeyVaultHandler) getPropertiesForCloud() (string, string, error)
}

func (vh *AzureKeyVaultHandler) getAuthConfig(ctx context.Context, client client.Client, logger logr.Logger,
triggerNamespace, keyVaultResourceURL, activeDirectoryEndpoint string) (auth.AuthorizerConfig, error) {
triggerNamespace, keyVaultResourceURL, activeDirectoryEndpoint string, secretsLister corev1listers.SecretLister) (auth.AuthorizerConfig, error) {
switch vh.podIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
clientID := vh.vault.Credentials.ClientID
tenantID := vh.vault.Credentials.TenantID

clientSecretName := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Name
clientSecretKey := vh.vault.Credentials.ClientSecret.ValueFrom.SecretKeyRef.Key
clientSecret := resolveAuthSecret(ctx, client, logger, clientSecretName, triggerNamespace, clientSecretKey)
clientSecret := resolveAuthSecret(ctx, client, logger, clientSecretName, triggerNamespace, clientSecretKey, secretsLister)

if clientID == "" || tenantID == "" || clientSecret == "" {
return nil, fmt.Errorf("clientID, tenantID and clientSecret are expected when not using a pod identity provider")
Expand Down
Loading

0 comments on commit 1f5a85d

Please sign in to comment.