Skip to content

Commit

Permalink
Fix socket file handle leaks from old blocking queries upon consul re…
Browse files Browse the repository at this point in the history
…load. This fixes issue #3018
  • Loading branch information
Preetha Appan committed Jun 26, 2017
1 parent 4b51d00 commit 9eaf56b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 12 deletions.
13 changes: 11 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ type QueryOptions struct {
// relayed back to the sender through N other random nodes. Must be
// a value from 0 to 5 (inclusive).
RelayFactor uint8

// Context (optional) is passed through to the underlying http request layer, can be used
// to set timeouts and deadlines as well as to cancel requests
Context context.Context
}

// WriteOptions are used to parameterize a write
Expand Down Expand Up @@ -457,6 +461,7 @@ type request struct {
body io.Reader
header http.Header
obj interface{}
ctx context.Context
}

// setQueryOptions is used to annotate the request with
Expand Down Expand Up @@ -494,6 +499,7 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.RelayFactor != 0 {
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
}
r.ctx = q.Context
}

// durToMsec converts a duration to a millisecond specified string. If the
Expand Down Expand Up @@ -569,8 +575,11 @@ func (r *request) toHTTP() (*http.Request, error) {
if r.config.HttpAuth != nil {
req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
}

return req, nil
if r.ctx != nil {
return req.WithContext(r.ctx), nil
} else {
return req, nil
}
}

// newRequest is used to create a new request
Expand Down
29 changes: 22 additions & 7 deletions watch/funcs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package watch

import (
"context"
"fmt"

consulapi "github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -41,7 +42,9 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
}
fn := func(p *Plan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
pair, meta, err := kv.Get(key, &opts)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -70,7 +73,9 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
}
fn := func(p *Plan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
pairs, meta, err := kv.List(prefix, &opts)
if err != nil {
return 0, nil, err
Expand All @@ -89,7 +94,9 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {

fn := func(p *Plan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
services, meta, err := catalog.Services(&opts)
if err != nil {
return 0, nil, err
Expand All @@ -108,7 +115,9 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {

fn := func(p *Plan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
nodes, meta, err := catalog.Nodes(&opts)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -144,7 +153,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {

fn := func(p *Plan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -177,7 +188,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {

fn := func(p *Plan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta
var err error
Expand Down Expand Up @@ -205,7 +218,9 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {

fn := func(p *Plan) (uint64, interface{}, error) {
event := p.client.Event()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex, Context: ctx}
events, meta, err := event.List(name, &opts)
if err != nil {
return 0, nil, err
Expand Down
3 changes: 3 additions & 0 deletions watch/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ func (p *Plan) Stop() {
return
}
p.stop = true
if p.cancelFunc != nil {
p.cancelFunc()
}
close(p.stopCh)
}

Expand Down
9 changes: 6 additions & 3 deletions watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io"
"sync"

"context"

consulapi "github.com/hashicorp/consul/api"
)

Expand All @@ -27,9 +29,10 @@ type Plan struct {
lastIndex uint64
lastResult interface{}

stop bool
stopCh chan struct{}
stopLock sync.Mutex
stop bool
stopCh chan struct{}
stopLock sync.Mutex
cancelFunc context.CancelFunc
}

// WatcherFunc is used to watch for a diff
Expand Down

0 comments on commit 9eaf56b

Please sign in to comment.