Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #962 from weaveworks/feature/skipgit
Browse files Browse the repository at this point in the history
Let fluxd answer queries without a git repo supplied
  • Loading branch information
squaremo authored Mar 19, 2018
2 parents a3149f3 + d5bfb67 commit 9da7c57
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 289 deletions.
15 changes: 15 additions & 0 deletions api/v6/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,24 @@ type ImageStatus struct {
Containers []Container
}

// ReadOnlyReason enumerates the reasons that a controller is
// considered read-only. The zero value is considered "OK", since the
// zero value is what prior versions of the daemon will effectively
// send.
type ReadOnlyReason string

const (
ReadOnlyOK ReadOnlyReason = ""
ReadOnlyMissing ReadOnlyReason = "NotInRepo"
ReadOnlySystem ReadOnlyReason = "System"
ReadOnlyNoRepo ReadOnlyReason = "NoRepo"
ReadOnlyNotReady ReadOnlyReason = "NotReady"
)

type ControllerStatus struct {
ID flux.ResourceID
Containers []Container
ReadOnly ReadOnlyReason
Status string
Automated bool
Locked bool
Expand Down
4 changes: 4 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type Cluster interface {
type Controller struct {
ID flux.ResourceID
Status string // A status summary for display
// Is the controller considered read-only because it's under the
// control of the platform. In the case of Kubernetes, we simply
// omit these controllers; but this may not always be the case.
IsSystem bool

Containers ContainersOrExcuse
}
Expand Down
140 changes: 50 additions & 90 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,20 @@ import (
"os"
"os/exec"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"github.com/weaveworks/go-checkpoint"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/weaveworks/flux/api/v6"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes"
"github.com/weaveworks/flux/daemon"
"github.com/weaveworks/flux/event"
"github.com/weaveworks/flux/git"
transport "github.com/weaveworks/flux/http"
"github.com/weaveworks/flux/http/client"
Expand Down Expand Up @@ -287,40 +285,6 @@ func main() {
}
}

// Indirect reference to a daemon, initially of the NotReady variety
notReadyDaemon := daemon.NewNotReadyDaemon(version, k8s, v6.GitRemoteConfig{
URL: *gitURL,
Branch: *gitBranch,
Path: *gitPath,
})
daemonRef := daemon.NewRef(notReadyDaemon)

var eventWriter event.EventWriter
{
// Connect to fluxsvc if given an upstream address
if *upstreamURL != "" {
upstreamLogger := log.With(logger, "component", "upstream")
upstreamLogger.Log("URL", *upstreamURL)
upstream, err := daemonhttp.NewUpstream(
&http.Client{Timeout: 10 * time.Second},
fmt.Sprintf("fluxd/%v", version),
client.Token(*token),
transport.NewUpstreamRouter(),
*upstreamURL,
remote.NewErrorLoggingUpstreamServer(daemonRef, upstreamLogger),
upstreamLogger,
)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
eventWriter = upstream
defer upstream.Close()
} else {
logger.Log("upstream", "no upstream URL given")
}
}

// Mechanical components.

// When we can receive from this channel, it indicates that we
Expand All @@ -346,23 +310,13 @@ func main() {
shutdownWg.Wait()
}()

// HTTP transport component, for metrics
go func() {
mux := http.DefaultServeMux
mux.Handle("/metrics", promhttp.Handler())
handler := daemonhttp.NewHandler(daemonRef, daemonhttp.NewRouter())
mux.Handle("/api/flux/", http.StripPrefix("/api/flux", handler))
logger.Log("addr", *listenAddr)
errc <- http.ListenAndServe(*listenAddr, mux)
}()

// Checkpoint: we want to include the fact of whether the daemon
// was given a Git repo it could clone; but the expected scenario
// is that it will have been set up already, and we don't want to
// report anything before seeing if it works. So, don't start
// until we have failed or succeeded.
var checker *checkpoint.Checker
updateCheckLogger := log.With(logger, "component", "checkpoint")
checkForUpdates(clusterVersion, strconv.FormatBool(*gitURL != ""), updateCheckLogger)

gitRemote := git.Remote{URL: *gitURL}
gitConfig := git.Config{
Expand All @@ -377,48 +331,24 @@ func main() {

repo := git.NewRepo(gitRemote)
{

// If there's no URL here, we will not be able to do anything else.
if gitRemote.URL == "" {
checker = checkForUpdates(clusterVersion, "false", updateCheckLogger)
return
}

shutdownWg.Add(1)
go func() {
errc <- repo.Start(shutdown, shutdownWg)
}()
for {
status, err := repo.Status()
logger.Log("repo", repo.Origin().URL, "status", status, "err", err)
notReadyDaemon.UpdateStatus(status, err)

if status == git.RepoReady {
checker = checkForUpdates(clusterVersion, "true", updateCheckLogger)
logger.Log("working-dir", repo.Dir(),
"user", *gitUser,
"email", *gitEmail,
"sync-tag", *gitSyncTag,
"notes-ref", *gitNotesRef,
"set-author", *gitSetAuthor)
break
}

if checker == nil {
checker = checkForUpdates(clusterVersion, "false", updateCheckLogger)
}

tryAgain := time.NewTimer(10 * time.Second)
select {
case err := <-errc:
go func() { errc <- err }()
return
case <-tryAgain.C:
continue
err := repo.Start(shutdown, shutdownWg)
if err != nil {
errc <- err
}
}
}()
}

logger.Log(
"url", *gitURL,
"user", *gitUser,
"email", *gitEmail,
"sync-tag", *gitSyncTag,
"notes-ref", *gitNotesRef,
"set-author", *gitSetAuthor,
)

var jobs *job.Queue
{
jobs = job.NewQueue(shutdown, shutdownWg)
Expand All @@ -434,14 +364,38 @@ func main() {
GitConfig: gitConfig,
Jobs: jobs,
JobStatusCache: &job.StatusCache{Size: 100},

EventWriter: eventWriter,
Logger: log.With(logger, "component", "daemon"), LoopVars: &daemon.LoopVars{
Logger: log.With(logger, "component", "daemon"),
LoopVars: &daemon.LoopVars{
SyncInterval: *gitPollInterval,
RegistryPollInterval: *registryPollInterval,
},
}

{
// Connect to fluxsvc if given an upstream address
if *upstreamURL != "" {
upstreamLogger := log.With(logger, "component", "upstream")
upstreamLogger.Log("URL", *upstreamURL)
upstream, err := daemonhttp.NewUpstream(
&http.Client{Timeout: 10 * time.Second},
fmt.Sprintf("fluxd/%v", version),
client.Token(*token),
transport.NewUpstreamRouter(),
*upstreamURL,
remote.NewErrorLoggingUpstreamServer(daemon, upstreamLogger),
upstreamLogger,
)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
daemon.EventWriter = upstream
defer upstream.Close()
} else {
logger.Log("upstream", "no upstream URL given")
}
}

shutdownWg.Add(1)
go daemon.Loop(shutdown, shutdownWg, log.With(logger, "component", "sync-loop"))

Expand All @@ -450,8 +404,14 @@ func main() {
shutdownWg.Add(1)
go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, imageCreds)

// Update daemonRef so that upstream and handlers point to fully working daemon
daemonRef.UpdateServer(daemon)
go func() {
mux := http.DefaultServeMux
mux.Handle("/metrics", promhttp.Handler())
handler := daemonhttp.NewHandler(daemon, daemonhttp.NewRouter())
mux.Handle("/api/flux/", http.StripPrefix("/api/flux", handler))
logger.Log("addr", *listenAddr)
errc <- http.ListenAndServe(*listenAddr, mux)
}()

// Fall off the end, into the waiting procedure.
}
20 changes: 18 additions & 2 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,37 @@ func (d *Daemon) ListServices(ctx context.Context, namespace string) ([]v6.Contr
}

var services policy.ResourceMap
var globalReadOnly v6.ReadOnlyReason
err = d.WithClone(ctx, func(checkout *git.Checkout) error {
var err error
services, err = d.Manifests.ServicesWithPolicies(checkout.ManifestDir())
return err
})
if err != nil {
switch {
case err == git.ErrNotReady:
globalReadOnly = v6.ReadOnlyNotReady
case err == git.ErrNoConfig:
globalReadOnly = v6.ReadOnlyNoRepo
case err != nil:
return nil, errors.Wrap(err, "getting service policies")
}

var res []v6.ControllerStatus
for _, service := range clusterServices {
policies := services[service.ID]
var readOnly v6.ReadOnlyReason
policies, ok := services[service.ID]
switch {
case globalReadOnly != "":
readOnly = globalReadOnly
case !ok:
readOnly = v6.ReadOnlyMissing
case service.IsSystem:
readOnly = v6.ReadOnlySystem
}
res = append(res, v6.ControllerStatus{
ID: service.ID,
Containers: containers2containers(service.ContainersOrNil()),
ReadOnly: readOnly,
Status: service.Status,
Automated: policies.Contains(policy.Automated),
Locked: policies.Contains(policy.Locked),
Expand Down
103 changes: 0 additions & 103 deletions daemon/notready.go

This file was deleted.

Loading

0 comments on commit 9da7c57

Please sign in to comment.