Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Use resource.Resource throughout
Browse files Browse the repository at this point in the history
Before we had an interface representing resources, they were generally
passed around as a pair of (id string, definition []byte). We can cut
down on the number of representations of resources by just using
`resource.Resource`.

This includes returning synchronisation errors with the whole
resource, rather than simply by name. Doing so means we will be able
to better surface sync problems with individual resources.
  • Loading branch information
squaremo committed Mar 6, 2018
1 parent 26b23cd commit 920d2da
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 85 deletions.
62 changes: 31 additions & 31 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/registry"
"github.com/weaveworks/flux/resource"
"github.com/weaveworks/flux/ssh"
)

Expand All @@ -40,15 +41,23 @@ type extendedClient struct {
v1beta1batch.CronJobsGetter
}

// --- internal types for keeping track of syncing

type apiObject struct {
bytes []byte
resource.Resource
Kind string `yaml:"kind"`
Metadata struct {
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
} `yaml:"metadata"`
}

// A convenience for getting an minimal object from some bytes.
func parseObj(def []byte) (*apiObject, error) {
obj := apiObject{}
return &obj, yaml.Unmarshal(def, &obj)
}

func (o *apiObject) hasNamespace() bool {
return o.Metadata.Namespace != ""
}
Expand Down Expand Up @@ -85,32 +94,27 @@ func isAddon(obj namespacedLabeled) bool {
// --- /add ons

type changeSet struct {
nsObjs map[string][]obj
noNsObjs map[string][]obj
nsObjs map[string][]*apiObject
noNsObjs map[string][]*apiObject
}

func makeChangeSet() changeSet {
return changeSet{
nsObjs: make(map[string][]obj),
noNsObjs: make(map[string][]obj),
nsObjs: make(map[string][]*apiObject),
noNsObjs: make(map[string][]*apiObject),
}
}

func (c *changeSet) stage(cmd, id string, o *apiObject) {
func (c *changeSet) stage(cmd string, o *apiObject) {
if o.hasNamespace() {
c.nsObjs[cmd] = append(c.nsObjs[cmd], obj{id, o})
c.nsObjs[cmd] = append(c.nsObjs[cmd], o)
} else {
c.noNsObjs[cmd] = append(c.noNsObjs[cmd], obj{id, o})
c.noNsObjs[cmd] = append(c.noNsObjs[cmd], o)
}
}

type obj struct {
id string
*apiObject
}

type Applier interface {
apply(log.Logger, changeSet, cluster.SyncError)
apply(log.Logger, changeSet) cluster.SyncErrors
}

// Cluster is a handle to a Kubernetes API server.
Expand Down Expand Up @@ -217,34 +221,38 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {
logger := log.With(c.logger, "method", "Sync")

cs := makeChangeSet()
errs := cluster.SyncError{}
var errs cluster.SyncErrors
for _, action := range spec.Actions {
stages := []struct {
b []byte
res resource.Resource
cmd string
}{
{action.Delete, "delete"},
{action.Apply, "apply"},
}
for _, stage := range stages {
if len(stage.b) == 0 {
if stage.res == nil {
continue
}
obj, err := definitionObj(stage.b)
id := action.ResourceID
obj, err := parseObj(stage.res.Bytes())
if err == nil {
cs.stage(stage.cmd, id, obj)
obj.Resource = stage.res
cs.stage(stage.cmd, obj)
} else {
errs[id] = err
errs = append(errs, cluster.SyncError{Resource: stage.res, Error: err})
break
}
}
}

c.mu.Lock()
defer c.mu.Unlock()
c.applier.apply(logger, cs, errs)
if len(errs) != 0 {
if applyErrs := c.applier.apply(logger, cs); len(applyErrs) > 0 {
errs = append(errs, applyErrs...)
}

// If `nil`, errs is a cluster.SyncError(nil) rather than error(nil)
if errs != nil {
return errs
}
return nil
Expand Down Expand Up @@ -408,11 +416,3 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds {

return allImageCreds
}

// --- end cluster.Cluster

// A convenience for getting an minimal object from some bytes.
func definitionObj(bytes []byte) (*apiObject, error) {
obj := apiObject{bytes: bytes}
return &obj, yaml.Unmarshal(bytes, &obj)
}
34 changes: 25 additions & 9 deletions cluster/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,41 @@ import (

"github.com/go-kit/kit/log"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/policy"
)

type mockApplier struct {
commandRun bool
}

func (m *mockApplier) apply(_ log.Logger, c changeSet, _ cluster.SyncError) {
func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncErrors {
if len(c.nsObjs) != 0 || len(c.noNsObjs) != 0 {
m.commandRun = true
}
return nil
}

func deploymentDef(name string) []byte {
return []byte(`---
kind: Deployment
metadata:
name: ` + name)
type rsc struct {
id string
bytes []byte
}

func (r rsc) ResourceID() flux.ResourceID {
return flux.MustParseResourceID(r.id)
}

func (r rsc) Bytes() []byte {
return r.bytes
}

func (r rsc) Policy() policy.Set {
return nil
}

func (r rsc) Source() string {
return "test"
}

// ---
Expand All @@ -39,7 +56,7 @@ func setup(t *testing.T) (*Cluster, *mockApplier) {
func TestSyncNop(t *testing.T) {
kube, mock := setup(t)
if err := kube.Sync(cluster.SyncDef{}); err != nil {
t.Error(err)
t.Errorf("%#v", err)
}
if mock.commandRun {
t.Error("expected no commands run")
Expand All @@ -51,8 +68,7 @@ func TestSyncMalformed(t *testing.T) {
err := kube.Sync(cluster.SyncDef{
Actions: []cluster.SyncAction{
cluster.SyncAction{
ResourceID: "foobar",
Apply: []byte("garbage"),
Apply: rsc{"id", []byte("garbage")},
},
},
})
Expand Down
15 changes: 8 additions & 7 deletions cluster/kubernetes/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (c *Kubectl) connectArgs() []string {
return args
}

func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError) {
f := func(m map[string][]obj, cmd string, args ...string) {
func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncErrors) {
f := func(m map[string][]*apiObject, cmd string, args ...string) {
objs := m[cmd]
if len(objs) == 0 {
return
Expand All @@ -62,9 +62,9 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError)
args = append(args, cmd)
if err := c.doCommand(logger, makeMultidoc(objs), args...); err != nil {
for _, obj := range objs {
r := bytes.NewReader(obj.bytes)
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs[obj.id] = err
errs = append(errs, cluster.SyncError{obj.Resource, err})
}
}
}
Expand All @@ -79,7 +79,7 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError)
// first, so we run the commands the other way round.
f(cs.noNsObjs, "apply", "--namespace", "default")
f(cs.nsObjs, "apply")

return errs
}

func (c *Kubectl) doCommand(logger log.Logger, r io.Reader, args ...string) error {
Expand All @@ -101,10 +101,11 @@ func (c *Kubectl) doCommand(logger log.Logger, r io.Reader, args ...string) erro
return err
}

func makeMultidoc(objs []obj) *bytes.Buffer {
func makeMultidoc(objs []*apiObject) *bytes.Buffer {
buf := &bytes.Buffer{}
for _, obj := range objs {
buf.WriteString("\n---\n" + string(obj.bytes))
buf.WriteString("\n---\n")
buf.Write(obj.Bytes())
}
return buf
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// the source to learn about them.
func updatePodController(def []byte, container string, newImageID image.Ref) ([]byte, error) {
// Sanity check
obj, err := definitionObj(def)
obj, err := parseObj(def)
if err != nil {
return nil, err
}
Expand Down
32 changes: 15 additions & 17 deletions cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,35 @@ package cluster

import (
"strings"

"github.com/weaveworks/flux/resource"
)

// Definitions for use in synchronising a cluster with a git repo.

// Yep, resources are defined by opaque bytes. It's up to the cluster
// at the other end to do the right thing.
type ResourceDef []byte

// The action(s) to take on a particular resource.
// This should just be done in order, i.e.,:
// 1. delete if something in Delete
// 2. apply if something in Apply
// SyncAction represents either the deletion or application (create or
// update) of a resource.
type SyncAction struct {
// The ID is just a handle for labeling any error. No other
// meaning is attached to it.
ResourceID string
Delete ResourceDef
Apply ResourceDef
Delete resource.Resource // ) one of these
Apply resource.Resource // )
}

type SyncDef struct {
// The actions to undertake
Actions []SyncAction
}

type SyncError map[string]error
type SyncError struct {
resource.Resource
Error error
}

type SyncErrors []SyncError

func (err SyncError) Error() string {
func (err SyncErrors) Error() string {
var errs []string
for id, e := range err {
errs = append(errs, id+": "+e.Error())
for _, e := range err {
errs = append(errs, e.ResourceID().String()+": "+e.Error.Error())
}
return strings.Join(errs, "; ")
}
6 changes: 2 additions & 4 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Reso
}
if _, ok := repoResources[id]; !ok {
sync.Actions = append(sync.Actions, cluster.SyncAction{
ResourceID: id,
Delete: res.Bytes(),
Delete: res,
})
}
}
Expand All @@ -73,7 +72,6 @@ func prepareSyncApply(logger log.Logger, clusterResources map[string]resource.Re
}
}
sync.Actions = append(sync.Actions, cluster.SyncAction{
ResourceID: id,
Apply: res.Bytes(),
Apply: res,
})
}
Loading

0 comments on commit 920d2da

Please sign in to comment.