Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Let fluxd answer queries without a git repo supplied #962

Merged
merged 4 commits into from
Mar 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions api/v6/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,24 @@ type ImageStatus struct {
Containers []Container
}

// ReadOnlyReason enumerates the reasons that a controller is
// considered read-only. The zero value is considered "OK", since the
// zero value is what prior versions of the daemon will effectively
// send.
type ReadOnlyReason string

const (
ReadOnlyOK ReadOnlyReason = ""
ReadOnlyMissing ReadOnlyReason = "NotInRepo"
ReadOnlySystem ReadOnlyReason = "System"
ReadOnlyNoRepo ReadOnlyReason = "NoRepo"
ReadOnlyNotReady ReadOnlyReason = "NotReady"
)

type ControllerStatus struct {
ID flux.ResourceID
Containers []Container
ReadOnly ReadOnlyReason
Status string
Automated bool
Locked bool
Expand Down
4 changes: 4 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type Cluster interface {
type Controller struct {
ID flux.ResourceID
Status string // A status summary for display
// Is the controller considered read-only because it's under the
// control of the platform. In the case of Kubernetes, we simply
// omit these controllers; but this may not always be the case.
IsSystem bool

Containers ContainersOrExcuse
}
Expand Down
140 changes: 50 additions & 90 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,20 @@ import (
"os"
"os/exec"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"github.com/weaveworks/go-checkpoint"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/weaveworks/flux/api/v6"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes"
"github.com/weaveworks/flux/daemon"
"github.com/weaveworks/flux/event"
"github.com/weaveworks/flux/git"
transport "github.com/weaveworks/flux/http"
"github.com/weaveworks/flux/http/client"
Expand Down Expand Up @@ -278,40 +276,6 @@ func main() {
}
}

// Indirect reference to a daemon, initially of the NotReady variety
notReadyDaemon := daemon.NewNotReadyDaemon(version, k8s, v6.GitRemoteConfig{
URL: *gitURL,
Branch: *gitBranch,
Path: *gitPath,
})
daemonRef := daemon.NewRef(notReadyDaemon)

var eventWriter event.EventWriter
{
// Connect to fluxsvc if given an upstream address
if *upstreamURL != "" {
upstreamLogger := log.With(logger, "component", "upstream")
upstreamLogger.Log("URL", *upstreamURL)
upstream, err := daemonhttp.NewUpstream(
&http.Client{Timeout: 10 * time.Second},
fmt.Sprintf("fluxd/%v", version),
client.Token(*token),
transport.NewUpstreamRouter(),
*upstreamURL,
remote.NewErrorLoggingUpstreamServer(daemonRef, upstreamLogger),
upstreamLogger,
)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
eventWriter = upstream
defer upstream.Close()
} else {
logger.Log("upstream", "no upstream URL given")
}
}

// Mechanical components.

// When we can receive from this channel, it indicates that we
Expand All @@ -337,23 +301,13 @@ func main() {
shutdownWg.Wait()
}()

// HTTP transport component, for metrics
go func() {
mux := http.DefaultServeMux
mux.Handle("/metrics", promhttp.Handler())
handler := daemonhttp.NewHandler(daemonRef, daemonhttp.NewRouter())
mux.Handle("/api/flux/", http.StripPrefix("/api/flux", handler))
logger.Log("addr", *listenAddr)
errc <- http.ListenAndServe(*listenAddr, mux)
}()

// Checkpoint: we want to include the fact of whether the daemon
// was given a Git repo it could clone; but the expected scenario
// is that it will have been set up already, and we don't want to
// report anything before seeing if it works. So, don't start
// until we have failed or succeeded.
var checker *checkpoint.Checker
updateCheckLogger := log.With(logger, "component", "checkpoint")
checkForUpdates(clusterVersion, strconv.FormatBool(*gitURL != ""), updateCheckLogger)

gitRemote := git.Remote{URL: *gitURL}
gitConfig := git.Config{
Expand All @@ -368,48 +322,24 @@ func main() {

repo := git.NewRepo(gitRemote)
{

// If there's no URL here, we will not be able to do anything else.
if gitRemote.URL == "" {
checker = checkForUpdates(clusterVersion, "false", updateCheckLogger)
return
}

shutdownWg.Add(1)
go func() {
errc <- repo.Start(shutdown, shutdownWg)
}()
for {
status, err := repo.Status()
logger.Log("repo", repo.Origin().URL, "status", status, "err", err)
notReadyDaemon.UpdateStatus(status, err)

if status == git.RepoReady {
checker = checkForUpdates(clusterVersion, "true", updateCheckLogger)
logger.Log("working-dir", repo.Dir(),
"user", *gitUser,
"email", *gitEmail,
"sync-tag", *gitSyncTag,
"notes-ref", *gitNotesRef,
"set-author", *gitSetAuthor)
break
}

if checker == nil {
checker = checkForUpdates(clusterVersion, "false", updateCheckLogger)
}

tryAgain := time.NewTimer(10 * time.Second)
select {
case err := <-errc:
go func() { errc <- err }()
return
case <-tryAgain.C:
continue
err := repo.Start(shutdown, shutdownWg)
if err != nil {
errc <- err
}
}
}()
}

logger.Log(
"url", *gitURL,
"user", *gitUser,
"email", *gitEmail,
"sync-tag", *gitSyncTag,
"notes-ref", *gitNotesRef,
"set-author", *gitSetAuthor,
)

var jobs *job.Queue
{
jobs = job.NewQueue(shutdown, shutdownWg)
Expand All @@ -425,14 +355,38 @@ func main() {
GitConfig: gitConfig,
Jobs: jobs,
JobStatusCache: &job.StatusCache{Size: 100},

EventWriter: eventWriter,
Logger: log.With(logger, "component", "daemon"), LoopVars: &daemon.LoopVars{
Logger: log.With(logger, "component", "daemon"),
LoopVars: &daemon.LoopVars{

This comment was marked as abuse.

This comment was marked as abuse.

SyncInterval: *gitPollInterval,
RegistryPollInterval: *registryPollInterval,
},
}

{
// Connect to fluxsvc if given an upstream address
if *upstreamURL != "" {
upstreamLogger := log.With(logger, "component", "upstream")
upstreamLogger.Log("URL", *upstreamURL)
upstream, err := daemonhttp.NewUpstream(
&http.Client{Timeout: 10 * time.Second},
fmt.Sprintf("fluxd/%v", version),
client.Token(*token),
transport.NewUpstreamRouter(),
*upstreamURL,
remote.NewErrorLoggingUpstreamServer(daemon, upstreamLogger),
upstreamLogger,
)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
daemon.EventWriter = upstream
defer upstream.Close()
} else {
logger.Log("upstream", "no upstream URL given")
}
}

shutdownWg.Add(1)
go daemon.Loop(shutdown, shutdownWg, log.With(logger, "component", "sync-loop"))

Expand All @@ -441,8 +395,14 @@ func main() {
shutdownWg.Add(1)
go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, imageCreds)

// Update daemonRef so that upstream and handlers point to fully working daemon
daemonRef.UpdateServer(daemon)
go func() {
mux := http.DefaultServeMux
mux.Handle("/metrics", promhttp.Handler())
handler := daemonhttp.NewHandler(daemon, daemonhttp.NewRouter())
mux.Handle("/api/flux/", http.StripPrefix("/api/flux", handler))
logger.Log("addr", *listenAddr)
errc <- http.ListenAndServe(*listenAddr, mux)
}()

// Fall off the end, into the waiting procedure.
}
20 changes: 18 additions & 2 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,37 @@ func (d *Daemon) ListServices(ctx context.Context, namespace string) ([]v6.Contr
}

var services policy.ResourceMap
var globalReadOnly v6.ReadOnlyReason
err = d.WithClone(ctx, func(checkout *git.Checkout) error {
var err error
services, err = d.Manifests.ServicesWithPolicies(checkout.ManifestDir())
return err
})
if err != nil {
switch {

This comment was marked as abuse.

case err == git.ErrNotReady:
globalReadOnly = v6.ReadOnlyNotReady
case err == git.ErrNoConfig:
globalReadOnly = v6.ReadOnlyNoRepo
case err != nil:
return nil, errors.Wrap(err, "getting service policies")
}

var res []v6.ControllerStatus
for _, service := range clusterServices {
policies := services[service.ID]
var readOnly v6.ReadOnlyReason
policies, ok := services[service.ID]
switch {
case globalReadOnly != "":
readOnly = globalReadOnly
case !ok:
readOnly = v6.ReadOnlyMissing
case service.IsSystem:
readOnly = v6.ReadOnlySystem
}
res = append(res, v6.ControllerStatus{
ID: service.ID,
Containers: containers2containers(service.ContainersOrNil()),
ReadOnly: readOnly,
Status: service.Status,
Automated: policies.Contains(policy.Automated),
Locked: policies.Contains(policy.Locked),
Expand Down
103 changes: 0 additions & 103 deletions daemon/notready.go

This file was deleted.

Loading