From 9eaf56bfe33e700d913ff75a57928353a42ab37a Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 26 Jun 2017 15:52:03 -0500 Subject: [PATCH 1/3] Fix socket file handle leaks from old blocking queries upon consul reload. This fixes issue #3018 --- api/api.go | 13 +++++++++++-- watch/funcs.go | 29 ++++++++++++++++++++++------- watch/plan.go | 3 +++ watch/watch.go | 9 ++++++--- 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/api/api.go b/api/api.go index 1c9f1e2b714b..ff06c5cc1fbc 100644 --- a/api/api.go +++ b/api/api.go @@ -105,6 +105,10 @@ type QueryOptions struct { // relayed back to the sender through N other random nodes. Must be // a value from 0 to 5 (inclusive). RelayFactor uint8 + + // Context (optional) is passed through to the underlying http request layer, can be used + // to set timeouts and deadlines as well as to cancel requests + Context context.Context } // WriteOptions are used to parameterize a write @@ -457,6 +461,7 @@ type request struct { body io.Reader header http.Header obj interface{} + ctx context.Context } // setQueryOptions is used to annotate the request with @@ -494,6 +499,7 @@ func (r *request) setQueryOptions(q *QueryOptions) { if q.RelayFactor != 0 { r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor))) } + r.ctx = q.Context } // durToMsec converts a duration to a millisecond specified string. If the @@ -569,8 +575,11 @@ func (r *request) toHTTP() (*http.Request, error) { if r.config.HttpAuth != nil { req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password) } - - return req, nil + if r.ctx != nil { + return req.WithContext(r.ctx), nil + } else { + return req, nil + } } // newRequest is used to create a new request diff --git a/watch/funcs.go b/watch/funcs.go index 0fd7fdb9e349..21bf593ae3a6 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -1,6 +1,7 @@ package watch import ( + "context" "fmt" consulapi "github.com/hashicorp/consul/api" @@ -41,7 +42,9 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) { } fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} pair, meta, err := kv.Get(key, &opts) if err != nil { return 0, nil, err @@ -70,7 +73,9 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { } fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} pairs, meta, err := kv.List(prefix, &opts) if err != nil { return 0, nil, err @@ -89,7 +94,9 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} services, meta, err := catalog.Services(&opts) if err != nil { return 0, nil, err @@ -108,7 +115,9 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} nodes, meta, err := catalog.Nodes(&opts) if err != nil { return 0, nil, err @@ -144,7 +153,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} nodes, meta, err := health.Service(service, tag, passingOnly, &opts) if err != nil { return 0, nil, err @@ -177,7 +188,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} var checks []*consulapi.HealthCheck var meta *consulapi.QueryMeta var err error @@ -205,7 +218,9 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { event := p.client.Event() - opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex, Context: ctx} events, meta, err := event.List(name, &opts) if err != nil { return 0, nil, err diff --git a/watch/plan.go b/watch/plan.go index 2b92ca94a42b..c925425f6f92 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -107,6 +107,9 @@ func (p *Plan) Stop() { return } p.stop = true + if p.cancelFunc != nil { + p.cancelFunc() + } close(p.stopCh) } diff --git a/watch/watch.go b/watch/watch.go index 79a8bbcdec4d..e04614620ce4 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -5,6 +5,8 @@ import ( "io" "sync" + "context" + consulapi "github.com/hashicorp/consul/api" ) @@ -27,9 +29,10 @@ type Plan struct { lastIndex uint64 lastResult interface{} - stop bool - stopCh chan struct{} - stopLock sync.Mutex + stop bool + stopCh chan struct{} + stopLock sync.Mutex + cancelFunc context.CancelFunc } // WatcherFunc is used to watch for a diff From a6c3cf04036762af79bba3746bc2b0acadb49290 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 26 Jun 2017 16:11:14 -0500 Subject: [PATCH 2/3] Made helper method for query options with context --- watch/funcs.go | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/watch/funcs.go b/watch/funcs.go index 21bf593ae3a6..dfc12220a760 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -42,9 +42,7 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) { } fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) pair, meta, err := kv.Get(key, &opts) if err != nil { return 0, nil, err @@ -73,9 +71,7 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { } fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) pairs, meta, err := kv.List(prefix, &opts) if err != nil { return 0, nil, err @@ -94,9 +90,7 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) services, meta, err := catalog.Services(&opts) if err != nil { return 0, nil, err @@ -115,9 +109,7 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) nodes, meta, err := catalog.Nodes(&opts) if err != nil { return 0, nil, err @@ -153,9 +145,7 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) nodes, meta, err := health.Service(service, tag, passingOnly, &opts) if err != nil { return 0, nil, err @@ -188,9 +178,7 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, stale) var checks []*consulapi.HealthCheck var meta *consulapi.QueryMeta var err error @@ -218,9 +206,7 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { event := p.client.Event() - ctx, cancel := context.WithCancel(context.Background()) - p.cancelFunc = cancel - opts := consulapi.QueryOptions{WaitIndex: p.lastIndex, Context: ctx} + opts := makeQueryOptionsWithContext(p, false) events, meta, err := event.List(name, &opts) if err != nil { return 0, nil, err @@ -237,3 +223,10 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { } return fn, nil } + +func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions { + ctx, cancel := context.WithCancel(context.Background()) + p.cancelFunc = cancel + opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx} + return opts +} From d25be8671ad74a9af39a996c36d92eab6d0b1647 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 27 Jun 2017 16:22:57 -0500 Subject: [PATCH 3/3] Make sure to call cancel on the context --- watch/funcs.go | 7 +++++++ watch/watch.go | 3 +-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/watch/funcs.go b/watch/funcs.go index dfc12220a760..7383430096ba 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -43,6 +43,7 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() pair, meta, err := kv.Get(key, &opts) if err != nil { return 0, nil, err @@ -72,6 +73,7 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() pairs, meta, err := kv.List(prefix, &opts) if err != nil { return 0, nil, err @@ -91,6 +93,7 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() services, meta, err := catalog.Services(&opts) if err != nil { return 0, nil, err @@ -110,6 +113,7 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() nodes, meta, err := catalog.Nodes(&opts) if err != nil { return 0, nil, err @@ -146,6 +150,7 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() nodes, meta, err := health.Service(service, tag, passingOnly, &opts) if err != nil { return 0, nil, err @@ -179,6 +184,7 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) + defer p.cancelFunc() var checks []*consulapi.HealthCheck var meta *consulapi.QueryMeta var err error @@ -207,6 +213,7 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (uint64, interface{}, error) { event := p.client.Event() opts := makeQueryOptionsWithContext(p, false) + defer p.cancelFunc() events, meta, err := event.List(name, &opts) if err != nil { return 0, nil, err diff --git a/watch/watch.go b/watch/watch.go index e04614620ce4..bfc33628a57e 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -1,12 +1,11 @@ package watch import ( + "context" "fmt" "io" "sync" - "context" - consulapi "github.com/hashicorp/consul/api" )