From b239df940880143adb37652cdb4bfc75a578542a Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Mon, 1 Aug 2022 10:56:14 +0200 Subject: [PATCH 1/7] Add List() implementation for memory topo This adds a List() implementation for the memory topo and adds a bunch of tests for this function as well. These tests are only run when the implementation indicates proper support for List. Signed-off-by: Dirkjan Bussink --- go/vt/topo/memorytopo/file.go | 60 +++++++++++++++++++++++++++++++++-- go/vt/topo/test/file.go | 48 ++++++++++++++++++++++++++-- go/vt/topo/test/testing.go | 4 +++ 3 files changed, 107 insertions(+), 5 deletions(-) diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index a8b4bf19a0c..6a6f2e5f79b 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -17,10 +17,10 @@ limitations under the License. package memorytopo import ( + "context" "fmt" "path" - - "context" + "strings" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -158,7 +158,61 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, // List is part of the topo.Conn interface. func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) { - return nil, topo.NewError(topo.NoImplementation, "List not supported in memory topo") + if err := c.dial(ctx); err != nil { + return nil, err + } + + c.factory.mu.Lock() + defer c.factory.mu.Unlock() + + if c.factory.err != nil { + return nil, c.factory.err + } + + dir, file := path.Split(filePathPrefix) + // Get the node to list. + n := c.factory.nodeByPath(c.cell, dir) + if n == nil { + return []topo.KVInfo{}, topo.NewError(topo.NoNode, filePathPrefix) + } + + var result []topo.KVInfo + for name, child := range n.children { + if !strings.HasPrefix(name, file) { + continue + } + if child.isDirectory() { + result = append(result, gatherChildren(child, path.Join(dir, name))...) + } else { + result = append(result, topo.KVInfo{ + Key: []byte(path.Join(dir, name)), + Value: child.contents, + Version: NodeVersion(child.version), + }) + } + } + + if len(result) == 0 { + return []topo.KVInfo{}, topo.NewError(topo.NoNode, filePathPrefix) + } + + return result, nil +} + +func gatherChildren(n *node, dirPath string) []topo.KVInfo { + var result []topo.KVInfo + for name, child := range n.children { + if child.isDirectory() { + result = append(result, gatherChildren(child, path.Join(dirPath, name))...) + } else { + result = append(result, topo.KVInfo{ + Key: []byte(path.Join(dirPath, name)), + Value: child.contents, + Version: NodeVersion(child.version), + }) + } + } + return result } // Delete is part of topo.Conn interface. diff --git a/go/vt/topo/test/file.go b/go/vt/topo/test/file.go index f32c3775c0a..01a72c64cfc 100644 --- a/go/vt/topo/test/file.go +++ b/go/vt/topo/test/file.go @@ -17,11 +17,11 @@ limitations under the License. package test import ( + "context" "reflect" + "strings" "testing" - "context" - "vitess.io/vitess/go/vt/topo" ) @@ -201,3 +201,47 @@ func checkFileInCell(t *testing.T, conn topo.Conn, hasCells bool) { expected = expected[:len(expected)-1] checkListDir(ctx, t, conn, "/", expected) } + +// checkList tests the file part of the Conn API. +func checkList(t *testing.T, ts *topo.Server) { + ctx := context.Background() + // global cell + conn, err := ts.ConnForCell(ctx, LocalCellName) + if err != nil { + t.Fatalf("ConnForCell(LocalCellName) failed: %v", err) + } + _, err = conn.List(ctx, "/") + if topo.IsErrType(err, topo.NoImplementation) { + // If this is not supported, skip the test + t.Skipf("%T does not support List()", conn) + return + } + if err != nil { + t.Fatalf("List(test) failed: %v", err) + } + + _, err = conn.Create(ctx, "/toplevel/nested/myfile", []byte{'a'}) + if err != nil { + t.Fatalf("Create('/myfile') failed: %v", err) + } + + for _, path := range []string{"/top", "/toplevel", "/toplevel/", "/toplevel/nes", "/toplevel/nested/myfile"} { + entries, err := conn.List(ctx, path) + if err != nil { + t.Fatalf("List failed(path: %q): %v", path, err) + } + + if len(entries) != 1 { + t.Fatalf("List(test) returned incorrect number of elements for path %q. Expected 1, got %d: %v", path, len(entries), entries) + } + + if !strings.HasSuffix(string(entries[0].Key), "/toplevel/nested/myfile") { + t.Fatalf("found entry doesn't end with /toplevel/nested/myfile for path %q: %s", path, string(entries[0].Key)) + } + + if string(entries[0].Value) != "a" { + t.Fatalf("found entry doesn't have value \"a\" for path %q: %s", path, string(entries[0].Value)) + } + } + +} diff --git a/go/vt/topo/test/testing.go b/go/vt/topo/test/testing.go index 66faf77bcbf..c7b448bf2f4 100644 --- a/go/vt/topo/test/testing.go +++ b/go/vt/topo/test/testing.go @@ -111,5 +111,9 @@ func TopoServerTestSuite(t *testing.T, factory func() *topo.Server) { checkWatch(t, ts) t.Log("=== checkWatchInterrupt") checkWatchInterrupt(t, ts) + + t.Log("=== checkList") + checkList(t, ts) + ts.Close() } From e3551aefe236f4ecb1f60788e07b0c65c779f041 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Mon, 1 Aug 2022 14:18:03 +0200 Subject: [PATCH 2/7] Refactor Watch() to use context properly A function that can spawn a goroutine should use the passed in context for that and not return a cancel function itself. That is considered an anti-pattern, so let's change this all to more idiomatic Go patterns. This also ensures we can now set a proper timeout on the initial data retrieval for a watch using the configured topo timeout. Signed-off-by: Dirkjan Bussink --- go/vt/srvtopo/watch_srvkeyspace.go | 9 ++++- go/vt/srvtopo/watch_srvvschema.go | 8 +++- go/vt/topo/conn.go | 19 ++++------ go/vt/topo/consultopo/watch.go | 27 +++++++------- go/vt/topo/etcd2topo/watch.go | 27 +++++++------- go/vt/topo/faketopo/faketopo.go | 12 +++--- go/vt/topo/helpers/tee.go | 2 +- go/vt/topo/k8stopo/watch.go | 26 +++++-------- go/vt/topo/memorytopo/watch.go | 18 ++++----- go/vt/topo/shard.go | 16 +++++--- go/vt/topo/srv_keyspace.go | 30 +++++---------- go/vt/topo/srv_vschema.go | 20 +++++----- go/vt/topo/stats_conn.go | 5 +-- go/vt/topo/stats_conn_test.go | 7 ++-- go/vt/topo/test/watch.go | 27 ++++++++------ go/vt/topo/topotests/shard_watch_test.go | 35 +++++++++--------- go/vt/topo/topotests/srv_keyspace_test.go | 37 ++++++++++--------- go/vt/topo/zk2topo/watch.go | 37 ++++++------------- .../topocustomrule/topocustomrule.go | 9 +++-- go/vt/vttablet/tabletmanager/shard_sync.go | 5 +-- go/vt/vttablet/tabletmanager/shard_watcher.go | 17 ++++----- 21 files changed, 190 insertions(+), 203 deletions(-) diff --git a/go/vt/srvtopo/watch_srvkeyspace.go b/go/vt/srvtopo/watch_srvkeyspace.go index e3edcdd5250..c12216f53f5 100644 --- a/go/vt/srvtopo/watch_srvkeyspace.go +++ b/go/vt/srvtopo/watch_srvkeyspace.go @@ -40,14 +40,19 @@ func (k *srvKeyspaceKey) String() string { func NewSrvKeyspaceWatcher(topoServer *topo.Server, counts *stats.CountersWithSingleLabel, cacheRefresh, cacheTTL time.Duration) *SrvKeyspaceWatcher { watch := func(ctx context.Context, entry *watchEntry) { key := entry.key.(*srvKeyspaceKey) - current, changes, cancel := topoServer.WatchSrvKeyspace(context.Background(), key.cell, key.keyspace) + watchCtx, watchCancel := context.WithCancel(ctx) + defer watchCancel() + + current, changes, err := topoServer.WatchSrvKeyspace(watchCtx, key.cell, key.keyspace) + if err != nil { + return + } entry.update(ctx, current.Value, current.Err, true) if current.Err != nil { return } - defer cancel() for c := range changes { entry.update(ctx, c.Value, c.Err, false) if c.Err != nil { diff --git a/go/vt/srvtopo/watch_srvvschema.go b/go/vt/srvtopo/watch_srvvschema.go index 14d30829f69..9eb8c6d3648 100644 --- a/go/vt/srvtopo/watch_srvvschema.go +++ b/go/vt/srvtopo/watch_srvvschema.go @@ -38,7 +38,13 @@ func (k cellName) String() string { func NewSrvVSchemaWatcher(topoServer *topo.Server, counts *stats.CountersWithSingleLabel, cacheRefresh, cacheTTL time.Duration) *SrvVSchemaWatcher { watch := func(ctx context.Context, entry *watchEntry) { key := entry.key.(cellName) - current, changes, cancel := topoServer.WatchSrvVSchema(context.Background(), key.String()) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + current, changes, err := topoServer.WatchSrvVSchema(ctx, key.String()) + if err != nil { + return + } entry.update(ctx, current.Value, current.Err, true) if current.Err != nil { diff --git a/go/vt/topo/conn.go b/go/vt/topo/conn.go index cf75617ef45..12166be2bc2 100644 --- a/go/vt/topo/conn.go +++ b/go/vt/topo/conn.go @@ -17,9 +17,8 @@ limitations under the License. package topo import ( - "sort" - "context" + "sort" ) // Conn defines the interface that must be implemented by topology @@ -120,17 +119,13 @@ type Conn interface { // Watch starts watching a file in the provided cell. It // returns the current value, a 'changes' channel to read the - // changes from, and a 'cancel' function to call to stop the - // watch. If the initial read fails, or the file doesn't - // exist, current.Err is set, and 'changes'/'cancel' are nil. - // Otherwise current.Err is nil, and current.Contents / - // current.Version are accurate. The provided context is only - // used to setup the current watch, and not after Watch() - // returns. + // changes from, and an error. + // If the initial read fails, or the file doesn't + // exist, an error is returned. // - // To stop the watch, just call the returned 'cancel' function. + // To stop the watch, cancel the provided context. // This will eventually result in a final WatchData result with Err = - // ErrInterrupted. It should be safe to call the 'cancel' function + // ErrInterrupted. It should be safe to cancel the context // multiple times, or after the Watch already errored out. // // The 'changes' channel may return a record with Err != nil. @@ -158,7 +153,7 @@ type Conn interface { // being correct quickly, as long as it eventually gets there. // // filePath is a path relative to the root directory of the cell. - Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, cancel CancelFunc) + Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, err error) // // Leader election methods. This is meant to have a small diff --git a/go/vt/topo/consultopo/watch.go b/go/vt/topo/consultopo/watch.go index a4da011dfdd..ca6e15d3ffa 100644 --- a/go/vt/topo/consultopo/watch.go +++ b/go/vt/topo/consultopo/watch.go @@ -17,12 +17,11 @@ limitations under the License. package consultopo import ( + "context" "flag" "path" "time" - "context" - "github.com/hashicorp/consul/api" "vitess.io/vitess/go/vt/topo" @@ -33,16 +32,21 @@ var ( ) // Watch is part of the topo.Conn interface. -func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { // Initial get. nodePath := path.Join(s.root, filePath) - pair, _, err := s.kv.Get(nodePath, nil) + options := &api.QueryOptions{} + + initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer initialCancel() + + pair, _, err := s.kv.Get(nodePath, options.WithContext(initialCtx)) if err != nil { - return &topo.WatchData{Err: err}, nil, nil + return nil, nil, err } if pair == nil { // Node doesn't exist. - return &topo.WatchData{Err: topo.NewError(topo.NoNode, nodePath)}, nil, nil + return nil, nil, topo.NewError(topo.NoNode, nodePath) } // Initial value to return. @@ -51,9 +55,6 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < Version: ConsulVersion(pair.ModifyIndex), } - // Create a context, will be used to cancel the watch. - watchCtx, watchCancel := context.WithCancel(context.Background()) - // Create the notifications channel, send updates to it. notifications := make(chan *topo.WatchData, 10) go func() { @@ -83,7 +84,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < // This essentially uses WaitTime as a heartbeat interval to detect // a dead connection. cancelGetCtx() - getCtx, cancelGetCtx = context.WithTimeout(watchCtx, 2*opts.WaitTime) + getCtx, cancelGetCtx = context.WithTimeout(ctx, 2*opts.WaitTime) pair, _, err = s.kv.Get(nodePath, opts.WithContext(getCtx)) if err != nil { @@ -114,9 +115,9 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < // See if the watch was canceled. select { - case <-watchCtx.Done(): + case <-ctx.Done(): notifications <- &topo.WatchData{ - Err: convertError(watchCtx.Err(), nodePath), + Err: convertError(ctx.Err(), nodePath), } cancelGetCtx() return @@ -125,5 +126,5 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < } }() - return wd, notifications, topo.CancelFunc(watchCancel) + return wd, notifications, nil } diff --git a/go/vt/topo/etcd2topo/watch.go b/go/vt/topo/etcd2topo/watch.go index 239e73aa3f9..a50f6b90a4f 100644 --- a/go/vt/topo/etcd2topo/watch.go +++ b/go/vt/topo/etcd2topo/watch.go @@ -17,11 +17,10 @@ limitations under the License. package etcd2topo import ( + "context" "path" "time" - "context" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" @@ -33,29 +32,29 @@ import ( ) // Watch is part of the topo.Conn interface. -func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { nodePath := path.Join(s.root, filePath) // Get the initial version of the file - initial, err := s.cli.Get(ctx, nodePath) + initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer initialCancel() + initial, err := s.cli.Get(initialCtx, nodePath) if err != nil { // Generic error. - return &topo.WatchData{Err: convertError(err, nodePath)}, nil, nil + return nil, nil, convertError(err, nodePath) } + if len(initial.Kvs) != 1 { // Node doesn't exist. - return &topo.WatchData{Err: topo.NewError(topo.NoNode, nodePath)}, nil, nil + return nil, nil, topo.NewError(topo.NoNode, nodePath) } wd := &topo.WatchData{ Contents: initial.Kvs[0].Value, Version: EtcdVersion(initial.Kvs[0].ModRevision), } - // Create an outer context that will be canceled on return and will cancel all inner watches. - outerCtx, outerCancel := context.WithCancel(context.Background()) - // Create a context, will be used to cancel the watch on retry. - watchCtx, watchCancel := context.WithCancel(outerCtx) + watchCtx, watchCancel := context.WithCancel(ctx) // Create the Watcher. We start watching from the response we // got, not from the file original version, as the server may @@ -63,14 +62,14 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < watcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(initial.Header.Revision)) if watcher == nil { watchCancel() - outerCancel() - return &topo.WatchData{Err: vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "Watch failed")}, nil, nil + return nil, nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "Watch failed") } // Create the notifications channel, send updates to it. notifications := make(chan *topo.WatchData, 10) go func() { defer close(notifications) + defer watchCancel() var currVersion = initial.Header.Revision var watchRetries int @@ -91,7 +90,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < watchRetries++ // Cancel inner context on retry and create new one. watchCancel() - watchCtx, watchCancel = context.WithCancel(outerCtx) + watchCtx, watchCancel = context.WithCancel(ctx) newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion)) if newWatcher == nil { log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion) @@ -137,5 +136,5 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < } }() - return wd, notifications, topo.CancelFunc(outerCancel) + return wd, notifications, nil } diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index be56315a594..4d47c656c69 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -288,12 +288,12 @@ func (f *FakeConn) Lock(ctx context.Context, dirPath, contents string) (topo.Loc } // Watch implements the Conn interface -func (f *FakeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (f *FakeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { f.mu.Lock() defer f.mu.Unlock() res, isPresent := f.getResultMap[filePath] if !isPresent { - return &topo.WatchData{Err: topo.NewError(topo.NoNode, filePath)}, nil, nil + return nil, nil, topo.NewError(topo.NoNode, filePath) } current := &topo.WatchData{ Contents: res.contents, @@ -303,7 +303,9 @@ func (f *FakeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, notifications := make(chan *topo.WatchData, 100) f.watches[filePath] = append(f.watches[filePath], notifications) - cancel := func() { + go func() { + defer close(notifications) + <-ctx.Done() watches, isPresent := f.watches[filePath] if !isPresent { return @@ -315,8 +317,8 @@ func (f *FakeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, break } } - } - return current, notifications, cancel + }() + return current, notifications, nil } // NewLeaderParticipation implements the Conn interface diff --git a/go/vt/topo/helpers/tee.go b/go/vt/topo/helpers/tee.go index abec09d0713..9c611012a09 100644 --- a/go/vt/topo/helpers/tee.go +++ b/go/vt/topo/helpers/tee.go @@ -164,7 +164,7 @@ func (c *TeeConn) Delete(ctx context.Context, filePath string, version topo.Vers } // Watch is part of the topo.Conn interface -func (c *TeeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (c *TeeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { return c.primary.Watch(ctx, filePath) } diff --git a/go/vt/topo/k8stopo/watch.go b/go/vt/topo/k8stopo/watch.go index 4c86736e52a..96014119bca 100644 --- a/go/vt/topo/k8stopo/watch.go +++ b/go/vt/topo/k8stopo/watch.go @@ -28,26 +28,22 @@ import ( ) // Watch is part of the topo.Conn interface. -func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { log.Info("Starting Kubernetes topo Watch on ", filePath) current := &topo.WatchData{} // get current - contents, ver, err := s.Get(ctx, filePath) + initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer initialCancel() + + contents, ver, err := s.Get(initialCtx, filePath) if err != nil { - // Per the topo.Conn interface: - // "If the initial read fails, or the file doesn't - // exist, current.Err is set, and 'changes'/'cancel' are nil." - current.Err = err - return current, nil, nil + return nil, nil, err } current.Contents = contents current.Version = ver - // Create a context, will be used to cancel the watch. - watchCtx, watchCancel := context.WithCancel(context.Background()) - // Create the changes channel changes := make(chan *topo.WatchData, 10) @@ -56,11 +52,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < resource, err := s.buildFileResource(filePath, []byte{}) if err != nil { - // Per the topo.Conn interface: - // current.Err is set, and 'changes'/'cancel' are nil - watchCancel() - current.Err = err - return current, nil, nil + return nil, nil, err } // Create the informer / indexer to watch the single resource @@ -111,9 +103,9 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < go memberInformer.Run(informerChan) // Handle interrupts - go closeOnDone(watchCtx, filePath, informerChan, gracefulShutdown, changes) + go closeOnDone(ctx, filePath, informerChan, gracefulShutdown, changes) - return current, changes, topo.CancelFunc(watchCancel) + return current, changes, nil } func closeOnDone(ctx context.Context, filePath string, informerChan chan struct{}, gracefulShutdown chan struct{}, changes chan *topo.WatchData) { diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 1ac41717839..0f651da1a60 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -17,29 +17,28 @@ limitations under the License. package memorytopo import ( - "fmt" - "context" + "fmt" "vitess.io/vitess/go/vt/topo" ) // Watch is part of the topo.Conn interface. -func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { c.factory.mu.Lock() defer c.factory.mu.Unlock() if c.factory.err != nil { - return &topo.WatchData{Err: c.factory.err}, nil, nil + return nil, nil, c.factory.err } n := c.factory.nodeByPath(c.cell, filePath) if n == nil { - return &topo.WatchData{Err: topo.NewError(topo.NoNode, filePath)}, nil, nil + return nil, nil, topo.NewError(topo.NoNode, filePath) } if n.contents == nil { // it's a directory - return &topo.WatchData{Err: fmt.Errorf("cannot watch directory %v in cell %v", filePath, c.cell)}, nil, nil + return nil, nil, fmt.Errorf("cannot watch directory %v in cell %v", filePath, c.cell) } current := &topo.WatchData{ Contents: n.contents, @@ -51,7 +50,8 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c nextWatchIndex++ n.watches[watchIndex] = notifications - cancel := func() { + go func() { + <-ctx.Done() // This function can be called at any point, so we first need // to make sure the watch is still valid. c.factory.mu.Lock() @@ -67,6 +67,6 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c w <- &topo.WatchData{Err: topo.NewError(topo.Interrupted, "watch")} close(w) } - } - return current, notifications, cancel + }() + return current, notifications, nil } diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index d1da7c94256..e1f5034db9e 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -656,11 +656,14 @@ type WatchShardData struct { // WatchShard will set a watch on the Shard object. // It has the same contract as conn.Watch, but it also unpacks the // contents into a Shard object -func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*WatchShardData, <-chan *WatchShardData, CancelFunc) { +func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*WatchShardData, <-chan *WatchShardData, error) { shardPath := shardFilePath(keyspace, shard) - current, wdChannel, cancel := ts.globalCell.Watch(ctx, shardPath) - if current.Err != nil { - return &WatchShardData{Err: current.Err}, nil, nil + ctx, cancel := context.WithCancel(ctx) + + current, wdChannel, err := ts.globalCell.Watch(ctx, shardPath) + if err != nil { + cancel() + return nil, nil, err } value := &topodatapb.Shard{} if err := proto.Unmarshal(current.Contents, value); err != nil { @@ -668,7 +671,7 @@ func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*Watc cancel() for range wdChannel { } - return &WatchShardData{Err: vterrors.Wrapf(err, "error unpacking initial Shard object")}, nil, nil + return nil, nil, vterrors.Wrapf(err, "error unpacking initial Shard object") } changes := make(chan *WatchShardData, 10) @@ -678,6 +681,7 @@ func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*Watc // send an ErrInterrupted and then close the channel. We'll // just propagate that back to our caller. go func() { + defer cancel() defer close(changes) for wd := range wdChannel { @@ -702,5 +706,5 @@ func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*Watc } }() - return &WatchShardData{Value: value}, changes, cancel + return &WatchShardData{Value: value}, changes, nil } diff --git a/go/vt/topo/srv_keyspace.go b/go/vt/topo/srv_keyspace.go index 264b01a3edc..ee1db1100e1 100644 --- a/go/vt/topo/srv_keyspace.go +++ b/go/vt/topo/srv_keyspace.go @@ -17,6 +17,7 @@ limitations under the License. package topo import ( + "context" "encoding/hex" "fmt" "path" @@ -24,8 +25,6 @@ import ( "google.golang.org/protobuf/proto" - "context" - "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/concurrency" @@ -51,16 +50,18 @@ type WatchSrvKeyspaceData struct { // WatchSrvKeyspace will set a watch on the SrvKeyspace object. // It has the same contract as Conn.Watch, but it also unpacks the // contents into a SrvKeyspace object. -func (ts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (*WatchSrvKeyspaceData, <-chan *WatchSrvKeyspaceData, CancelFunc) { +func (ts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (*WatchSrvKeyspaceData, <-chan *WatchSrvKeyspaceData, error) { conn, err := ts.ConnForCell(ctx, cell) if err != nil { return &WatchSrvKeyspaceData{Err: err}, nil, nil } filePath := srvKeyspaceFileName(keyspace) - current, wdChannel, cancel := conn.Watch(ctx, filePath) - if current.Err != nil { - return &WatchSrvKeyspaceData{Err: current.Err}, nil, nil + ctx, cancel := context.WithCancel(ctx) + current, wdChannel, err := conn.Watch(ctx, filePath) + if err != nil { + cancel() + return nil, nil, err } value := &topodatapb.SrvKeyspace{} if err := proto.Unmarshal(current.Contents, value); err != nil { @@ -68,7 +69,7 @@ func (ts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) ( cancel() for range wdChannel { } - return &WatchSrvKeyspaceData{Err: vterrors.Wrapf(err, "error unpacking initial SrvKeyspace object")}, nil, nil + return nil, nil, vterrors.Wrapf(err, "error unpacking initial SrvKeyspace object") } changes := make(chan *WatchSrvKeyspaceData, 10) @@ -79,6 +80,7 @@ func (ts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) ( // send an ErrInterrupted and then close the channel. We'll // just propagate that back to our caller. go func() { + defer cancel() defer close(changes) for wd := range wdChannel { @@ -103,7 +105,7 @@ func (ts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) ( } }() - return &WatchSrvKeyspaceData{Value: value}, changes, cancel + return &WatchSrvKeyspaceData{Value: value}, changes, nil } // GetSrvKeyspaceNames returns the SrvKeyspace objects for a cell. @@ -689,18 +691,6 @@ func OrderAndCheckPartitions(cell string, srvKeyspace *topodatapb.SrvKeyspace) e return nil } -// ShardIsServing returns true if this shard is found in any of the partitions in the srvKeyspace -func ShardIsServing(srvKeyspace *topodatapb.SrvKeyspace, shard *topodatapb.Shard) bool { - for _, partition := range srvKeyspace.GetPartitions() { - for _, shardReference := range partition.GetShardReferences() { - if key.KeyRangeEqual(shardReference.GetKeyRange(), shard.GetKeyRange()) { - return true - } - } - } - return false -} - // ValidateSrvKeyspace validates that the SrvKeyspace for given keyspace in the provided cells is not corrupted func (ts *Server) ValidateSrvKeyspace(ctx context.Context, keyspace, cells string) error { cellsToValidate, err := ts.ExpandCells(ctx, cells) diff --git a/go/vt/topo/srv_vschema.go b/go/vt/topo/srv_vschema.go index db04379f7a1..25bff7dcf73 100644 --- a/go/vt/topo/srv_vschema.go +++ b/go/vt/topo/srv_vschema.go @@ -17,13 +17,12 @@ limitations under the License. package topo import ( + "context" "fmt" "sync" "google.golang.org/protobuf/proto" - "context" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vterrors" @@ -42,15 +41,17 @@ type WatchSrvVSchemaData struct { // WatchSrvVSchema will set a watch on the SrvVSchema object. // It has the same contract as Conn.Watch, but it also unpacks the // contents into a SrvVSchema object. -func (ts *Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVSchemaData, <-chan *WatchSrvVSchemaData, CancelFunc) { +func (ts *Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVSchemaData, <-chan *WatchSrvVSchemaData, error) { conn, err := ts.ConnForCell(ctx, cell) if err != nil { - return &WatchSrvVSchemaData{Err: err}, nil, nil + return nil, nil, err } - current, wdChannel, cancel := conn.Watch(ctx, SrvVSchemaFile) - if current.Err != nil { - return &WatchSrvVSchemaData{Err: current.Err}, nil, nil + ctx, cancel := context.WithCancel(ctx) + current, wdChannel, err := conn.Watch(ctx, SrvVSchemaFile) + if err != nil { + cancel() + return nil, nil, err } value := &vschemapb.SrvVSchema{} if err := proto.Unmarshal(current.Contents, value); err != nil { @@ -58,7 +59,7 @@ func (ts *Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVS cancel() for range wdChannel { } - return &WatchSrvVSchemaData{Err: vterrors.Wrapf(err, "error unpacking initial SrvVSchema object")}, nil, nil + return nil, nil, vterrors.Wrapf(err, "error unpacking initial SrvVSchema object") } changes := make(chan *WatchSrvVSchemaData, 10) @@ -69,6 +70,7 @@ func (ts *Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVS // send an ErrInterrupted and then close the channel. We'll // just propagate that back to our caller. go func() { + defer cancel() defer close(changes) for wd := range wdChannel { @@ -92,7 +94,7 @@ func (ts *Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVS } }() - return &WatchSrvVSchemaData{Value: value}, changes, cancel + return &WatchSrvVSchemaData{Value: value}, changes, nil } // UpdateSrvVSchema updates the SrvVSchema file for a cell. diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 2d30503f645..5a57e647373 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -17,9 +17,8 @@ limitations under the License. package topo import ( - "time" - "context" + "time" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -162,7 +161,7 @@ func (st *StatsConn) Lock(ctx context.Context, dirPath, contents string) (LockDe } // Watch is part of the Conn interface -func (st *StatsConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, cancel CancelFunc) { +func (st *StatsConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, err error) { startTime := time.Now() statsKey := []string{"Watch", st.cell} defer topoStatsConnTimings.Record(statsKey, startTime) diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index bffc47f81df..72f15cd17cb 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -17,11 +17,10 @@ limitations under the License. package topo import ( + "context" "fmt" "testing" - "context" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" ) @@ -106,8 +105,8 @@ func (st *fakeConn) Lock(ctx context.Context, dirPath, contents string) (lock Lo } // Watch is part of the Conn interface -func (st *fakeConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, cancel CancelFunc) { - return current, changes, cancel +func (st *fakeConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, err error) { + return current, changes, err } // NewLeaderParticipation is part of the Conn interface diff --git a/go/vt/topo/test/watch.go b/go/vt/topo/test/watch.go index d89e8a85aff..05dec8db717 100644 --- a/go/vt/topo/test/watch.go +++ b/go/vt/topo/test/watch.go @@ -17,6 +17,7 @@ limitations under the License. package test import ( + "context" "testing" "time" @@ -24,8 +25,6 @@ import ( "google.golang.org/protobuf/proto" - "context" - "vitess.io/vitess/go/vt/topo" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -36,19 +35,22 @@ import ( // provided srvKeyspace. func waitForInitialValue(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.SrvKeyspace) (changes <-chan *topo.WatchData, cancel func()) { var current *topo.WatchData - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) start := time.Now() + var err error for { - current, changes, cancel = conn.Watch(ctx, "keyspaces/test_keyspace/SrvKeyspace") - if topo.IsErrType(current.Err, topo.NoNode) { + current, changes, err = conn.Watch(ctx, "keyspaces/test_keyspace/SrvKeyspace") + if topo.IsErrType(err, topo.NoNode) { // hasn't appeared yet if time.Since(start) > 10*time.Second { + cancel() t.Fatalf("time out waiting for file to appear") } time.Sleep(10 * time.Millisecond) continue } - if current.Err != nil { + if err != nil { + cancel() t.Fatalf("watch failed: %v", current.Err) } // we got a valid result @@ -56,9 +58,11 @@ func waitForInitialValue(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.S } got := &topodatapb.SrvKeyspace{} if err := proto.Unmarshal(current.Contents, got); err != nil { + cancel() t.Fatalf("cannot proto-unmarshal data: %v", err) } if !proto.Equal(got, srvKeyspace) { + cancel() t.Fatalf("got bad data: %v expected: %v", got, srvKeyspace) } @@ -68,15 +72,16 @@ func waitForInitialValue(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.S // checkWatch runs the tests on the Watch part of the Conn API. // We use a SrvKeyspace object. func checkWatch(t *testing.T, ts *topo.Server) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() conn, err := ts.ConnForCell(ctx, LocalCellName) if err != nil { t.Fatalf("ConnForCell(test) failed: %v", err) } // start watching something that doesn't exist -> error - current, changes, _ := conn.Watch(ctx, "keyspaces/test_keyspace/SrvKeyspace") - if !topo.IsErrType(current.Err, topo.NoNode) { + current, changes, err := conn.Watch(ctx, "keyspaces/test_keyspace/SrvKeyspace") + if !topo.IsErrType(err, topo.NoNode) { t.Errorf("watch on missing node didn't return ErrNoNode: %v %v", current, changes) } @@ -104,8 +109,8 @@ func checkWatch(t *testing.T, ts *topo.Server) { } // start watching again, it should work - changes, cancel := waitForInitialValue(t, conn, srvKeyspace) - defer cancel() + changes, secondCancel := waitForInitialValue(t, conn, srvKeyspace) + defer secondCancel() // change the data srvKeyspace.Partitions[0].ShardReferences[0].Name = "new_name" diff --git a/go/vt/topo/topotests/shard_watch_test.go b/go/vt/topo/topotests/shard_watch_test.go index 7b04224a8f5..3cf64378e41 100644 --- a/go/vt/topo/topotests/shard_watch_test.go +++ b/go/vt/topo/topotests/shard_watch_test.go @@ -31,23 +31,24 @@ import ( ) // waitForInitialShard waits for the initial Shard to appear. -func waitForInitialShard(t *testing.T, ts *topo.Server, keyspace, shard string) (current *topo.WatchShardData, changes <-chan *topo.WatchShardData, cancel topo.CancelFunc) { - ctx := context.Background() +func waitForInitialShard(t *testing.T, ts *topo.Server, keyspace, shard string) (current *topo.WatchShardData, changes <-chan *topo.WatchShardData, cancel context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) start := time.Now() + var err error for { - current, changes, cancel = ts.WatchShard(ctx, keyspace, shard) + current, changes, err = ts.WatchShard(ctx, keyspace, shard) switch { - case topo.IsErrType(current.Err, topo.NoNode): + case topo.IsErrType(err, topo.NoNode): // hasn't appeared yet if time.Since(start) > 10*time.Second { t.Fatalf("time out waiting for file to appear") } time.Sleep(10 * time.Millisecond) continue - case current.Err == nil: + case err == nil: return default: - t.Fatalf("watch failed: %v", current.Err) + t.Fatalf("watch failed: %v", err) } } } @@ -59,8 +60,8 @@ func TestWatchShardNoNode(t *testing.T) { ts := memorytopo.NewServer("cell1") // No Shard -> ErrNoNode - current, _, _ := ts.WatchShard(ctx, keyspace, shard) - if !topo.IsErrType(current.Err, topo.NoNode) { + current, _, err := ts.WatchShard(ctx, keyspace, shard) + if !topo.IsErrType(err, topo.NoNode) { t.Errorf("Got invalid result from WatchShard(not there): %v", current.Err) } } @@ -145,9 +146,9 @@ func TestWatchShard(t *testing.T) { cancel() // Bad data in topo, setting the watch should now fail. - current, _, _ = ts.WatchShard(ctx, keyspace, shard) - if current.Err == nil || !strings.Contains(current.Err.Error(), "error unpacking initial Shard object") { - t.Fatalf("expected an initial error setting watch on bad content, but got: %v", current.Err) + _, _, err = ts.WatchShard(ctx, keyspace, shard) + if err == nil || !strings.Contains(err.Error(), "error unpacking initial Shard object") { + t.Fatalf("expected an initial error setting watch on bad content, but got: %v", err) } data, err := proto.Marshal(wanted) @@ -160,9 +161,9 @@ func TestWatchShard(t *testing.T) { } start := time.Now() for { - current, changes, _ = ts.WatchShard(ctx, keyspace, shard) - if current.Err != nil { - if strings.Contains(current.Err.Error(), "error unpacking initial Shard object") { + current, changes, err = ts.WatchShard(ctx, keyspace, shard) + if err != nil { + if strings.Contains(err.Error(), "error unpacking initial Shard object") { // hasn't changed yet if time.Since(start) > 10*time.Second { t.Fatalf("time out waiting for file to appear") @@ -208,9 +209,9 @@ func TestWatchShardCancel(t *testing.T) { ts := memorytopo.NewServer(cell) // No Shard -> ErrNoNode - current, _, _ := ts.WatchShard(ctx, keyspace, shard) - if !topo.IsErrType(current.Err, topo.NoNode) { - t.Errorf("Got invalid result from WatchShard(not there): %v", current.Err) + _, _, err := ts.WatchShard(ctx, keyspace, shard) + if !topo.IsErrType(err, topo.NoNode) { + t.Errorf("Got invalid result from WatchShard(not there): %v", err) } // Create keyspace diff --git a/go/vt/topo/topotests/srv_keyspace_test.go b/go/vt/topo/topotests/srv_keyspace_test.go index 54da260ebf8..8eeaf3f07ac 100644 --- a/go/vt/topo/topotests/srv_keyspace_test.go +++ b/go/vt/topo/topotests/srv_keyspace_test.go @@ -37,23 +37,24 @@ import ( // waitForInitialSrvKeyspace waits for the initial SrvKeyspace to // appear, and match the provided srvKeyspace. -func waitForInitialSrvKeyspace(t *testing.T, ts *topo.Server, cell, keyspace string) (current *topo.WatchSrvKeyspaceData, changes <-chan *topo.WatchSrvKeyspaceData, cancel topo.CancelFunc) { - ctx := context.Background() +func waitForInitialSrvKeyspace(t *testing.T, ts *topo.Server, cell, keyspace string) (current *topo.WatchSrvKeyspaceData, changes <-chan *topo.WatchSrvKeyspaceData, cancel context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) start := time.Now() + var err error for { - current, changes, cancel = ts.WatchSrvKeyspace(ctx, cell, keyspace) + current, changes, err = ts.WatchSrvKeyspace(ctx, cell, keyspace) switch { - case topo.IsErrType(current.Err, topo.NoNode): + case topo.IsErrType(err, topo.NoNode): // hasn't appeared yet if time.Since(start) > 10*time.Second { t.Fatalf("time out waiting for file to appear") } time.Sleep(10 * time.Millisecond) continue - case current.Err == nil: + case err == nil: return default: - t.Fatalf("watch failed: %v", current.Err) + t.Fatalf("watch failed: %v", err) } } } @@ -65,9 +66,9 @@ func TestWatchSrvKeyspaceNoNode(t *testing.T) { ts := memorytopo.NewServer(cell) // No SrvKeyspace -> ErrNoNode - current, _, _ := ts.WatchSrvKeyspace(ctx, cell, keyspace) - if !topo.IsErrType(current.Err, topo.NoNode) { - t.Errorf("Got invalid result from WatchSrvKeyspace(not there): %v", current.Err) + _, _, err := ts.WatchSrvKeyspace(ctx, cell, keyspace) + if !topo.IsErrType(err, topo.NoNode) { + t.Errorf("Got invalid result from WatchSrvKeyspace(not there): %v", err) } } @@ -120,9 +121,9 @@ func TestWatchSrvKeyspace(t *testing.T) { cancel() // Bad data in topo, setting the watch should now fail. - current, _, _ = ts.WatchSrvKeyspace(ctx, cell, keyspace) - if current.Err == nil || !strings.Contains(current.Err.Error(), "error unpacking initial SrvKeyspace object") { - t.Fatalf("expected an initial error setting watch on bad content, but got: %v", current.Err) + _, _, err = ts.WatchSrvKeyspace(ctx, cell, keyspace) + if err == nil || !strings.Contains(err.Error(), "error unpacking initial SrvKeyspace object") { + t.Fatalf("expected an initial error setting watch on bad content, but got: %v", err) } // Update content, wait until Watch works again @@ -131,9 +132,9 @@ func TestWatchSrvKeyspace(t *testing.T) { } start := time.Now() for { - current, changes, _ = ts.WatchSrvKeyspace(ctx, cell, keyspace) - if current.Err != nil { - if strings.Contains(current.Err.Error(), "error unpacking initial SrvKeyspace object") { + current, changes, err = ts.WatchSrvKeyspace(ctx, cell, keyspace) + if err != nil { + if strings.Contains(err.Error(), "error unpacking initial SrvKeyspace object") { // hasn't changed yet if time.Since(start) > 10*time.Second { t.Fatalf("time out waiting for file to appear") @@ -178,9 +179,9 @@ func TestWatchSrvKeyspaceCancel(t *testing.T) { ts := memorytopo.NewServer(cell) // No SrvKeyspace -> ErrNoNode - current, _, _ := ts.WatchSrvKeyspace(ctx, cell, keyspace) - if !topo.IsErrType(current.Err, topo.NoNode) { - t.Errorf("Got invalid result from WatchSrvKeyspace(not there): %v", current.Err) + _, _, err := ts.WatchSrvKeyspace(ctx, cell, keyspace) + if !topo.IsErrType(err, topo.NoNode) { + t.Errorf("Got invalid result from WatchSrvKeyspace(not there): %v", err) } // Create initial value diff --git a/go/vt/topo/zk2topo/watch.go b/go/vt/topo/zk2topo/watch.go index 0bdedc7d931..69e8f2df48c 100644 --- a/go/vt/topo/zk2topo/watch.go +++ b/go/vt/topo/zk2topo/watch.go @@ -17,11 +17,9 @@ limitations under the License. package zk2topo import ( + "context" "fmt" "path" - "sync" - - "context" "vitess.io/vitess/go/vt/vterrors" @@ -29,39 +27,28 @@ import ( ) // Watch is part of the topo.Conn interface. -func (zs *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (zs *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { zkPath := path.Join(zs.root, filePath) // Get the initial value, set the initial watch - data, stats, watch, err := zs.conn.GetW(ctx, zkPath) + initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer initialCancel() + + data, stats, watch, err := zs.conn.GetW(initialCtx, zkPath) if err != nil { - return &topo.WatchData{Err: convertError(err, zkPath)}, nil, nil + return nil, nil, convertError(err, zkPath) } if stats == nil { // No stats --> node doesn't exist. - return &topo.WatchData{Err: topo.NewError(topo.NoNode, zkPath)}, nil, nil + return nil, nil, topo.NewError(topo.NoNode, zkPath) } wd := &topo.WatchData{ Contents: data, Version: ZKVersion(stats.Version), } - // mu protects the stop channel. We need to make sure the 'cancel' - // func can be called multiple times, and that we don't close 'stop' - // too many times. - mu := sync.Mutex{} - stop := make(chan struct{}) - cancel := func() { - mu.Lock() - defer mu.Unlock() - if stop != nil { - close(stop) - stop = nil - } - } - c := make(chan *topo.WatchData, 10) - go func(stop chan struct{}) { + go func() { defer close(c) for { @@ -78,7 +65,7 @@ func (zs *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, return } - case <-stop: + case <-ctx.Done(): // user is not interested any more c <- &topo.WatchData{Err: topo.NewError(topo.Interrupted, "watch")} return @@ -104,7 +91,7 @@ func (zs *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, return } } - }(stop) + }() - return wd, c, cancel + return wd, c, nil } diff --git a/go/vt/vttablet/customrule/topocustomrule/topocustomrule.go b/go/vt/vttablet/customrule/topocustomrule/topocustomrule.go index 513a2b4f9c2..4ad57bd6a93 100644 --- a/go/vt/vttablet/customrule/topocustomrule/topocustomrule.go +++ b/go/vt/vttablet/customrule/topocustomrule/topocustomrule.go @@ -138,10 +138,11 @@ func (cr *topoCustomRule) oneWatch() error { cr.mu.Unlock() }() - ctx := context.Background() - current, wdChannel, cancel := cr.conn.Watch(ctx, cr.filePath) - if current.Err != nil { - return current.Err + ctx, cancel := context.WithCancel(context.Background()) + current, wdChannel, err := cr.conn.Watch(ctx, cr.filePath) + if err != nil { + cancel() + return err } cr.mu.Lock() diff --git a/go/vt/vttablet/tabletmanager/shard_sync.go b/go/vt/vttablet/tabletmanager/shard_sync.go index 0b257537b4b..41d2c608e0d 100644 --- a/go/vt/vttablet/tabletmanager/shard_sync.go +++ b/go/vt/vttablet/tabletmanager/shard_sync.go @@ -17,11 +17,10 @@ limitations under the License. package tabletmanager import ( + "context" "flag" "time" - "context" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl" @@ -133,7 +132,7 @@ func (tm *TabletManager) shardSyncLoop(ctx context.Context, notifyChan <-chan st // We already have an active watch. Nothing to do. continue } - if err := shardWatch.start(ctx, tm.TopoServer, tablet.Keyspace, tablet.Shard); err != nil { + if err := shardWatch.start(tm.TopoServer, tablet.Keyspace, tablet.Shard); err != nil { log.Errorf("Failed to start shard watch: %v", err) // Start retry timer and go back to sleep. retryChan = time.After(*shardSyncRetryDelay) diff --git a/go/vt/vttablet/tabletmanager/shard_watcher.go b/go/vt/vttablet/tabletmanager/shard_watcher.go index 5a98f8a83fd..d8a6dd2c2dc 100644 --- a/go/vt/vttablet/tabletmanager/shard_watcher.go +++ b/go/vt/vttablet/tabletmanager/shard_watcher.go @@ -25,26 +25,25 @@ import ( type shardWatcher struct { watchChan <-chan *topo.WatchShardData - watchCancel topo.CancelFunc + watchCancel context.CancelFunc } func (sw *shardWatcher) active() bool { return sw.watchChan != nil } -func (sw *shardWatcher) start(ctx context.Context, ts *topo.Server, keyspace, shard string) error { - ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) - defer cancel() - +func (sw *shardWatcher) start(ts *topo.Server, keyspace, shard string) error { log.Infof("Starting shard watch of %v/%v", keyspace, shard) - event, c, watchCancel := ts.WatchShard(ctx, keyspace, shard) - if event.Err != nil { - return event.Err + ctx, cancel := context.WithCancel(context.Background()) + _, c, err := ts.WatchShard(ctx, keyspace, shard) + if err != nil { + cancel() + return err } sw.watchChan = c - sw.watchCancel = watchCancel + sw.watchCancel = cancel return nil } From fc655ff98b2e2ec48ae1a9932dc1041b6203f230 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Mon, 1 Aug 2022 15:33:38 +0200 Subject: [PATCH 3/7] Add recursive watcher for topo This adds a new function to the topo interface which allows for recursive watching on the topo. Recursive watching can be used to significantly improve performance in a few cases where we want to monitor changes across a prefix. There's no immediate usage added yet, that will be done in follow up changes. Signed-off-by: Dirkjan Bussink --- go/vt/topo/conn.go | 49 +++++++ go/vt/topo/consultopo/watch.go | 9 ++ go/vt/topo/etcd2topo/watch.go | 120 ++++++++++++++++- go/vt/topo/faketopo/faketopo.go | 4 + go/vt/topo/helpers/tee.go | 4 + go/vt/topo/k8stopo/watch.go | 7 + go/vt/topo/memorytopo/file.go | 43 ++++++- go/vt/topo/memorytopo/memorytopo.go | 39 +++++- go/vt/topo/memorytopo/watch.go | 55 +++++++- go/vt/topo/stats_conn.go | 7 + go/vt/topo/stats_conn_test.go | 5 + go/vt/topo/test/file.go | 1 - go/vt/topo/test/testing.go | 3 + go/vt/topo/test/watch.go | 193 +++++++++++++++++++++++++++- go/vt/topo/zk2topo/watch.go | 8 ++ 15 files changed, 528 insertions(+), 19 deletions(-) diff --git a/go/vt/topo/conn.go b/go/vt/topo/conn.go index 12166be2bc2..4d4b2329233 100644 --- a/go/vt/topo/conn.go +++ b/go/vt/topo/conn.go @@ -155,6 +155,44 @@ type Conn interface { // filePath is a path relative to the root directory of the cell. Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, err error) + // WatchRecursive starts watching a file prefix in the provided cell. It + // returns all the current values for existing files with the given + // prefix, a 'changes' channel to read the changes from and an error. + // + // The provided context should be canceled when stopping WatchRecursive(). + // This API is different from Watch() and Watch() will be changed + // to match this API as well in the future. + // + // Canceling will eventually result in a final WatchDataRecursive result with Err = + // ErrInterrupted. + // + // The 'changes' channel may return a record with Err != nil. + // In that case, the channel will also be closed right after + // that record. In any case, 'changes' has to be drained of + // all events, even when 'stop' is closed. + // + // Note the 'changes' channel can return twice the same + // Version/Contents (for instance, if the watch is interrupted + // and restarted within the Conn implementation). + // Similarly, the 'changes' channel may skip versions / changes + // (that is, if value goes [A, B, C, D, E, F], the watch may only + // receive [A, B, F]). This should only happen for rapidly + // changing values though. Usually, the initial value will come + // back right away. And a stable value (that hasn't changed for + // a while) should be seen shortly. + // + // The WatchRecursive call is not guaranteed to return exactly up to + // date data right away. For instance, if a file is created + // and saved, and then a watch is set on that file, it may + // return ErrNoNode (as the underlying configuration service + // may use asynchronous caches that are not up to date + // yet). The only guarantee is that the watch data will + // eventually converge. Vitess doesn't explicitly depend on the data + // being correct quickly, as long as it eventually gets there. + // + // path is a path relative to the root directory of the cell. + WatchRecursive(ctx context.Context, path string) ([]*WatchDataRecursive, <-chan *WatchDataRecursive, error) + // // Leader election methods. This is meant to have a small // number of processes elect a primary within a group. The @@ -271,6 +309,17 @@ type WatchData struct { Err error } +// WatchDataRecursive is the structure returned by the WatchRecursive() API. +// It contains the same data as WatchData, but additionally also the specific +// path of the entry that the recursive watch applies to, since an entire +// file prefix can be watched. +type WatchDataRecursive struct { + // Path is the path that has changed + Path string + + WatchData +} + // KVInfo is a structure that contains a generic key/value pair from // the topo server, along with important metadata about it. // This should be used to provide multiple entries in List like calls diff --git a/go/vt/topo/consultopo/watch.go b/go/vt/topo/consultopo/watch.go index ca6e15d3ffa..8c8abbb992c 100644 --- a/go/vt/topo/consultopo/watch.go +++ b/go/vt/topo/consultopo/watch.go @@ -128,3 +128,12 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < return wd, notifications, nil } + +// WatchRecursive is part of the topo.Conn interface. +func (s *Server) WatchRecursive(_ context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + // This isn't implemented yet, but likely can be implemented using List + // with blocking logic like how we use Get with blocking for regular Watch. + // See also how https://www.consul.io/docs/dynamic-app-config/watches#keyprefix + // works under the hood. + return nil, nil, topo.NewError(topo.NoImplementation, path) +} diff --git a/go/vt/topo/etcd2topo/watch.go b/go/vt/topo/etcd2topo/watch.go index a50f6b90a4f..e653fe1ca2e 100644 --- a/go/vt/topo/etcd2topo/watch.go +++ b/go/vt/topo/etcd2topo/watch.go @@ -19,6 +19,7 @@ package etcd2topo import ( "context" "path" + "strings" "time" "go.etcd.io/etcd/api/v3/mvccpb" @@ -75,7 +76,6 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < var watchRetries int for { select { - case <-watchCtx.Done(): // This includes context cancellation errors. notifications <- &topo.WatchData{ @@ -85,7 +85,11 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < case wresp, ok := <-watcher: if !ok { if watchRetries > 10 { - time.Sleep(time.Duration(watchRetries) * time.Second) + select { + case <-time.After(time.Duration(watchRetries) * time.Second): + case <-watchCtx.Done(): + continue + } } watchRetries++ // Cancel inner context on retry and create new one. @@ -138,3 +142,115 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < return wd, notifications, nil } + +// WatchRecursive is part of the topo.Conn interface. +func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + nodePath := path.Join(s.root, dirpath) + if !strings.HasSuffix(nodePath, "/") { + nodePath = nodePath + "/" + } + + // Get the initial version of the file + initial, err := s.cli.Get(ctx, nodePath, clientv3.WithPrefix()) + if err != nil { + return nil, nil, convertError(err, nodePath) + } + + var initialwd []*topo.WatchDataRecursive + + for _, kv := range initial.Kvs { + var wd topo.WatchDataRecursive + wd.Path = string(kv.Key) + wd.Contents = kv.Value + wd.Version = EtcdVersion(initial.Kvs[0].ModRevision) + initialwd = append(initialwd, &wd) + } + + // Create a context, will be used to cancel the watch on retry. + watchCtx, watchCancel := context.WithCancel(ctx) + + // Create the Watcher. We start watching from the response we + // got, not from the file original version, as the server may + // not have that much history. + watcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(initial.Header.Revision), clientv3.WithPrefix()) + if watcher == nil { + watchCancel() + return nil, nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "Watch failed") + } + + // Create the notifications channel, send updates to it. + notifications := make(chan *topo.WatchDataRecursive, 10) + go func() { + defer close(notifications) + defer watchCancel() + + var currVersion = initial.Header.Revision + var watchRetries int + for { + select { + case <-watchCtx.Done(): + // This includes context cancellation errors. + notifications <- &topo.WatchDataRecursive{ + WatchData: topo.WatchData{Err: convertError(watchCtx.Err(), nodePath)}, + } + return + case wresp, ok := <-watcher: + if !ok { + if watchRetries > 10 { + select { + case <-time.After(time.Duration(watchRetries) * time.Second): + case <-watchCtx.Done(): + continue + } + } + watchRetries++ + // Cancel inner context on retry and create new one. + watchCancel() + watchCtx, watchCancel = context.WithCancel(ctx) + + newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion), clientv3.WithPrefix()) + if newWatcher == nil { + log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion) + } else { + watcher = newWatcher + } + continue + } + + watchRetries = 0 + + if wresp.Canceled { + // Final notification. + notifications <- &topo.WatchDataRecursive{ + WatchData: topo.WatchData{Err: convertError(wresp.Err(), nodePath)}, + } + return + } + + currVersion = wresp.Header.GetRevision() + + for _, ev := range wresp.Events { + switch ev.Type { + case mvccpb.PUT: + notifications <- &topo.WatchDataRecursive{ + Path: string(ev.Kv.Key), + WatchData: topo.WatchData{ + Contents: ev.Kv.Value, + Version: EtcdVersion(ev.Kv.Version), + }, + } + case mvccpb.DELETE: + notifications <- &topo.WatchDataRecursive{ + Path: string(ev.Kv.Key), + WatchData: topo.WatchData{ + Err: topo.NewError(topo.NoNode, nodePath), + }, + } + } + } + } + } + }() + + return initialwd, notifications, nil +} diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index 4d47c656c69..4e0a1b5409c 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -321,6 +321,10 @@ func (f *FakeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, return current, notifications, nil } +func (f *FakeConn) WatchRecursive(ctx context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + panic("implement me") +} + // NewLeaderParticipation implements the Conn interface func (f *FakeConn) NewLeaderParticipation(string, string) (topo.LeaderParticipation, error) { panic("implement me") diff --git a/go/vt/topo/helpers/tee.go b/go/vt/topo/helpers/tee.go index 9c611012a09..2df1b8bbadb 100644 --- a/go/vt/topo/helpers/tee.go +++ b/go/vt/topo/helpers/tee.go @@ -168,6 +168,10 @@ func (c *TeeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, return c.primary.Watch(ctx, filePath) } +func (c *TeeConn) WatchRecursive(ctx context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + return c.primary.WatchRecursive(ctx, path) +} + // // Lock management. // diff --git a/go/vt/topo/k8stopo/watch.go b/go/vt/topo/k8stopo/watch.go index 96014119bca..2707e22ad27 100644 --- a/go/vt/topo/k8stopo/watch.go +++ b/go/vt/topo/k8stopo/watch.go @@ -119,3 +119,10 @@ func closeOnDone(ctx context.Context, filePath string, informerChan chan struct{ close(informerChan) close(changes) } + +// WatchRecursive is part of the topo.Conn interface. +func (s *Server) WatchRecursive(_ context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + // Kubernetes doesn't seem to provide a primitive that watches a prefix + // or directory, so this likely can never be implemented. + return nil, nil, topo.NewError(topo.NoImplementation, path) +} diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index 6a6f2e5f79b..0abfc56cb80 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -60,6 +60,15 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to // Create the file. n := c.factory.newFile(file, contents, p) p.children[file] = n + + n.propagateRecursiveWatch(&topo.WatchDataRecursive{ + Path: filePath, + WatchData: topo.WatchData{ + Contents: n.contents, + Version: NodeVersion(n.version), + }, + }) + return NodeVersion(n.version), nil } @@ -122,12 +131,22 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver // Call the watches for _, w := range n.watches { - w <- &topo.WatchData{ - Contents: n.contents, - Version: NodeVersion(n.version), + if w.contents != nil { + w.contents <- &topo.WatchData{ + Contents: n.contents, + Version: NodeVersion(n.version), + } } } + n.propagateRecursiveWatch(&topo.WatchDataRecursive{ + Path: filePath, + WatchData: topo.WatchData{ + Contents: n.contents, + Version: NodeVersion(n.version), + }, + }) + return NodeVersion(n.version), nil } @@ -257,11 +276,23 @@ func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version // Call the watches for _, w := range n.watches { - w <- &topo.WatchData{ - Err: topo.NewError(topo.NoNode, filePath), + if w.contents != nil { + w.contents <- &topo.WatchData{ + Err: topo.NewError(topo.NoNode, filePath), + } + close(w.contents) + } + if w.lock != nil { + close(w.lock) } - close(w) } + n.propagateRecursiveWatch(&topo.WatchDataRecursive{ + Path: filePath, + WatchData: topo.WatchData{ + Err: topo.NewError(topo.NoNode, filePath), + }, + }) + return nil } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index 89c107636b5..c54c9702c82 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -20,13 +20,12 @@ limitations under the License. package memorytopo import ( + "context" "math/rand" "strings" "sync" "time" - "context" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -146,6 +145,12 @@ func (c *Conn) Close() { c.factory = nil } +type watch struct { + contents chan *topo.WatchData + recursive chan *topo.WatchDataRecursive + lock chan string +} + // node contains a directory or a file entry. // Exactly one of contents or children is not nil. type node struct { @@ -159,7 +164,7 @@ type node struct { parent *node // watches is a map of all watches for this node. - watches map[int]chan *topo.WatchData + watches map[int]watch // lock is nil when the node is not locked. // otherwise it has a channel that is closed by unlock. @@ -175,11 +180,34 @@ func (n *node) isDirectory() bool { return n.children != nil } +func (n *node) recurseContents(callback func(n *node)) { + if n.isDirectory() { + for _, child := range n.children { + child.recurseContents(callback) + } + } else { + callback(n) + } +} + +func (n *node) propagateRecursiveWatch(ev *topo.WatchDataRecursive) { + for parent := n.parent; parent != nil; parent = parent.parent { + for _, w := range parent.watches { + if w.recursive != nil { + w.recursive <- ev + } + } + } +} + // PropagateWatchError propagates the given error to all watches on this node // and recursively applies to all children func (n *node) PropagateWatchError(err error) { for _, ch := range n.watches { - ch <- &topo.WatchData{ + if ch.contents == nil { + continue + } + ch.contents <- &topo.WatchData{ Err: err, } } @@ -230,7 +258,7 @@ func (f *Factory) newFile(name string, contents []byte, parent *node) *node { version: f.getNextVersion(), contents: contents, parent: parent, - watches: make(map[int]chan *topo.WatchData), + watches: make(map[int]watch), } } @@ -240,6 +268,7 @@ func (f *Factory) newDirectory(name string, parent *node) *node { version: f.getNextVersion(), children: make(map[string]*node), parent: parent, + watches: make(map[int]watch), } } diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 0f651da1a60..3991911da3a 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -48,7 +48,7 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c notifications := make(chan *topo.WatchData, 100) watchIndex := nextWatchIndex nextWatchIndex++ - n.watches[watchIndex] = notifications + n.watches[watchIndex] = watch{contents: notifications} go func() { <-ctx.Done() @@ -64,9 +64,58 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c if w, ok := n.watches[watchIndex]; ok { delete(n.watches, watchIndex) - w <- &topo.WatchData{Err: topo.NewError(topo.Interrupted, "watch")} - close(w) + w.contents <- &topo.WatchData{Err: topo.NewError(topo.Interrupted, "watch")} + close(w.contents) } }() return current, notifications, nil } + +// WatchRecursive is part of the topo.Conn interface. +func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + c.factory.mu.Lock() + defer c.factory.mu.Unlock() + + if c.factory.err != nil { + return nil, nil, c.factory.err + } + + n := c.factory.getOrCreatePath(c.cell, dirpath) + if n == nil { + return nil, nil, topo.NewError(topo.NoNode, dirpath) + } + + var initialwd []*topo.WatchDataRecursive + n.recurseContents(func(n *node) { + initialwd = append(initialwd, &topo.WatchDataRecursive{ + Path: n.name, + WatchData: topo.WatchData{ + Contents: n.contents, + Version: NodeVersion(n.version), + }, + }) + }) + + notifications := make(chan *topo.WatchDataRecursive, 100) + watchIndex := nextWatchIndex + nextWatchIndex++ + n.watches[watchIndex] = watch{recursive: notifications} + + go func() { + defer close(notifications) + + <-ctx.Done() + + c.factory.mu.Lock() + defer c.factory.mu.Unlock() + + n := c.factory.nodeByPath(c.cell, dirpath) + if n != nil { + delete(n.watches, watchIndex) + } + + notifications <- &topo.WatchDataRecursive{WatchData: topo.WatchData{Err: topo.NewError(topo.Interrupted, "watch")}} + }() + + return initialwd, notifications, nil +} diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 5a57e647373..c1959bf5d3d 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -168,6 +168,13 @@ func (st *StatsConn) Watch(ctx context.Context, filePath string) (current *Watch return st.conn.Watch(ctx, filePath) } +func (st *StatsConn) WatchRecursive(ctx context.Context, path string) ([]*WatchDataRecursive, <-chan *WatchDataRecursive, error) { + startTime := time.Now() + statsKey := []string{"WatchRecursive", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + return st.conn.WatchRecursive(ctx, path) +} + // NewLeaderParticipation is part of the Conn interface func (st *StatsConn) NewLeaderParticipation(name, id string) (LeaderParticipation, error) { startTime := time.Now() diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index 72f15cd17cb..8c6d9a94026 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -109,6 +109,11 @@ func (st *fakeConn) Watch(ctx context.Context, filePath string) (current *WatchD return current, changes, err } +// WatchRecursive is part of the Conn interface +func (st *fakeConn) WatchRecursive(ctx context.Context, path string) (current []*WatchDataRecursive, changes <-chan *WatchDataRecursive, err error) { + return current, changes, err +} + // NewLeaderParticipation is part of the Conn interface func (st *fakeConn) NewLeaderParticipation(name, id string) (mp LeaderParticipation, err error) { if name == "error" { diff --git a/go/vt/topo/test/file.go b/go/vt/topo/test/file.go index 01a72c64cfc..4a1fe2a51a9 100644 --- a/go/vt/topo/test/file.go +++ b/go/vt/topo/test/file.go @@ -243,5 +243,4 @@ func checkList(t *testing.T, ts *topo.Server) { t.Fatalf("found entry doesn't have value \"a\" for path %q: %s", path, string(entries[0].Value)) } } - } diff --git a/go/vt/topo/test/testing.go b/go/vt/topo/test/testing.go index c7b448bf2f4..0861df0c708 100644 --- a/go/vt/topo/test/testing.go +++ b/go/vt/topo/test/testing.go @@ -115,5 +115,8 @@ func TopoServerTestSuite(t *testing.T, factory func() *topo.Server) { t.Log("=== checkList") checkList(t, ts) + t.Log("=== checkWatchRecursive") + checkWatchRecursive(t, ts) + ts.Close() } diff --git a/go/vt/topo/test/watch.go b/go/vt/topo/test/watch.go index 05dec8db717..24e831718d1 100644 --- a/go/vt/topo/test/watch.go +++ b/go/vt/topo/test/watch.go @@ -33,7 +33,7 @@ import ( // waitForInitialValue waits for the initial value of // keyspaces/test_keyspace/SrvKeyspace to appear, and match the // provided srvKeyspace. -func waitForInitialValue(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.SrvKeyspace) (changes <-chan *topo.WatchData, cancel func()) { +func waitForInitialValue(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.SrvKeyspace) (changes <-chan *topo.WatchData, cancel context.CancelFunc) { var current *topo.WatchData ctx, cancel := context.WithCancel(context.Background()) start := time.Now() @@ -51,7 +51,7 @@ func waitForInitialValue(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.S } if err != nil { cancel() - t.Fatalf("watch failed: %v", current.Err) + t.Fatalf("watch failed: %v", err) } // we got a valid result break @@ -69,6 +69,49 @@ func waitForInitialValue(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.S return changes, cancel } +// waitForInitialValue waits for the initial value of +// keyspaces/test_keyspace/SrvKeyspace to appear, and match the +// provided srvKeyspace. +func waitForInitialValueRecursive(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.SrvKeyspace) (changes <-chan *topo.WatchDataRecursive, cancel context.CancelFunc, err error) { + var current []*topo.WatchDataRecursive + ctx, cancel := context.WithCancel(context.Background()) + start := time.Now() + for { + current, changes, err = conn.WatchRecursive(ctx, "keyspaces/test_keyspace") + if topo.IsErrType(err, topo.NoNode) { + // hasn't appeared yet + if time.Since(start) > 10*time.Second { + cancel() + t.Fatalf("time out waiting for file to appear") + } + time.Sleep(10 * time.Millisecond) + continue + } + if topo.IsErrType(err, topo.NoImplementation) { + // If this is not supported, skip the test + cancel() + return nil, nil, err + } + if err != nil { + cancel() + t.Fatalf("watch failed: %v", err) + } + // we got a valid result + break + } + got := &topodatapb.SrvKeyspace{} + if err := proto.Unmarshal(current[0].Contents, got); err != nil { + cancel() + t.Fatalf("cannot proto-unmarshal data: %v", err) + } + if !proto.Equal(got, srvKeyspace) { + cancel() + t.Fatalf("got bad data: %v expected: %v", got, srvKeyspace) + } + + return changes, cancel, nil +} + // checkWatch runs the tests on the Watch part of the Conn API. // We use a SrvKeyspace object. func checkWatch(t *testing.T, ts *topo.Server) { @@ -253,3 +296,149 @@ func checkWatchInterrupt(t *testing.T, ts *topo.Server) { // And calling cancel() again should just work. cancel() } + +// checkWatchRecursive tests we can setup a recursive watch +func checkWatchRecursive(t *testing.T, ts *topo.Server) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + conn, err := ts.ConnForCell(ctx, LocalCellName) + if err != nil { + t.Fatalf("ConnForCell(test) failed: %v", err) + } + + // create some data + keyRange, err := key.ParseShardingSpec("-") + if err != nil || len(keyRange) != 1 { + t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(keyRange)) + } + + srvKeyspace := &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ + { + ServedType: topodatapb.TabletType_PRIMARY, + ShardReferences: []*topodatapb.ShardReference{ + { + Name: "name", + KeyRange: keyRange[0], + }, + }, + }, + }, + } + if err := ts.UpdateSrvKeyspace(ctx, LocalCellName, "test_keyspace", srvKeyspace); err != nil { + t.Fatalf("UpdateSrvKeyspace(1): %v", err) + } + + // start watching again, it should work + changes, secondCancel, err := waitForInitialValueRecursive(t, conn, srvKeyspace) + if topo.IsErrType(err, topo.NoImplementation) { + // Skip the rest if there's no implementation + t.Logf("%T does not support WatchRecursive()", conn) + return + } + defer secondCancel() + + // change the data + srvKeyspace.Partitions[0].ShardReferences[0].Name = "new_name" + if err := ts.UpdateSrvKeyspace(ctx, LocalCellName, "test_keyspace", srvKeyspace); err != nil { + t.Fatalf("UpdateSrvKeyspace(2): %v", err) + } + + // Make sure we get the watch data, maybe not as first notice, + // but eventually. The API specifies it is possible to get duplicate + // notifications. + for { + wd, ok := <-changes + if !ok { + t.Fatalf("watch channel unexpectedly closed") + } + if wd.Err != nil { + t.Fatalf("watch interrupted: %v", wd.Err) + } + got := &topodatapb.SrvKeyspace{} + if err := proto.Unmarshal(wd.Contents, got); err != nil { + t.Fatalf("cannot proto-unmarshal data: %v", err) + } + + if got.Partitions[0].ShardReferences[0].Name == "name" { + // extra first value, still good + continue + } + if got.Partitions[0].ShardReferences[0].Name == "new_name" { + // watch worked, good + break + } + t.Fatalf("got unknown SrvKeyspace: %v", got) + } + + // remove the SrvKeyspace + if err := ts.DeleteSrvKeyspace(ctx, LocalCellName, "test_keyspace"); err != nil { + t.Fatalf("DeleteSrvKeyspace: %v", err) + } + + // Make sure we get the ErrNoNode notification eventually. + // The API specifies it is possible to get duplicate + // notifications. + for { + wd, ok := <-changes + if !ok { + t.Fatalf("watch channel unexpectedly closed") + } + + if topo.IsErrType(wd.Err, topo.NoNode) { + // good + break + } + if wd.Err != nil { + t.Fatalf("bad error returned for deletion: %v", wd.Err) + } + // we got something, better be the right value + got := &topodatapb.SrvKeyspace{} + if err := proto.Unmarshal(wd.Contents, got); err != nil { + t.Fatalf("cannot proto-unmarshal data: %v", err) + } + if got.Partitions[0].ShardReferences[0].Name == "new_name" { + // good value + continue + } + t.Fatalf("got unknown SrvKeyspace waiting for deletion: %v", got) + } + + // We now have to stop watching. This doesn't automatically + // happen for recursive watches on a single file since others + // can still be seen. + secondCancel() + + // Make sure we get the topo.ErrInterrupted notification eventually. + for { + wd, ok := <-changes + if !ok { + t.Fatalf("watch channel unexpectedly closed") + } + if topo.IsErrType(wd.Err, topo.Interrupted) { + // good + break + } + if wd.Err != nil { + t.Fatalf("bad error returned for cancellation: %v", wd.Err) + } + // we got something, better be the right value + got := &topodatapb.SrvKeyspace{} + if err := proto.Unmarshal(wd.Contents, got); err != nil { + t.Fatalf("cannot proto-unmarshal data: %v", err) + } + if got.Partitions[0].ShardReferences[0].Name == "name" { + // good value + continue + } + t.Fatalf("got unknown SrvKeyspace waiting for deletion: %v", got) + } + + // Now the channel should be closed. + if wd, ok := <-changes; ok { + t.Fatalf("got unexpected event after error: %v", wd) + } + + // And calling cancel() again should just work. + secondCancel() +} diff --git a/go/vt/topo/zk2topo/watch.go b/go/vt/topo/zk2topo/watch.go index 69e8f2df48c..06d466d8cef 100644 --- a/go/vt/topo/zk2topo/watch.go +++ b/go/vt/topo/zk2topo/watch.go @@ -95,3 +95,11 @@ func (zs *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, return wd, c, nil } + +// WatchRecursive is part of the topo.Conn interface. +func (zs *Server) WatchRecursive(_ context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + // This isn't implemented yet, but potentially can be implemented if we want + // to update the minimum ZooKeeper requirement to 3.6.0 and use recursive watches. + // Also see https://zookeeper.apache.org/doc/r3.6.3/zookeeperProgrammers.html#sc_WatchPersistentRecursive + return nil, nil, topo.NewError(topo.NoImplementation, path) +} From f6b166e93e599714a3539b41605501565a5ecce3 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Mon, 1 Aug 2022 15:39:11 +0200 Subject: [PATCH 4/7] Don't allow expired contexts to retrieve topo connection Once a context is expired, we shouldn't return a valid cell anymore, even if it's the global cell. Local cells would fail in this case as well, so this ensures we match that behavior. Signed-off-by: Dirkjan Bussink --- go/vt/topo/server.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index 4d783379443..8eb72aeecc3 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -43,14 +43,13 @@ There are two test sub-packages associated with this code: package topo import ( + "context" "flag" "fmt" "sync" "vitess.io/vitess/go/vt/proto/topodata" - "context" - "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/log" @@ -245,6 +244,9 @@ func Open() *Server { func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) { // Global cell is the easy case. if cell == GlobalCell { + if ctx.Err() != nil { + return nil, ctx.Err() + } return ts.globalCell, nil } From e723738cfd61727ff7bbad3f9afba76953a3f7c2 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Mon, 1 Aug 2022 16:10:53 +0200 Subject: [PATCH 5/7] Add WaitForNewLeader to election API This allows for more efficiently waiting for a new leader to be elected. It also allows for a continued stream of changes during the election lifetime. This is very useful for example when something else needs to be triggered on an election change and we don't need to use a busy wait loop for it in that case. Signed-off-by: Dirkjan Bussink --- go/vt/topo/conn.go | 10 +++++ go/vt/topo/consultopo/election.go | 11 ++++- go/vt/topo/etcd2topo/election.go | 73 ++++++++++++++++++++++++++++-- go/vt/topo/etcd2topo/server.go | 8 +++- go/vt/topo/etcd2topo/watch.go | 8 ++++ go/vt/topo/k8stopo/election.go | 10 ++++- go/vt/topo/memorytopo/election.go | 44 +++++++++++++++++- go/vt/topo/memorytopo/lock.go | 9 +++- go/vt/topo/test/election.go | 74 ++++++++++++++++++++++++++++++- go/vt/topo/test/testing.go | 5 +++ go/vt/topo/zk2topo/election.go | 10 ++++- 11 files changed, 245 insertions(+), 17 deletions(-) diff --git a/go/vt/topo/conn.go b/go/vt/topo/conn.go index 4d4b2329233..dd5e2622abe 100644 --- a/go/vt/topo/conn.go +++ b/go/vt/topo/conn.go @@ -386,4 +386,14 @@ type LeaderParticipation interface { // GetCurrentLeaderID returns the current primary id. // This may not work after Stop has been called. GetCurrentLeaderID(ctx context.Context) (string, error) + + // WaitForNewLeader allows for nodes to wait until a leadership + // election cycle completes and to get subsequent updates of + // leadership changes. This way logic that needs to know if leadership + // changes also if we're not the leader ourselves doesn't need to + // poll for leadership status. + // + // For topo implementation that have this, it can be used more + // efficiently than needing a busy wait loop. + WaitForNewLeader(ctx context.Context) (<-chan string, error) } diff --git a/go/vt/topo/consultopo/election.go b/go/vt/topo/consultopo/election.go index 0c595dd27a8..badcff19fff 100644 --- a/go/vt/topo/consultopo/election.go +++ b/go/vt/topo/consultopo/election.go @@ -17,9 +17,8 @@ limitations under the License. package consultopo import ( - "path" - "context" + "path" "github.com/hashicorp/consul/api" @@ -133,3 +132,11 @@ func (mp *consulLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (st } return string(pair.Value), nil } + +// WaitForNewLeader is part of the topo.LeaderParticipation interface +func (mp *consulLeaderParticipation) WaitForNewLeader(context.Context) (<-chan string, error) { + // This isn't implemented yet, but likely can be implemented using List + // with blocking logic on election path. + // See also how WatchRecursive could be implemented as well. + return nil, topo.NewError(topo.NoImplementation, "wait for leader not supported in Consul topo") +} diff --git a/go/vt/topo/etcd2topo/election.go b/go/vt/topo/etcd2topo/election.go index 667b98562f4..276d9e60355 100644 --- a/go/vt/topo/etcd2topo/election.go +++ b/go/vt/topo/etcd2topo/election.go @@ -17,9 +17,8 @@ limitations under the License. package etcd2topo import ( - "path" - "context" + "path" clientv3 "go.etcd.io/etcd/client/v3" @@ -76,7 +75,11 @@ func (mp *etcdLeaderParticipation) WaitForLeadership() (context.Context, error) // we just cancel that context. lockCtx, lockCancel := context.WithCancel(context.Background()) go func() { - <-mp.stop + select { + case <-mp.s.running: + return + case <-mp.stop: + } if ld != nil { if err := ld.Unlock(context.Background()); err != nil { log.Errorf("failed to unlock electionPath %v: %v", electionPath, err) @@ -123,3 +126,67 @@ func (mp *etcdLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (stri } return string(resp.Kvs[0].Value), nil } + +func (mp *etcdLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) { + electionPath := path.Join(mp.s.root, electionsPath, mp.name) + + notifications := make(chan string, 8) + ctx, cancel := context.WithCancel(ctx) + + // Get the current leader + initial, err := mp.s.cli.Get(ctx, electionPath+"/", + clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByModRevision, clientv3.SortAscend), + clientv3.WithLimit(1)) + if err != nil { + cancel() + return nil, err + } + + if len(initial.Kvs) == 1 { + leader := initial.Kvs[0].Value + notifications <- string(leader) + } + + // Create the Watcher. We start watching from the response we + // got, not from the file original version, as the server may + // not have that much history. + watcher := mp.s.cli.Watch(ctx, electionPath, clientv3.WithPrefix(), clientv3.WithRev(initial.Header.Revision)) + if watcher == nil { + cancel() + return nil, convertError(err, electionPath) + } + + go func() { + defer cancel() + defer close(notifications) + for { + select { + case <-mp.s.running: + return + case <-mp.done: + return + case <-ctx.Done(): + return + case wresp, ok := <-watcher: + if !ok || wresp.Canceled { + return + } + + currentLeader, err := mp.s.cli.Get(ctx, electionPath+"/", + clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByModRevision, clientv3.SortAscend), + clientv3.WithLimit(1)) + if err != nil { + continue + } + if len(currentLeader.Kvs) != 1 { + continue + } + notifications <- string(currentLeader.Kvs[0].Value) + } + } + }() + + return notifications, nil +} diff --git a/go/vt/topo/etcd2topo/server.go b/go/vt/topo/etcd2topo/server.go index 062fe0bcec5..3ca60f15e6e 100644 --- a/go/vt/topo/etcd2topo/server.go +++ b/go/vt/topo/etcd2topo/server.go @@ -74,12 +74,15 @@ type Server struct { // root is the root path for this client. root string + + running chan struct{} } // Close implements topo.Server.Close. // It will nil out the global and cells fields, so any attempt to // re-use this server will panic. func (s *Server) Close() { + close(s.running) s.cli.Close() s.cli = nil } @@ -140,8 +143,9 @@ func NewServerWithOpts(serverAddr, root, certPath, keyPath, caPath string) (*Ser } return &Server{ - cli: cli, - root: root, + cli: cli, + root: root, + running: make(chan struct{}), }, nil } diff --git a/go/vt/topo/etcd2topo/watch.go b/go/vt/topo/etcd2topo/watch.go index e653fe1ca2e..aa930ca876d 100644 --- a/go/vt/topo/etcd2topo/watch.go +++ b/go/vt/topo/etcd2topo/watch.go @@ -76,6 +76,8 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < var watchRetries int for { select { + case <-s.running: + return case <-watchCtx.Done(): // This includes context cancellation errors. notifications <- &topo.WatchData{ @@ -87,6 +89,8 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < if watchRetries > 10 { select { case <-time.After(time.Duration(watchRetries) * time.Second): + case <-s.running: + continue case <-watchCtx.Done(): continue } @@ -188,6 +192,8 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa var watchRetries int for { select { + case <-s.running: + return case <-watchCtx.Done(): // This includes context cancellation errors. notifications <- &topo.WatchDataRecursive{ @@ -199,6 +205,8 @@ func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Wa if watchRetries > 10 { select { case <-time.After(time.Duration(watchRetries) * time.Second): + case <-s.running: + continue case <-watchCtx.Done(): continue } diff --git a/go/vt/topo/k8stopo/election.go b/go/vt/topo/k8stopo/election.go index 1106156f88f..9c89faf445d 100644 --- a/go/vt/topo/k8stopo/election.go +++ b/go/vt/topo/k8stopo/election.go @@ -17,9 +17,8 @@ limitations under the License. package k8stopo import ( - "path" - "context" + "path" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -121,3 +120,10 @@ func (mp *kubernetesLeaderParticipation) GetCurrentLeaderID(ctx context.Context) } return string(id), nil } + +// WaitForNewLeader is part of the topo.LeaderParticipation interface +func (mp *kubernetesLeaderParticipation) WaitForNewLeader(context.Context) (<-chan string, error) { + // Kubernetes doesn't seem to provide a primitive that watches a prefix + // or directory, so this likely can never be implemented. + return nil, topo.NewError(topo.NoImplementation, "wait for leader not supported in K8s topo") +} diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index e10d09527a7..5ad16622974 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -17,9 +17,8 @@ limitations under the License. package memorytopo import ( - "path" - "context" + "path" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -126,3 +125,44 @@ func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, return n.lockContents, nil } + +// WaitForNewLeader is part of the topo.LeaderParticipation interface +func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) { + mp.c.factory.mu.Lock() + defer mp.c.factory.mu.Unlock() + + electionPath := path.Join(electionsPath, mp.name) + n := mp.c.factory.nodeByPath(mp.c.cell, electionPath) + if n == nil { + return nil, topo.NewError(topo.NoNode, electionPath) + } + + notifications := make(chan string, 8) + watchIndex := nextWatchIndex + nextWatchIndex++ + n.watches[watchIndex] = watch{lock: notifications} + + if n.lock != nil { + notifications <- n.lockContents + } + + go func() { + defer close(notifications) + + select { + case <-mp.stop: + case <-ctx.Done(): + } + + mp.c.factory.mu.Lock() + defer mp.c.factory.mu.Unlock() + + n := mp.c.factory.nodeByPath(mp.c.cell, electionPath) + if n == nil { + return + } + delete(n.watches, watchIndex) + }() + + return notifications, nil +} diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index ab9e8088873..8da3d446471 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -17,9 +17,8 @@ limitations under the License. package memorytopo import ( - "fmt" - "context" + "fmt" "vitess.io/vitess/go/vt/topo" ) @@ -77,6 +76,12 @@ func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDes // No one has the lock, grab it. n.lock = make(chan struct{}) n.lockContents = contents + for _, w := range n.watches { + if w.lock == nil { + continue + } + w.lock <- contents + } c.factory.mu.Unlock() return &memoryTopoLockDescriptor{ c: c, diff --git a/go/vt/topo/test/election.go b/go/vt/topo/test/election.go index bd92a452c4e..594e6562eb2 100644 --- a/go/vt/topo/test/election.go +++ b/go/vt/topo/test/election.go @@ -17,11 +17,10 @@ limitations under the License. package test import ( + "context" "testing" "time" - "context" - "vitess.io/vitess/go/vt/topo" ) @@ -147,3 +146,74 @@ func checkElection(t *testing.T, ts *topo.Server) { t.Errorf("wrong error returned by WaitForLeadership, got %v expected %v", err, topo.NewError(topo.Interrupted, "")) } } + +// checkWaitForNewLeader runs the WaitForLeadership test on the LeaderParticipation +func checkWaitForNewLeader(t *testing.T, ts *topo.Server) { + conn, err := ts.ConnForCell(context.Background(), topo.GlobalCell) + if err != nil { + t.Fatalf("ConnForCell(global) failed: %v", err) + } + name := "testmp" + + // create a new LeaderParticipation + id1 := "id1" + mp1, err := conn.NewLeaderParticipation(name, id1) + if err != nil { + t.Fatalf("cannot create mp1: %v", err) + } + + // no primary yet, check name + waitForLeaderID(t, mp1, "") + + // wait for id1 to be the primary + _, err = mp1.WaitForLeadership() + if err != nil { + t.Fatalf("mp1 cannot become Leader: %v", err) + } + + // A lot of implementations use a toplevel directory for their elections. + // Make sure it is marked as 'Ephemeral'. + entries, err := conn.ListDir(context.Background(), "/", true /*full*/) + if err != nil { + t.Fatalf("ListDir(/) failed: %v", err) + } + for _, e := range entries { + if e.Name != topo.CellsPath { + if !e.Ephemeral { + t.Errorf("toplevel directory that is not ephemeral: %v", e) + } + } + } + + // get the current primary name, better be id1 + waitForLeaderID(t, mp1, id1) + + // create a second LeaderParticipation on same name + id2 := "id2" + mp2, err := conn.NewLeaderParticipation(name, id2) + if err != nil { + t.Fatalf("cannot create mp2: %v", err) + } + + leaders, err := mp2.WaitForNewLeader(context.Background()) + if topo.IsErrType(err, topo.NoImplementation) { + t.Logf("%T does not support WaitForNewLeader()", mp2) + return + } + if err != nil { + t.Fatalf("cannot wait for leadership: %v", err) + return + } + + // ask mp2 for primary name, should get id1 + waitForLeaderID(t, mp2, id1) + + // stop mp1 + mp1.Stop() + + leader := <-leaders + + if leader != id1 { + t.Fatalf("wrong node elected: %v", leader) + } +} diff --git a/go/vt/topo/test/testing.go b/go/vt/topo/test/testing.go index 0861df0c708..fbddc65e98d 100644 --- a/go/vt/topo/test/testing.go +++ b/go/vt/topo/test/testing.go @@ -96,6 +96,11 @@ func TopoServerTestSuite(t *testing.T, factory func() *topo.Server) { checkElection(t, ts) ts.Close() + t.Log("=== checkWaitForNewLeader") + ts = factory() + checkWaitForNewLeader(t, ts) + ts.Close() + t.Log("=== checkDirectory") ts = factory() checkDirectory(t, ts) diff --git a/go/vt/topo/zk2topo/election.go b/go/vt/topo/zk2topo/election.go index 1ff2f0a9577..a7f066db70c 100644 --- a/go/vt/topo/zk2topo/election.go +++ b/go/vt/topo/zk2topo/election.go @@ -17,11 +17,10 @@ limitations under the License. package zk2topo import ( + "context" "path" "sort" - "context" - "github.com/z-division/go-zookeeper/zk" "vitess.io/vitess/go/vt/vterrors" @@ -198,3 +197,10 @@ func (mp *zkLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string return string(data), nil } } + +// WaitForNewLeader is part of the topo.LeaderParticipation interface +func (mp *zkLeaderParticipation) WaitForNewLeader(context.Context) (<-chan string, error) { + // This isn't implemented yet, but likely can be implemented in the same way + // as how WatchRecursive could be implemented as well. + return nil, topo.NewError(topo.NoImplementation, "wait for leader not supported in ZK2 topo") +} From a6cda32653349af1416540297143c1a88e3a4515 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Mon, 1 Aug 2022 17:48:23 +0200 Subject: [PATCH 6/7] Update entries also when receiving direct errors Also when we get an error directly from the watcher on the initial entry, we want to update the tracking value with that error. This is needed to ensure we handle errors like a file not existing and appearing later properly. Signed-off-by: Dirkjan Bussink --- go/vt/srvtopo/watch_srvkeyspace.go | 1 + go/vt/srvtopo/watch_srvvschema.go | 1 + go/vt/topo/topotests/shard_watch_test.go | 4 ++-- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/go/vt/srvtopo/watch_srvkeyspace.go b/go/vt/srvtopo/watch_srvkeyspace.go index c12216f53f5..34c4a7afa08 100644 --- a/go/vt/srvtopo/watch_srvkeyspace.go +++ b/go/vt/srvtopo/watch_srvkeyspace.go @@ -45,6 +45,7 @@ func NewSrvKeyspaceWatcher(topoServer *topo.Server, counts *stats.CountersWithSi current, changes, err := topoServer.WatchSrvKeyspace(watchCtx, key.cell, key.keyspace) if err != nil { + entry.update(ctx, nil, err, true) return } diff --git a/go/vt/srvtopo/watch_srvvschema.go b/go/vt/srvtopo/watch_srvvschema.go index 9eb8c6d3648..2b8ab2b80e6 100644 --- a/go/vt/srvtopo/watch_srvvschema.go +++ b/go/vt/srvtopo/watch_srvvschema.go @@ -43,6 +43,7 @@ func NewSrvVSchemaWatcher(topoServer *topo.Server, counts *stats.CountersWithSin current, changes, err := topoServer.WatchSrvVSchema(ctx, key.String()) if err != nil { + entry.update(ctx, nil, err, true) return } diff --git a/go/vt/topo/topotests/shard_watch_test.go b/go/vt/topo/topotests/shard_watch_test.go index 3cf64378e41..89be0064f31 100644 --- a/go/vt/topo/topotests/shard_watch_test.go +++ b/go/vt/topo/topotests/shard_watch_test.go @@ -60,9 +60,9 @@ func TestWatchShardNoNode(t *testing.T) { ts := memorytopo.NewServer("cell1") // No Shard -> ErrNoNode - current, _, err := ts.WatchShard(ctx, keyspace, shard) + _, _, err := ts.WatchShard(ctx, keyspace, shard) if !topo.IsErrType(err, topo.NoNode) { - t.Errorf("Got invalid result from WatchShard(not there): %v", current.Err) + t.Errorf("Got invalid result from WatchShard(not there): %v", err) } } From d021712cd24d0658794654d4bb31dd9f790e4485 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Tue, 2 Aug 2022 09:14:24 +0200 Subject: [PATCH 7/7] Fix broken k8stopo List implementation Signed-off-by: Dirkjan Bussink --- go/vt/topo/k8stopo/file.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/go/vt/topo/k8stopo/file.go b/go/vt/topo/k8stopo/file.go index 0cc96d33ea0..bdc1cf42f13 100644 --- a/go/vt/topo/k8stopo/file.go +++ b/go/vt/topo/k8stopo/file.go @@ -19,6 +19,7 @@ package k8stopo import ( "bytes" "compress/gzip" + "context" "encoding/base64" "fmt" "hash/fnv" @@ -28,8 +29,6 @@ import ( "strings" "time" - "context" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/retry" @@ -243,11 +242,16 @@ func (s *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo if len(nodes) == 0 { return results, topo.NewError(topo.NoNode, filePathPrefix) } + rootPrefix := filepath.Join(s.root, filePathPrefix) for _, node := range nodes { - if strings.HasPrefix(node.Data.Value, filePathPrefix) { + if strings.HasPrefix(node.Data.Key, rootPrefix) { + out, err := unpackValue([]byte(node.Data.Value)) + if err != nil { + return results, convertError(err, node.Data.Key) + } results = append(results, topo.KVInfo{ Key: []byte(node.Data.Key), - Value: []byte(node.Data.Value), + Value: out, Version: KubernetesVersion(node.GetResourceVersion()), }) }