Skip to content

Commit

Permalink
Support custom watches on controller
Browse files Browse the repository at this point in the history
  • Loading branch information
thisisnotashwin committed Aug 15, 2023
1 parent 0e94f48 commit a4138fa
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .changelog/18439.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
Support custom watches on the Consul Controller framework.
```
64 changes: 57 additions & 7 deletions internal/controller/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource"
)
Expand Down Expand Up @@ -46,6 +47,22 @@ func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMap
return c
}

// WithCustomWatch adds a custom watch on the given type/dependency to the controller. custom mapper
// will be called to determine which resources must be reconciled as a result of
// an event.
func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller {
if source == nil {
panic("source must not be nil")
}

if mapper == nil {
panic("mapper must not be nil")
}

c.customWatches = append(c.customWatches, customWatch{source, mapper})
return c
}

// WithLogger changes the controller's logger.
func (c Controller) WithLogger(logger hclog.Logger) Controller {
if logger == nil {
Expand Down Expand Up @@ -107,20 +124,53 @@ func (c Controller) backoff() (time.Duration, time.Duration) {
// Use the builder methods in this package (starting with ForType) to construct
// a controller, and then pass it to a Manager to be executed.
type Controller struct {
managedType *pbresource.Type
reconciler Reconciler
logger hclog.Logger
watches []watch
baseBackoff time.Duration
maxBackoff time.Duration
placement Placement
managedType *pbresource.Type
reconciler Reconciler
logger hclog.Logger
watches []watch
customWatches []customWatch
baseBackoff time.Duration
maxBackoff time.Duration
placement Placement
}

type watch struct {
watchedType *pbresource.Type
mapper DependencyMapper
}

// Watch is responsible for watching for custom events from source and adding them to
// the event queue.
func (s *Source) Watch(ctx context.Context, add func(e Event)) error {
for {
select {
case <-ctx.Done():
return nil
case evt := <-s.Source:
add(evt)
}
}
}

// Source is used as a generic source of events. This can be used when events aren't coming from resources
// stored by the resource API.
type Source struct {
Source <-chan Event
}

type Event struct {
Obj queue.ItemType
}

func (e Event) Key() string {
return e.Obj.Key()
}

type customWatch struct {
source *Source
mapper CustomDependencyMapper
}

// Request represents a request to reconcile the resource with the given ID.
type Request struct {
// ID of the resource that needs to be reconciled.
Expand Down
45 changes: 45 additions & 0 deletions internal/controller/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,19 @@ func TestController_API(t *testing.T) {
rec := newTestReconciler()
client := svctest.RunResourceService(t, demo.RegisterTypes)

concertsChan := make(chan controller.Event)
concertSource := &controller.Source{Source: concertsChan}
concertMapper := func(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) {
artistID := event.Obj.(*Concert).artistID
var requests []controller.Request
requests = append(requests, controller.Request{ID: artistID})
return requests, nil
}

ctrl := controller.
ForType(demo.TypeV2Artist).
WithWatch(demo.TypeV2Album, controller.MapOwner).
WithCustomWatch(concertSource, concertMapper).
WithBackoff(10*time.Millisecond, 100*time.Millisecond).
WithReconciler(rec)

Expand Down Expand Up @@ -69,6 +79,32 @@ func TestController_API(t *testing.T) {
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
})

t.Run("custom watched resource type", func(t *testing.T) {
res, err := demo.GenerateV2Artist()
require.NoError(t, err)

rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
require.NoError(t, err)

req := rec.wait(t)
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)

rec.expectNoRequest(t, 500*time.Millisecond)

concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: rsp.Resource.Id}}

watchedReq := rec.wait(t)
prototest.AssertDeepEqual(t, req.ID, watchedReq.ID)

otherArtist, err := demo.GenerateV2Artist()
require.NoError(t, err)

concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: otherArtist.Id}}

watchedReq = rec.wait(t)
prototest.AssertDeepEqual(t, otherArtist.Id, watchedReq.ID)
})

t.Run("error retries", func(t *testing.T) {
rec.failNext(errors.New("KABOOM"))

Expand Down Expand Up @@ -266,3 +302,12 @@ func testContext(t *testing.T) context.Context {

return ctx
}

type Concert struct {
name string
artistID *pbresource.ID
}

func (c Concert) Key() string {
return c.name
}
59 changes: 58 additions & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (c *controllerRunner) run(ctx context.Context) error {
})

for _, watch := range c.ctrl.watches {
watch := watch
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)

// Watched Type Events → Mapper Queue
Expand All @@ -57,6 +56,22 @@ func (c *controllerRunner) run(ctx context.Context) error {
})
}

for _, customWatch := range c.ctrl.customWatches {
customMapQueue := runQueue[Event](groupCtx, c.ctrl)

// Custom Events → Mapper Queue
group.Go(func() error {
return customWatch.source.Watch(groupCtx, func(e Event) {
customMapQueue.Add(e)
})
})

// Mapper Queue → Mapper → Reconciliation Queue
group.Go(func() error {
return c.runCustomMapper(groupCtx, customWatch, customMapQueue, recQueue)
})
}

// Reconciliation Queue → Reconciler
group.Go(func() error {
return c.runReconciler(groupCtx, recQueue)
Expand Down Expand Up @@ -136,6 +151,48 @@ func (c *controllerRunner) runMapper(
}
}

func (c *controllerRunner) runCustomMapper(
ctx context.Context,
w customWatch,
from queue.WorkQueue[Event],
to queue.WorkQueue[Request],
) error {
logger := c.logger.With("watched_event", w.source)

for {
item, shutdown := from.Get()
if shutdown {
return nil
}

var reqs []Request
err := c.handlePanic(func() error {
var err error
reqs, err = w.mapper(ctx, c.runtime(), item)
return err
})
if err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}

from.Forget(item)
from.Done(item)
}
}

func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {
for {
req, shutdown := queue.Get()
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/dependency_mappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ type DependencyMapper func(
res *pbresource.Resource,
) ([]Request, error)

type CustomDependencyMapper func(
ctx context.Context,
rt Runtime,
event Event,
) ([]Request, error)

// MapOwner implements a DependencyMapper that returns the updated resource's owner.
func MapOwner(_ context.Context, _ Runtime, res *pbresource.Resource) ([]Request, error) {
var reqs []Request
Expand Down

0 comments on commit a4138fa

Please sign in to comment.