Skip to content

Commit

Permalink
Fix potential missing datastore metrics in vSphere plugin (#4968)
Browse files Browse the repository at this point in the history
  • Loading branch information
prydin authored and danielnelson committed Nov 6, 2018
1 parent 0e07bbb commit 2d782fb
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 24 deletions.
2 changes: 1 addition & 1 deletion plugins/inputs/vsphere/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ vm_metric_exclude = [ "*" ]
# object_discovery_interval = "300s"
## timeout applies to any of the api request made to vcenter
# timeout = "20s"
# timeout = "60s"
## Optional SSL Config
# ssl_ca = "/path/to/cafile"
Expand Down
72 changes: 68 additions & 4 deletions plugins/inputs/vsphere/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"crypto/tls"
"log"
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/vmware/govmomi"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/performance"
"github.com/vmware/govmomi/session"
"github.com/vmware/govmomi/view"
Expand All @@ -17,6 +20,10 @@ import (
"github.com/vmware/govmomi/vim25/soap"
)

// The highest number of metrics we can query for, no matter what settings
// and server say.
const absoluteMaxMetrics = 10000

// ClientFactory is used to obtain Clients to be used throughout the plugin. Typically,
// a single Client is reused across all functions and goroutines, but the client
// is periodically recycled to avoid authentication expiration issues.
Expand Down Expand Up @@ -79,6 +86,8 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) {
// NewClient creates a new vSphere client based on the url and setting passed as parameters.
func NewClient(ctx context.Context, u *url.URL, vs *VSphere) (*Client, error) {
sw := NewStopwatch("connect", u.Host)
defer sw.Stop()

tlsCfg, err := vs.ClientConfig.TLSConfig()
if err != nil {
return nil, err
Expand Down Expand Up @@ -147,16 +156,27 @@ func NewClient(ctx context.Context, u *url.URL, vs *VSphere) (*Client, error) {

p := performance.NewManager(c.Client)

sw.Stop()

return &Client{
client := &Client{
Client: c,
Views: m,
Root: v,
Perf: p,
Valid: true,
Timeout: vs.Timeout.Duration,
}, nil
}
// Adjust max query size if needed
ctx3, cancel3 := context.WithTimeout(ctx, vs.Timeout.Duration)
defer cancel3()
n, err := client.GetMaxQueryMetrics(ctx3)
if err != nil {
return nil, err
}
log.Printf("D! [input.vsphere] vCenter says max_query_metrics should be %d", n)
if n < vs.MaxQueryMetrics {
log.Printf("W! [input.vsphere] Configured max_query_metrics is %d, but server limits it to %d. Reducing.", vs.MaxQueryMetrics, n)
vs.MaxQueryMetrics = n
}
return client, nil
}

// Close shuts down a ClientFactory and releases any resources associated with it.
Expand Down Expand Up @@ -191,3 +211,47 @@ func (c *Client) GetServerTime(ctx context.Context) (time.Time, error) {
}
return *t, nil
}

// GetMaxQueryMetrics returns the max_query_metrics setting as configured in vCenter
func (c *Client) GetMaxQueryMetrics(ctx context.Context) (int, error) {
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()

om := object.NewOptionManager(c.Client.Client, *c.Client.Client.ServiceContent.Setting)
res, err := om.Query(ctx, "config.vpxd.stats.maxQueryMetrics")
if err == nil {
if len(res) > 0 {
if s, ok := res[0].GetOptionValue().Value.(string); ok {
v, err := strconv.Atoi(s)
if err == nil {
log.Printf("D! [input.vsphere] vCenter maxQueryMetrics is defined: %d", v)
if v == -1 {
// Whatever the server says, we never ask for more metrics than this.
return absoluteMaxMetrics, nil
}
return v, nil
}
}
// Fall through version-based inference if value isn't usable
}
} else {
log.Println("I! [input.vsphere] Option query for maxQueryMetrics failed. Using default")
}

// No usable maxQueryMetrics setting. Infer based on version
ver := c.Client.Client.ServiceContent.About.Version
parts := strings.Split(ver, ".")
if len(parts) < 2 {
log.Printf("W! [input.vsphere] vCenter returned an invalid version string: %s. Using default query size=64", ver)
return 64, nil
}
log.Printf("D! [input.vsphere] vCenter version is: %s", ver)
major, err := strconv.Atoi(parts[0])
if err != nil {
return 0, err
}
if major < 6 || major == 6 && parts[1] == "0" {
return 64, nil
}
return 256, nil
}
68 changes: 54 additions & 14 deletions plugins/inputs/vsphere/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

var isolateLUN = regexp.MustCompile(".*/([^/]+)/?$")

const metricLookback = 3

// Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower
// level Client type.
type Endpoint struct {
Expand All @@ -32,6 +34,7 @@ type Endpoint struct {
lastColls map[string]time.Time
instanceInfo map[string]resourceInfo
resourceKinds map[string]resourceKind
hwMarks *TSCache
lun2ds map[string]string
discoveryTicker *time.Ticker
collectMux sync.RWMutex
Expand Down Expand Up @@ -96,6 +99,7 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint,
URL: url,
Parent: parent,
lastColls: make(map[string]time.Time),
hwMarks: NewTSCache(1 * time.Hour),
instanceInfo: make(map[string]resourceInfo),
lun2ds: make(map[string]string),
initialized: false,
Expand Down Expand Up @@ -353,8 +357,8 @@ func (e *Endpoint) discover(ctx context.Context) error {
// Populate resource objects, and endpoint instance info.
for k, res := range e.resourceKinds {
log.Printf("D! [input.vsphere] Discovering resources for %s", res.name)
// Need to do this for all resource types even if they are not enabled (but datastore)
if res.enabled || (k != "datastore" && k != "vm") {
// Need to do this for all resource types even if they are not enabled
if res.enabled || k != "vm" {
objects, err := res.getObjects(ctx, e, client.Root)
if err != nil {
return err
Expand Down Expand Up @@ -416,7 +420,6 @@ func (e *Endpoint) discover(ctx context.Context) error {
url := ds.altID
m := isolateLUN.FindStringSubmatch(url)
if m != nil {
log.Printf("D! [input.vsphere]: LUN: %s", m[1])
l2d[m[1]] = ds.name
}
}
Expand Down Expand Up @@ -539,7 +542,6 @@ func getDatastores(ctx context.Context, e *Endpoint, root *view.ContainerView) (
url = info.Url
}
}
log.Printf("D! [input.vsphere]: DS URL: %s %s", url, r.Name)
m[r.ExtensibleManagedObject.Reference().Value] = objectRef{
name: r.Name, ref: r.ExtensibleManagedObject.Reference(), parentRef: r.Parent, altID: url}
}
Expand Down Expand Up @@ -584,10 +586,24 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error
}
}
}

// Purge old timestamps from the cache
e.hwMarks.Purge()
return nil
}

func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, now time.Time, latest time.Time) {
maxMetrics := e.Parent.MaxQueryMetrics
if maxMetrics < 1 {
maxMetrics = 1
}

// Workaround for vCenter weirdness. Cluster metrics seem to count multiple times
// when checking query size, so keep it at a low value.
// Revisit this when we better understand the reason why vCenter counts it this way!
if res.name == "cluster" && maxMetrics > 10 {
maxMetrics = 10
}
pqs := make([]types.PerfQuerySpec, 0, e.Parent.MaxQueryObjects)
metrics := 0
total := 0
Expand All @@ -600,7 +616,7 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n
mr := len(info.metrics)
for mr > 0 {
mc := mr
headroom := e.Parent.MaxQueryMetrics - metrics
headroom := maxMetrics - metrics
if !res.realTime && mc > headroom { // Metric query limit only applies to non-realtime metrics
mc = headroom
}
Expand All @@ -610,10 +626,19 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n
MaxSample: 1,
MetricId: info.metrics[fm : fm+mc],
IntervalId: res.sampling,
Format: "normal",
}

// For non-realtime metrics, we need to look back a few samples in case
// the vCenter is late reporting metrics.
if !res.realTime {
pq.MaxSample = metricLookback
}

// Look back 3 sampling periods
start := latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1))
if !res.realTime {
pq.StartTime = &latest
pq.StartTime = &start
pq.EndTime = &now
}
pqs = append(pqs, pq)
Expand All @@ -623,8 +648,8 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n
// We need to dump the current chunk of metrics for one of two reasons:
// 1) We filled up the metric quota while processing the current resource
// 2) We are at the last resource and have no more data to process.
if mr > 0 || (!res.realTime && metrics >= e.Parent.MaxQueryMetrics) || nRes >= e.Parent.MaxQueryObjects {
log.Printf("D! [input.vsphere]: Querying %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d",
if mr > 0 || (!res.realTime && metrics >= maxMetrics) || nRes >= e.Parent.MaxQueryObjects {
log.Printf("D! [input.vsphere]: Queueing query: %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d",
len(pqs), metrics, mr, res.name, e.URL.Host, total+1, len(res.objects))

// To prevent deadlocks, don't send work items if the context has been cancelled.
Expand All @@ -646,6 +671,8 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n
//
if len(pqs) > 0 {
// Call push function
log.Printf("D! [input.vsphere]: Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)",
len(pqs), metrics, res.name, e.URL.Host, len(res.objects))
f(ctx, pqs)
}
}
Expand All @@ -668,7 +695,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
log.Printf("D! [input.vsphere]: Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType)
if !res.realTime && elapsed < float64(res.sampling) {
// No new data would be available. We're outta herE! [input.vsphere]:
log.Printf("D! [input.vsphere]: Sampling period for %s of %d has not elapsed for %s",
log.Printf("D! [input.vsphere]: Sampling period for %s of %d has not elapsed on %s",
resourceType, res.sampling, e.URL.Host)
return nil
}
Expand All @@ -679,7 +706,6 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
internalTags := map[string]string{"resourcetype": resourceType}
sw := NewStopwatchWithTags("gather_duration", e.URL.Host, internalTags)

log.Printf("D! [input.vsphere]: Start of sample period deemed to be %s", latest)
log.Printf("D! [input.vsphere]: Collecting metrics for %d objects of type %s for %s",
len(res.objects), resourceType, e.URL.Host)

Expand All @@ -690,7 +716,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
wp.Run(ctx, func(ctx context.Context, in interface{}) interface{} {
chunk := in.([]types.PerfQuerySpec)
n, err := e.collectChunk(ctx, chunk, resourceType, res, acc)
log.Printf("D! [input.vsphere]: Query returned %d metrics", n)
log.Printf("D! [input.vsphere] CollectChunk for %s returned %d metrics", resourceType, n)
if err != nil {
return err
}
Expand Down Expand Up @@ -722,7 +748,7 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc
sw.Stop()
SendInternalCounterWithTags("gather_count", e.URL.Host, internalTags, count)
if len(merr) > 0 {
return err
return merr
}
return nil
}
Expand Down Expand Up @@ -757,6 +783,7 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
if err != nil {
return count, err
}
log.Printf("D! [input.vsphere] Query for %s returned metrics for %d objects", resourceType, len(ems))

// Iterate through results
for _, em := range ems {
Expand All @@ -783,10 +810,18 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
}
e.populateTags(&objectRef, resourceType, &res, t, &v)

// Now deal with the values
for idx, value := range v.Value {
// Now deal with the values. Iterate backwards so we start with the latest value
tsKey := moid + "|" + name + "|" + v.Instance
for idx := len(v.Value) - 1; idx >= 0; idx-- {
ts := em.SampleInfo[idx].Timestamp

// Since non-realtime metrics are queries with a lookback, we need to check the high-water mark
// to determine if this should be included. Only samples not seen before should be included.
if !(res.realTime || e.hwMarks.IsNew(tsKey, ts)) {
continue
}
value := v.Value[idx]

// Organize the metrics into a bucket per measurement.
// Data SHOULD be presented to us with the same timestamp for all samples, but in case
// they don't we use the measurement name + timestamp as the key for the bucket.
Expand All @@ -813,6 +848,11 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
bucket.fields[fn] = value
}
count++

// Update highwater marks for non-realtime metrics.
if !res.realTime {
e.hwMarks.Put(tsKey, ts)
}
}
}
// We've iterated through all the metrics and collected buckets for each
Expand Down
57 changes: 57 additions & 0 deletions plugins/inputs/vsphere/tscache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package vsphere

import (
"log"
"sync"
"time"
)

// TSCache is a cache of timestamps used to determine the validity of datapoints
type TSCache struct {
ttl time.Duration
table map[string]time.Time
done chan struct{}
mux sync.RWMutex
}

// NewTSCache creates a new TSCache with a specified time-to-live after which timestamps are discarded.
func NewTSCache(ttl time.Duration) *TSCache {
return &TSCache{
ttl: ttl,
table: make(map[string]time.Time),
done: make(chan struct{}),
}
}

// Purge removes timestamps that are older than the time-to-live
func (t *TSCache) Purge() {
t.mux.Lock()
defer t.mux.Unlock()
n := 0
for k, v := range t.table {
if time.Now().Sub(v) > t.ttl {
delete(t.table, k)
n++
}
}
log.Printf("D! [input.vsphere] Purged timestamp cache. %d deleted with %d remaining", n, len(t.table))
}

// IsNew returns true if the supplied timestamp for the supplied key is more recent than the
// timestamp we have on record.
func (t *TSCache) IsNew(key string, tm time.Time) bool {
t.mux.RLock()
defer t.mux.RUnlock()
v, ok := t.table[key]
if !ok {
return true // We've never seen this before, so consider everything a new sample
}
return !tm.Before(v)
}

// Put updates the latest timestamp for the supplied key.
func (t *TSCache) Put(key string, time time.Time) {
t.mux.Lock()
defer t.mux.Unlock()
t.table[key] = time
}
Loading

0 comments on commit 2d782fb

Please sign in to comment.