Skip to content

Commit

Permalink
v3rpc, mvcc: Guarantee order of requested progress notifications
Browse files Browse the repository at this point in the history
Progress notifications requested using ProgressRequest were sent
directly using the ctrlStream, which means that they could race
against watch responses in the watchStream.

This would especially happen when the stream was not synced - e.g. if
you requested a progress notification on a freshly created unsynced
watcher, the notification would typically arrive indicating a revision
for which not all watch responses had been sent.

This changes the behaviour so that v3rpc always goes through the watch
stream, using a new RequestProgressAll function that closely matches
the behaviour of the v3rpc code - i.e.

1. Generate a message with WatchId -1, indicating the revision for
   *all* watchers in the stream

2. Guarantee that a response is (eventually) sent

The latter might require us to defer the response until all watchers
are synced, which is likely as it should be. Note that we do *not*
guarantee that the number of progress notifications matches the number
of requests, only that eventually at least one gets sent.

Signed-off-by: Peter Wortmann <peter.wortmann@skao.int>
  • Loading branch information
scpmw committed Feb 8, 2023
1 parent 9505d72 commit 1725aa9
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 8 deletions.
38 changes: 30 additions & 8 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ type serverWatchStream struct {
// records fragmented watch IDs
fragment map[mvcc.WatchID]bool

// indicates whether we have an outstanding global progress
// notification to send
deferredProgress bool

// closec indicates the stream is closed.
closec chan struct{}

Expand Down Expand Up @@ -174,6 +178,8 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
prevKV: make(map[mvcc.WatchID]bool),
fragment: make(map[mvcc.WatchID]bool),

deferredProgress: false,

closec: make(chan struct{}),
}

Expand Down Expand Up @@ -360,10 +366,16 @@ func (sws *serverWatchStream) recvLoop() error {
}
case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels
sws.mu.Lock()
// Ignore if deferred progress notification is already in progress
if !sws.deferredProgress {
// Request progress for all watchers,
// force generation of a response
if !sws.watchStream.RequestProgressAll() {
sws.deferredProgress = true
}
}
sws.mu.Unlock()
}
default:
// we probably should not shutdown the entire stream when
Expand Down Expand Up @@ -408,6 +420,7 @@ func (sws *serverWatchStream) sendLoop() {
// either return []*mvccpb.Event from the mvcc package
// or define protocol buffer with []mvccpb.Event.
evs := wresp.Events
progressNotify := len(evs) == 0
events := make([]*mvccpb.Event, len(evs))
sws.mu.RLock()
needPrevKV := sws.prevKV[wresp.WatchID]
Expand All @@ -432,11 +445,15 @@ func (sws *serverWatchStream) sendLoop() {
Canceled: canceled,
}

if _, okID := ids[wresp.WatchID]; !okID {
// buffer if id not yet announced
wrs := append(pending[wresp.WatchID], wr)
pending[wresp.WatchID] = wrs
continue
// Progress notifications can have WatchID -1
// if they announce on behalf of multiple watchers
if !progressNotify || wresp.WatchID != clientv3.InvalidWatchID {
if _, okID := ids[wresp.WatchID]; !okID {
// buffer if id not yet announced
wrs := append(pending[wresp.WatchID], wr)
pending[wresp.WatchID] = wrs
continue
}
}

mvcc.ReportEventReceived(len(evs))
Expand Down Expand Up @@ -467,6 +484,11 @@ func (sws *serverWatchStream) sendLoop() {
// elide next progress update if sent a key update
sws.progress[wresp.WatchID] = false
}
if sws.deferredProgress {
if sws.watchStream.RequestProgressAll() {
sws.deferredProgress = false
}
}
sws.mu.Unlock()

case c, ok := <-sws.ctrlStream:
Expand Down
21 changes: 21 additions & 0 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
type watchable interface {
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
progress(w *watcher)
progress_all(watchers map[WatchID]*watcher) bool
rev() int64
}

Expand Down Expand Up @@ -482,6 +483,26 @@ func (s *watchableStore) progress(w *watcher) {
}
}

func (s *watchableStore) progress_all(watchers map[WatchID]*watcher) bool {
s.mu.Lock()
defer s.mu.Unlock()

// Any watcher unsynced?
for w, _ := range watchers {
if _, ok := s.synced.watchers[watchers[w]]; !ok {
return false
}
}

// If all watchers are synchronised, send out progress notification
// on first watcher
for w, _ := range watchers {
watchers[w].send(WatchResponse{WatchID: -1, Revision: s.rev()})
return true
}
return true
}

type watcher struct {
// the watcher key
key []byte
Expand Down
13 changes: 13 additions & 0 deletions server/storage/mvcc/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ type WatchStream interface {
// of the watchers since the watcher is currently synced.
RequestProgress(id WatchID)

// RequestProgressAll requests a progress notification for the
// entire watcher group. The responses will be sent through
// the WatchRespone Chan of the first watcher of this stream,
// if any. The response will only be sent if all watchers are
// synced, in which case the function returns true
RequestProgressAll() bool

// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned.
Cancel(id WatchID) error
Expand Down Expand Up @@ -188,3 +195,9 @@ func (ws *watchStream) RequestProgress(id WatchID) {
}
ws.watchable.progress(w)
}

func (ws *watchStream) RequestProgressAll() bool {
ws.mu.Lock()
defer ws.mu.Unlock()
return ws.watchable.progress_all(ws.watchers)
}

0 comments on commit 1725aa9

Please sign in to comment.