Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Add a way to create gauge metrics in stackdriver surfacer. #542

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions surfacers/stackdriver/proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ message SurfacerConf {
// processing is paused while serving data to Stackdriver. This buffer is to
// make writes to Stackdriver surfacer non-blocking.
optional int64 metrics_buffer_size = 5 [default = 10000];

// If set to true, cloudprober will export all metrics as gauge metrics.
optional bool export_as_gauge = 6;
}
118 changes: 56 additions & 62 deletions surfacers/stackdriver/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ import (
configpb "github.com/google/cloudprober/surfacers/stackdriver/proto"
)

const batchSize = 200
const (
batchSize = 200
// MaxMetricNameLength is the maximum length of a stackdriver metric name.
MaxMetricNameLength = 100
cumulative = "CUMULATIVE"
gauge = "GAUGE"
)

//-----------------------------------------------------------------------------
// Stack Driver Surfacer Specific Code
Expand All @@ -55,6 +61,7 @@ type SDSurfacer struct {

// Internal cache for saving metric data until a batch is sent
cache map[string]*monitoring.TimeSeries
lvCache map[string]float64
knownMetrics map[string]bool

// Channel for writing the data without blocking
Expand Down Expand Up @@ -87,6 +94,7 @@ func New(ctx context.Context, config *configpb.SurfacerConf, l *logger.Logger) (
// and a channel for writing data.
s := SDSurfacer{
cache: make(map[string]*monitoring.TimeSeries),
lvCache: make(map[string]float64),
knownMetrics: make(map[string]bool),
writeChan: make(chan *metrics.EventMetrics, config.GetMetricsBufferSize()),
c: config,
Expand Down Expand Up @@ -230,7 +238,10 @@ func (s *SDSurfacer) writeBatch(ctx context.Context) {
// We batch the time series into appropriately-sized sets
// and write them
for i := 0; i < len(ts); i += batchSize {
endIndex := min(len(ts), i+batchSize)
endIndex := i + batchSize
if endIndex > len(ts) {
endIndex = len(ts)
}

s.l.Infof("Sending entries %d through %d of %d", i, endIndex, len(ts))

Expand Down Expand Up @@ -258,37 +269,52 @@ func (s *SDSurfacer) writeBatch(ctx context.Context) {

}

//-----------------------------------------------------------------------------
// StackDriver Object Creation and Helper Functions
//-----------------------------------------------------------------------------
func (s *SDSurfacer) recordDouble(kind, name string, labels map[string]string, timestamp time.Time, val float64, unit, cacheKey string) *monitoring.TimeSeries {
cacheKey = name + "," + cacheKey

if s.c.GetExportAsGauge() && kind == cumulative {
// If configured to export metrics as gauge metrics, compute and export the
// increase since the last value (s.lvCache). If last value is bigger than
// the new value, we assume the metric has reset.
kind = gauge
oldVal := s.lvCache[cacheKey]
if val < oldVal {
oldVal = 0 // metric reset.
}
s.lvCache[cacheKey] = val
val = val - oldVal
}

ts := s.timeSeries(kind, name, "DOUBLE", labels, timestamp, &monitoring.TypedValue{DoubleValue: &val}, unit)
s.cache[cacheKey] = ts

// recordTimeSeries forms a timeseries object from the given arguments, records
// it in the cache if batch processing is enabled, and returns it.
return ts
}

func (s *SDSurfacer) recordDistribution(kind, name string, labels map[string]string, timestamp time.Time, distValue *metrics.Distribution, unit, cacheKey string) *monitoring.TimeSeries {
ts := s.timeSeries(kind, name, "DISTRIBUTION", labels, timestamp, distValue.StackdriverTypedValue(), unit)
s.cache[name+","+cacheKey] = ts
return ts
}

// timeSeries creates a timeseries object from the given arguments.
//
// More information on the object and specific fields can be found here:
// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries
func (s *SDSurfacer) recordTimeSeries(metricKind, metricName, msgType string, labels map[string]string, timestamp time.Time, tv *monitoring.TypedValue, unit, cacheKey string) *monitoring.TimeSeries {
func (s *SDSurfacer) timeSeries(kind, metricName, msgType string, labels map[string]string, timestamp time.Time, tv *monitoring.TypedValue, unit string) *monitoring.TimeSeries {
startTime := s.startTime.Format(time.RFC3339Nano)
if metricKind == "GAUGE" {
if kind == "GAUGE" {
startTime = timestamp.Format(time.RFC3339Nano)
}

ts := &monitoring.TimeSeries{
// The URL address for our custom metric, must match the
// name we used in the MetricDescriptor.
Metric: &monitoring.Metric{
Type: s.c.GetMonitoringUrl() + metricName,
Labels: labels,
},

// Must match the MetricKind and ValueType of the MetricDescriptor.
MetricKind: metricKind,
MetricKind: kind,
ValueType: msgType,
Unit: unit,

// Create a single data point, this could be utilized to create
// a batch of points instead of a single point if the write
// rate is too high.
Points: []*monitoring.Point{
{
Interval: &monitoring.TimeInterval{
Expand All @@ -312,22 +338,16 @@ func (s *SDSurfacer) recordTimeSeries(metricKind, metricName, msgType string, la
}
}

// We create a key that is a composite of both the name and the
// labels so we can make sure that the cache holds all distinct
// values and not just the ones with different names.
s.cache[metricName+","+cacheKey] = ts

return ts

}

// sdKind converts EventMetrics kind to StackDriver kind string.
func (s *SDSurfacer) sdKind(kind metrics.Kind) string {
switch kind {
case metrics.GAUGE:
return "GAUGE"
return gauge
case metrics.CUMULATIVE:
return "CUMULATIVE"
return cumulative
default:
return ""
}
Expand Down Expand Up @@ -372,8 +392,8 @@ func (s *SDSurfacer) ignoreMetric(name string) bool {
}
}

if !validMetricLength(name, s.c.GetMonitoringUrl()) {
s.l.Warningf("Message name %q is greater than the 100 character limit, skipping write", name)
if len(name)+len(s.c.GetMonitoringUrl()) > MaxMetricNameLength {
s.l.Warningf("Message name %q is greater than the %d character limit, skipping write", MaxMetricNameLength, name)
return true
}

Expand Down Expand Up @@ -410,7 +430,7 @@ func (s *SDSurfacer) failureCountForDefaultMetrics(em *metrics.EventMetrics, nam
// Since stackdriver doesn't support metrics.String and metrics.Map value types,
// it converts them to a numerical types (stackdriver type Double) with
// additional labels. See the inline comments for this conversion is done.
func (s *SDSurfacer) recordEventMetrics(em *metrics.EventMetrics) (ts []*monitoring.TimeSeries) {
func (s *SDSurfacer) recordEventMetrics(em *metrics.EventMetrics) {
metricKind := s.sdKind(em.Kind)
if metricKind == "" {
s.l.Warningf("Unknown event metrics type (not CUMULATIVE or GAUGE): %v", em.Kind)
Expand All @@ -423,7 +443,7 @@ func (s *SDSurfacer) recordEventMetrics(em *metrics.EventMetrics) (ts []*monitor
fName := metricPrefix + "failure"
creatFailureMetric, fVal := s.failureCountForDefaultMetrics(em, fName)
if creatFailureMetric {
ts = append(ts, s.recordTimeSeries(metricKind, fName, "DOUBLE", emLabels, em.Timestamp, &monitoring.TypedValue{DoubleValue: &fVal}, "1", cacheKey))
s.recordDouble(metricKind, fName, emLabels, em.Timestamp, fVal, "1", cacheKey)
}

for _, k := range em.MetricsKeys() {
Expand Down Expand Up @@ -453,8 +473,7 @@ func (s *SDSurfacer) recordEventMetrics(em *metrics.EventMetrics) (ts []*monitor

// If metric value is of type numerical value.
if v, ok := val.(metrics.NumValue); ok {
f := float64(v.Int64())
ts = append(ts, s.recordTimeSeries(metricKind, name, "DOUBLE", mLabels, em.Timestamp, &monitoring.TypedValue{DoubleValue: &f}, unit, cacheKey))
s.recordDouble(metricKind, name, mLabels, em.Timestamp, float64(v.Int64()), unit, cacheKey)
continue
}

Expand All @@ -467,8 +486,7 @@ func (s *SDSurfacer) recordEventMetrics(em *metrics.EventMetrics) (ts []*monitor
// metrics.String stringer wraps string values in a single "". Remove those
// for stackdriver.
mLabels["val"] = strings.Trim(v.String(), "\"")
f := float64(1)
ts = append(ts, s.recordTimeSeries(metricKind, name, "DOUBLE", mLabels, em.Timestamp, &monitoring.TypedValue{DoubleValue: &f}, unit, cacheKey))
s.recordDouble(gauge, name, mLabels, em.Timestamp, float64(1), unit, cacheKey)
continue
}

Expand All @@ -477,49 +495,25 @@ func (s *SDSurfacer) recordEventMetrics(em *metrics.EventMetrics) (ts []*monitor
// Since StackDriver doesn't support Map value type, we convert Map values
// to multiple timeseries with map's KeyName and key as labels.
for _, mapKey := range mapValue.Keys() {
// Create a new map so that we don't modify the overall labels map.
mmLabels := make(map[string]string)
for lk, lv := range mLabels {
mmLabels[lk] = lv
}
mmLabels[mapValue.MapName] = mapKey
f := float64(mapValue.GetKey(mapKey).Int64())
ts = append(ts, s.recordTimeSeries(metricKind, name, "DOUBLE", mmLabels, em.Timestamp, &monitoring.TypedValue{DoubleValue: &f}, unit, cacheKey))
mCacheKey := fmt.Sprintf("%s,%s=%s", cacheKey, mapValue.MapName, mapKey)
s.recordDouble(metricKind, name, mmLabels, em.Timestamp, float64(mapValue.GetKey(mapKey).Int64()), unit, mCacheKey)
}
continue
}

// If metric value is of type Distribution.
if distValue, ok := val.(*metrics.Distribution); ok {
ts = append(ts, s.recordTimeSeries(metricKind, name, "DISTRIBUTION", mLabels, em.Timestamp, distValue.StackdriverTypedValue(), unit, cacheKey))
s.recordDistribution(metricKind, name, mLabels, em.Timestamp, distValue, unit, cacheKey)
continue
}

// We'll reach here only if encounter an unsupported value type.
s.l.Warningf("Unsupported value type: %v", val)
}
return ts
}

//-----------------------------------------------------------------------------
// Non-stackdriver Helper Functions
//-----------------------------------------------------------------------------

// checkMetricLength checks if the combination of the metricName and the url
// prefix are longer than 100 characters, which is illegal in a Stackdriver
// call. Stack Driver doesn't allow custom metrics with more than 100 character
// names, so we have a check to see if we are going over the limit.
// Ref: https://cloud.google.com/monitoring/api/v3/metrics#metric_names
func validMetricLength(metricName string, monitoringURL string) bool {
if len(metricName)+len(monitoringURL) > 100 {
return false
}
return true
}

// Function to return the min of two integers
func min(a, b int) int {
if a < b {
return a
}
return b
}