From 4e36cb7f3893d3e248ba3301db784d173d8b902f Mon Sep 17 00:00:00 2001 From: prydin Date: Wed, 10 Oct 2018 08:52:03 -0400 Subject: [PATCH 1/2] Implemented timeouts --- plugins/inputs/vsphere/client.go | 36 ++++++++++------ plugins/inputs/vsphere/endpoint.go | 58 ++++++++++++++++++-------- plugins/inputs/vsphere/vsphere_test.go | 22 ++++++++++ 3 files changed, 85 insertions(+), 31 deletions(-) diff --git a/plugins/inputs/vsphere/client.go b/plugins/inputs/vsphere/client.go index 2148a72ff42f3..63e76408a3d5d 100644 --- a/plugins/inputs/vsphere/client.go +++ b/plugins/inputs/vsphere/client.go @@ -6,6 +6,7 @@ import ( "log" "net/url" "sync" + "time" "github.com/vmware/govmomi" "github.com/vmware/govmomi/performance" @@ -33,6 +34,7 @@ type Client struct { Root *view.ContainerView Perf *performance.Manager Valid bool + Timeout time.Duration closeGate sync.Once } @@ -52,7 +54,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) { defer cf.mux.Unlock() if cf.client == nil { var err error - if cf.client, err = NewClient(cf.url, cf.parent); err != nil { + if cf.client, err = NewClient(ctx, cf.url, cf.parent); err != nil { return nil, err } } @@ -60,9 +62,13 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) { // Execute a dummy call against the server to make sure the client is // still functional. If not, try to log back in. If that doesn't work, // we give up. - if _, err := methods.GetCurrentTime(ctx, cf.client.Client); err != nil { + ctx1, cancel1 := context.WithTimeout(ctx, cf.parent.Timeout.Duration) + defer cancel1() + if _, err := methods.GetCurrentTime(ctx1, cf.client.Client); err != nil { log.Printf("I! [input.vsphere]: Client session seems to have time out. Reauthenticating!") - if cf.client.Client.SessionManager.Login(ctx, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil { + ctx2, cancel2 := context.WithTimeout(ctx, cf.parent.Timeout.Duration) + defer cancel2() + if cf.client.Client.SessionManager.Login(ctx2, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil { return nil, err } } @@ -71,7 +77,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) { } // NewClient creates a new vSphere client based on the url and setting passed as parameters. -func NewClient(u *url.URL, vs *VSphere) (*Client, error) { +func NewClient(ctx context.Context, u *url.URL, vs *VSphere) (*Client, error) { sw := NewStopwatch("connect", u.Host) tlsCfg, err := vs.ClientConfig.TLSConfig() if err != nil { @@ -84,7 +90,6 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) { if vs.Username != "" { u.User = url.UserPassword(vs.Username, vs.Password) } - ctx := context.Background() log.Printf("D! [input.vsphere]: Creating client: %s", u.Host) soapClient := soap.NewClient(u, tlsCfg.InsecureSkipVerify) @@ -102,7 +107,9 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) { } } - vimClient, err := vim25.NewClient(ctx, soapClient) + ctx1, cancel1 := context.WithTimeout(ctx, vs.Timeout.Duration) + defer cancel1() + vimClient, err := vim25.NewClient(ctx1, soapClient) if err != nil { return nil, err } @@ -110,7 +117,9 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) { // If TSLKey is specified, try to log in as an extension using a cert. if vs.TLSKey != "" { - if err := sm.LoginExtensionByCertificate(ctx, vs.TLSKey); err != nil { + ctx2, cancel2 := context.WithTimeout(ctx, vs.Timeout.Duration) + defer cancel2() + if err := sm.LoginExtensionByCertificate(ctx2, vs.TLSKey); err != nil { return nil, err } } @@ -141,11 +150,12 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) { sw.Stop() return &Client{ - Client: c, - Views: m, - Root: v, - Perf: p, - Valid: true, + Client: c, + Views: m, + Root: v, + Perf: p, + Valid: true, + Timeout: vs.Timeout.Duration, }, nil } @@ -163,7 +173,7 @@ func (c *Client) close() { // Use a Once to prevent us from panics stemming from trying // to close it multiple times. c.closeGate.Do(func() { - ctx := context.Background() + ctx, _ := context.WithTimeout(context.Background(), c.Timeout) if c.Client != nil { if err := c.Client.Logout(ctx); err != nil { log.Printf("E! [input.vsphere]: Error during logout: %s", err) diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index 9f4c55250909f..ed41cb2b06107 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -46,7 +46,7 @@ type resourceKind struct { objects objectMap filters filter.Filter collectInstances bool - getObjects func(context.Context, *view.ContainerView) (objectMap, error) + getObjects func(context.Context, *Endpoint, *view.ContainerView) (objectMap, error) } type metricEntry struct { @@ -253,7 +253,9 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro return nil, err } - mn, err := client.Perf.CounterInfoByName(ctx) + ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel1() + mn, err := client.Perf.CounterInfoByName(ctx1) if err != nil { return nil, err @@ -272,7 +274,9 @@ func (e *Endpoint) getMetadata(ctx context.Context, in interface{}) interface{} } rq := in.(*metricQRequest) - metrics, err := client.Perf.AvailableMetric(ctx, rq.obj.ref.Reference(), rq.res.sampling) + ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel1() + metrics, err := client.Perf.AvailableMetric(ctx1, rq.obj.ref.Reference(), rq.res.sampling) if err != nil && err != context.Canceled { log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) } @@ -292,7 +296,9 @@ func (e *Endpoint) getDatacenterName(ctx context.Context, client *Client, cache path = append(path, here.Reference().String()) o := object.NewCommon(client.Client.Client, r) var result mo.ManagedEntity - err := o.Properties(ctx, here, []string{"parent", "name"}, &result) + ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel1() + err := o.Properties(ctx1, here, []string{"parent", "name"}, &result) if err != nil { log.Printf("W! [input.vsphere]: Error while resolving parent. Assuming no parent exists. Error: %s", err) break @@ -344,7 +350,7 @@ func (e *Endpoint) discover(ctx context.Context) error { log.Printf("D! [input.vsphere] Discovering resources for %s", res.name) // Need to do this for all resource types even if they are not enabled (but datastore) if res.enabled || (k != "datastore" && k != "vm") { - objects, err := res.getObjects(ctx, client.Root) + objects, err := res.getObjects(ctx, e, client.Root) if err != nil { return err } @@ -411,9 +417,11 @@ func (e *Endpoint) discover(ctx context.Context) error { return nil } -func getDatacenters(ctx context.Context, root *view.ContainerView) (objectMap, error) { +func getDatacenters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.Datacenter - err := root.Retrieve(ctx, []string{"Datacenter"}, []string{"name", "parent"}, &resources) + ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel1() + err := root.Retrieve(ctx1, []string{"Datacenter"}, []string{"name", "parent"}, &resources) if err != nil { return nil, err } @@ -425,9 +433,11 @@ func getDatacenters(ctx context.Context, root *view.ContainerView) (objectMap, e return m, nil } -func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, error) { +func getClusters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.ClusterComputeResource - err := root.Retrieve(ctx, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources) + ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel1() + err := root.Retrieve(ctx1, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources) if err != nil { return nil, err } @@ -439,7 +449,9 @@ func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, erro if !ok { o := object.NewFolder(root.Client(), *r.Parent) var folder mo.Folder - err := o.Properties(ctx, *r.Parent, []string{"parent"}, &folder) + ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel2() + err := o.Properties(ctx2, *r.Parent, []string{"parent"}, &folder) if err != nil { log.Printf("W! [input.vsphere] Error while getting folder parent: %e", err) p = nil @@ -455,7 +467,7 @@ func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, erro return m, nil } -func getHosts(ctx context.Context, root *view.ContainerView) (objectMap, error) { +func getHosts(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.HostSystem err := root.Retrieve(ctx, []string{"HostSystem"}, []string{"name", "parent"}, &resources) if err != nil { @@ -469,9 +481,11 @@ func getHosts(ctx context.Context, root *view.ContainerView) (objectMap, error) return m, nil } -func getVMs(ctx context.Context, root *view.ContainerView) (objectMap, error) { +func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.VirtualMachine - err := root.Retrieve(ctx, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources) + ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel1() + err := root.Retrieve(ctx1, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources) if err != nil { return nil, err } @@ -491,9 +505,11 @@ func getVMs(ctx context.Context, root *view.ContainerView) (objectMap, error) { return m, nil } -func getDatastores(ctx context.Context, root *view.ContainerView) (objectMap, error) { +func getDatastores(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.Datastore - err := root.Retrieve(ctx, []string{"Datastore"}, []string{"name", "parent"}, &resources) + ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel1() + err := root.Retrieve(ctx1, []string{"Datastore"}, []string{"name", "parent"}, &resources) if err != nil { return nil, err } @@ -689,17 +705,23 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, return 0, err } - metricInfo, err := client.Perf.CounterInfoByName(ctx) + ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel1() + metricInfo, err := client.Perf.CounterInfoByName(ctx1) if err != nil { return count, err } - metrics, err := client.Perf.Query(ctx, pqs) + ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel2() + metrics, err := client.Perf.Query(ctx2, pqs) if err != nil { return count, err } - ems, err := client.Perf.ToMetricSeries(ctx, metrics) + ctx3, cancel3 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) + defer cancel3() + ems, err := client.Perf.ToMetricSeries(ctx3, metrics) if err != nil { return count, err } diff --git a/plugins/inputs/vsphere/vsphere_test.go b/plugins/inputs/vsphere/vsphere_test.go index 20c61d92bf2cd..3290da2e9080a 100644 --- a/plugins/inputs/vsphere/vsphere_test.go +++ b/plugins/inputs/vsphere/vsphere_test.go @@ -6,6 +6,7 @@ import ( "fmt" "regexp" "sort" + "strings" "testing" "time" @@ -229,6 +230,27 @@ func TestWorkerPool(t *testing.T) { } } +func TestTimeout(t *testing.T) { + m, s, err := createSim() + if err != nil { + t.Fatal(err) + } + defer m.Remove() + defer s.Close() + + var acc testutil.Accumulator + v := defaultVSphere() + v.Vcenters = []string{s.URL.String()} + v.Timeout = internal.Duration{Duration: 1 * time.Nanosecond} + require.NoError(t, v.Start(nil)) // We're not using the Accumulator, so it can be nil. + defer v.Stop() + require.NoError(t, v.Gather(&acc)) + + // The accumulator must contain exactly one error and it must be a deadline exceeded. + require.Equal(t, 1, len(acc.Errors)) + require.True(t, strings.Contains(acc.Errors[0].Error(), "context deadline exceeded")) +} + func TestAll(t *testing.T) { m, s, err := createSim() if err != nil { From 4ae1ac175d103b99226f82130ce1f819d242f484 Mon Sep 17 00:00:00 2001 From: prydin Date: Wed, 10 Oct 2018 13:20:08 -0400 Subject: [PATCH 2/2] Fixed cancel issue --- plugins/inputs/vsphere/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/vsphere/client.go b/plugins/inputs/vsphere/client.go index 63e76408a3d5d..779a249da032c 100644 --- a/plugins/inputs/vsphere/client.go +++ b/plugins/inputs/vsphere/client.go @@ -173,7 +173,8 @@ func (c *Client) close() { // Use a Once to prevent us from panics stemming from trying // to close it multiple times. c.closeGate.Do(func() { - ctx, _ := context.WithTimeout(context.Background(), c.Timeout) + ctx, cancel := context.WithTimeout(context.Background(), c.Timeout) + defer cancel() if c.Client != nil { if err := c.Client.Logout(ctx); err != nil { log.Printf("E! [input.vsphere]: Error during logout: %s", err)