diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index 9b28319806ba..06e1fa8eb72f 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -449,6 +449,7 @@ func (sws *serverWatchStream) sendLoop() { sws.mu.RUnlock() var serr error + // gofail: var beforeSendWatchResponse struct{} if !fragmented && !ok { serr = sws.gRPCStream.Send(wr) } else { diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index ae5c551fdfe6..ad17b2be7ace 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -39,6 +39,8 @@ var ( maxWatchersPerSync = 512 ) +func ChanBufLen() int { return chanBufLen } + type watchable interface { watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) progress(w *watcher) @@ -370,6 +372,11 @@ func (s *watchableStore) syncWatchers() int { victims := make(watcherBatch) wb := newWatcherBatch(wg, evs) for w := range wg.watchers { + if w.minRev < compactionRev { + // Skip the watcher that failed to send compacted watch response due to w.ch is full. + // Next retry of syncWatchers would try to resend the compacted watch response to w.ch + continue + } w.minRev = curRev + 1 eb, ok := wb[w] diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index 86e35697f326..11d397708cba 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/mvccpb" @@ -250,6 +251,66 @@ func TestWatchCompacted(t *testing.T) { } } +func TestWatchNoEventLossOnCompact(t *testing.T) { + oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync + + b, _ := betesting.NewDefaultTmpBackend(t) + lg := zaptest.NewLogger(t) + s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{}) + + defer func() { + cleanup(s, b) + chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync + }() + + chanBufLen, maxWatchersPerSync = 1, 4 + testKey, testValue := []byte("foo"), []byte("bar") + + maxRev := 10 + compactRev := int64(5) + for i := 0; i < maxRev; i++ { + s.Put(testKey, testValue, lease.NoLease) + } + _, err := s.Compact(traceutil.TODO(), compactRev) + if err != nil { + t.Fatalf("failed to compact kv (%v)", err) + } + + w := s.NewWatchStream() + defer w.Close() + + watchers := map[WatchID]int64{ + 0: 1, + 1: 1, // create unsyncd watchers with startRev < compactRev + 2: 6, // create unsyncd watchers with compactRev < startRev < currentRev + } + for id, startRev := range watchers { + w.Watch(id, testKey, nil, startRev) + } + // fill up w.Chan() with 1 buf via 2 compacted watch response + s.syncWatchers() + + for len(watchers) > 0 { + select { + case resp := <-w.Chan(): + if resp.CompactRevision != 0 { + require.Equal(t, resp.CompactRevision, int64(5)) + require.Contains(t, watchers, resp.WatchID) + delete(watchers, resp.WatchID) + continue + } + nextRev := watchers[resp.WatchID] + for _, ev := range resp.Events { + require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID) + nextRev++ + } + if nextRev == s.rev()+1 { + delete(watchers, resp.WatchID) + } + } + } +} + func TestWatchFutureRev(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index 40b07fc5f319..fe41a27c151d 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -17,6 +17,7 @@ package integration import ( "bytes" "context" + "errors" "fmt" "reflect" "sort" @@ -24,13 +25,17 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" + "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/tests/v3/framework/integration" + gofail "go.etcd.io/gofail/runtime" ) // TestV3WatchFromCurrentRevision tests Watch APIs from current revision. @@ -1512,3 +1517,56 @@ func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) { } require.True(t, gotProgressNotification, "Expected to get progress notification") } + +// TestV3NoEventsLostOnCompact verifies that slow watchers exit with compacted watch response +// if its next revision of events are compacted and no lost events sent to client. +func TestV3NoEventsLostOnCompact(t *testing.T) { + if integration.ThroughProxy { + t.Skip("grpc proxy currently does not support requesting progress notifications") + } + integration.BeforeTest(t) + if len(gofail.List()) == 0 { + t.Skip("please run 'make gofail-enable' before running the test") + } + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + client := clus.RandClient() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // sendLoop throughput is rate-limited to 1 event per second + require.NoError(t, gofail.Enable("beforeSendWatchResponse", `sleep("1s")`)) + wch := client.Watch(ctx, "foo") + + var rev int64 + writeCount := mvcc.ChanBufLen() * 11 / 10 + for i := 0; i < writeCount; i++ { + resp, err := client.Put(ctx, "foo", "bar") + require.NoError(t, err) + rev = resp.Header.Revision + } + _, err := client.Compact(ctx, rev) + require.NoError(t, err) + + time.Sleep(time.Second) + require.NoError(t, gofail.Disable("beforeSendWatchResponse")) + + eventCount := 0 + compacted := false + for resp := range wch { + err = resp.Err() + if err != nil { + if !errors.Is(err, rpctypes.ErrCompacted) { + t.Fatalf("want watch response err %v but got %v", rpctypes.ErrCompacted, err) + } + compacted = true + break + } + eventCount += len(resp.Events) + if eventCount == writeCount { + break + } + } + assert.Truef(t, compacted, "Expected stream to get compacted, instead we got %d events out of %d events", eventCount, writeCount) +} diff --git a/tests/robustness/makefile.mk b/tests/robustness/makefile.mk index 381db6301b99..dee968eb63ab 100644 --- a/tests/robustness/makefile.mk +++ b/tests/robustness/makefile.mk @@ -36,7 +36,7 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g .PHONY: gofail-enable gofail-enable: install-gofail - gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ + gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/ cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION} cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION} cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION} @@ -44,7 +44,7 @@ gofail-enable: install-gofail .PHONY: gofail-disable gofail-disable: install-gofail - gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ + gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/ cd ./server && go mod tidy cd ./etcdutl && go mod tidy cd ./etcdctl && go mod tidy