Skip to content

Commit

Permalink
Merge pull request #170 from ulucinar/fix-167
Browse files Browse the repository at this point in the history
Add upjet runtime Prometheus metrics
  • Loading branch information
ulucinar authored Mar 8, 2023
2 parents 4a422c6 + 7a3f20a commit 5377e5d
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 58 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ require (
github.com/hashicorp/terraform-plugin-sdk/v2 v2.24.0
github.com/iancoleman/strcase v0.2.0
github.com/json-iterator/go v1.1.12
github.com/mitchellh/go-ps v1.0.0
github.com/muvaf/typewriter v0.0.0-20210910160850-80e49fe1eb32
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.2
github.com/spf13/afero v1.9.2
github.com/tmccombs/hcl2json v0.3.3
github.com/yuin/goldmark v1.4.13
Expand Down Expand Up @@ -79,7 +81,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_golang v1.12.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa1
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc=
github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package controller

import (
"context"
"time"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
Expand All @@ -14,6 +15,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/upbound/upjet/pkg/config"
"github.com/upbound/upjet/pkg/metrics"
"github.com/upbound/upjet/pkg/resource"
"github.com/upbound/upjet/pkg/resource/json"
"github.com/upbound/upjet/pkg/terraform"
Expand Down Expand Up @@ -113,7 +115,7 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.
return managed.ExternalObservation{}, errors.Wrap(err, errRefresh)
}
switch {
case res.IsApplying, res.IsDestroying:
case res.ASyncInProgress:
mg.SetConditions(resource.AsyncOperationOngoingCondition())
return managed.ExternalObservation{
ResourceExists: true,
Expand Down Expand Up @@ -179,6 +181,7 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.
}, nil
// we prioritize status updates over late-init'ed spec updates
case !markedAvailable:
addTTR(tr)
tr.SetConditions(xpv1.Available())
return managed.ExternalObservation{
ResourceExists: true,
Expand Down Expand Up @@ -211,6 +214,11 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.
}
}

func addTTR(mg xpresource.Managed) {
gvk := mg.GetObjectKind().GroupVersionKind()
metrics.TTRMeasurements.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Observe(time.Since(mg.GetCreationTimestamp().Time).Seconds())
}

func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) {
if e.config.UseAsync {
return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestObserve(t *testing.T) {
w: WorkspaceFns{
RefreshFn: func(_ context.Context) (terraform.RefreshResult, error) {
return terraform.RefreshResult{
IsApplying: true,
ASyncInProgress: true,
}, nil
},
},
Expand Down
90 changes: 90 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2023 Upbound Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

const (
promNSUpjet = "upjet"
promSysTF = "terraform"
promSysResource = "resource"
)

// ExecMode is the Terraform CLI execution mode label
type ExecMode int

const (
// ModeSync represents the synchronous execution mode
ModeSync ExecMode = iota
// ModeASync represents the asynchronous execution mode
ModeASync
)

// String converts an execMode to string
func (em ExecMode) String() string {
switch em {
case ModeSync:
return "sync"
case ModeASync:
return "async"
default:
return "unknown"
}
}

var (
// CLITime is the Terraform CLI execution times histogram.
CLITime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: promNSUpjet,
Subsystem: promSysTF,
Name: "cli_duration",
Help: "Measures in seconds how long it takes a Terraform CLI invocation to complete",
Buckets: []float64{1.0, 3, 5, 10, 15, 30, 60, 120, 300},
}, []string{"subcommand", "mode"})

// CLIExecutions are the active number of terraform CLI invocations.
CLIExecutions = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: promNSUpjet,
Subsystem: promSysTF,
Name: "active_cli_invocations",
Help: "The number of active (running) Terraform CLI invocations",
}, []string{"subcommand", "mode"})

// TFProcesses are the active number of
// terraform CLI & Terraform provider processes running.
TFProcesses = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: promNSUpjet,
Subsystem: promSysTF,
Name: "running_processes",
Help: "The number of running Terraform CLI and Terraform provider processes",
}, []string{"type"})

// TTRMeasurements are the time-to-readiness measurements for
// the managed resources.
TTRMeasurements = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: promNSUpjet,
Subsystem: promSysResource,
Name: "ttr",
Help: "Measures in seconds the time-to-readiness (TTR) for managed resources",
Buckets: []float64{10, 15, 30, 60, 120, 300, 600, 1800, 3600},
}, []string{"group", "version", "kind"})
)

func init() {
metrics.Registry.MustRegister(CLITime, CLIExecutions, TFProcesses, TTRMeasurements)
}
18 changes: 12 additions & 6 deletions pkg/terraform/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ type Operation struct {
mu sync.RWMutex
}

// MarkStart marks the operation as started.
func (o *Operation) MarkStart(t string) {
// MarkStart marks the operation as started atomically after checking
// no previous operation is already running.
// Returns `false` if a previous operation is still in progress.
func (o *Operation) MarkStart(t string) bool {
o.mu.Lock()
defer o.mu.Unlock()
if o.startTime != nil && o.endTime == nil {
return false
}
now := time.Now()
o.Type = t
o.startTime = &now
o.endTime = nil
return true
}

// MarkEnd marks the operation as ended.
Expand Down Expand Up @@ -60,15 +66,15 @@ func (o *Operation) IsRunning() bool {
}

// StartTime returns the start time of the current operation.
func (o *Operation) StartTime() *time.Time {
func (o *Operation) StartTime() time.Time {
o.mu.RLock()
defer o.mu.RUnlock()
return o.startTime
return *o.startTime
}

// EndTime returns the end time of the current operation.
func (o *Operation) EndTime() *time.Time {
func (o *Operation) EndTime() time.Time {
o.mu.RLock()
defer o.mu.RUnlock()
return o.endTime
return *o.endTime
}
2 changes: 1 addition & 1 deletion pkg/terraform/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestOperation(t *testing.T) {
},
want: want{
checks: func(o *Operation) bool {
return o.Type == "" && o.StartTime() == nil && o.EndTime() == nil
return o.Type == "" && o.startTime == nil && o.endTime == nil
},
result: true,
},
Expand Down
72 changes: 59 additions & 13 deletions pkg/terraform/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/crossplane/crossplane-runtime/pkg/logging"
xpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/mitchellh/go-ps"
"github.com/pkg/errors"
"github.com/spf13/afero"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/exec"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/upbound/upjet/pkg/config"
"github.com/upbound/upjet/pkg/metrics"
"github.com/upbound/upjet/pkg/resource"
)

Expand Down Expand Up @@ -109,6 +112,15 @@ func WithProviderRunner(pr ProviderRunner) WorkspaceStoreOption {
}
}

// WithProcessReportInterval enables the upjet.terraform.running_processes
// metric, which periodically reports the total number of Terraform CLI and
// Terraform provider processes in the system.
func WithProcessReportInterval(d time.Duration) WorkspaceStoreOption {
return func(ws *WorkspaceStore) {
ws.processReportInterval = d
}
}

// NewWorkspaceStore returns a new WorkspaceStore.
func NewWorkspaceStore(l logging.Logger, opts ...WorkspaceStoreOption) *WorkspaceStore {
ws := &WorkspaceStore{
Expand All @@ -122,6 +134,10 @@ func NewWorkspaceStore(l logging.Logger, opts ...WorkspaceStoreOption) *Workspac
for _, f := range opts {
f(ws)
}
ws.initMetrics()
if ws.processReportInterval != 0 {
go ws.reportTFProcesses(ws.processReportInterval)
}
return ws
}

Expand All @@ -131,13 +147,13 @@ type WorkspaceStore struct {
// Since there can be multiple calls that add/remove values from the map at
// the same time, it has to be safe for concurrency since those operations
// cause rehashing in some cases.
store map[types.UID]*Workspace
logger logging.Logger
providerRunner ProviderRunner
mu sync.Mutex

fs afero.Afero
executor exec.Interface
store map[types.UID]*Workspace
logger logging.Logger
providerRunner ProviderRunner
mu sync.Mutex
processReportInterval time.Duration
fs afero.Afero
executor exec.Interface
}

// Workspace makes sure the Terraform workspace for the given resource is ready
Expand Down Expand Up @@ -176,9 +192,7 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient
return nil, errors.Wrap(err, "cannot write main tf file")
}
if isNeedProviderUpgrade {
cmd := w.executor.CommandContext(ctx, "terraform", "init", "-upgrade", "-input=false")
cmd.SetDir(w.dir)
out, err := cmd.CombinedOutput()
out, err := w.runTF(ctx, metrics.ModeSync, "init", "-upgrade", "-input=false")
w.logger.Debug("init -upgrade ended", "out", ts.filterSensitiveInformation(string(out)))
if err != nil {
return w, errors.Wrapf(err, "cannot upgrade workspace: %s", ts.filterSensitiveInformation(string(out)))
Expand All @@ -198,9 +212,7 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient
if !os.IsNotExist(err) {
return w, nil
}
cmd := w.executor.CommandContext(ctx, "terraform", "init", "-input=false")
cmd.SetDir(w.dir)
out, err := cmd.CombinedOutput()
out, err := w.runTF(ctx, metrics.ModeSync, "init", "-input=false")
w.logger.Debug("init ended", "out", ts.filterSensitiveInformation(string(out)))
return w, errors.Wrapf(err, "cannot init workspace: %s", ts.filterSensitiveInformation(string(out)))
}
Expand All @@ -221,6 +233,14 @@ func (ws *WorkspaceStore) Remove(obj xpresource.Object) error {
return nil
}

func (ws *WorkspaceStore) initMetrics() {
for _, mode := range []metrics.ExecMode{metrics.ModeSync, metrics.ModeASync} {
for _, subcommand := range []string{"init", "apply", "destroy", "plan"} {
metrics.CLIExecutions.WithLabelValues(subcommand, mode.String()).Set(0)
}
}
}

func (ts Setup) filterSensitiveInformation(s string) string {
for _, v := range ts.Configuration {
if str, ok := v.(string); ok && str != "" {
Expand All @@ -229,3 +249,29 @@ func (ts Setup) filterSensitiveInformation(s string) string {
}
return s
}

func (ws *WorkspaceStore) reportTFProcesses(interval time.Duration) {
for _, t := range []string{"cli", "provider"} {
metrics.TFProcesses.WithLabelValues(t).Set(0)
}
t := time.NewTicker(interval)
for range t.C {
processes, err := ps.Processes()
if err != nil {
ws.logger.Debug("Failed to list processes", "err", err)
continue
}
cliCount, providerCount := 0.0, 0.0
for _, p := range processes {
e := p.Executable()
switch {
case e == "terraform":
cliCount++
case strings.HasPrefix(e, "terraform-"):
providerCount++
}
}
metrics.TFProcesses.WithLabelValues("cli").Set(cliCount)
metrics.TFProcesses.WithLabelValues("provider").Set(providerCount)
}
}
Loading

0 comments on commit 5377e5d

Please sign in to comment.