Skip to content

Commit

Permalink
Merge pull request #14 from bpineau/controller_listwatch
Browse files Browse the repository at this point in the history
ListWatch is controllers responsibility
  • Loading branch information
bpineau authored Apr 8, 2018
2 parents 69009f5 + 77de993 commit 1899edf
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 37 deletions.
20 changes: 17 additions & 3 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (
"github.com/bpineau/katafygio/config"
"github.com/bpineau/katafygio/pkg/event"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

Expand All @@ -34,9 +37,18 @@ type Controller struct {
informer cache.SharedIndexInformer
}

// New return an untyped, generic Kubernetes controller
func New(lw cache.ListerWatcher, notifier *event.Notifier, name string, config *config.KfConfig) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// New return a kubernetes controller using the provided client
func New(client cache.ListerWatcher, notifier *event.Notifier, name string, config *config.KfConfig) *Controller {

selector := metav1.ListOptions{LabelSelector: config.Filter}
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.List(selector)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.Watch(selector)
},
}

informer := cache.NewSharedIndexInformer(
lw,
Expand All @@ -45,6 +57,8 @@ func New(lw cache.ListerWatcher, notifier *event.Notifier, name string, config *
cache.Indexers{},
)

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
Expand Down
59 changes: 25 additions & 34 deletions pkg/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ import (
"github.com/bpineau/katafygio/pkg/event"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
)

const discoveryInterval = 60 * time.Second
Expand All @@ -36,17 +33,13 @@ type Observer struct {
}

type gvk struct {
group string
version string
kind string
gv schema.GroupVersion
ar metav1.APIResource
groupVersion schema.GroupVersion
apiResource metav1.APIResource
}

type resources map[string]*gvk

// New returns a new observer, that will watch for api resource kinds
// and create new controllers for each one.
// New returns a new observer, that will watch API resources and create controllers
func New(config *config.KfConfig, notif *event.Notifier) *Observer {
return &Observer{
config: config,
Expand All @@ -72,7 +65,7 @@ func (c *Observer) Start() *Observer {
for {
err := c.refresh()
if err != nil {
c.config.Logger.Errorf("Failed to refresh: %v", err)
c.config.Logger.Errorf("Refresh failed: %v", err)
}

select {
Expand All @@ -92,8 +85,8 @@ func (c *Observer) Stop() {

close(c.stop)

for _, c := range c.ctrls {
c.Stop()
for _, ct := range c.ctrls {
ct.Stop()
}

<-c.done
Expand All @@ -110,30 +103,27 @@ func (c *Observer) refresh() error {
continue
}

cl, err := c.cpool.ClientForGroupVersionKind(res.gv.WithKind(res.kind))
kind := res.apiResource.Kind
gk := res.groupVersion.WithKind(kind)
cname := strings.ToLower(kind)

cl, err := c.cpool.ClientForGroupVersionKind(gk)
if err != nil {
return fmt.Errorf("failed to get a cpool for %s", name)
return fmt.Errorf("failed to get a client for %s", name)
}

client := cl.Resource(res.ar.DeepCopy(), metav1.NamespaceAll)

selector := metav1.ListOptions{LabelSelector: c.config.Filter}
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.List(selector)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.Watch(selector)
},
}
client := cl.Resource(res.apiResource.DeepCopy(), metav1.NamespaceAll)

c.ctrls[name] = controller.New(lw, c.notif, strings.ToLower(res.ar.Kind), c.config)
c.ctrls[name] = controller.New(client, c.notif, cname, c.config)
go c.ctrls[name].Start()
}

return nil
}

// The api-server may expose a resource under several API groups, for backward
// compatibility. We'll want to ignore lower priorities "cohabitations":
// cf. kubernetes/cmd/kube-apiserver/app/server.go
var preferredVersions = map[string]string{
"apps:deployment": "extensions:deployment",
"apps:daemonset": "extensions:daemonset",
Expand All @@ -149,12 +139,12 @@ func (c *Observer) expandAndFilterAPIResources(groups []*metav1.APIResourceList)
for _, group := range groups {
gv, err := schema.ParseGroupVersion(group.GroupVersion)
if err != nil {
c.config.Logger.Errorf("api-server sent an unparsable group version: %v", err)
c.config.Logger.Errorf("unparsable group version: %v", err)
continue
}

for _, ar := range group.APIResources {
// remove subresources (like job/status or deployments/scale)
// remove subresources (like job/status)
if strings.ContainsRune(ar.Name, '/') {
continue
}
Expand All @@ -169,13 +159,13 @@ func (c *Observer) expandAndFilterAPIResources(groups []*metav1.APIResourceList)
continue
}

resource := &gvk{group: gv.Group, version: gv.Version, kind: ar.Kind, gv: gv, ar: ar}
resources[strings.ToLower(gv.Group+":"+ar.Kind)] = resource
resources[strings.ToLower(gv.Group+":"+ar.Kind)] = &gvk{
groupVersion: gv,
apiResource: ar,
}
}
}

// remove lower priorities "cohabitations". cf. kubernetes/cmd/kube-apiserver/app/server.go
// (the api-server may expose a resource under several api groups, for backward compat)
for preferred, obsolete := range preferredVersions {
if _, ok := resources[preferred]; ok {
delete(resources, obsolete)
Expand All @@ -186,8 +176,9 @@ func (c *Observer) expandAndFilterAPIResources(groups []*metav1.APIResourceList)
}

func isExcluded(excluded []string, name string) bool {
lname := strings.ToLower(name)
for _, ctl := range excluded {
if strings.Compare(strings.ToLower(name), strings.ToLower(ctl)) == 0 {
if strings.Compare(lname, strings.ToLower(ctl)) == 0 {
return true
}
}
Expand Down

0 comments on commit 1899edf

Please sign in to comment.