Skip to content

Commit

Permalink
Merge pull request #12 from akartsky/namespace_scope
Browse files Browse the repository at this point in the history
Namespace scope service controllers
  • Loading branch information
jaypipes authored May 14, 2021
2 parents ef007e3 + 13280e0 commit 0ff9ffa
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 16 deletions.
8 changes: 8 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
flagAWSEndpointURL = "aws-endpoint-url"
flagLogLevel = "log-level"
flagResourceTags = "resource-tags"
flagWatchNamespace = "watch-namespace"
)

// Config contains configuration otpions for ACK service controllers
Expand All @@ -46,6 +47,7 @@ type Config struct {
EndpointURL string
LogLevel string
ResourceTags []string
WatchNamespace string
}

// BindFlags defines CLI/runtime configuration options
Expand Down Expand Up @@ -99,6 +101,12 @@ func (cfg *Config) BindFlags() {
[]string{},
"Configures the ACK service controller to always set key/value pairs tags on resources that it manages.",
)
flag.StringVar(
&cfg.WatchNamespace , flagWatchNamespace,
"",
"Specific namespace the service controller will watch for object creation from CRD. "+
" By default it will listen to all namespaces",
)
}

// SetupLogger initializes the logger used in the service controller
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/adoption_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r *adoptionReconciler) BindControllerManager(mgr ctrlrt.Manager) error {
return err
}
r.kc = mgr.GetClient()
r.cache = ackrtcache.New(clientset, r.log)
r.cache = ackrtcache.New(clientset, r.log, r.cfg.WatchNamespace)
r.cache.Run()
return ctrlrt.NewControllerManagedBy(
mgr,
Expand Down
4 changes: 2 additions & 2 deletions pkg/runtime/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ type Caches struct {

// New creates a new Caches object from a kubernetes.Interface and
// a logr.Logger
func New(clientset kubernetes.Interface, log logr.Logger) Caches {
func New(clientset kubernetes.Interface, log logr.Logger, watchNamespace string) Caches {
return Caches{
Accounts: NewAccountCache(clientset, log),
Namespaces: NewNamespaceCache(clientset, log),
Namespaces: NewNamespaceCache(clientset, log, watchNamespace),
}
}

Expand Down
30 changes: 19 additions & 11 deletions pkg/runtime/cache/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type NamespaceCache struct {
sync.RWMutex

log logr.Logger
// Provide a namespace specifically to listen to.
// Provide empty string to listen to all namespaces except kube-system and kube-public.
watchNamespace string

// Namespace informer
informer k8scache.SharedInformer
// namespaceInfos maps namespaces names to their known namespaceInfo
Expand All @@ -63,7 +67,7 @@ type NamespaceCache struct {

// NewNamespaceCache makes a new NamespaceCache from a
// kubernetes.Interface and a logr.Logger
func NewNamespaceCache(clientset kubernetes.Interface, log logr.Logger) *NamespaceCache {
func NewNamespaceCache(clientset kubernetes.Interface, log logr.Logger, watchNamespace string) *NamespaceCache {
sharedInformer := informersv1.NewNamespaceInformer(
clientset,
informerResyncPeriod,
Expand All @@ -72,40 +76,44 @@ func NewNamespaceCache(clientset kubernetes.Interface, log logr.Logger) *Namespa
return &NamespaceCache{
informer: sharedInformer,
log: log.WithName("cache.namespace"),
watchNamespace: watchNamespace,
namespaceInfos: make(map[string]*namespaceInfo),
}
}

// isIgnoredNamespace returns true if an object is of type corev1.Namespace
// and it metadata name is one of 'ack-system', 'kube-system' or 'kube-public'
func isIgnoredNamespace(raw interface{}) bool {
// Check if the provided namespace should be listened to or not
func isWatchNamespace(raw interface{}, watchNamespace string) bool {
object, ok := raw.(*corev1.Namespace)
return ok &&
(object.ObjectMeta.Name == "ack-system" ||
object.ObjectMeta.Name == "kube-system" ||
object.ObjectMeta.Name == "kube-public")
if !ok {
return false
}

if watchNamespace != "" {
return watchNamespace == object.ObjectMeta.Name
}
return object.ObjectMeta.Name != "kube-system" && object.ObjectMeta.Name != "kube-public"
}

// Run adds event handler functions to the SharedInformer and
// runs the informer to begin processing items.
func (c *NamespaceCache) Run(stopCh <-chan struct{}) {
c.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if !isIgnoredNamespace(obj) {
if isWatchNamespace(obj, c.watchNamespace) {
ns := obj.(*corev1.Namespace)
c.setNamespaceInfoFromK8sObject(ns)
c.log.V(1).Info("created namespace", "name", ns.ObjectMeta.Name)
}
},
UpdateFunc: func(orig, desired interface{}) {
if !isIgnoredNamespace(desired) {
if isWatchNamespace(desired, c.watchNamespace) {
ns := desired.(*corev1.Namespace)
c.setNamespaceInfoFromK8sObject(ns)
c.log.V(1).Info("updated namespace", "name", ns.ObjectMeta.Name)
}
},
DeleteFunc: func(obj interface{}) {
if !isIgnoredNamespace(obj) {
if isWatchNamespace(obj, c.watchNamespace) {
ns := obj.(*corev1.Namespace)
c.deleteNamespaceInfo(ns.ObjectMeta.Name)
c.log.V(1).Info("deleted namespace", "name", ns.ObjectMeta.Name)
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/cache/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestNamespaceCache(t *testing.T) {
fakeLogger := ctrlrtzap.New(ctrlrtzap.UseFlagOptions(&zapOptions))

// initlizing account cache
namespaceCache := ackrtcache.NewNamespaceCache(k8sClient, fakeLogger)
namespaceCache := ackrtcache.NewNamespaceCache(k8sClient, fakeLogger, "")
stopCh := make(chan struct{})

namespaceCache.Run(stopCh)
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r *resourceReconciler) BindControllerManager(mgr ctrlrt.Manager) error {
return err
}
r.kc = mgr.GetClient()
r.cache = ackrtcache.New(clientset, r.log)
r.cache = ackrtcache.New(clientset, r.log, r.cfg.WatchNamespace)
r.cache.Run()
rd := r.rmf.ResourceDescriptor()
return ctrlrt.NewControllerManagedBy(
Expand Down

0 comments on commit 0ff9ffa

Please sign in to comment.