Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Report sync errors including source file
Browse files Browse the repository at this point in the history
Include an error report in the sync notification, with resources
v. errors (with the latter coming from `kubectl` most likely).

The most useful bit of information when a resource fails to sync --
more useful than the error from kubectl, even -- is the file that had
a problem. Include that in the notification.

Secondarily: to avoid having a long, tmpfile path in messages, make
the source of resources relative to the repo.
  • Loading branch information
squaremo committed Mar 6, 2018
1 parent 920d2da commit 65d88d5
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 40 deletions.
7 changes: 4 additions & 3 deletions cluster/kubernetes/files.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package kubernetes

import (
"path/filepath"

"github.com/pkg/errors"

"github.com/weaveworks/flux"

"github.com/weaveworks/flux/cluster/kubernetes/resource"
)

Expand All @@ -13,7 +14,7 @@ import (
// specified namespace and name) to the paths of resource definition
// files.
func (c *Manifests) FindDefinedServices(path string) (map[flux.ResourceID][]string, error) {
objects, err := resource.Load(path)
objects, err := resource.Load(path, path)
if err != nil {
return nil, errors.Wrap(err, "loading resources")
}
Expand All @@ -23,7 +24,7 @@ func (c *Manifests) FindDefinedServices(path string) (map[flux.ResourceID][]stri
id := obj.ResourceID()
_, kind, _ := id.Components()
if _, ok := resourceKinds[kind]; ok {
result[id] = append(result[id], obj.Source())
result[id] = append(result[id], filepath.Join(path, obj.Source()))
}
}
return result, nil
Expand Down
6 changes: 3 additions & 3 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *changeSet) stage(cmd string, o *apiObject) {
}

type Applier interface {
apply(log.Logger, changeSet) cluster.SyncErrors
apply(log.Logger, changeSet) cluster.SyncError
}

// Cluster is a handle to a Kubernetes API server.
Expand Down Expand Up @@ -221,7 +221,7 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {
logger := log.With(c.logger, "method", "Sync")

cs := makeChangeSet()
var errs cluster.SyncErrors
var errs cluster.SyncError
for _, action := range spec.Actions {
stages := []struct {
res resource.Resource
Expand All @@ -239,7 +239,7 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {
obj.Resource = stage.res
cs.stage(stage.cmd, obj)
} else {
errs = append(errs, cluster.SyncError{Resource: stage.res, Error: err})
errs = append(errs, cluster.ResourceError{Resource: stage.res, Error: err})
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type mockApplier struct {
commandRun bool
}

func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncErrors {
func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncError {
if len(c.nsObjs) != 0 || len(c.noNsObjs) != 0 {
m.commandRun = true
}
Expand Down
4 changes: 2 additions & 2 deletions cluster/kubernetes/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ type Manifests struct {

// FindDefinedServices implementation in files.go

func (c *Manifests) LoadManifests(paths ...string) (map[string]resource.Resource, error) {
return kresource.Load(paths...)
func (c *Manifests) LoadManifests(base, first string, rest ...string) (map[string]resource.Resource, error) {
return kresource.Load(base, first, rest...)
}

func (c *Manifests) ParseManifests(allDefs []byte) (map[string]resource.Resource, error) {
Expand Down
4 changes: 2 additions & 2 deletions cluster/kubernetes/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *Kubectl) connectArgs() []string {
return args
}

func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncErrors) {
func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError) {
f := func(m map[string][]*apiObject, cmd string, args ...string) {
objs := m[cmd]
if len(objs) == 0 {
Expand All @@ -64,7 +64,7 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError
for _, obj := range objs {
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs = append(errs, cluster.SyncError{obj.Resource, err})
errs = append(errs, cluster.ResourceError{obj.Resource, err})
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions cluster/kubernetes/resource/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
// Load takes paths to directories or files, and creates an object set
// based on the file(s) therein. Resources are named according to the
// file content, rather than the file name of directory structure.
func Load(roots ...string) (map[string]resource.Resource, error) {
func Load(base, atLeastOne string, more ...string) (map[string]resource.Resource, error) {
roots := append([]string{atLeastOne}, more...)
objs := map[string]resource.Resource{}
for _, root := range roots {
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
Expand All @@ -27,13 +28,17 @@ func Load(roots ...string) (map[string]resource.Resource, error) {
if err != nil {
return errors.Wrapf(err, "reading file at %q", path)
}
docsInFile, err := ParseMultidoc(bytes, path)
source, err := filepath.Rel(base, path)
if err != nil {
return errors.Wrapf(err, "finding relative path for %q", path)
}
docsInFile, err := ParseMultidoc(bytes, source)
if err != nil {
return errors.Wrapf(err, "parsing file at %q", path)
}
for id, obj := range docsInFile {
if alreadyDefined, ok := objs[id]; ok {
return fmt.Errorf(`resource '%s' defined more than once (in %s and %s)`, id, alreadyDefined.Source(), path)
return fmt.Errorf(`resource '%s' defined more than once (in %s and %s)`, id, alreadyDefined.Source(), source)
}
objs[id] = obj
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/resource/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestLoadSome(t *testing.T) {
if err := testfiles.WriteTestFiles(dir); err != nil {
t.Fatal(err)
}
objs, err := Load(dir)
objs, err := Load(dir, dir)
if err != nil {
t.Error(err)
}
Expand Down
7 changes: 5 additions & 2 deletions cluster/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ type Manifests interface {
// Update the definitions in a manifests bytes according to the
// spec given.
UpdateDefinition(def []byte, container string, newImageID image.Ref) ([]byte, error)
// Load all the resource manifests under the path given
LoadManifests(paths ...string) (map[string]resource.Resource, error)
// Load all the resource manifests under the path given. `baseDir`
// is used to relativise the paths, which are supplied as absolute
// paths to directories or files; at least one path must be
// supplied.
LoadManifests(baseDir, first string, rest ...string) (map[string]resource.Resource, error)
// Parse the manifests given in an exported blob
ParseManifests([]byte) (map[string]resource.Resource, error)
// UpdatePolicies modifies a manifest to apply the policy update specified
Expand Down
6 changes: 3 additions & 3 deletions cluster/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Mock struct {
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
FindDefinedServicesFunc func(path string) (map[flux.ResourceID][]string, error)
UpdateDefinitionFunc func(def []byte, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(paths ...string) (map[string]resource.Resource, error)
LoadManifestsFunc func(base, first string, rest ...string) (map[string]resource.Resource, error)
ParseManifestsFunc func([]byte) (map[string]resource.Resource, error)
UpdateManifestFunc func(path, resourceID string, f func(def []byte) ([]byte, error)) error
UpdatePoliciesFunc func([]byte, policy.Update) ([]byte, error)
Expand Down Expand Up @@ -57,8 +57,8 @@ func (m *Mock) UpdateDefinition(def []byte, container string, newImageID image.R
return m.UpdateDefinitionFunc(def, container, newImageID)
}

func (m *Mock) LoadManifests(paths ...string) (map[string]resource.Resource, error) {
return m.LoadManifestsFunc(paths...)
func (m *Mock) LoadManifests(base, first string, rest ...string) (map[string]resource.Resource, error) {
return m.LoadManifestsFunc(base, first, rest...)
}

func (m *Mock) ParseManifests(def []byte) (map[string]resource.Resource, error) {
Expand Down
10 changes: 5 additions & 5 deletions cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@ import (
// SyncAction represents either the deletion or application (create or
// update) of a resource.
type SyncAction struct {
Delete resource.Resource // ) one of these
Apply resource.Resource // )
Delete resource.Resource // ) one of these
Apply resource.Resource // )
}

type SyncDef struct {
// The actions to undertake
Actions []SyncAction
}

type SyncError struct {
type ResourceError struct {
resource.Resource
Error error
}

type SyncErrors []SyncError
type SyncError []ResourceError

func (err SyncErrors) Error() string {
func (err SyncError) Error() string {
var errs []string
for _, e := range err {
errs = append(errs, e.ResourceID().String()+": "+e.Error.Error())
Expand Down
2 changes: 1 addition & 1 deletion daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func TestDaemon_PolicyUpdate(t *testing.T) {
return false
}
defer co.Clean()
m, err := d.Manifests.LoadManifests(co.ManifestDir())
m, err := d.Manifests.LoadManifests(co.Dir(), co.ManifestDir())
if err != nil {
t.Fatalf("Error: %s", err.Error())
}
Expand Down
25 changes: 15 additions & 10 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/event"
"github.com/weaveworks/flux/git"
fluxmetrics "github.com/weaveworks/flux/metrics"
Expand Down Expand Up @@ -187,21 +188,24 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
}

// Get a map of all resources defined in the repo
allResources, err := d.Manifests.LoadManifests(working.ManifestDir())
allResources, err := d.Manifests.LoadManifests(working.Dir(), working.ManifestDir())
if err != nil {
return errors.Wrap(err, "loading resources from repo")
}

var syncErrors map[string]string
// TODO supply deletes argument from somewhere (command-line?)
if err := fluxsync.Sync(d.Manifests, allResources, d.Cluster, false, logger); err != nil {
logger.Log("err", err)
// TODO(michael): we should distinguish between "fully mostly
// succeeded" and "failed utterly", since we want to abandon
// this and not move the tag (and send a SyncFail event
// upstream?), if the latter. For now, it's presumed that any
// error returned is at worst a minor, partial failure (e.g.,
// a small number of resources failed to sync, for unimportant
// reasons)
switch syncerr := err.(type) {
case cluster.SyncError:
syncErrors = map[string]string{}
for _, e := range syncerr {
syncErrors[fmt.Sprintf("%s (%s)", e.ResourceID(), e.Source())] = e.Error.Error()
}
default:
return err
}
}

// update notes and emit events for applied commits
Expand Down Expand Up @@ -232,9 +236,9 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
} else {
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
changedFiles, err := working.ChangedFiles(ctx, oldTagRev)
if err == nil {
if err == nil && len(changedFiles) > 0 {
// We had some changed files, we're syncing a diff
changedResources, err = d.Manifests.LoadManifests(changedFiles...)
changedResources, err = d.Manifests.LoadManifests(working.Dir(), changedFiles[0], changedFiles[1:]...)
}
cancel()
if err != nil {
Expand Down Expand Up @@ -361,6 +365,7 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
Commits: cs,
InitialSync: initialSync,
Includes: includes,
Errors: syncErrors,
},
}); err != nil {
logger.Log("err", err)
Expand Down
2 changes: 2 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ type SyncEventMetadata struct {
// policy changes, and "other" (meaning things we didn't commit
// ourselves)
Includes map[string]bool `json:"includes,omitempty"`
// Per-resource errors
Errors map[string]string `json:"errors,omitempty"`
// `true` if we have no record of having synced before
InitialSync bool `json:"initialSync,omitempty"`
}
Expand Down
7 changes: 6 additions & 1 deletion git/working.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@ func (c *Checkout) Clean() {
}
}

// ManifestDir returns a path to where the files are
// Dir returns the path to the repo
func (c *Checkout) Dir() string {
return c.dir
}

// ManifestDir returns the path to the manifests files
func (c *Checkout) ManifestDir() string {
return filepath.Join(c.dir, c.config.Path)
}
Expand Down
6 changes: 3 additions & 3 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestSync(t *testing.T) {
manifests := &kubernetes.Manifests{}
var clus cluster.Cluster = &syncCluster{mockCluster, map[string][]byte{}}

resources, err := manifests.LoadManifests(checkout.ManifestDir())
resources, err := manifests.LoadManifests(checkout.Dir(), checkout.ManifestDir())
if err != nil {
t.Fatal(err)
}
Expand All @@ -56,7 +56,7 @@ func TestSync(t *testing.T) {
break
}

resources, err = manifests.LoadManifests(checkout.ManifestDir())
resources, err = manifests.LoadManifests(checkout.Dir(), checkout.ManifestDir())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func checkClusterMatchesFiles(t *testing.T, m cluster.Manifests, c cluster.Clust
if err != nil {
t.Fatal(err)
}
files, err := m.LoadManifests(dir)
files, err := m.LoadManifests(dir, dir)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 65d88d5

Please sign in to comment.