-
Notifications
You must be signed in to change notification settings - Fork 9.8k
/
watcher.go
203 lines (169 loc) · 5.78 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"bytes"
"errors"
"sync"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream")
)
type WatchID int64
// FilterFunc returns true if the given event should be filtered out.
type FilterFunc func(e mvccpb.Event) bool
type WatchStream interface {
// Watch creates a watcher. The watcher watches the events happening or
// happened on the given key or range [key, end) from the given startRev.
//
// The whole event history can be watched unless compacted.
// If "startRev" <=0, watch observes events after currentRev.
//
// The returned "id" is the ID of this watcher. It appears as WatchID
// in events that are sent to the created watcher through stream channel.
// The watch ID is used when it's not equal to AutoWatchID. Otherwise,
// an auto-generated watch ID is returned.
Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)
// Chan returns a chan. All watch response will be sent to the returned chan.
Chan() <-chan WatchResponse
// RequestProgress requests the progress of the watcher with given ID. The response
// will only be sent if the watcher is currently synced.
// The responses will be sent through the WatchRespone Chan attached
// with this stream to ensure correct ordering.
// The responses contains no events. The revision in the response is the progress
// of the watchers since the watcher is currently synced.
RequestProgress(id WatchID)
// RequestProgressAll requests a progress notification for all
// watchers sharing the stream. If all watchers are synced, a
// progress notification with watch ID -1 will be sent to an
// arbitrary watcher of this stream, and the function returns
// true.
RequestProgressAll() bool
// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned.
Cancel(id WatchID) error
// Close closes Chan and release all related resources.
Close()
// Rev returns the current revision of the KV the stream watches on.
Rev() int64
}
type WatchResponse struct {
// WatchID is the WatchID of the watcher this response sent to.
WatchID WatchID
// Events contains all the events that needs to send.
Events []mvccpb.Event
// Revision is the revision of the KV when the watchResponse is created.
// For a normal response, the revision should be the same as the last
// modified revision inside Events. For a delayed response to a unsynced
// watcher, the revision is greater than the last modified revision
// inside Events.
Revision int64
// CompactRevision is set when the watcher is cancelled due to compaction.
CompactRevision int64
}
// watchStream contains a collection of watchers that share
// one streaming chan to send out watched events and other control events.
type watchStream struct {
watchable watchable
ch chan WatchResponse
mu sync.Mutex // guards fields below it
// nextID is the ID pre-allocated for next new watcher in this stream
nextID WatchID
closed bool
cancels map[WatchID]cancelFunc
watchers map[WatchID]*watcher
}
// Watch creates a new watcher in the stream and returns its WatchID.
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
// prevent wrong range where key >= end lexicographically
// watch request with 'WithFromKey' has empty-byte range end
if len(end) != 0 && bytes.Compare(key, end) != -1 {
return -1, ErrEmptyWatcherRange
}
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.closed {
return -1, ErrEmptyWatcherRange
}
if id == clientv3.AutoWatchID {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}
id = ws.nextID
ws.nextID++
} else if _, ok := ws.watchers[id]; ok {
return -1, ErrWatcherDuplicateID
}
w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
ws.cancels[id] = c
ws.watchers[id] = w
return id, nil
}
func (ws *watchStream) Chan() <-chan WatchResponse {
return ws.ch
}
func (ws *watchStream) Cancel(id WatchID) error {
ws.mu.Lock()
cancel, ok := ws.cancels[id]
w := ws.watchers[id]
ok = ok && !ws.closed
ws.mu.Unlock()
if !ok {
return ErrWatcherNotExist
}
cancel()
ws.mu.Lock()
// The watch isn't removed until cancel so that if Close() is called,
// it will wait for the cancel. Otherwise, Close() could close the
// watch channel while the store is still posting events.
if ww := ws.watchers[id]; ww == w {
delete(ws.cancels, id)
delete(ws.watchers, id)
}
ws.mu.Unlock()
return nil
}
func (ws *watchStream) Close() {
ws.mu.Lock()
defer ws.mu.Unlock()
for _, cancel := range ws.cancels {
cancel()
}
ws.closed = true
close(ws.ch)
watchStreamGauge.Dec()
}
func (ws *watchStream) Rev() int64 {
ws.mu.Lock()
defer ws.mu.Unlock()
return ws.watchable.rev()
}
func (ws *watchStream) RequestProgress(id WatchID) {
ws.mu.Lock()
w, ok := ws.watchers[id]
ws.mu.Unlock()
if !ok {
return
}
ws.watchable.progress(w)
}
func (ws *watchStream) RequestProgressAll() bool {
ws.mu.Lock()
defer ws.mu.Unlock()
return ws.watchable.progressAll(ws.watchers)
}