Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: Send requested progress notifications through watchStream (fix #15220) #15237

Merged
merged 3 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ type serverWatchStream struct {
// records fragmented watch IDs
fragment map[mvcc.WatchID]bool

// indicates whether we have an outstanding global progress
// notification to send
deferredProgress bool

// closec indicates the stream is closed.
closec chan struct{}

Expand Down Expand Up @@ -174,6 +178,8 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
prevKV: make(map[mvcc.WatchID]bool),
fragment: make(map[mvcc.WatchID]bool),

deferredProgress: false,

closec: make(chan struct{}),
}

Expand Down Expand Up @@ -360,10 +366,16 @@ func (sws *serverWatchStream) recvLoop() error {
}
case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels
sws.mu.Lock()
// Ignore if deferred progress notification is already in progress
if !sws.deferredProgress {
// Request progress for all watchers,
// force generation of a response
if !sws.watchStream.RequestProgressAll() {
sws.deferredProgress = true
}
}
sws.mu.Unlock()
}
default:
// we probably should not shutdown the entire stream when
Expand Down Expand Up @@ -432,11 +444,15 @@ func (sws *serverWatchStream) sendLoop() {
Canceled: canceled,
}

if _, okID := ids[wresp.WatchID]; !okID {
// buffer if id not yet announced
wrs := append(pending[wresp.WatchID], wr)
pending[wresp.WatchID] = wrs
continue
// Progress notifications can have WatchID -1
// if they announce on behalf of multiple watchers
if wresp.WatchID != clientv3.InvalidWatchID {
if _, okID := ids[wresp.WatchID]; !okID {
// buffer if id not yet announced
wrs := append(pending[wresp.WatchID], wr)
pending[wresp.WatchID] = wrs
continue
}
}

mvcc.ReportEventReceived(len(evs))
Expand Down Expand Up @@ -467,6 +483,11 @@ func (sws *serverWatchStream) sendLoop() {
// elide next progress update if sent a key update
sws.progress[wresp.WatchID] = false
}
if sws.deferredProgress {
if sws.watchStream.RequestProgressAll() {
sws.deferredProgress = false
}
}
sws.mu.Unlock()

case c, ok := <-sws.ctrlStream:
Expand Down
30 changes: 26 additions & 4 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/backend"
Expand All @@ -41,6 +42,7 @@ var (
type watchable interface {
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
progress(w *watcher)
progressAll(watchers map[WatchID]*watcher) bool
rev() int64
}

Expand Down Expand Up @@ -475,14 +477,34 @@ func (s *watchableStore) addVictim(victim watcherBatch) {
func (s *watchableStore) rev() int64 { return s.store.Rev() }

func (s *watchableStore) progress(w *watcher) {
s.progressIfSync(map[WatchID]*watcher{w.id: w}, w.id)
}

func (s *watchableStore) progressAll(watchers map[WatchID]*watcher) bool {
return s.progressIfSync(watchers, clientv3.InvalidWatchID)
}

func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseWatchID WatchID) bool {
s.mu.RLock()
scpmw marked this conversation as resolved.
Show resolved Hide resolved
defer s.mu.RUnlock()

if _, ok := s.synced.watchers[w]; ok {
w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
// If the ch is full, this watcher is receiving events.
// We do not need to send progress at all.
// Any watcher unsynced?
for _, w := range watchers {
if _, ok := s.synced.watchers[w]; !ok {
return false
}
}

// If all watchers are synchronised, send out progress
// notification on first watcher. Note that all watchers
// should have the same underlying stream, and the progress
// notification will be broadcasted client-side if required
// (see dispatchEvent in client/v3/watch.go)
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
for _, w := range watchers {
w.send(WatchResponse{WatchID: responseWatchID, Revision: s.rev()})
return true
}
return true
}

type watcher struct {
Expand Down
13 changes: 13 additions & 0 deletions server/storage/mvcc/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ type WatchStream interface {
// of the watchers since the watcher is currently synced.
RequestProgress(id WatchID)

// RequestProgressAll requests a progress notification for all
// watchers sharing the stream. If all watchers are synced, a
// progress notification with watch ID -1 will be sent to an
// arbitrary watcher of this stream, and the function returns
// true.
RequestProgressAll() bool

// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned.
Cancel(id WatchID) error
Expand Down Expand Up @@ -188,3 +195,9 @@ func (ws *watchStream) RequestProgress(id WatchID) {
}
ws.watchable.progress(w)
}

func (ws *watchStream) RequestProgressAll() bool {
ws.mu.Lock()
defer ws.mu.Unlock()
return ws.watchable.progressAll(ws.watchers)
}
50 changes: 50 additions & 0 deletions server/storage/mvcc/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/lease"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
)
Expand Down Expand Up @@ -342,6 +343,55 @@ func TestWatcherRequestProgress(t *testing.T) {
}
}

func TestWatcherRequestProgressAll(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)

// manually create watchableStore instead of newWatchableStore
// because newWatchableStore automatically calls syncWatchers
// method to sync watchers in unsynced map. We want to keep watchers
// in unsynced to test if syncWatchers works as expected.
s := &watchableStore{
store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
}

defer cleanup(s, b)

testKey := []byte("foo")
notTestKey := []byte("bad")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)

// Create watch stream with watcher. We will not actually get
// any notifications on it specifically, but there needs to be
// at least one Watch for progress notifications to get
// generated.
w := s.NewWatchStream()
w.Watch(0, notTestKey, nil, 1)

w.RequestProgressAll()
select {
case resp := <-w.Chan():
t.Fatalf("unexpected %+v", resp)
default:
}

s.syncWatchers()

w.RequestProgressAll()
wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}
select {
case resp := <-w.Chan():
if !reflect.DeepEqual(resp, wrs) {
t.Fatalf("got %+v, expect %+v", resp, wrs)
}
case <-time.After(time.Second):
t.Fatal("failed to receive progress")
}
}

func TestWatcherWatchWithFilter(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
Expand Down
68 changes: 68 additions & 0 deletions tests/integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1397,3 +1397,71 @@ func TestV3WatchCloseCancelRace(t *testing.T) {
t.Fatalf("expected %s watch, got %s", expected, minWatches)
}
}

// TestV3WatchProgressWaitsForSync checks that progress notifications
// don't get sent until the watcher is synchronised
func TestV3WatchProgressWaitsForSync(t *testing.T) {

// Disable for gRPC proxy, as it does not support requesting
// progress notifications
if integration.ThroughProxy {
t.Skip("grpc proxy currently does not support requesting progress notifications")
}

integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

client := clus.RandClient()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Write a couple values into key to make sure there's a
// non-trivial amount of history.
count := 1001
t.Logf("Writing key 'foo' %d times", count)
for i := 0; i < count; i++ {
_, err := client.Put(ctx, "foo", fmt.Sprintf("bar%d", i))
require.NoError(t, err)
}

// Create watch channel starting at revision 1 (i.e. it starts
// unsynced because of the update above)
wch := client.Watch(ctx, "foo", clientv3.WithRev(1))

// Immediately request a progress notification. As the client
// is unsynchronised, the server will have to defer the
// notification internally.
err := client.RequestProgress(ctx)
require.NoError(t, err)

// Verify that we get the watch responses first. Note that
// events might be spread across multiple packets.
var event_count = 0
for event_count < count {
wr := <-wch
if wr.Err() != nil {
t.Fatal(fmt.Errorf("watch error: %w", wr.Err()))
}
if wr.IsProgressNotify() {
t.Fatal("Progress notification from unsynced client!")
}
if wr.Header.Revision != int64(count+1) {
t.Fatal("Incomplete watch response!")
}
event_count += len(wr.Events)
}

// ... followed by the requested progress notification
wr2 := <-wch
if wr2.Err() != nil {
t.Fatal(fmt.Errorf("watch error: %w", wr2.Err()))
}
if !wr2.IsProgressNotify() {
t.Fatal("Did not receive progress notification!")
}
if wr2.Header.Revision != int64(count+1) {
t.Fatal("Wrong revision in progress notification!")
}
}
scpmw marked this conversation as resolved.
Show resolved Hide resolved
46 changes: 36 additions & 10 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ const (

var (
LowTraffic = trafficConfig{
name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
requestProgress: false,
traffic: traffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
Expand All @@ -56,10 +57,11 @@ var (
},
}
HighTraffic = trafficConfig{
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
requestProgress: false,
traffic: traffic{
keyCount: 10,
largePutSize: 32769,
Expand All @@ -71,6 +73,22 @@ var (
},
},
}
ReqProgTraffic = trafficConfig{
serathius marked this conversation as resolved.
Show resolved Hide resolved
name: "RequestProgressTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
requestProgress: true,
traffic: traffic{
keyCount: 10,
largePutSize: 8196,
leaseTTL: DefaultLeaseTTL,
writes: []requestChance{
{operation: Put, chance: 95},
{operation: LargePut, chance: 5},
},
},
}
defaultTraffic = LowTraffic
trafficList = []trafficConfig{
LowTraffic, HighTraffic,
Expand Down Expand Up @@ -141,6 +159,14 @@ func TestRobustness(t *testing.T) {
e2e.WithSnapshotCount(100),
),
})
scenarios = append(scenarios, scenario{
name: "Issue15220",
failpoint: RandomOneNodeClusterFailpoint,
traffic: &ReqProgTraffic,
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
),
})
snapshotOptions := []e2e.EPClusterOption{
e2e.WithGoFailEnabled(true),
e2e.WithSnapshotCount(100),
Expand Down Expand Up @@ -191,7 +217,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
forcestopCluster(r.clus)

watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
validateWatchResponses(t, r.responses, watchProgressNotifyEnabled)
validateWatchResponses(t, r.responses, traffic.requestProgress || watchProgressNotifyEnabled)

r.events = watchEvents(r.responses)
validateEventsMatch(t, r.events)
Expand All @@ -218,7 +244,7 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
return nil
})
g.Go(func() error {
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan)
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, traffic.requestProgress)
return nil
})
g.Wait()
Expand Down
11 changes: 6 additions & 5 deletions tests/robustness/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,12 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
}

type trafficConfig struct {
name string
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
name string
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
requestProgress bool // Request progress notifications while watching this traffic
}

type Traffic interface {
Expand Down
Loading