From 9eaf56bfe33e700d913ff75a57928353a42ab37a Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 26 Jun 2017 15:52:03 -0500 Subject: [PATCH] Fix socket file handle leaks from old blocking queries upon consul reload. This fixes issue #3018 --- api/api.go | 13 +++++++++++-- watch/funcs.go | 29 ++++++++++++++++++++++------- watch/plan.go | 3 +++ watch/watch.go | 9 ++++++--- 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/api/api.go b/api/api.go index 1c9f1e2b714b..ff06c5cc1fbc 100644 --- a/api/api.go +++ b/api/api.go @@ -105,6 +105,10 @@ type QueryOptions struct { // relayed back to the sender through N other random nodes. Must be // a value from 0 to 5 (inclusive). RelayFactor uint8 + + // Context (optional) is passed through to the underlying http request layer, can be used + // to set timeouts and deadlines as well as to cancel requests + Context context.Context } // WriteOptions are used to parameterize a write @@ -457,6 +461,7 @@ type request struct { body io.Reader header http.Header obj interface{} + ctx context.Context } // setQueryOptions is used to annotate the request with @@ -494,6 +499,7 @@ func (r *request) setQueryOptions(q *QueryOptions) { if q.RelayFactor != 0 { r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor))) } + r.ctx = q.Context } // durToMsec converts a duration to a millisecond specified string. If the @@ -569,8 +575,11 @@ func (r *request) toHTTP() (*http.Request, error) { if r.config.HttpAuth != nil { req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password) } - - return req, nil + if r.ctx != nil { + return req.WithContext(r.ctx), nil + } else { + return req, nil + } } // newRequest is used to create a new request diff --git a/watch/funcs.go b/watch/funcs.go index 0fd7fdb9e349..21bf593ae3a6 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -1,6 +1,7 @@ package watch import ( + "context" "fmt" consulapi "github.com/hashicorp/consul/api" @@ -41,7 +42,9 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) { } fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} pair, meta, err := kv.Get(key, &opts) if err != nil { return 0, nil, err @@ -70,7 +73,9 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { } fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} pairs, meta, err := kv.List(prefix, &opts) if err != nil { return 0, nil, err @@ -89,7 +94,9 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} services, meta, err := catalog.Services(&opts) if err != nil { return 0, nil, err @@ -108,7 +115,9 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} nodes, meta, err := catalog.Nodes(&opts) if err != nil { return 0, nil, err @@ -144,7 +153,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} nodes, meta, err := health.Service(service, tag, passingOnly, &opts) if err != nil { return 0, nil, err @@ -177,7 +188,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} var checks []*consulapi.HealthCheck var meta *consulapi.QueryMeta var err error @@ -205,7 +218,9 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { event := p.client.Event() - opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex, Context: ctx} events, meta, err := event.List(name, &opts) if err != nil { return 0, nil, err diff --git a/watch/plan.go b/watch/plan.go index 2b92ca94a42b..c925425f6f92 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -107,6 +107,9 @@ func (p *Plan) Stop() { return } p.stop = true + if p.cancelFunc != nil { + p.cancelFunc() + } close(p.stopCh) } diff --git a/watch/watch.go b/watch/watch.go index 79a8bbcdec4d..e04614620ce4 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -5,6 +5,8 @@ import ( "io" "sync" + "context" + consulapi "github.com/hashicorp/consul/api" ) @@ -27,9 +29,10 @@ type Plan struct { lastIndex uint64 lastResult interface{} - stop bool - stopCh chan struct{} - stopLock sync.Mutex + stop bool + stopCh chan struct{} + stopLock sync.Mutex + cancelFunc context.CancelFunc } // WatcherFunc is used to watch for a diff