Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #970 from weaveworks/feature/756-include-sync-errors
Browse files Browse the repository at this point in the history
Include individual resource sync errors in Sync events
  • Loading branch information
squaremo authored Mar 12, 2018
2 parents e78dc32 + cdca842 commit 1a860b1
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 113 deletions.
7 changes: 4 additions & 3 deletions cluster/kubernetes/files.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package kubernetes

import (
"path/filepath"

"github.com/pkg/errors"

"github.com/weaveworks/flux"

"github.com/weaveworks/flux/cluster/kubernetes/resource"
)

Expand All @@ -13,7 +14,7 @@ import (
// specified namespace and name) to the paths of resource definition
// files.
func (c *Manifests) FindDefinedServices(path string) (map[flux.ResourceID][]string, error) {
objects, err := resource.Load(path)
objects, err := resource.Load(path, path)
if err != nil {
return nil, errors.Wrap(err, "loading resources")
}
Expand All @@ -23,7 +24,7 @@ func (c *Manifests) FindDefinedServices(path string) (map[flux.ResourceID][]stri
id := obj.ResourceID()
_, kind, _ := id.Components()
if _, ok := resourceKinds[kind]; ok {
result[id] = append(result[id], obj.Source())
result[id] = append(result[id], filepath.Join(path, obj.Source()))
}
}
return result, nil
Expand Down
62 changes: 31 additions & 31 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/registry"
"github.com/weaveworks/flux/resource"
"github.com/weaveworks/flux/ssh"
)

Expand All @@ -40,15 +41,23 @@ type extendedClient struct {
v1beta1batch.CronJobsGetter
}

// --- internal types for keeping track of syncing

type apiObject struct {
bytes []byte
resource.Resource
Kind string `yaml:"kind"`
Metadata struct {
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
} `yaml:"metadata"`
}

// A convenience for getting an minimal object from some bytes.
func parseObj(def []byte) (*apiObject, error) {
obj := apiObject{}
return &obj, yaml.Unmarshal(def, &obj)
}

func (o *apiObject) hasNamespace() bool {
return o.Metadata.Namespace != ""
}
Expand Down Expand Up @@ -85,32 +94,27 @@ func isAddon(obj namespacedLabeled) bool {
// --- /add ons

type changeSet struct {
nsObjs map[string][]obj
noNsObjs map[string][]obj
nsObjs map[string][]*apiObject
noNsObjs map[string][]*apiObject
}

func makeChangeSet() changeSet {
return changeSet{
nsObjs: make(map[string][]obj),
noNsObjs: make(map[string][]obj),
nsObjs: make(map[string][]*apiObject),
noNsObjs: make(map[string][]*apiObject),
}
}

func (c *changeSet) stage(cmd, id string, o *apiObject) {
func (c *changeSet) stage(cmd string, o *apiObject) {
if o.hasNamespace() {
c.nsObjs[cmd] = append(c.nsObjs[cmd], obj{id, o})
c.nsObjs[cmd] = append(c.nsObjs[cmd], o)
} else {
c.noNsObjs[cmd] = append(c.noNsObjs[cmd], obj{id, o})
c.noNsObjs[cmd] = append(c.noNsObjs[cmd], o)
}
}

type obj struct {
id string
*apiObject
}

type Applier interface {
apply(log.Logger, changeSet, cluster.SyncError)
apply(log.Logger, changeSet) cluster.SyncError
}

// Cluster is a handle to a Kubernetes API server.
Expand Down Expand Up @@ -217,34 +221,38 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {
logger := log.With(c.logger, "method", "Sync")

cs := makeChangeSet()
errs := cluster.SyncError{}
var errs cluster.SyncError
for _, action := range spec.Actions {
stages := []struct {
b []byte
res resource.Resource
cmd string
}{
{action.Delete, "delete"},
{action.Apply, "apply"},
}
for _, stage := range stages {
if len(stage.b) == 0 {
if stage.res == nil {
continue
}
obj, err := definitionObj(stage.b)
id := action.ResourceID
obj, err := parseObj(stage.res.Bytes())
if err == nil {
cs.stage(stage.cmd, id, obj)
obj.Resource = stage.res
cs.stage(stage.cmd, obj)
} else {
errs[id] = err
errs = append(errs, cluster.ResourceError{Resource: stage.res, Error: err})
break
}
}
}

c.mu.Lock()
defer c.mu.Unlock()
c.applier.apply(logger, cs, errs)
if len(errs) != 0 {
if applyErrs := c.applier.apply(logger, cs); len(applyErrs) > 0 {
errs = append(errs, applyErrs...)
}

// If `nil`, errs is a cluster.SyncError(nil) rather than error(nil)
if errs != nil {
return errs
}
return nil
Expand Down Expand Up @@ -408,11 +416,3 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds {

return allImageCreds
}

// --- end cluster.Cluster

// A convenience for getting an minimal object from some bytes.
func definitionObj(bytes []byte) (*apiObject, error) {
obj := apiObject{bytes: bytes}
return &obj, yaml.Unmarshal(bytes, &obj)
}
34 changes: 25 additions & 9 deletions cluster/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,41 @@ import (

"github.com/go-kit/kit/log"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/policy"
)

type mockApplier struct {
commandRun bool
}

func (m *mockApplier) apply(_ log.Logger, c changeSet, _ cluster.SyncError) {
func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncError {
if len(c.nsObjs) != 0 || len(c.noNsObjs) != 0 {
m.commandRun = true
}
return nil
}

func deploymentDef(name string) []byte {
return []byte(`---
kind: Deployment
metadata:
name: ` + name)
type rsc struct {
id string
bytes []byte
}

func (r rsc) ResourceID() flux.ResourceID {
return flux.MustParseResourceID(r.id)
}

func (r rsc) Bytes() []byte {
return r.bytes
}

func (r rsc) Policy() policy.Set {
return nil
}

func (r rsc) Source() string {
return "test"
}

// ---
Expand All @@ -39,7 +56,7 @@ func setup(t *testing.T) (*Cluster, *mockApplier) {
func TestSyncNop(t *testing.T) {
kube, mock := setup(t)
if err := kube.Sync(cluster.SyncDef{}); err != nil {
t.Error(err)
t.Errorf("%#v", err)
}
if mock.commandRun {
t.Error("expected no commands run")
Expand All @@ -51,8 +68,7 @@ func TestSyncMalformed(t *testing.T) {
err := kube.Sync(cluster.SyncDef{
Actions: []cluster.SyncAction{
cluster.SyncAction{
ResourceID: "foobar",
Apply: []byte("garbage"),
Apply: rsc{"id", []byte("garbage")},
},
},
})
Expand Down
4 changes: 2 additions & 2 deletions cluster/kubernetes/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ type Manifests struct {

// FindDefinedServices implementation in files.go

func (c *Manifests) LoadManifests(paths ...string) (map[string]resource.Resource, error) {
return kresource.Load(paths...)
func (c *Manifests) LoadManifests(base, first string, rest ...string) (map[string]resource.Resource, error) {
return kresource.Load(base, first, rest...)
}

func (c *Manifests) ParseManifests(allDefs []byte) (map[string]resource.Resource, error) {
Expand Down
15 changes: 8 additions & 7 deletions cluster/kubernetes/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (c *Kubectl) connectArgs() []string {
return args
}

func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError) {
f := func(m map[string][]obj, cmd string, args ...string) {
func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError) {
f := func(m map[string][]*apiObject, cmd string, args ...string) {
objs := m[cmd]
if len(objs) == 0 {
return
Expand All @@ -62,9 +62,9 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError)
args = append(args, cmd)
if err := c.doCommand(logger, makeMultidoc(objs), args...); err != nil {
for _, obj := range objs {
r := bytes.NewReader(obj.bytes)
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs[obj.id] = err
errs = append(errs, cluster.ResourceError{obj.Resource, err})
}
}
}
Expand All @@ -79,7 +79,7 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError)
// first, so we run the commands the other way round.
f(cs.noNsObjs, "apply", "--namespace", "default")
f(cs.nsObjs, "apply")

return errs
}

func (c *Kubectl) doCommand(logger log.Logger, r io.Reader, args ...string) error {
Expand All @@ -101,10 +101,11 @@ func (c *Kubectl) doCommand(logger log.Logger, r io.Reader, args ...string) erro
return err
}

func makeMultidoc(objs []obj) *bytes.Buffer {
func makeMultidoc(objs []*apiObject) *bytes.Buffer {
buf := &bytes.Buffer{}
for _, obj := range objs {
buf.WriteString("\n---\n" + string(obj.bytes))
buf.WriteString("\n---\n")
buf.Write(obj.Bytes())
}
return buf
}
Expand Down
11 changes: 8 additions & 3 deletions cluster/kubernetes/resource/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
// Load takes paths to directories or files, and creates an object set
// based on the file(s) therein. Resources are named according to the
// file content, rather than the file name of directory structure.
func Load(roots ...string) (map[string]resource.Resource, error) {
func Load(base, atLeastOne string, more ...string) (map[string]resource.Resource, error) {
roots := append([]string{atLeastOne}, more...)
objs := map[string]resource.Resource{}
for _, root := range roots {
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
Expand All @@ -32,13 +33,17 @@ func Load(roots ...string) (map[string]resource.Resource, error) {
if err != nil {
return errors.Wrapf(err, "reading file at %q", path)
}
docsInFile, err := ParseMultidoc(bytes, path)
source, err := filepath.Rel(base, path)
if err != nil {
return errors.Wrapf(err, "finding relative path for %q", path)
}
docsInFile, err := ParseMultidoc(bytes, source)
if err != nil {
return errors.Wrapf(err, "parsing file at %q", path)
}
for id, obj := range docsInFile {
if alreadyDefined, ok := objs[id]; ok {
return fmt.Errorf(`resource '%s' defined more than once (in %s and %s)`, id, alreadyDefined.Source(), path)
return fmt.Errorf(`resource '%s' defined more than once (in %s and %s)`, id, alreadyDefined.Source(), source)
}
objs[id] = obj
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/resource/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestLoadSome(t *testing.T) {
if err := testfiles.WriteTestFiles(dir); err != nil {
t.Fatal(err)
}
objs, err := Load(dir)
objs, err := Load(dir, dir)
if err != nil {
t.Error(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// the source to learn about them.
func updatePodController(def []byte, container string, newImageID image.Ref) ([]byte, error) {
// Sanity check
obj, err := definitionObj(def)
obj, err := parseObj(def)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions cluster/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ type Manifests interface {
// Update the definitions in a manifests bytes according to the
// spec given.
UpdateDefinition(def []byte, container string, newImageID image.Ref) ([]byte, error)
// Load all the resource manifests under the path given
LoadManifests(paths ...string) (map[string]resource.Resource, error)
// Load all the resource manifests under the path given. `baseDir`
// is used to relativise the paths, which are supplied as absolute
// paths to directories or files; at least one path must be
// supplied.
LoadManifests(baseDir, first string, rest ...string) (map[string]resource.Resource, error)
// Parse the manifests given in an exported blob
ParseManifests([]byte) (map[string]resource.Resource, error)
// UpdatePolicies modifies a manifest to apply the policy update specified
Expand Down
6 changes: 3 additions & 3 deletions cluster/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Mock struct {
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
FindDefinedServicesFunc func(path string) (map[flux.ResourceID][]string, error)
UpdateDefinitionFunc func(def []byte, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(paths ...string) (map[string]resource.Resource, error)
LoadManifestsFunc func(base, first string, rest ...string) (map[string]resource.Resource, error)
ParseManifestsFunc func([]byte) (map[string]resource.Resource, error)
UpdateManifestFunc func(path, resourceID string, f func(def []byte) ([]byte, error)) error
UpdatePoliciesFunc func([]byte, policy.Update) ([]byte, error)
Expand Down Expand Up @@ -57,8 +57,8 @@ func (m *Mock) UpdateDefinition(def []byte, container string, newImageID image.R
return m.UpdateDefinitionFunc(def, container, newImageID)
}

func (m *Mock) LoadManifests(paths ...string) (map[string]resource.Resource, error) {
return m.LoadManifestsFunc(paths...)
func (m *Mock) LoadManifests(base, first string, rest ...string) (map[string]resource.Resource, error) {
return m.LoadManifestsFunc(base, first, rest...)
}

func (m *Mock) ParseManifests(def []byte) (map[string]resource.Resource, error) {
Expand Down
Loading

0 comments on commit 1a860b1

Please sign in to comment.