Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Add Prometheus metrics exporter to helm-op #1653

Merged
merged 2 commits into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions chart/flux/templates/helm-operator-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ spec:
- name: flux-helm-operator
image: "{{ .Values.helmOperator.repository }}:{{ .Values.helmOperator.tag }}"
imagePullPolicy: {{ .Values.helmOperator.pullPolicy }}
ports:
- name: http
containerPort: 3030
volumeMounts:
{{- if .Values.ssh.known_hosts }}
- name: sshdir
Expand Down
30 changes: 13 additions & 17 deletions cmd/helm-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
ifinformers "github.com/weaveworks/flux/integrations/client/informers/externalversions"
fluxhelm "github.com/weaveworks/flux/integrations/helm"
"github.com/weaveworks/flux/integrations/helm/chartsync"
daemonhttp "github.com/weaveworks/flux/integrations/helm/http/daemon"
"github.com/weaveworks/flux/integrations/helm/operator"
"github.com/weaveworks/flux/integrations/helm/release"
"github.com/weaveworks/flux/integrations/helm/status"
)

var (
fs *pflag.FlagSet
err error
logger log.Logger

versionFlag *bool
Expand Down Expand Up @@ -77,6 +77,8 @@ func init() {
kubeconfig = fs.String("kubeconfig", "", "path to a kubeconfig; required if out-of-cluster")
master = fs.String("master", "", "address of the Kubernetes API server; overrides any value in kubeconfig; required if out-of-cluster")

listenAddr = fs.StringP("listen", "l", ":3030", "Listen address where /metrics and API will be served")

tillerIP = fs.String("tiller-ip", "", "Tiller IP address; required if run out-of-cluster")
tillerPort = fs.String("tiller-port", "", "Tiller port; required if run out-of-cluster")
tillerNamespace = fs.String("tiller-namespace", "kube-system", "Tiller namespace")
Expand All @@ -99,30 +101,30 @@ func init() {
}

func main() {
// Stop glog complaining
// set glog output to stderr
flag.CommandLine.Parse([]string{"-logtostderr"})
// Now do our own
fs.Parse(os.Args)

if *versionFlag {
println(version)
os.Exit(0)
}

// LOGGING ------------------------------------------------------------------------------
// init go-kit log
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}

// SHUTDOWN ----------------------------------------------------------------------------
// error channel
errc := make(chan error)

// Shutdown trigger for goroutines
// shutdown triggers
shutdown := make(chan struct{})
shutdownWg := &sync.WaitGroup{}

// wait for SIGTERM
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -137,7 +139,6 @@ func main() {

mainLogger := log.With(logger, "component", "helm-operator")

// CLUSTER ACCESS -----------------------------------------------------------------------
cfg, err := clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
if err != nil {
mainLogger.Log("error", fmt.Sprintf("Error building kubeconfig: %v", err))
Expand All @@ -150,15 +151,12 @@ func main() {
os.Exit(1)
}

// CUSTOM RESOURCES CLIENT --------------------------------------------------------------
ifClient, err := clientset.NewForConfig(cfg)
if err != nil {
mainLogger.Log("error", fmt.Sprintf("Error building integrations clientset: %v", err))
//errc <- fmt.Errorf("Error building integrations clientset: %v", err)
os.Exit(1)
}

// HELM ---------------------------------------------------------------------------------
helmClient := fluxhelm.ClientSetup(log.With(logger, "component", "helm"), kubeClient, fluxhelm.TillerOptions{
Host: *tillerIP,
Port: *tillerPort,
Expand All @@ -178,27 +176,25 @@ func main() {

// release instance is needed during the sync of Charts changes and during the sync of HelmRelease changes
rel := release.New(log.With(logger, "component", "release"), helmClient)
// CHARTS CHANGES SYNC ------------------------------------------------------------------
chartSync := chartsync.New(log.With(logger, "component", "chartsync"),
chartsync.Polling{Interval: *chartsSyncInterval},
chartsync.Clients{KubeClient: *kubeClient, IfClient: *ifClient},
rel, chartsync.Config{LogDiffs: *logReleaseDiffs, UpdateDeps: *updateDependencies, GitTimeout: *gitTimeout})
chartSync.Run(shutdown, errc, shutdownWg)

// OPERATOR - CUSTOM RESOURCE CHANGE SYNC -----------------------------------------------
// CUSTOM RESOURCES CACHING SETUP -------------------------------------------------------
// SharedInformerFactory sets up informer, that maps resource type to a cache shared informer.
// operator attaches event handler to the informer and syncs the informer cache
ifInformerFactory := ifinformers.NewSharedInformerFactory(ifClient, 30*time.Second)
// Reference to shared index informers for the HelmRelease
fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases()

// start FluxRelease informer
opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, chartSync)
// Starts handling k8s events related to the given resource kind
go ifInformerFactory.Start(shutdown)

checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint"))

// start HTTP server
go daemonhttp.ListenAndServe(*listenAddr, log.With(logger, "component", "daemonhttp"), shutdown)

// start operator
if err = opr.Run(1, shutdown, shutdownWg); err != nil {
msg := fmt.Sprintf("Failure to run controller: %s", err.Error())
logger.Log("error", msg)
Expand Down
7 changes: 6 additions & 1 deletion deploy-helm/helm-operator-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ spec:
metadata:
labels:
name: flux-helm-operator
annotations:
prometheus.io/scrape: "true"
spec:
serviceAccount: flux
serviceAccountName: flux
volumes:
# The following volume is for using a customised known_hosts file,
# which you will need to do if you host your own git repo rather
Expand Down Expand Up @@ -62,6 +64,9 @@ spec:
# and replace the tag here.
image: quay.io/weaveworks/helm-operator:0.5.2
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 3030
resources:
requests:
cpu: 50m
Expand Down
48 changes: 48 additions & 0 deletions integrations/helm/http/daemon/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package daemon

import (
"context"
"fmt"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"time"
)

// ListenAndServe starts a HTTP server instrumented with Prometheus on the specified address
func ListenAndServe(listenAddr string, logger log.Logger, stopCh <-chan struct{}) {
mux := http.DefaultServeMux
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})

srv := &http.Server{
Addr: listenAddr,
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 1 * time.Minute,
IdleTimeout: 15 * time.Second,
}

logger.Log("info", fmt.Sprintf("Starting HTTP server on %s", listenAddr))

// run server in background
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
logger.Log("error", fmt.Sprintf("HTTP server crashed %v", err))
}
}()

// wait for close signal and attempt graceful shutdown
<-stopCh
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := srv.Shutdown(ctx); err != nil {
logger.Log("warn", fmt.Sprintf("HTTP server graceful shutdown failed %v", err))
} else {
logger.Log("info", "HTTP server stopped")
}
}