diff --git a/consul/notify.go b/consul/notify.go index 88da09242cf7..2fe5acbe2b79 100644 --- a/consul/notify.go +++ b/consul/notify.go @@ -10,7 +10,7 @@ import ( // notify list. type NotifyGroup struct { l sync.Mutex - notify []chan struct{} + notify map[chan struct{}]struct{} } // Notify will do a non-blocking send to all waiting channels, and @@ -18,20 +18,33 @@ type NotifyGroup struct { func (n *NotifyGroup) Notify() { n.l.Lock() defer n.l.Unlock() - for _, ch := range n.notify { + for ch, _ := range n.notify { select { case ch <- struct{}{}: default: } } - n.notify = n.notify[:0] + n.notify = nil } // Wait adds a channel to the notify group func (n *NotifyGroup) Wait(ch chan struct{}) { n.l.Lock() defer n.l.Unlock() - n.notify = append(n.notify, ch) + if n.notify == nil { + n.notify = make(map[chan struct{}]struct{}) + } + n.notify[ch] = struct{}{} +} + +// Clear removes a channel from the notify group +func (n *NotifyGroup) Clear(ch chan struct{}) { + n.l.Lock() + defer n.l.Unlock() + if n.notify == nil { + return + } + delete(n.notify, ch) } // WaitCh allocates a channel that is subscribed to notifications diff --git a/consul/notify_test.go b/consul/notify_test.go index 4c3be5590121..2133e9b3125c 100644 --- a/consul/notify_test.go +++ b/consul/notify_test.go @@ -54,3 +54,19 @@ func TestNotifyGroup(t *testing.T) { t.Fatalf("should not block") } } + +func TestNotifyGroup_Clear(t *testing.T) { + grp := &NotifyGroup{} + + ch1 := grp.WaitCh() + grp.Clear(ch1) + + grp.Notify() + + // Should not get anything + select { + case <-ch1: + t.Fatalf("should not get message") + default: + } +} diff --git a/consul/rpc.go b/consul/rpc.go index 6359d1cf6fed..ae3fd6781227 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -30,6 +30,15 @@ const ( // maxQueryTime is used to bound the limit of a blocking query maxQueryTime = 600 * time.Second + // defaultQueryTime is the amount of time we block waiting for a change + // if no time is specified. Previously we would wait the maxQueryTime. + defaultQueryTime = 300 * time.Second + + // jitterFraction is a the limit to the amount of jitter we apply + // to a user specified MaxQueryTime. We divide the specified time by + // the fraction. So 16 == 6.25% limit of jitter + jitterFraction = 16 + // Warn if the Raft command is larger than this. // If it's over 1MB something is probably being abusive. raftWarnSize = 1024 * 1024 @@ -314,8 +323,9 @@ type blockingRPCOptions struct { // blockingRPCOpt is the replacement for blockingRPC as it allows // for more parameterization easily. It should be prefered over blockingRPC. func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { - var timeout <-chan time.Time + var timeout *time.Timer var notifyCh chan struct{} + var state *StateStore // Fast path non-blocking if opts.queryOpts.MinQueryIndex == 0 { @@ -327,30 +337,38 @@ func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { panic("no tables to block on") } - // Restrict the max query time + // Restrict the max query time, and ensure there is always one if opts.queryOpts.MaxQueryTime > maxQueryTime { opts.queryOpts.MaxQueryTime = maxQueryTime + } else if opts.queryOpts.MaxQueryTime <= 0 { + opts.queryOpts.MaxQueryTime = defaultQueryTime } - // Ensure a time limit is set if we have an index - if opts.queryOpts.MinQueryIndex > 0 && opts.queryOpts.MaxQueryTime == 0 { - opts.queryOpts.MaxQueryTime = maxQueryTime - } + // Apply a small amount of jitter to the request + opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction) // Setup a query timeout - if opts.queryOpts.MaxQueryTime > 0 { - timeout = time.After(opts.queryOpts.MaxQueryTime) - } + timeout = time.NewTimer(opts.queryOpts.MaxQueryTime) + + // Setup the notify channel + notifyCh = make(chan struct{}, 1) - // Setup a notification channel for changes -SETUP_NOTIFY: - if opts.queryOpts.MinQueryIndex > 0 { - notifyCh = make(chan struct{}, 1) - state := s.fsm.State() - state.Watch(opts.tables, notifyCh) + // Ensure we tear down any watchers on return + state = s.fsm.State() + defer func() { + timeout.Stop() + state.StopWatch(opts.tables, notifyCh) if opts.kvWatch { - state.WatchKV(opts.kvPrefix, notifyCh) + state.StopWatchKV(opts.kvPrefix, notifyCh) } + }() + +REGISTER_NOTIFY: + // Register the notification channel. This may be done + // multiple times if we have not reached the target wait index. + state.Watch(opts.tables, notifyCh) + if opts.kvWatch { + state.WatchKV(opts.kvPrefix, notifyCh) } RUN_QUERY: @@ -372,8 +390,8 @@ RUN_QUERY: if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex { select { case <-notifyCh: - goto SETUP_NOTIFY - case <-timeout: + goto REGISTER_NOTIFY + case <-timeout.C: } } return err diff --git a/consul/state_store.go b/consul/state_store.go index 074261a1b9bc..f008c78ea9b5 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -421,6 +421,13 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) { } } +// StopWatch is used to unsubscribe a channel to a set of MDBTables +func (s *StateStore) StopWatch(tables MDBTables, notify chan struct{}) { + for _, t := range tables { + s.watch[t].Clear(notify) + } +} + // WatchKV is used to subscribe a channel to changes in KV data func (s *StateStore) WatchKV(prefix string, notify chan struct{}) { s.kvWatchLock.Lock() @@ -439,6 +446,18 @@ func (s *StateStore) WatchKV(prefix string, notify chan struct{}) { s.kvWatch.Insert(prefix, grp) } +// StopWatchKV is used to unsubscribe a channel from changes in KV data +func (s *StateStore) StopWatchKV(prefix string, notify chan struct{}) { + s.kvWatchLock.Lock() + defer s.kvWatchLock.Unlock() + + // Check for an existing notify group + if raw, ok := s.kvWatch.Get(prefix); ok { + grp := raw.(*NotifyGroup) + grp.Clear(notify) + } +} + // notifyKV is used to notify any KV listeners of a change // on a prefix func (s *StateStore) notifyKV(path string, prefix bool) { diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 1c605c8756a4..97592c11d5ea 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -127,6 +127,37 @@ func TestGetNodes(t *testing.T) { } } +func TestGetNodes_Watch_StopWatch(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + notify1 := make(chan struct{}, 1) + notify2 := make(chan struct{}, 1) + + store.Watch(store.QueryTables("Nodes"), notify1) + store.Watch(store.QueryTables("Nodes"), notify2) + store.StopWatch(store.QueryTables("Nodes"), notify2) + + if err := store.EnsureNode(40, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + + select { + case <-notify1: + default: + t.Fatalf("should be notified") + } + + select { + case <-notify2: + t.Fatalf("should not be notified") + default: + } +} + func BenchmarkGetNodes(b *testing.B) { store, err := testStateStore() if err != nil { @@ -1429,6 +1460,32 @@ func TestKVSSet_Watch(t *testing.T) { } } +func TestKVSSet_Watch_Stop(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + notify1 := make(chan struct{}, 1) + + store.WatchKV("", notify1) + store.StopWatchKV("", notify1) + + // Create the entry + d := &structs.DirEntry{Key: "foo/baz", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Check that we've not fired notify1 + select { + case <-notify1: + t.Fatalf("should not notify ") + default: + } +} + func TestKVSSet_Get(t *testing.T) { store, err := testStateStore() if err != nil { diff --git a/consul/util.go b/consul/util.go index d0ce256fe68c..6cc7858995a6 100644 --- a/consul/util.go +++ b/consul/util.go @@ -4,12 +4,14 @@ import ( crand "crypto/rand" "encoding/binary" "fmt" + "math/rand" "net" "os" "path/filepath" "runtime" "strconv" "strings" + "time" "github.com/hashicorp/serf/serf" ) @@ -222,3 +224,8 @@ func generateUUID() string { buf[8:10], buf[10:16]) } + +// Returns a random stagger interval between 0 and the duration +func randomStagger(intv time.Duration) time.Duration { + return time.Duration(uint64(rand.Int63()) % uint64(intv)) +} diff --git a/consul/util_test.go b/consul/util_test.go index 24d9f7299bf9..5803e95a9bc8 100644 --- a/consul/util_test.go +++ b/consul/util_test.go @@ -4,6 +4,7 @@ import ( "net" "regexp" "testing" + "time" "github.com/hashicorp/serf/serf" ) @@ -124,3 +125,13 @@ func TestGenerateUUID(t *testing.T) { } } } + +func TestRandomStagger(t *testing.T) { + intv := time.Minute + for i := 0; i < 10; i++ { + stagger := randomStagger(intv) + if stagger < 0 || stagger >= intv { + t.Fatalf("Bad: %v", stagger) + } + } +} diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index 7ada4d670fa1..9e1ae0b9be50 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -39,9 +39,9 @@ query string parameter to the value of `X-Consul-Index`, indicating that the cli to wait for any changes subsequent to that index. In addition to `index`, endpoints that support blocking will also honor a `wait` -parameter specifying a maximum duration for the blocking request. If not set, it will -default to 10 minutes. This value can be specified in the form of "10s" or "5m" (i.e., -10 seconds or 5 minutes, respectively). +parameter specifying a maximum duration for the blocking request. This is limited to +10 minutes. If not set, the wait time defaults to 5 minutes. This value can be specified +in the form of "10s" or "5m" (i.e., 10 seconds or 5 minutes, respectively). A critical note is that the return of a blocking request is **no guarantee** of a change. It is possible that the timeout was reached or that there was an idempotent write that does