Skip to content

Commit

Permalink
Fix problematic watch cancellation due to context cancellation (#11170)
Browse files Browse the repository at this point in the history
* Fix problematic watch cancellation due to context cancellation

Right now we pass in the context when starting a Watch that is also
used for the request context. This means that the Watch ends up being
cancelled when the original request that started it as a side effects
ends up completing and cancels the context to clean up.

This is of course not as intended. Before the refactor in
#10906 this wasn't causing a
practical issue yet. We'd still have the expired context internally in
the watcher and it would be passed through with updating entries, but
there were no calls that ended up validating the context expiry,
avoiding any immediate issue.

This is bound to fail though at some point if something would be
added that does care about the context. What is needed is that the
watcher we start sets up it's own context based on the background
context since it is detached from the original request that might
trigger starting the watcher as a side effect.

Additionally, it means that the tracked context for an error isn't
really useful. It would often be an already cancelled context from a
mostly unrelated request which doesn't provide useful information. Even
more so, it would keep a reference to that context so it would never be
garbage collection potentially and would keep more request data alive
than necessary.

With the fix, the context is always from the background context with a
cancel on top for that watcher. This isn't very useful either. Also we
don't use this context tracking for any error messaging or reporting
anywhere, so I believe it's better to clean up this tracking.

By cleaning up that tracking, we also avoid the need to pass down the
context in entry updates and that is all cleaned up here as well.

Lastly, a failing test is introduced that verifies the original issue.
It retrieves serving keyspace information, cancels the original request
that triggered that and then validates the watcher is still running by
updating the value again within the timeout window. This failed before
this fix as the watcher would be cancelled and the cached old value was
returned before the TTL expired.

The main problem of this bug is not an issue of correctness, but of a
serious performance degration in the vtgate. Each second we'd restart
context setup if we ever had a failure on the path triggered by regular
queries and the system would not recover from this situation and heavily
query the topo server and make things very expensive.

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>

* Improve handling of retries and timer wait

The timer here can stay around if other events fire first, so we want to
use an explicit timer to stop it immediately when we know it completes.

Additionally, because of binding issues, watchCancel() would not rebind
if we start a new inner watcher. Therefore this adds back an outer
context that we can cancel in a defer to we know for sure we cancel
things properly when stopping the watcher.

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>

* Fix leak in etc2topo tests

We never closed the `cli` instance here so it would linger until the
process completes.

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>

* Remove unused context

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
  • Loading branch information
dbussink authored Sep 6, 2022
1 parent fa96cd8 commit f95f652
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 43 deletions.
2 changes: 0 additions & 2 deletions go/vt/srvtopo/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type queryEntry struct {
lastQueryTime time.Time
value any
lastError error
lastErrorCtx context.Context
}

type resilientQuery struct {
Expand Down Expand Up @@ -144,7 +143,6 @@ func (q *resilientQuery) getCurrentValue(ctx context.Context, wkey fmt.Stringer,
}

entry.lastError = err
entry.lastErrorCtx = newCtx
}()
}

Expand Down
1 change: 0 additions & 1 deletion go/vt/srvtopo/query_srvkeyspacenames.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (q *SrvKeyspaceNamesQuery) srvKeyspaceNamesCacheStatus() (result []*SrvKeys
ExpirationTime: entry.insertionTime.Add(q.rq.cacheTTL),
LastQueryTime: entry.lastQueryTime,
LastError: entry.lastError,
LastErrorCtx: entry.lastErrorCtx,
})
entry.mutex.Unlock()
}
Expand Down
33 changes: 23 additions & 10 deletions go/vt/srvtopo/resilient_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
// TestGetSrvKeyspace will test we properly return updated SrvKeyspace.
func TestGetSrvKeyspace(t *testing.T) {
ts, factory := memorytopo.NewServerAndFactory("test_cell")
*srvTopoCacheTTL = time.Duration(100 * time.Millisecond)
*srvTopoCacheRefresh = time.Duration(40 * time.Millisecond)
*srvTopoCacheTTL = time.Duration(200 * time.Millisecond)
*srvTopoCacheRefresh = time.Duration(80 * time.Millisecond)
defer func() {
*srvTopoCacheTTL = 1 * time.Second
*srvTopoCacheRefresh = 1 * time.Second
Expand All @@ -70,9 +70,11 @@ func TestGetSrvKeyspace(t *testing.T) {

// wait until we get the right value
var got *topodatapb.SrvKeyspace
expiry := time.Now().Add(5 * time.Second)
expiry := time.Now().Add(*srvTopoCacheRefresh - 20*time.Millisecond)
for {
got, err = rs.GetSrvKeyspace(context.Background(), "test_cell", "test_ks")
ctx, cancel := context.WithCancel(context.Background())
got, err = rs.GetSrvKeyspace(ctx, "test_cell", "test_ks")
cancel()

if err != nil {
t.Fatalf("GetSrvKeyspace got unexpected error: %v", err)
Expand All @@ -86,6 +88,23 @@ func TestGetSrvKeyspace(t *testing.T) {
time.Sleep(2 * time.Millisecond)
}

// Update the value and check it again to verify that the watcher
// is still up and running
want = &topodatapb.SrvKeyspace{Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{{ServedType: topodatapb.TabletType_REPLICA}}}
err = ts.UpdateSrvKeyspace(context.Background(), "test_cell", "test_ks", want)
require.NoError(t, err, "UpdateSrvKeyspace(test_cell, test_ks, %s) failed", want)

// Wait a bit to give the watcher enough time to update the value.
time.Sleep(10 * time.Millisecond)
got, err = rs.GetSrvKeyspace(context.Background(), "test_cell", "test_ks")

if err != nil {
t.Fatalf("GetSrvKeyspace got unexpected error: %v", err)
}
if !proto.Equal(want, got) {
t.Fatalf("GetSrvKeyspace() = %+v, want %+v", got, want)
}

// make sure the HTML template works
funcs := map[string]any{}
for k, v := range status.StatusFuncs {
Expand Down Expand Up @@ -363,9 +382,6 @@ func TestSrvKeyspaceCachedError(t *testing.T) {
if err != entry.lastError {
t.Errorf("Error wasn't saved properly")
}
if ctx != entry.lastErrorCtx {
t.Errorf("Context wasn't saved properly")
}

time.Sleep(*srvTopoCacheTTL + 10*time.Millisecond)
// Ask again with a different context, should get an error and
Expand All @@ -379,9 +395,6 @@ func TestSrvKeyspaceCachedError(t *testing.T) {
if err2 != entry.lastError {
t.Errorf("Error wasn't saved properly")
}
if ctx != entry.lastErrorCtx {
t.Errorf("Context wasn't saved properly")
}
}

// TestGetSrvKeyspaceCreated will test we properly get the initial
Expand Down
1 change: 0 additions & 1 deletion go/vt/srvtopo/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ type SrvKeyspaceCacheStatus struct {
ExpirationTime time.Time
LastErrorTime time.Time
LastError error
LastErrorCtx context.Context
}

// StatusAsHTML returns an HTML version of our status.
Expand Down
19 changes: 8 additions & 11 deletions go/vt/srvtopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,13 @@ type watchEntry struct {
lastError error

lastValueTime time.Time
lastErrorCtx context.Context
lastErrorTime time.Time

listeners []func(any, error) bool
}

type resilientWatcher struct {
watcher func(ctx context.Context, entry *watchEntry)
watcher func(entry *watchEntry)

counts *stats.CountersWithSingleLabel
cacheRefreshInterval time.Duration
Expand Down Expand Up @@ -101,7 +100,7 @@ func (entry *watchEntry) addListener(ctx context.Context, callback func(any, err
callback(v, err)
}

func (entry *watchEntry) ensureWatchingLocked(ctx context.Context) {
func (entry *watchEntry) ensureWatchingLocked() {
switch entry.watchState {
case watchStateRunning, watchStateStarting:
case watchStateIdle:
Expand All @@ -110,7 +109,7 @@ func (entry *watchEntry) ensureWatchingLocked(ctx context.Context) {
if shouldRefresh {
entry.watchState = watchStateStarting
entry.watchStartingChan = make(chan struct{})
go entry.rw.watcher(ctx, entry)
go entry.rw.watcher(entry)
}
}
}
Expand All @@ -122,7 +121,7 @@ func (entry *watchEntry) currentValueLocked(ctx context.Context) (any, error) {
return entry.value, entry.lastError
}

entry.ensureWatchingLocked(ctx)
entry.ensureWatchingLocked()

cacheValid := entry.value != nil && time.Since(entry.lastValueTime) < entry.rw.cacheTTL
if cacheValid {
Expand All @@ -147,12 +146,12 @@ func (entry *watchEntry) currentValueLocked(ctx context.Context) (any, error) {
return nil, entry.lastError
}

func (entry *watchEntry) update(ctx context.Context, value any, err error, init bool) {
func (entry *watchEntry) update(value any, err error, init bool) {
entry.mutex.Lock()
defer entry.mutex.Unlock()

if err != nil {
entry.onErrorLocked(ctx, err, init)
entry.onErrorLocked(err, init)
} else {
entry.onValueLocked(value)
}
Expand All @@ -177,14 +176,12 @@ func (entry *watchEntry) onValueLocked(value any) {
entry.lastValueTime = time.Now()

entry.lastError = nil
entry.lastErrorCtx = nil
entry.lastErrorTime = time.Time{}
}

func (entry *watchEntry) onErrorLocked(callerCtx context.Context, err error, init bool) {
func (entry *watchEntry) onErrorLocked(err error, init bool) {
entry.rw.counts.Add(errorCategory, 1)

entry.lastErrorCtx = callerCtx
entry.lastErrorTime = time.Now()

// if the node disappears, delete the cached value
Expand Down Expand Up @@ -225,7 +222,7 @@ func (entry *watchEntry) onErrorLocked(callerCtx context.Context, err error, ini
time.Sleep(entry.rw.cacheRefreshInterval)

entry.mutex.Lock()
entry.ensureWatchingLocked(context.Background())
entry.ensureWatchingLocked()
entry.mutex.Unlock()
}()
}
Expand Down
15 changes: 7 additions & 8 deletions go/vt/srvtopo/watch_srvkeyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,24 @@ func (k *srvKeyspaceKey) String() string {
}

func NewSrvKeyspaceWatcher(topoServer *topo.Server, counts *stats.CountersWithSingleLabel, cacheRefresh, cacheTTL time.Duration) *SrvKeyspaceWatcher {
watch := func(ctx context.Context, entry *watchEntry) {
watch := func(entry *watchEntry) {
key := entry.key.(*srvKeyspaceKey)
watchCtx, watchCancel := context.WithCancel(ctx)
defer watchCancel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

current, changes, err := topoServer.WatchSrvKeyspace(watchCtx, key.cell, key.keyspace)
current, changes, err := topoServer.WatchSrvKeyspace(ctx, key.cell, key.keyspace)
if err != nil {
entry.update(ctx, nil, err, true)
entry.update(nil, err, true)
return
}

entry.update(ctx, current.Value, current.Err, true)
entry.update(current.Value, current.Err, true)
if current.Err != nil {
return
}

for c := range changes {
entry.update(ctx, c.Value, c.Err, false)
entry.update(c.Value, c.Err, false)
if c.Err != nil {
return
}
Expand Down Expand Up @@ -109,7 +109,6 @@ func (w *SrvKeyspaceWatcher) srvKeyspaceCacheStatus() (result []*SrvKeyspaceCach
ExpirationTime: expirationTime,
LastErrorTime: entry.lastErrorTime,
LastError: entry.lastError,
LastErrorCtx: entry.lastErrorCtx,
})
entry.mutex.Unlock()
}
Expand Down
10 changes: 5 additions & 5 deletions go/vt/srvtopo/watch_srvvschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,25 @@ func (k cellName) String() string {
}

func NewSrvVSchemaWatcher(topoServer *topo.Server, counts *stats.CountersWithSingleLabel, cacheRefresh, cacheTTL time.Duration) *SrvVSchemaWatcher {
watch := func(ctx context.Context, entry *watchEntry) {
watch := func(entry *watchEntry) {
key := entry.key.(cellName)
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

current, changes, err := topoServer.WatchSrvVSchema(ctx, key.String())
if err != nil {
entry.update(ctx, nil, err, true)
entry.update(nil, err, true)
return
}

entry.update(ctx, current.Value, current.Err, true)
entry.update(current.Value, current.Err, true)
if current.Err != nil {
return
}

defer cancel()
for c := range changes {
entry.update(ctx, c.Value, c.Err, false)
entry.update(c.Value, c.Err, false)
if c.Err != nil {
return
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/topo/etcd2topo/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func startEtcd(t *testing.T) string {
if err != nil {
t.Fatalf("newCellClient(%v) failed: %v", clientAddr, err)
}
defer cli.Close()

// Wait until we can list "/", or timeout.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
22 changes: 17 additions & 5 deletions go/vt/topo/etcd2topo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,27 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
Version: EtcdVersion(initial.Kvs[0].ModRevision),
}

// Create an outer context that will be canceled on return and will cancel all inner watches.
outerCtx, outerCancel := context.WithCancel(ctx)

// Create a context, will be used to cancel the watch on retry.
watchCtx, watchCancel := context.WithCancel(ctx)
watchCtx, watchCancel := context.WithCancel(outerCtx)

// Create the Watcher. We start watching from the response we
// got, not from the file original version, as the server may
// not have that much history.
watcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(initial.Header.Revision))
if watcher == nil {
watchCancel()
outerCancel()
return nil, nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "Watch failed")
}

// Create the notifications channel, send updates to it.
notifications := make(chan *topo.WatchData, 10)
go func() {
defer close(notifications)
defer watchCancel()
defer outerCancel()

var currVersion = initial.Header.Revision
var watchRetries int
Expand All @@ -87,11 +91,15 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
case wresp, ok := <-watcher:
if !ok {
if watchRetries > 10 {
t := time.NewTimer(time.Duration(watchRetries) * time.Second)
select {
case <-time.After(time.Duration(watchRetries) * time.Second):
case <-t.C:
t.Stop()
case <-s.running:
t.Stop()
continue
case <-watchCtx.Done():
t.Stop()
continue
}
}
Expand Down Expand Up @@ -170,23 +178,27 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa
initialwd = append(initialwd, &wd)
}

// Create an outer context that will be canceled on return and will cancel all inner watches.
outerCtx, outerCancel := context.WithCancel(ctx)

// Create a context, will be used to cancel the watch on retry.
watchCtx, watchCancel := context.WithCancel(ctx)
watchCtx, watchCancel := context.WithCancel(outerCtx)

// Create the Watcher. We start watching from the response we
// got, not from the file original version, as the server may
// not have that much history.
watcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(initial.Header.Revision), clientv3.WithPrefix())
if watcher == nil {
watchCancel()
outerCancel()
return nil, nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "Watch failed")
}

// Create the notifications channel, send updates to it.
notifications := make(chan *topo.WatchDataRecursive, 10)
go func() {
defer close(notifications)
defer watchCancel()
defer outerCancel()

var currVersion = initial.Header.Revision
var watchRetries int
Expand Down

0 comments on commit f95f652

Please sign in to comment.