diff --git a/go/vt/srvtopo/watch_srvkeyspace.go b/go/vt/srvtopo/watch_srvkeyspace.go index e3edcdd5250..34c4a7afa08 100644 --- a/go/vt/srvtopo/watch_srvkeyspace.go +++ b/go/vt/srvtopo/watch_srvkeyspace.go @@ -40,14 +40,20 @@ func (k *srvKeyspaceKey) String() string { func NewSrvKeyspaceWatcher(topoServer *topo.Server, counts *stats.CountersWithSingleLabel, cacheRefresh, cacheTTL time.Duration) *SrvKeyspaceWatcher { watch := func(ctx context.Context, entry *watchEntry) { key := entry.key.(*srvKeyspaceKey) - current, changes, cancel := topoServer.WatchSrvKeyspace(context.Background(), key.cell, key.keyspace) + watchCtx, watchCancel := context.WithCancel(ctx) + defer watchCancel() + + current, changes, err := topoServer.WatchSrvKeyspace(watchCtx, key.cell, key.keyspace) + if err != nil { + entry.update(ctx, nil, err, true) + return + } entry.update(ctx, current.Value, current.Err, true) if current.Err != nil { return } - defer cancel() for c := range changes { entry.update(ctx, c.Value, c.Err, false) if c.Err != nil { diff --git a/go/vt/srvtopo/watch_srvvschema.go b/go/vt/srvtopo/watch_srvvschema.go index 14d30829f69..2b8ab2b80e6 100644 --- a/go/vt/srvtopo/watch_srvvschema.go +++ b/go/vt/srvtopo/watch_srvvschema.go @@ -38,7 +38,14 @@ func (k cellName) String() string { func NewSrvVSchemaWatcher(topoServer *topo.Server, counts *stats.CountersWithSingleLabel, cacheRefresh, cacheTTL time.Duration) *SrvVSchemaWatcher { watch := func(ctx context.Context, entry *watchEntry) { key := entry.key.(cellName) - current, changes, cancel := topoServer.WatchSrvVSchema(context.Background(), key.String()) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + current, changes, err := topoServer.WatchSrvVSchema(ctx, key.String()) + if err != nil { + entry.update(ctx, nil, err, true) + return + } entry.update(ctx, current.Value, current.Err, true) if current.Err != nil { diff --git a/go/vt/topo/conn.go b/go/vt/topo/conn.go index cf75617ef45..dd5e2622abe 100644 --- a/go/vt/topo/conn.go +++ b/go/vt/topo/conn.go @@ -17,9 +17,8 @@ limitations under the License. package topo import ( - "sort" - "context" + "sort" ) // Conn defines the interface that must be implemented by topology @@ -120,17 +119,13 @@ type Conn interface { // Watch starts watching a file in the provided cell. It // returns the current value, a 'changes' channel to read the - // changes from, and a 'cancel' function to call to stop the - // watch. If the initial read fails, or the file doesn't - // exist, current.Err is set, and 'changes'/'cancel' are nil. - // Otherwise current.Err is nil, and current.Contents / - // current.Version are accurate. The provided context is only - // used to setup the current watch, and not after Watch() - // returns. + // changes from, and an error. + // If the initial read fails, or the file doesn't + // exist, an error is returned. // - // To stop the watch, just call the returned 'cancel' function. + // To stop the watch, cancel the provided context. // This will eventually result in a final WatchData result with Err = - // ErrInterrupted. It should be safe to call the 'cancel' function + // ErrInterrupted. It should be safe to cancel the context // multiple times, or after the Watch already errored out. // // The 'changes' channel may return a record with Err != nil. @@ -158,7 +153,45 @@ type Conn interface { // being correct quickly, as long as it eventually gets there. // // filePath is a path relative to the root directory of the cell. - Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, cancel CancelFunc) + Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, err error) + + // WatchRecursive starts watching a file prefix in the provided cell. It + // returns all the current values for existing files with the given + // prefix, a 'changes' channel to read the changes from and an error. + // + // The provided context should be canceled when stopping WatchRecursive(). + // This API is different from Watch() and Watch() will be changed + // to match this API as well in the future. + // + // Canceling will eventually result in a final WatchDataRecursive result with Err = + // ErrInterrupted. + // + // The 'changes' channel may return a record with Err != nil. + // In that case, the channel will also be closed right after + // that record. In any case, 'changes' has to be drained of + // all events, even when 'stop' is closed. + // + // Note the 'changes' channel can return twice the same + // Version/Contents (for instance, if the watch is interrupted + // and restarted within the Conn implementation). + // Similarly, the 'changes' channel may skip versions / changes + // (that is, if value goes [A, B, C, D, E, F], the watch may only + // receive [A, B, F]). This should only happen for rapidly + // changing values though. Usually, the initial value will come + // back right away. And a stable value (that hasn't changed for + // a while) should be seen shortly. + // + // The WatchRecursive call is not guaranteed to return exactly up to + // date data right away. For instance, if a file is created + // and saved, and then a watch is set on that file, it may + // return ErrNoNode (as the underlying configuration service + // may use asynchronous caches that are not up to date + // yet). The only guarantee is that the watch data will + // eventually converge. Vitess doesn't explicitly depend on the data + // being correct quickly, as long as it eventually gets there. + // + // path is a path relative to the root directory of the cell. + WatchRecursive(ctx context.Context, path string) ([]*WatchDataRecursive, <-chan *WatchDataRecursive, error) // // Leader election methods. This is meant to have a small @@ -276,6 +309,17 @@ type WatchData struct { Err error } +// WatchDataRecursive is the structure returned by the WatchRecursive() API. +// It contains the same data as WatchData, but additionally also the specific +// path of the entry that the recursive watch applies to, since an entire +// file prefix can be watched. +type WatchDataRecursive struct { + // Path is the path that has changed + Path string + + WatchData +} + // KVInfo is a structure that contains a generic key/value pair from // the topo server, along with important metadata about it. // This should be used to provide multiple entries in List like calls @@ -342,4 +386,14 @@ type LeaderParticipation interface { // GetCurrentLeaderID returns the current primary id. // This may not work after Stop has been called. GetCurrentLeaderID(ctx context.Context) (string, error) + + // WaitForNewLeader allows for nodes to wait until a leadership + // election cycle completes and to get subsequent updates of + // leadership changes. This way logic that needs to know if leadership + // changes also if we're not the leader ourselves doesn't need to + // poll for leadership status. + // + // For topo implementation that have this, it can be used more + // efficiently than needing a busy wait loop. + WaitForNewLeader(ctx context.Context) (<-chan string, error) } diff --git a/go/vt/topo/consultopo/election.go b/go/vt/topo/consultopo/election.go index 0c595dd27a8..badcff19fff 100644 --- a/go/vt/topo/consultopo/election.go +++ b/go/vt/topo/consultopo/election.go @@ -17,9 +17,8 @@ limitations under the License. package consultopo import ( - "path" - "context" + "path" "github.com/hashicorp/consul/api" @@ -133,3 +132,11 @@ func (mp *consulLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (st } return string(pair.Value), nil } + +// WaitForNewLeader is part of the topo.LeaderParticipation interface +func (mp *consulLeaderParticipation) WaitForNewLeader(context.Context) (<-chan string, error) { + // This isn't implemented yet, but likely can be implemented using List + // with blocking logic on election path. + // See also how WatchRecursive could be implemented as well. + return nil, topo.NewError(topo.NoImplementation, "wait for leader not supported in Consul topo") +} diff --git a/go/vt/topo/consultopo/watch.go b/go/vt/topo/consultopo/watch.go index a4da011dfdd..8c8abbb992c 100644 --- a/go/vt/topo/consultopo/watch.go +++ b/go/vt/topo/consultopo/watch.go @@ -17,12 +17,11 @@ limitations under the License. package consultopo import ( + "context" "flag" "path" "time" - "context" - "github.com/hashicorp/consul/api" "vitess.io/vitess/go/vt/topo" @@ -33,16 +32,21 @@ var ( ) // Watch is part of the topo.Conn interface. -func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { // Initial get. nodePath := path.Join(s.root, filePath) - pair, _, err := s.kv.Get(nodePath, nil) + options := &api.QueryOptions{} + + initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer initialCancel() + + pair, _, err := s.kv.Get(nodePath, options.WithContext(initialCtx)) if err != nil { - return &topo.WatchData{Err: err}, nil, nil + return nil, nil, err } if pair == nil { // Node doesn't exist. - return &topo.WatchData{Err: topo.NewError(topo.NoNode, nodePath)}, nil, nil + return nil, nil, topo.NewError(topo.NoNode, nodePath) } // Initial value to return. @@ -51,9 +55,6 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < Version: ConsulVersion(pair.ModifyIndex), } - // Create a context, will be used to cancel the watch. - watchCtx, watchCancel := context.WithCancel(context.Background()) - // Create the notifications channel, send updates to it. notifications := make(chan *topo.WatchData, 10) go func() { @@ -83,7 +84,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < // This essentially uses WaitTime as a heartbeat interval to detect // a dead connection. cancelGetCtx() - getCtx, cancelGetCtx = context.WithTimeout(watchCtx, 2*opts.WaitTime) + getCtx, cancelGetCtx = context.WithTimeout(ctx, 2*opts.WaitTime) pair, _, err = s.kv.Get(nodePath, opts.WithContext(getCtx)) if err != nil { @@ -114,9 +115,9 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < // See if the watch was canceled. select { - case <-watchCtx.Done(): + case <-ctx.Done(): notifications <- &topo.WatchData{ - Err: convertError(watchCtx.Err(), nodePath), + Err: convertError(ctx.Err(), nodePath), } cancelGetCtx() return @@ -125,5 +126,14 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < } }() - return wd, notifications, topo.CancelFunc(watchCancel) + return wd, notifications, nil +} + +// WatchRecursive is part of the topo.Conn interface. +func (s *Server) WatchRecursive(_ context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + // This isn't implemented yet, but likely can be implemented using List + // with blocking logic like how we use Get with blocking for regular Watch. + // See also how https://www.consul.io/docs/dynamic-app-config/watches#keyprefix + // works under the hood. + return nil, nil, topo.NewError(topo.NoImplementation, path) } diff --git a/go/vt/topo/etcd2topo/election.go b/go/vt/topo/etcd2topo/election.go index 667b98562f4..276d9e60355 100644 --- a/go/vt/topo/etcd2topo/election.go +++ b/go/vt/topo/etcd2topo/election.go @@ -17,9 +17,8 @@ limitations under the License. package etcd2topo import ( - "path" - "context" + "path" clientv3 "go.etcd.io/etcd/client/v3" @@ -76,7 +75,11 @@ func (mp *etcdLeaderParticipation) WaitForLeadership() (context.Context, error) // we just cancel that context. lockCtx, lockCancel := context.WithCancel(context.Background()) go func() { - <-mp.stop + select { + case <-mp.s.running: + return + case <-mp.stop: + } if ld != nil { if err := ld.Unlock(context.Background()); err != nil { log.Errorf("failed to unlock electionPath %v: %v", electionPath, err) @@ -123,3 +126,67 @@ func (mp *etcdLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (stri } return string(resp.Kvs[0].Value), nil } + +func (mp *etcdLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) { + electionPath := path.Join(mp.s.root, electionsPath, mp.name) + + notifications := make(chan string, 8) + ctx, cancel := context.WithCancel(ctx) + + // Get the current leader + initial, err := mp.s.cli.Get(ctx, electionPath+"/", + clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByModRevision, clientv3.SortAscend), + clientv3.WithLimit(1)) + if err != nil { + cancel() + return nil, err + } + + if len(initial.Kvs) == 1 { + leader := initial.Kvs[0].Value + notifications <- string(leader) + } + + // Create the Watcher. We start watching from the response we + // got, not from the file original version, as the server may + // not have that much history. + watcher := mp.s.cli.Watch(ctx, electionPath, clientv3.WithPrefix(), clientv3.WithRev(initial.Header.Revision)) + if watcher == nil { + cancel() + return nil, convertError(err, electionPath) + } + + go func() { + defer cancel() + defer close(notifications) + for { + select { + case <-mp.s.running: + return + case <-mp.done: + return + case <-ctx.Done(): + return + case wresp, ok := <-watcher: + if !ok || wresp.Canceled { + return + } + + currentLeader, err := mp.s.cli.Get(ctx, electionPath+"/", + clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByModRevision, clientv3.SortAscend), + clientv3.WithLimit(1)) + if err != nil { + continue + } + if len(currentLeader.Kvs) != 1 { + continue + } + notifications <- string(currentLeader.Kvs[0].Value) + } + } + }() + + return notifications, nil +} diff --git a/go/vt/topo/etcd2topo/server.go b/go/vt/topo/etcd2topo/server.go index 062fe0bcec5..3ca60f15e6e 100644 --- a/go/vt/topo/etcd2topo/server.go +++ b/go/vt/topo/etcd2topo/server.go @@ -74,12 +74,15 @@ type Server struct { // root is the root path for this client. root string + + running chan struct{} } // Close implements topo.Server.Close. // It will nil out the global and cells fields, so any attempt to // re-use this server will panic. func (s *Server) Close() { + close(s.running) s.cli.Close() s.cli = nil } @@ -140,8 +143,9 @@ func NewServerWithOpts(serverAddr, root, certPath, keyPath, caPath string) (*Ser } return &Server{ - cli: cli, - root: root, + cli: cli, + root: root, + running: make(chan struct{}), }, nil } diff --git a/go/vt/topo/etcd2topo/watch.go b/go/vt/topo/etcd2topo/watch.go index 239e73aa3f9..aa930ca876d 100644 --- a/go/vt/topo/etcd2topo/watch.go +++ b/go/vt/topo/etcd2topo/watch.go @@ -17,11 +17,11 @@ limitations under the License. package etcd2topo import ( + "context" "path" + "strings" "time" - "context" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" @@ -33,29 +33,29 @@ import ( ) // Watch is part of the topo.Conn interface. -func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { nodePath := path.Join(s.root, filePath) // Get the initial version of the file - initial, err := s.cli.Get(ctx, nodePath) + initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer initialCancel() + initial, err := s.cli.Get(initialCtx, nodePath) if err != nil { // Generic error. - return &topo.WatchData{Err: convertError(err, nodePath)}, nil, nil + return nil, nil, convertError(err, nodePath) } + if len(initial.Kvs) != 1 { // Node doesn't exist. - return &topo.WatchData{Err: topo.NewError(topo.NoNode, nodePath)}, nil, nil + return nil, nil, topo.NewError(topo.NoNode, nodePath) } wd := &topo.WatchData{ Contents: initial.Kvs[0].Value, Version: EtcdVersion(initial.Kvs[0].ModRevision), } - // Create an outer context that will be canceled on return and will cancel all inner watches. - outerCtx, outerCancel := context.WithCancel(context.Background()) - // Create a context, will be used to cancel the watch on retry. - watchCtx, watchCancel := context.WithCancel(outerCtx) + watchCtx, watchCancel := context.WithCancel(ctx) // Create the Watcher. We start watching from the response we // got, not from the file original version, as the server may @@ -63,20 +63,21 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < watcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(initial.Header.Revision)) if watcher == nil { watchCancel() - outerCancel() - return &topo.WatchData{Err: vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "Watch failed")}, nil, nil + return nil, nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "Watch failed") } // Create the notifications channel, send updates to it. notifications := make(chan *topo.WatchData, 10) go func() { defer close(notifications) + defer watchCancel() var currVersion = initial.Header.Revision var watchRetries int for { select { - + case <-s.running: + return case <-watchCtx.Done(): // This includes context cancellation errors. notifications <- &topo.WatchData{ @@ -86,12 +87,18 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < case wresp, ok := <-watcher: if !ok { if watchRetries > 10 { - time.Sleep(time.Duration(watchRetries) * time.Second) + select { + case <-time.After(time.Duration(watchRetries) * time.Second): + case <-s.running: + continue + case <-watchCtx.Done(): + continue + } } watchRetries++ // Cancel inner context on retry and create new one. watchCancel() - watchCtx, watchCancel = context.WithCancel(outerCtx) + watchCtx, watchCancel = context.WithCancel(ctx) newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion)) if newWatcher == nil { log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion) @@ -137,5 +144,121 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < } }() - return wd, notifications, topo.CancelFunc(outerCancel) + return wd, notifications, nil +} + +// WatchRecursive is part of the topo.Conn interface. +func (s *Server) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + nodePath := path.Join(s.root, dirpath) + if !strings.HasSuffix(nodePath, "/") { + nodePath = nodePath + "/" + } + + // Get the initial version of the file + initial, err := s.cli.Get(ctx, nodePath, clientv3.WithPrefix()) + if err != nil { + return nil, nil, convertError(err, nodePath) + } + + var initialwd []*topo.WatchDataRecursive + + for _, kv := range initial.Kvs { + var wd topo.WatchDataRecursive + wd.Path = string(kv.Key) + wd.Contents = kv.Value + wd.Version = EtcdVersion(initial.Kvs[0].ModRevision) + initialwd = append(initialwd, &wd) + } + + // Create a context, will be used to cancel the watch on retry. + watchCtx, watchCancel := context.WithCancel(ctx) + + // Create the Watcher. We start watching from the response we + // got, not from the file original version, as the server may + // not have that much history. + watcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(initial.Header.Revision), clientv3.WithPrefix()) + if watcher == nil { + watchCancel() + return nil, nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "Watch failed") + } + + // Create the notifications channel, send updates to it. + notifications := make(chan *topo.WatchDataRecursive, 10) + go func() { + defer close(notifications) + defer watchCancel() + + var currVersion = initial.Header.Revision + var watchRetries int + for { + select { + case <-s.running: + return + case <-watchCtx.Done(): + // This includes context cancellation errors. + notifications <- &topo.WatchDataRecursive{ + WatchData: topo.WatchData{Err: convertError(watchCtx.Err(), nodePath)}, + } + return + case wresp, ok := <-watcher: + if !ok { + if watchRetries > 10 { + select { + case <-time.After(time.Duration(watchRetries) * time.Second): + case <-s.running: + continue + case <-watchCtx.Done(): + continue + } + } + watchRetries++ + // Cancel inner context on retry and create new one. + watchCancel() + watchCtx, watchCancel = context.WithCancel(ctx) + + newWatcher := s.cli.Watch(watchCtx, nodePath, clientv3.WithRev(currVersion), clientv3.WithPrefix()) + if newWatcher == nil { + log.Warningf("watch %v failed and get a nil channel returned, currVersion: %v", nodePath, currVersion) + } else { + watcher = newWatcher + } + continue + } + + watchRetries = 0 + + if wresp.Canceled { + // Final notification. + notifications <- &topo.WatchDataRecursive{ + WatchData: topo.WatchData{Err: convertError(wresp.Err(), nodePath)}, + } + return + } + + currVersion = wresp.Header.GetRevision() + + for _, ev := range wresp.Events { + switch ev.Type { + case mvccpb.PUT: + notifications <- &topo.WatchDataRecursive{ + Path: string(ev.Kv.Key), + WatchData: topo.WatchData{ + Contents: ev.Kv.Value, + Version: EtcdVersion(ev.Kv.Version), + }, + } + case mvccpb.DELETE: + notifications <- &topo.WatchDataRecursive{ + Path: string(ev.Kv.Key), + WatchData: topo.WatchData{ + Err: topo.NewError(topo.NoNode, nodePath), + }, + } + } + } + } + } + }() + + return initialwd, notifications, nil } diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index be56315a594..4e0a1b5409c 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -288,12 +288,12 @@ func (f *FakeConn) Lock(ctx context.Context, dirPath, contents string) (topo.Loc } // Watch implements the Conn interface -func (f *FakeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (f *FakeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { f.mu.Lock() defer f.mu.Unlock() res, isPresent := f.getResultMap[filePath] if !isPresent { - return &topo.WatchData{Err: topo.NewError(topo.NoNode, filePath)}, nil, nil + return nil, nil, topo.NewError(topo.NoNode, filePath) } current := &topo.WatchData{ Contents: res.contents, @@ -303,7 +303,9 @@ func (f *FakeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, notifications := make(chan *topo.WatchData, 100) f.watches[filePath] = append(f.watches[filePath], notifications) - cancel := func() { + go func() { + defer close(notifications) + <-ctx.Done() watches, isPresent := f.watches[filePath] if !isPresent { return @@ -315,8 +317,12 @@ func (f *FakeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, break } } - } - return current, notifications, cancel + }() + return current, notifications, nil +} + +func (f *FakeConn) WatchRecursive(ctx context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + panic("implement me") } // NewLeaderParticipation implements the Conn interface diff --git a/go/vt/topo/helpers/tee.go b/go/vt/topo/helpers/tee.go index abec09d0713..2df1b8bbadb 100644 --- a/go/vt/topo/helpers/tee.go +++ b/go/vt/topo/helpers/tee.go @@ -164,10 +164,14 @@ func (c *TeeConn) Delete(ctx context.Context, filePath string, version topo.Vers } // Watch is part of the topo.Conn interface -func (c *TeeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (c *TeeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { return c.primary.Watch(ctx, filePath) } +func (c *TeeConn) WatchRecursive(ctx context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + return c.primary.WatchRecursive(ctx, path) +} + // // Lock management. // diff --git a/go/vt/topo/k8stopo/election.go b/go/vt/topo/k8stopo/election.go index 1106156f88f..9c89faf445d 100644 --- a/go/vt/topo/k8stopo/election.go +++ b/go/vt/topo/k8stopo/election.go @@ -17,9 +17,8 @@ limitations under the License. package k8stopo import ( - "path" - "context" + "path" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -121,3 +120,10 @@ func (mp *kubernetesLeaderParticipation) GetCurrentLeaderID(ctx context.Context) } return string(id), nil } + +// WaitForNewLeader is part of the topo.LeaderParticipation interface +func (mp *kubernetesLeaderParticipation) WaitForNewLeader(context.Context) (<-chan string, error) { + // Kubernetes doesn't seem to provide a primitive that watches a prefix + // or directory, so this likely can never be implemented. + return nil, topo.NewError(topo.NoImplementation, "wait for leader not supported in K8s topo") +} diff --git a/go/vt/topo/k8stopo/file.go b/go/vt/topo/k8stopo/file.go index 0cc96d33ea0..bdc1cf42f13 100644 --- a/go/vt/topo/k8stopo/file.go +++ b/go/vt/topo/k8stopo/file.go @@ -19,6 +19,7 @@ package k8stopo import ( "bytes" "compress/gzip" + "context" "encoding/base64" "fmt" "hash/fnv" @@ -28,8 +29,6 @@ import ( "strings" "time" - "context" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/retry" @@ -243,11 +242,16 @@ func (s *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo if len(nodes) == 0 { return results, topo.NewError(topo.NoNode, filePathPrefix) } + rootPrefix := filepath.Join(s.root, filePathPrefix) for _, node := range nodes { - if strings.HasPrefix(node.Data.Value, filePathPrefix) { + if strings.HasPrefix(node.Data.Key, rootPrefix) { + out, err := unpackValue([]byte(node.Data.Value)) + if err != nil { + return results, convertError(err, node.Data.Key) + } results = append(results, topo.KVInfo{ Key: []byte(node.Data.Key), - Value: []byte(node.Data.Value), + Value: out, Version: KubernetesVersion(node.GetResourceVersion()), }) } diff --git a/go/vt/topo/k8stopo/watch.go b/go/vt/topo/k8stopo/watch.go index 4c86736e52a..2707e22ad27 100644 --- a/go/vt/topo/k8stopo/watch.go +++ b/go/vt/topo/k8stopo/watch.go @@ -28,26 +28,22 @@ import ( ) // Watch is part of the topo.Conn interface. -func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { log.Info("Starting Kubernetes topo Watch on ", filePath) current := &topo.WatchData{} // get current - contents, ver, err := s.Get(ctx, filePath) + initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer initialCancel() + + contents, ver, err := s.Get(initialCtx, filePath) if err != nil { - // Per the topo.Conn interface: - // "If the initial read fails, or the file doesn't - // exist, current.Err is set, and 'changes'/'cancel' are nil." - current.Err = err - return current, nil, nil + return nil, nil, err } current.Contents = contents current.Version = ver - // Create a context, will be used to cancel the watch. - watchCtx, watchCancel := context.WithCancel(context.Background()) - // Create the changes channel changes := make(chan *topo.WatchData, 10) @@ -56,11 +52,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < resource, err := s.buildFileResource(filePath, []byte{}) if err != nil { - // Per the topo.Conn interface: - // current.Err is set, and 'changes'/'cancel' are nil - watchCancel() - current.Err = err - return current, nil, nil + return nil, nil, err } // Create the informer / indexer to watch the single resource @@ -111,9 +103,9 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, < go memberInformer.Run(informerChan) // Handle interrupts - go closeOnDone(watchCtx, filePath, informerChan, gracefulShutdown, changes) + go closeOnDone(ctx, filePath, informerChan, gracefulShutdown, changes) - return current, changes, topo.CancelFunc(watchCancel) + return current, changes, nil } func closeOnDone(ctx context.Context, filePath string, informerChan chan struct{}, gracefulShutdown chan struct{}, changes chan *topo.WatchData) { @@ -127,3 +119,10 @@ func closeOnDone(ctx context.Context, filePath string, informerChan chan struct{ close(informerChan) close(changes) } + +// WatchRecursive is part of the topo.Conn interface. +func (s *Server) WatchRecursive(_ context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + // Kubernetes doesn't seem to provide a primitive that watches a prefix + // or directory, so this likely can never be implemented. + return nil, nil, topo.NewError(topo.NoImplementation, path) +} diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index e10d09527a7..5ad16622974 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -17,9 +17,8 @@ limitations under the License. package memorytopo import ( - "path" - "context" + "path" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -126,3 +125,44 @@ func (mp *cLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string, return n.lockContents, nil } + +// WaitForNewLeader is part of the topo.LeaderParticipation interface +func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) { + mp.c.factory.mu.Lock() + defer mp.c.factory.mu.Unlock() + + electionPath := path.Join(electionsPath, mp.name) + n := mp.c.factory.nodeByPath(mp.c.cell, electionPath) + if n == nil { + return nil, topo.NewError(topo.NoNode, electionPath) + } + + notifications := make(chan string, 8) + watchIndex := nextWatchIndex + nextWatchIndex++ + n.watches[watchIndex] = watch{lock: notifications} + + if n.lock != nil { + notifications <- n.lockContents + } + + go func() { + defer close(notifications) + + select { + case <-mp.stop: + case <-ctx.Done(): + } + + mp.c.factory.mu.Lock() + defer mp.c.factory.mu.Unlock() + + n := mp.c.factory.nodeByPath(mp.c.cell, electionPath) + if n == nil { + return + } + delete(n.watches, watchIndex) + }() + + return notifications, nil +} diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index a8b4bf19a0c..0abfc56cb80 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -17,10 +17,10 @@ limitations under the License. package memorytopo import ( + "context" "fmt" "path" - - "context" + "strings" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -60,6 +60,15 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to // Create the file. n := c.factory.newFile(file, contents, p) p.children[file] = n + + n.propagateRecursiveWatch(&topo.WatchDataRecursive{ + Path: filePath, + WatchData: topo.WatchData{ + Contents: n.contents, + Version: NodeVersion(n.version), + }, + }) + return NodeVersion(n.version), nil } @@ -122,12 +131,22 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver // Call the watches for _, w := range n.watches { - w <- &topo.WatchData{ - Contents: n.contents, - Version: NodeVersion(n.version), + if w.contents != nil { + w.contents <- &topo.WatchData{ + Contents: n.contents, + Version: NodeVersion(n.version), + } } } + n.propagateRecursiveWatch(&topo.WatchDataRecursive{ + Path: filePath, + WatchData: topo.WatchData{ + Contents: n.contents, + Version: NodeVersion(n.version), + }, + }) + return NodeVersion(n.version), nil } @@ -158,7 +177,61 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, // List is part of the topo.Conn interface. func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) { - return nil, topo.NewError(topo.NoImplementation, "List not supported in memory topo") + if err := c.dial(ctx); err != nil { + return nil, err + } + + c.factory.mu.Lock() + defer c.factory.mu.Unlock() + + if c.factory.err != nil { + return nil, c.factory.err + } + + dir, file := path.Split(filePathPrefix) + // Get the node to list. + n := c.factory.nodeByPath(c.cell, dir) + if n == nil { + return []topo.KVInfo{}, topo.NewError(topo.NoNode, filePathPrefix) + } + + var result []topo.KVInfo + for name, child := range n.children { + if !strings.HasPrefix(name, file) { + continue + } + if child.isDirectory() { + result = append(result, gatherChildren(child, path.Join(dir, name))...) + } else { + result = append(result, topo.KVInfo{ + Key: []byte(path.Join(dir, name)), + Value: child.contents, + Version: NodeVersion(child.version), + }) + } + } + + if len(result) == 0 { + return []topo.KVInfo{}, topo.NewError(topo.NoNode, filePathPrefix) + } + + return result, nil +} + +func gatherChildren(n *node, dirPath string) []topo.KVInfo { + var result []topo.KVInfo + for name, child := range n.children { + if child.isDirectory() { + result = append(result, gatherChildren(child, path.Join(dirPath, name))...) + } else { + result = append(result, topo.KVInfo{ + Key: []byte(path.Join(dirPath, name)), + Value: child.contents, + Version: NodeVersion(child.version), + }) + } + } + return result } // Delete is part of topo.Conn interface. @@ -203,11 +276,23 @@ func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version // Call the watches for _, w := range n.watches { - w <- &topo.WatchData{ - Err: topo.NewError(topo.NoNode, filePath), + if w.contents != nil { + w.contents <- &topo.WatchData{ + Err: topo.NewError(topo.NoNode, filePath), + } + close(w.contents) + } + if w.lock != nil { + close(w.lock) } - close(w) } + n.propagateRecursiveWatch(&topo.WatchDataRecursive{ + Path: filePath, + WatchData: topo.WatchData{ + Err: topo.NewError(topo.NoNode, filePath), + }, + }) + return nil } diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index ab9e8088873..8da3d446471 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -17,9 +17,8 @@ limitations under the License. package memorytopo import ( - "fmt" - "context" + "fmt" "vitess.io/vitess/go/vt/topo" ) @@ -77,6 +76,12 @@ func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDes // No one has the lock, grab it. n.lock = make(chan struct{}) n.lockContents = contents + for _, w := range n.watches { + if w.lock == nil { + continue + } + w.lock <- contents + } c.factory.mu.Unlock() return &memoryTopoLockDescriptor{ c: c, diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index 89c107636b5..c54c9702c82 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -20,13 +20,12 @@ limitations under the License. package memorytopo import ( + "context" "math/rand" "strings" "sync" "time" - "context" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -146,6 +145,12 @@ func (c *Conn) Close() { c.factory = nil } +type watch struct { + contents chan *topo.WatchData + recursive chan *topo.WatchDataRecursive + lock chan string +} + // node contains a directory or a file entry. // Exactly one of contents or children is not nil. type node struct { @@ -159,7 +164,7 @@ type node struct { parent *node // watches is a map of all watches for this node. - watches map[int]chan *topo.WatchData + watches map[int]watch // lock is nil when the node is not locked. // otherwise it has a channel that is closed by unlock. @@ -175,11 +180,34 @@ func (n *node) isDirectory() bool { return n.children != nil } +func (n *node) recurseContents(callback func(n *node)) { + if n.isDirectory() { + for _, child := range n.children { + child.recurseContents(callback) + } + } else { + callback(n) + } +} + +func (n *node) propagateRecursiveWatch(ev *topo.WatchDataRecursive) { + for parent := n.parent; parent != nil; parent = parent.parent { + for _, w := range parent.watches { + if w.recursive != nil { + w.recursive <- ev + } + } + } +} + // PropagateWatchError propagates the given error to all watches on this node // and recursively applies to all children func (n *node) PropagateWatchError(err error) { for _, ch := range n.watches { - ch <- &topo.WatchData{ + if ch.contents == nil { + continue + } + ch.contents <- &topo.WatchData{ Err: err, } } @@ -230,7 +258,7 @@ func (f *Factory) newFile(name string, contents []byte, parent *node) *node { version: f.getNextVersion(), contents: contents, parent: parent, - watches: make(map[int]chan *topo.WatchData), + watches: make(map[int]watch), } } @@ -240,6 +268,7 @@ func (f *Factory) newDirectory(name string, parent *node) *node { version: f.getNextVersion(), children: make(map[string]*node), parent: parent, + watches: make(map[int]watch), } } diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 1ac41717839..3991911da3a 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -17,29 +17,28 @@ limitations under the License. package memorytopo import ( - "fmt" - "context" + "fmt" "vitess.io/vitess/go/vt/topo" ) // Watch is part of the topo.Conn interface. -func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { c.factory.mu.Lock() defer c.factory.mu.Unlock() if c.factory.err != nil { - return &topo.WatchData{Err: c.factory.err}, nil, nil + return nil, nil, c.factory.err } n := c.factory.nodeByPath(c.cell, filePath) if n == nil { - return &topo.WatchData{Err: topo.NewError(topo.NoNode, filePath)}, nil, nil + return nil, nil, topo.NewError(topo.NoNode, filePath) } if n.contents == nil { // it's a directory - return &topo.WatchData{Err: fmt.Errorf("cannot watch directory %v in cell %v", filePath, c.cell)}, nil, nil + return nil, nil, fmt.Errorf("cannot watch directory %v in cell %v", filePath, c.cell) } current := &topo.WatchData{ Contents: n.contents, @@ -49,9 +48,10 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c notifications := make(chan *topo.WatchData, 100) watchIndex := nextWatchIndex nextWatchIndex++ - n.watches[watchIndex] = notifications + n.watches[watchIndex] = watch{contents: notifications} - cancel := func() { + go func() { + <-ctx.Done() // This function can be called at any point, so we first need // to make sure the watch is still valid. c.factory.mu.Lock() @@ -64,9 +64,58 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c if w, ok := n.watches[watchIndex]; ok { delete(n.watches, watchIndex) - w <- &topo.WatchData{Err: topo.NewError(topo.Interrupted, "watch")} - close(w) + w.contents <- &topo.WatchData{Err: topo.NewError(topo.Interrupted, "watch")} + close(w.contents) } + }() + return current, notifications, nil +} + +// WatchRecursive is part of the topo.Conn interface. +func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + c.factory.mu.Lock() + defer c.factory.mu.Unlock() + + if c.factory.err != nil { + return nil, nil, c.factory.err + } + + n := c.factory.getOrCreatePath(c.cell, dirpath) + if n == nil { + return nil, nil, topo.NewError(topo.NoNode, dirpath) } - return current, notifications, cancel + + var initialwd []*topo.WatchDataRecursive + n.recurseContents(func(n *node) { + initialwd = append(initialwd, &topo.WatchDataRecursive{ + Path: n.name, + WatchData: topo.WatchData{ + Contents: n.contents, + Version: NodeVersion(n.version), + }, + }) + }) + + notifications := make(chan *topo.WatchDataRecursive, 100) + watchIndex := nextWatchIndex + nextWatchIndex++ + n.watches[watchIndex] = watch{recursive: notifications} + + go func() { + defer close(notifications) + + <-ctx.Done() + + c.factory.mu.Lock() + defer c.factory.mu.Unlock() + + n := c.factory.nodeByPath(c.cell, dirpath) + if n != nil { + delete(n.watches, watchIndex) + } + + notifications <- &topo.WatchDataRecursive{WatchData: topo.WatchData{Err: topo.NewError(topo.Interrupted, "watch")}} + }() + + return initialwd, notifications, nil } diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index 4d783379443..8eb72aeecc3 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -43,14 +43,13 @@ There are two test sub-packages associated with this code: package topo import ( + "context" "flag" "fmt" "sync" "vitess.io/vitess/go/vt/proto/topodata" - "context" - "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/log" @@ -245,6 +244,9 @@ func Open() *Server { func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) { // Global cell is the easy case. if cell == GlobalCell { + if ctx.Err() != nil { + return nil, ctx.Err() + } return ts.globalCell, nil } diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index d1da7c94256..e1f5034db9e 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -656,11 +656,14 @@ type WatchShardData struct { // WatchShard will set a watch on the Shard object. // It has the same contract as conn.Watch, but it also unpacks the // contents into a Shard object -func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*WatchShardData, <-chan *WatchShardData, CancelFunc) { +func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*WatchShardData, <-chan *WatchShardData, error) { shardPath := shardFilePath(keyspace, shard) - current, wdChannel, cancel := ts.globalCell.Watch(ctx, shardPath) - if current.Err != nil { - return &WatchShardData{Err: current.Err}, nil, nil + ctx, cancel := context.WithCancel(ctx) + + current, wdChannel, err := ts.globalCell.Watch(ctx, shardPath) + if err != nil { + cancel() + return nil, nil, err } value := &topodatapb.Shard{} if err := proto.Unmarshal(current.Contents, value); err != nil { @@ -668,7 +671,7 @@ func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*Watc cancel() for range wdChannel { } - return &WatchShardData{Err: vterrors.Wrapf(err, "error unpacking initial Shard object")}, nil, nil + return nil, nil, vterrors.Wrapf(err, "error unpacking initial Shard object") } changes := make(chan *WatchShardData, 10) @@ -678,6 +681,7 @@ func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*Watc // send an ErrInterrupted and then close the channel. We'll // just propagate that back to our caller. go func() { + defer cancel() defer close(changes) for wd := range wdChannel { @@ -702,5 +706,5 @@ func (ts *Server) WatchShard(ctx context.Context, keyspace, shard string) (*Watc } }() - return &WatchShardData{Value: value}, changes, cancel + return &WatchShardData{Value: value}, changes, nil } diff --git a/go/vt/topo/srv_keyspace.go b/go/vt/topo/srv_keyspace.go index 264b01a3edc..ee1db1100e1 100644 --- a/go/vt/topo/srv_keyspace.go +++ b/go/vt/topo/srv_keyspace.go @@ -17,6 +17,7 @@ limitations under the License. package topo import ( + "context" "encoding/hex" "fmt" "path" @@ -24,8 +25,6 @@ import ( "google.golang.org/protobuf/proto" - "context" - "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/concurrency" @@ -51,16 +50,18 @@ type WatchSrvKeyspaceData struct { // WatchSrvKeyspace will set a watch on the SrvKeyspace object. // It has the same contract as Conn.Watch, but it also unpacks the // contents into a SrvKeyspace object. -func (ts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (*WatchSrvKeyspaceData, <-chan *WatchSrvKeyspaceData, CancelFunc) { +func (ts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (*WatchSrvKeyspaceData, <-chan *WatchSrvKeyspaceData, error) { conn, err := ts.ConnForCell(ctx, cell) if err != nil { return &WatchSrvKeyspaceData{Err: err}, nil, nil } filePath := srvKeyspaceFileName(keyspace) - current, wdChannel, cancel := conn.Watch(ctx, filePath) - if current.Err != nil { - return &WatchSrvKeyspaceData{Err: current.Err}, nil, nil + ctx, cancel := context.WithCancel(ctx) + current, wdChannel, err := conn.Watch(ctx, filePath) + if err != nil { + cancel() + return nil, nil, err } value := &topodatapb.SrvKeyspace{} if err := proto.Unmarshal(current.Contents, value); err != nil { @@ -68,7 +69,7 @@ func (ts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) ( cancel() for range wdChannel { } - return &WatchSrvKeyspaceData{Err: vterrors.Wrapf(err, "error unpacking initial SrvKeyspace object")}, nil, nil + return nil, nil, vterrors.Wrapf(err, "error unpacking initial SrvKeyspace object") } changes := make(chan *WatchSrvKeyspaceData, 10) @@ -79,6 +80,7 @@ func (ts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) ( // send an ErrInterrupted and then close the channel. We'll // just propagate that back to our caller. go func() { + defer cancel() defer close(changes) for wd := range wdChannel { @@ -103,7 +105,7 @@ func (ts *Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) ( } }() - return &WatchSrvKeyspaceData{Value: value}, changes, cancel + return &WatchSrvKeyspaceData{Value: value}, changes, nil } // GetSrvKeyspaceNames returns the SrvKeyspace objects for a cell. @@ -689,18 +691,6 @@ func OrderAndCheckPartitions(cell string, srvKeyspace *topodatapb.SrvKeyspace) e return nil } -// ShardIsServing returns true if this shard is found in any of the partitions in the srvKeyspace -func ShardIsServing(srvKeyspace *topodatapb.SrvKeyspace, shard *topodatapb.Shard) bool { - for _, partition := range srvKeyspace.GetPartitions() { - for _, shardReference := range partition.GetShardReferences() { - if key.KeyRangeEqual(shardReference.GetKeyRange(), shard.GetKeyRange()) { - return true - } - } - } - return false -} - // ValidateSrvKeyspace validates that the SrvKeyspace for given keyspace in the provided cells is not corrupted func (ts *Server) ValidateSrvKeyspace(ctx context.Context, keyspace, cells string) error { cellsToValidate, err := ts.ExpandCells(ctx, cells) diff --git a/go/vt/topo/srv_vschema.go b/go/vt/topo/srv_vschema.go index db04379f7a1..25bff7dcf73 100644 --- a/go/vt/topo/srv_vschema.go +++ b/go/vt/topo/srv_vschema.go @@ -17,13 +17,12 @@ limitations under the License. package topo import ( + "context" "fmt" "sync" "google.golang.org/protobuf/proto" - "context" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vterrors" @@ -42,15 +41,17 @@ type WatchSrvVSchemaData struct { // WatchSrvVSchema will set a watch on the SrvVSchema object. // It has the same contract as Conn.Watch, but it also unpacks the // contents into a SrvVSchema object. -func (ts *Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVSchemaData, <-chan *WatchSrvVSchemaData, CancelFunc) { +func (ts *Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVSchemaData, <-chan *WatchSrvVSchemaData, error) { conn, err := ts.ConnForCell(ctx, cell) if err != nil { - return &WatchSrvVSchemaData{Err: err}, nil, nil + return nil, nil, err } - current, wdChannel, cancel := conn.Watch(ctx, SrvVSchemaFile) - if current.Err != nil { - return &WatchSrvVSchemaData{Err: current.Err}, nil, nil + ctx, cancel := context.WithCancel(ctx) + current, wdChannel, err := conn.Watch(ctx, SrvVSchemaFile) + if err != nil { + cancel() + return nil, nil, err } value := &vschemapb.SrvVSchema{} if err := proto.Unmarshal(current.Contents, value); err != nil { @@ -58,7 +59,7 @@ func (ts *Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVS cancel() for range wdChannel { } - return &WatchSrvVSchemaData{Err: vterrors.Wrapf(err, "error unpacking initial SrvVSchema object")}, nil, nil + return nil, nil, vterrors.Wrapf(err, "error unpacking initial SrvVSchema object") } changes := make(chan *WatchSrvVSchemaData, 10) @@ -69,6 +70,7 @@ func (ts *Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVS // send an ErrInterrupted and then close the channel. We'll // just propagate that back to our caller. go func() { + defer cancel() defer close(changes) for wd := range wdChannel { @@ -92,7 +94,7 @@ func (ts *Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVS } }() - return &WatchSrvVSchemaData{Value: value}, changes, cancel + return &WatchSrvVSchemaData{Value: value}, changes, nil } // UpdateSrvVSchema updates the SrvVSchema file for a cell. diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 2d30503f645..c1959bf5d3d 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -17,9 +17,8 @@ limitations under the License. package topo import ( - "time" - "context" + "time" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -162,13 +161,20 @@ func (st *StatsConn) Lock(ctx context.Context, dirPath, contents string) (LockDe } // Watch is part of the Conn interface -func (st *StatsConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, cancel CancelFunc) { +func (st *StatsConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, err error) { startTime := time.Now() statsKey := []string{"Watch", st.cell} defer topoStatsConnTimings.Record(statsKey, startTime) return st.conn.Watch(ctx, filePath) } +func (st *StatsConn) WatchRecursive(ctx context.Context, path string) ([]*WatchDataRecursive, <-chan *WatchDataRecursive, error) { + startTime := time.Now() + statsKey := []string{"WatchRecursive", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + return st.conn.WatchRecursive(ctx, path) +} + // NewLeaderParticipation is part of the Conn interface func (st *StatsConn) NewLeaderParticipation(name, id string) (LeaderParticipation, error) { startTime := time.Now() diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index bffc47f81df..8c6d9a94026 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -17,11 +17,10 @@ limitations under the License. package topo import ( + "context" "fmt" "testing" - "context" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" ) @@ -106,8 +105,13 @@ func (st *fakeConn) Lock(ctx context.Context, dirPath, contents string) (lock Lo } // Watch is part of the Conn interface -func (st *fakeConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, cancel CancelFunc) { - return current, changes, cancel +func (st *fakeConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, err error) { + return current, changes, err +} + +// WatchRecursive is part of the Conn interface +func (st *fakeConn) WatchRecursive(ctx context.Context, path string) (current []*WatchDataRecursive, changes <-chan *WatchDataRecursive, err error) { + return current, changes, err } // NewLeaderParticipation is part of the Conn interface diff --git a/go/vt/topo/test/election.go b/go/vt/topo/test/election.go index bd92a452c4e..594e6562eb2 100644 --- a/go/vt/topo/test/election.go +++ b/go/vt/topo/test/election.go @@ -17,11 +17,10 @@ limitations under the License. package test import ( + "context" "testing" "time" - "context" - "vitess.io/vitess/go/vt/topo" ) @@ -147,3 +146,74 @@ func checkElection(t *testing.T, ts *topo.Server) { t.Errorf("wrong error returned by WaitForLeadership, got %v expected %v", err, topo.NewError(topo.Interrupted, "")) } } + +// checkWaitForNewLeader runs the WaitForLeadership test on the LeaderParticipation +func checkWaitForNewLeader(t *testing.T, ts *topo.Server) { + conn, err := ts.ConnForCell(context.Background(), topo.GlobalCell) + if err != nil { + t.Fatalf("ConnForCell(global) failed: %v", err) + } + name := "testmp" + + // create a new LeaderParticipation + id1 := "id1" + mp1, err := conn.NewLeaderParticipation(name, id1) + if err != nil { + t.Fatalf("cannot create mp1: %v", err) + } + + // no primary yet, check name + waitForLeaderID(t, mp1, "") + + // wait for id1 to be the primary + _, err = mp1.WaitForLeadership() + if err != nil { + t.Fatalf("mp1 cannot become Leader: %v", err) + } + + // A lot of implementations use a toplevel directory for their elections. + // Make sure it is marked as 'Ephemeral'. + entries, err := conn.ListDir(context.Background(), "/", true /*full*/) + if err != nil { + t.Fatalf("ListDir(/) failed: %v", err) + } + for _, e := range entries { + if e.Name != topo.CellsPath { + if !e.Ephemeral { + t.Errorf("toplevel directory that is not ephemeral: %v", e) + } + } + } + + // get the current primary name, better be id1 + waitForLeaderID(t, mp1, id1) + + // create a second LeaderParticipation on same name + id2 := "id2" + mp2, err := conn.NewLeaderParticipation(name, id2) + if err != nil { + t.Fatalf("cannot create mp2: %v", err) + } + + leaders, err := mp2.WaitForNewLeader(context.Background()) + if topo.IsErrType(err, topo.NoImplementation) { + t.Logf("%T does not support WaitForNewLeader()", mp2) + return + } + if err != nil { + t.Fatalf("cannot wait for leadership: %v", err) + return + } + + // ask mp2 for primary name, should get id1 + waitForLeaderID(t, mp2, id1) + + // stop mp1 + mp1.Stop() + + leader := <-leaders + + if leader != id1 { + t.Fatalf("wrong node elected: %v", leader) + } +} diff --git a/go/vt/topo/test/file.go b/go/vt/topo/test/file.go index f32c3775c0a..4a1fe2a51a9 100644 --- a/go/vt/topo/test/file.go +++ b/go/vt/topo/test/file.go @@ -17,11 +17,11 @@ limitations under the License. package test import ( + "context" "reflect" + "strings" "testing" - "context" - "vitess.io/vitess/go/vt/topo" ) @@ -201,3 +201,46 @@ func checkFileInCell(t *testing.T, conn topo.Conn, hasCells bool) { expected = expected[:len(expected)-1] checkListDir(ctx, t, conn, "/", expected) } + +// checkList tests the file part of the Conn API. +func checkList(t *testing.T, ts *topo.Server) { + ctx := context.Background() + // global cell + conn, err := ts.ConnForCell(ctx, LocalCellName) + if err != nil { + t.Fatalf("ConnForCell(LocalCellName) failed: %v", err) + } + _, err = conn.List(ctx, "/") + if topo.IsErrType(err, topo.NoImplementation) { + // If this is not supported, skip the test + t.Skipf("%T does not support List()", conn) + return + } + if err != nil { + t.Fatalf("List(test) failed: %v", err) + } + + _, err = conn.Create(ctx, "/toplevel/nested/myfile", []byte{'a'}) + if err != nil { + t.Fatalf("Create('/myfile') failed: %v", err) + } + + for _, path := range []string{"/top", "/toplevel", "/toplevel/", "/toplevel/nes", "/toplevel/nested/myfile"} { + entries, err := conn.List(ctx, path) + if err != nil { + t.Fatalf("List failed(path: %q): %v", path, err) + } + + if len(entries) != 1 { + t.Fatalf("List(test) returned incorrect number of elements for path %q. Expected 1, got %d: %v", path, len(entries), entries) + } + + if !strings.HasSuffix(string(entries[0].Key), "/toplevel/nested/myfile") { + t.Fatalf("found entry doesn't end with /toplevel/nested/myfile for path %q: %s", path, string(entries[0].Key)) + } + + if string(entries[0].Value) != "a" { + t.Fatalf("found entry doesn't have value \"a\" for path %q: %s", path, string(entries[0].Value)) + } + } +} diff --git a/go/vt/topo/test/testing.go b/go/vt/topo/test/testing.go index 66faf77bcbf..fbddc65e98d 100644 --- a/go/vt/topo/test/testing.go +++ b/go/vt/topo/test/testing.go @@ -96,6 +96,11 @@ func TopoServerTestSuite(t *testing.T, factory func() *topo.Server) { checkElection(t, ts) ts.Close() + t.Log("=== checkWaitForNewLeader") + ts = factory() + checkWaitForNewLeader(t, ts) + ts.Close() + t.Log("=== checkDirectory") ts = factory() checkDirectory(t, ts) @@ -111,5 +116,12 @@ func TopoServerTestSuite(t *testing.T, factory func() *topo.Server) { checkWatch(t, ts) t.Log("=== checkWatchInterrupt") checkWatchInterrupt(t, ts) + + t.Log("=== checkList") + checkList(t, ts) + + t.Log("=== checkWatchRecursive") + checkWatchRecursive(t, ts) + ts.Close() } diff --git a/go/vt/topo/test/watch.go b/go/vt/topo/test/watch.go index d89e8a85aff..24e831718d1 100644 --- a/go/vt/topo/test/watch.go +++ b/go/vt/topo/test/watch.go @@ -17,6 +17,7 @@ limitations under the License. package test import ( + "context" "testing" "time" @@ -24,8 +25,6 @@ import ( "google.golang.org/protobuf/proto" - "context" - "vitess.io/vitess/go/vt/topo" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -34,49 +33,98 @@ import ( // waitForInitialValue waits for the initial value of // keyspaces/test_keyspace/SrvKeyspace to appear, and match the // provided srvKeyspace. -func waitForInitialValue(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.SrvKeyspace) (changes <-chan *topo.WatchData, cancel func()) { +func waitForInitialValue(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.SrvKeyspace) (changes <-chan *topo.WatchData, cancel context.CancelFunc) { var current *topo.WatchData - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) start := time.Now() + var err error for { - current, changes, cancel = conn.Watch(ctx, "keyspaces/test_keyspace/SrvKeyspace") - if topo.IsErrType(current.Err, topo.NoNode) { + current, changes, err = conn.Watch(ctx, "keyspaces/test_keyspace/SrvKeyspace") + if topo.IsErrType(err, topo.NoNode) { // hasn't appeared yet if time.Since(start) > 10*time.Second { + cancel() t.Fatalf("time out waiting for file to appear") } time.Sleep(10 * time.Millisecond) continue } - if current.Err != nil { - t.Fatalf("watch failed: %v", current.Err) + if err != nil { + cancel() + t.Fatalf("watch failed: %v", err) } // we got a valid result break } got := &topodatapb.SrvKeyspace{} if err := proto.Unmarshal(current.Contents, got); err != nil { + cancel() t.Fatalf("cannot proto-unmarshal data: %v", err) } if !proto.Equal(got, srvKeyspace) { + cancel() t.Fatalf("got bad data: %v expected: %v", got, srvKeyspace) } return changes, cancel } +// waitForInitialValue waits for the initial value of +// keyspaces/test_keyspace/SrvKeyspace to appear, and match the +// provided srvKeyspace. +func waitForInitialValueRecursive(t *testing.T, conn topo.Conn, srvKeyspace *topodatapb.SrvKeyspace) (changes <-chan *topo.WatchDataRecursive, cancel context.CancelFunc, err error) { + var current []*topo.WatchDataRecursive + ctx, cancel := context.WithCancel(context.Background()) + start := time.Now() + for { + current, changes, err = conn.WatchRecursive(ctx, "keyspaces/test_keyspace") + if topo.IsErrType(err, topo.NoNode) { + // hasn't appeared yet + if time.Since(start) > 10*time.Second { + cancel() + t.Fatalf("time out waiting for file to appear") + } + time.Sleep(10 * time.Millisecond) + continue + } + if topo.IsErrType(err, topo.NoImplementation) { + // If this is not supported, skip the test + cancel() + return nil, nil, err + } + if err != nil { + cancel() + t.Fatalf("watch failed: %v", err) + } + // we got a valid result + break + } + got := &topodatapb.SrvKeyspace{} + if err := proto.Unmarshal(current[0].Contents, got); err != nil { + cancel() + t.Fatalf("cannot proto-unmarshal data: %v", err) + } + if !proto.Equal(got, srvKeyspace) { + cancel() + t.Fatalf("got bad data: %v expected: %v", got, srvKeyspace) + } + + return changes, cancel, nil +} + // checkWatch runs the tests on the Watch part of the Conn API. // We use a SrvKeyspace object. func checkWatch(t *testing.T, ts *topo.Server) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() conn, err := ts.ConnForCell(ctx, LocalCellName) if err != nil { t.Fatalf("ConnForCell(test) failed: %v", err) } // start watching something that doesn't exist -> error - current, changes, _ := conn.Watch(ctx, "keyspaces/test_keyspace/SrvKeyspace") - if !topo.IsErrType(current.Err, topo.NoNode) { + current, changes, err := conn.Watch(ctx, "keyspaces/test_keyspace/SrvKeyspace") + if !topo.IsErrType(err, topo.NoNode) { t.Errorf("watch on missing node didn't return ErrNoNode: %v %v", current, changes) } @@ -104,8 +152,8 @@ func checkWatch(t *testing.T, ts *topo.Server) { } // start watching again, it should work - changes, cancel := waitForInitialValue(t, conn, srvKeyspace) - defer cancel() + changes, secondCancel := waitForInitialValue(t, conn, srvKeyspace) + defer secondCancel() // change the data srvKeyspace.Partitions[0].ShardReferences[0].Name = "new_name" @@ -248,3 +296,149 @@ func checkWatchInterrupt(t *testing.T, ts *topo.Server) { // And calling cancel() again should just work. cancel() } + +// checkWatchRecursive tests we can setup a recursive watch +func checkWatchRecursive(t *testing.T, ts *topo.Server) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + conn, err := ts.ConnForCell(ctx, LocalCellName) + if err != nil { + t.Fatalf("ConnForCell(test) failed: %v", err) + } + + // create some data + keyRange, err := key.ParseShardingSpec("-") + if err != nil || len(keyRange) != 1 { + t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(keyRange)) + } + + srvKeyspace := &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ + { + ServedType: topodatapb.TabletType_PRIMARY, + ShardReferences: []*topodatapb.ShardReference{ + { + Name: "name", + KeyRange: keyRange[0], + }, + }, + }, + }, + } + if err := ts.UpdateSrvKeyspace(ctx, LocalCellName, "test_keyspace", srvKeyspace); err != nil { + t.Fatalf("UpdateSrvKeyspace(1): %v", err) + } + + // start watching again, it should work + changes, secondCancel, err := waitForInitialValueRecursive(t, conn, srvKeyspace) + if topo.IsErrType(err, topo.NoImplementation) { + // Skip the rest if there's no implementation + t.Logf("%T does not support WatchRecursive()", conn) + return + } + defer secondCancel() + + // change the data + srvKeyspace.Partitions[0].ShardReferences[0].Name = "new_name" + if err := ts.UpdateSrvKeyspace(ctx, LocalCellName, "test_keyspace", srvKeyspace); err != nil { + t.Fatalf("UpdateSrvKeyspace(2): %v", err) + } + + // Make sure we get the watch data, maybe not as first notice, + // but eventually. The API specifies it is possible to get duplicate + // notifications. + for { + wd, ok := <-changes + if !ok { + t.Fatalf("watch channel unexpectedly closed") + } + if wd.Err != nil { + t.Fatalf("watch interrupted: %v", wd.Err) + } + got := &topodatapb.SrvKeyspace{} + if err := proto.Unmarshal(wd.Contents, got); err != nil { + t.Fatalf("cannot proto-unmarshal data: %v", err) + } + + if got.Partitions[0].ShardReferences[0].Name == "name" { + // extra first value, still good + continue + } + if got.Partitions[0].ShardReferences[0].Name == "new_name" { + // watch worked, good + break + } + t.Fatalf("got unknown SrvKeyspace: %v", got) + } + + // remove the SrvKeyspace + if err := ts.DeleteSrvKeyspace(ctx, LocalCellName, "test_keyspace"); err != nil { + t.Fatalf("DeleteSrvKeyspace: %v", err) + } + + // Make sure we get the ErrNoNode notification eventually. + // The API specifies it is possible to get duplicate + // notifications. + for { + wd, ok := <-changes + if !ok { + t.Fatalf("watch channel unexpectedly closed") + } + + if topo.IsErrType(wd.Err, topo.NoNode) { + // good + break + } + if wd.Err != nil { + t.Fatalf("bad error returned for deletion: %v", wd.Err) + } + // we got something, better be the right value + got := &topodatapb.SrvKeyspace{} + if err := proto.Unmarshal(wd.Contents, got); err != nil { + t.Fatalf("cannot proto-unmarshal data: %v", err) + } + if got.Partitions[0].ShardReferences[0].Name == "new_name" { + // good value + continue + } + t.Fatalf("got unknown SrvKeyspace waiting for deletion: %v", got) + } + + // We now have to stop watching. This doesn't automatically + // happen for recursive watches on a single file since others + // can still be seen. + secondCancel() + + // Make sure we get the topo.ErrInterrupted notification eventually. + for { + wd, ok := <-changes + if !ok { + t.Fatalf("watch channel unexpectedly closed") + } + if topo.IsErrType(wd.Err, topo.Interrupted) { + // good + break + } + if wd.Err != nil { + t.Fatalf("bad error returned for cancellation: %v", wd.Err) + } + // we got something, better be the right value + got := &topodatapb.SrvKeyspace{} + if err := proto.Unmarshal(wd.Contents, got); err != nil { + t.Fatalf("cannot proto-unmarshal data: %v", err) + } + if got.Partitions[0].ShardReferences[0].Name == "name" { + // good value + continue + } + t.Fatalf("got unknown SrvKeyspace waiting for deletion: %v", got) + } + + // Now the channel should be closed. + if wd, ok := <-changes; ok { + t.Fatalf("got unexpected event after error: %v", wd) + } + + // And calling cancel() again should just work. + secondCancel() +} diff --git a/go/vt/topo/topotests/shard_watch_test.go b/go/vt/topo/topotests/shard_watch_test.go index 7b04224a8f5..89be0064f31 100644 --- a/go/vt/topo/topotests/shard_watch_test.go +++ b/go/vt/topo/topotests/shard_watch_test.go @@ -31,23 +31,24 @@ import ( ) // waitForInitialShard waits for the initial Shard to appear. -func waitForInitialShard(t *testing.T, ts *topo.Server, keyspace, shard string) (current *topo.WatchShardData, changes <-chan *topo.WatchShardData, cancel topo.CancelFunc) { - ctx := context.Background() +func waitForInitialShard(t *testing.T, ts *topo.Server, keyspace, shard string) (current *topo.WatchShardData, changes <-chan *topo.WatchShardData, cancel context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) start := time.Now() + var err error for { - current, changes, cancel = ts.WatchShard(ctx, keyspace, shard) + current, changes, err = ts.WatchShard(ctx, keyspace, shard) switch { - case topo.IsErrType(current.Err, topo.NoNode): + case topo.IsErrType(err, topo.NoNode): // hasn't appeared yet if time.Since(start) > 10*time.Second { t.Fatalf("time out waiting for file to appear") } time.Sleep(10 * time.Millisecond) continue - case current.Err == nil: + case err == nil: return default: - t.Fatalf("watch failed: %v", current.Err) + t.Fatalf("watch failed: %v", err) } } } @@ -59,9 +60,9 @@ func TestWatchShardNoNode(t *testing.T) { ts := memorytopo.NewServer("cell1") // No Shard -> ErrNoNode - current, _, _ := ts.WatchShard(ctx, keyspace, shard) - if !topo.IsErrType(current.Err, topo.NoNode) { - t.Errorf("Got invalid result from WatchShard(not there): %v", current.Err) + _, _, err := ts.WatchShard(ctx, keyspace, shard) + if !topo.IsErrType(err, topo.NoNode) { + t.Errorf("Got invalid result from WatchShard(not there): %v", err) } } @@ -145,9 +146,9 @@ func TestWatchShard(t *testing.T) { cancel() // Bad data in topo, setting the watch should now fail. - current, _, _ = ts.WatchShard(ctx, keyspace, shard) - if current.Err == nil || !strings.Contains(current.Err.Error(), "error unpacking initial Shard object") { - t.Fatalf("expected an initial error setting watch on bad content, but got: %v", current.Err) + _, _, err = ts.WatchShard(ctx, keyspace, shard) + if err == nil || !strings.Contains(err.Error(), "error unpacking initial Shard object") { + t.Fatalf("expected an initial error setting watch on bad content, but got: %v", err) } data, err := proto.Marshal(wanted) @@ -160,9 +161,9 @@ func TestWatchShard(t *testing.T) { } start := time.Now() for { - current, changes, _ = ts.WatchShard(ctx, keyspace, shard) - if current.Err != nil { - if strings.Contains(current.Err.Error(), "error unpacking initial Shard object") { + current, changes, err = ts.WatchShard(ctx, keyspace, shard) + if err != nil { + if strings.Contains(err.Error(), "error unpacking initial Shard object") { // hasn't changed yet if time.Since(start) > 10*time.Second { t.Fatalf("time out waiting for file to appear") @@ -208,9 +209,9 @@ func TestWatchShardCancel(t *testing.T) { ts := memorytopo.NewServer(cell) // No Shard -> ErrNoNode - current, _, _ := ts.WatchShard(ctx, keyspace, shard) - if !topo.IsErrType(current.Err, topo.NoNode) { - t.Errorf("Got invalid result from WatchShard(not there): %v", current.Err) + _, _, err := ts.WatchShard(ctx, keyspace, shard) + if !topo.IsErrType(err, topo.NoNode) { + t.Errorf("Got invalid result from WatchShard(not there): %v", err) } // Create keyspace diff --git a/go/vt/topo/topotests/srv_keyspace_test.go b/go/vt/topo/topotests/srv_keyspace_test.go index 54da260ebf8..8eeaf3f07ac 100644 --- a/go/vt/topo/topotests/srv_keyspace_test.go +++ b/go/vt/topo/topotests/srv_keyspace_test.go @@ -37,23 +37,24 @@ import ( // waitForInitialSrvKeyspace waits for the initial SrvKeyspace to // appear, and match the provided srvKeyspace. -func waitForInitialSrvKeyspace(t *testing.T, ts *topo.Server, cell, keyspace string) (current *topo.WatchSrvKeyspaceData, changes <-chan *topo.WatchSrvKeyspaceData, cancel topo.CancelFunc) { - ctx := context.Background() +func waitForInitialSrvKeyspace(t *testing.T, ts *topo.Server, cell, keyspace string) (current *topo.WatchSrvKeyspaceData, changes <-chan *topo.WatchSrvKeyspaceData, cancel context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) start := time.Now() + var err error for { - current, changes, cancel = ts.WatchSrvKeyspace(ctx, cell, keyspace) + current, changes, err = ts.WatchSrvKeyspace(ctx, cell, keyspace) switch { - case topo.IsErrType(current.Err, topo.NoNode): + case topo.IsErrType(err, topo.NoNode): // hasn't appeared yet if time.Since(start) > 10*time.Second { t.Fatalf("time out waiting for file to appear") } time.Sleep(10 * time.Millisecond) continue - case current.Err == nil: + case err == nil: return default: - t.Fatalf("watch failed: %v", current.Err) + t.Fatalf("watch failed: %v", err) } } } @@ -65,9 +66,9 @@ func TestWatchSrvKeyspaceNoNode(t *testing.T) { ts := memorytopo.NewServer(cell) // No SrvKeyspace -> ErrNoNode - current, _, _ := ts.WatchSrvKeyspace(ctx, cell, keyspace) - if !topo.IsErrType(current.Err, topo.NoNode) { - t.Errorf("Got invalid result from WatchSrvKeyspace(not there): %v", current.Err) + _, _, err := ts.WatchSrvKeyspace(ctx, cell, keyspace) + if !topo.IsErrType(err, topo.NoNode) { + t.Errorf("Got invalid result from WatchSrvKeyspace(not there): %v", err) } } @@ -120,9 +121,9 @@ func TestWatchSrvKeyspace(t *testing.T) { cancel() // Bad data in topo, setting the watch should now fail. - current, _, _ = ts.WatchSrvKeyspace(ctx, cell, keyspace) - if current.Err == nil || !strings.Contains(current.Err.Error(), "error unpacking initial SrvKeyspace object") { - t.Fatalf("expected an initial error setting watch on bad content, but got: %v", current.Err) + _, _, err = ts.WatchSrvKeyspace(ctx, cell, keyspace) + if err == nil || !strings.Contains(err.Error(), "error unpacking initial SrvKeyspace object") { + t.Fatalf("expected an initial error setting watch on bad content, but got: %v", err) } // Update content, wait until Watch works again @@ -131,9 +132,9 @@ func TestWatchSrvKeyspace(t *testing.T) { } start := time.Now() for { - current, changes, _ = ts.WatchSrvKeyspace(ctx, cell, keyspace) - if current.Err != nil { - if strings.Contains(current.Err.Error(), "error unpacking initial SrvKeyspace object") { + current, changes, err = ts.WatchSrvKeyspace(ctx, cell, keyspace) + if err != nil { + if strings.Contains(err.Error(), "error unpacking initial SrvKeyspace object") { // hasn't changed yet if time.Since(start) > 10*time.Second { t.Fatalf("time out waiting for file to appear") @@ -178,9 +179,9 @@ func TestWatchSrvKeyspaceCancel(t *testing.T) { ts := memorytopo.NewServer(cell) // No SrvKeyspace -> ErrNoNode - current, _, _ := ts.WatchSrvKeyspace(ctx, cell, keyspace) - if !topo.IsErrType(current.Err, topo.NoNode) { - t.Errorf("Got invalid result from WatchSrvKeyspace(not there): %v", current.Err) + _, _, err := ts.WatchSrvKeyspace(ctx, cell, keyspace) + if !topo.IsErrType(err, topo.NoNode) { + t.Errorf("Got invalid result from WatchSrvKeyspace(not there): %v", err) } // Create initial value diff --git a/go/vt/topo/zk2topo/election.go b/go/vt/topo/zk2topo/election.go index 1ff2f0a9577..a7f066db70c 100644 --- a/go/vt/topo/zk2topo/election.go +++ b/go/vt/topo/zk2topo/election.go @@ -17,11 +17,10 @@ limitations under the License. package zk2topo import ( + "context" "path" "sort" - "context" - "github.com/z-division/go-zookeeper/zk" "vitess.io/vitess/go/vt/vterrors" @@ -198,3 +197,10 @@ func (mp *zkLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (string return string(data), nil } } + +// WaitForNewLeader is part of the topo.LeaderParticipation interface +func (mp *zkLeaderParticipation) WaitForNewLeader(context.Context) (<-chan string, error) { + // This isn't implemented yet, but likely can be implemented in the same way + // as how WatchRecursive could be implemented as well. + return nil, topo.NewError(topo.NoImplementation, "wait for leader not supported in ZK2 topo") +} diff --git a/go/vt/topo/zk2topo/watch.go b/go/vt/topo/zk2topo/watch.go index 0bdedc7d931..06d466d8cef 100644 --- a/go/vt/topo/zk2topo/watch.go +++ b/go/vt/topo/zk2topo/watch.go @@ -17,11 +17,9 @@ limitations under the License. package zk2topo import ( + "context" "fmt" "path" - "sync" - - "context" "vitess.io/vitess/go/vt/vterrors" @@ -29,39 +27,28 @@ import ( ) // Watch is part of the topo.Conn interface. -func (zs *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) { +func (zs *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { zkPath := path.Join(zs.root, filePath) // Get the initial value, set the initial watch - data, stats, watch, err := zs.conn.GetW(ctx, zkPath) + initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer initialCancel() + + data, stats, watch, err := zs.conn.GetW(initialCtx, zkPath) if err != nil { - return &topo.WatchData{Err: convertError(err, zkPath)}, nil, nil + return nil, nil, convertError(err, zkPath) } if stats == nil { // No stats --> node doesn't exist. - return &topo.WatchData{Err: topo.NewError(topo.NoNode, zkPath)}, nil, nil + return nil, nil, topo.NewError(topo.NoNode, zkPath) } wd := &topo.WatchData{ Contents: data, Version: ZKVersion(stats.Version), } - // mu protects the stop channel. We need to make sure the 'cancel' - // func can be called multiple times, and that we don't close 'stop' - // too many times. - mu := sync.Mutex{} - stop := make(chan struct{}) - cancel := func() { - mu.Lock() - defer mu.Unlock() - if stop != nil { - close(stop) - stop = nil - } - } - c := make(chan *topo.WatchData, 10) - go func(stop chan struct{}) { + go func() { defer close(c) for { @@ -78,7 +65,7 @@ func (zs *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, return } - case <-stop: + case <-ctx.Done(): // user is not interested any more c <- &topo.WatchData{Err: topo.NewError(topo.Interrupted, "watch")} return @@ -104,7 +91,15 @@ func (zs *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, return } } - }(stop) + }() + + return wd, c, nil +} - return wd, c, cancel +// WatchRecursive is part of the topo.Conn interface. +func (zs *Server) WatchRecursive(_ context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + // This isn't implemented yet, but potentially can be implemented if we want + // to update the minimum ZooKeeper requirement to 3.6.0 and use recursive watches. + // Also see https://zookeeper.apache.org/doc/r3.6.3/zookeeperProgrammers.html#sc_WatchPersistentRecursive + return nil, nil, topo.NewError(topo.NoImplementation, path) } diff --git a/go/vt/vttablet/customrule/topocustomrule/topocustomrule.go b/go/vt/vttablet/customrule/topocustomrule/topocustomrule.go index 513a2b4f9c2..4ad57bd6a93 100644 --- a/go/vt/vttablet/customrule/topocustomrule/topocustomrule.go +++ b/go/vt/vttablet/customrule/topocustomrule/topocustomrule.go @@ -138,10 +138,11 @@ func (cr *topoCustomRule) oneWatch() error { cr.mu.Unlock() }() - ctx := context.Background() - current, wdChannel, cancel := cr.conn.Watch(ctx, cr.filePath) - if current.Err != nil { - return current.Err + ctx, cancel := context.WithCancel(context.Background()) + current, wdChannel, err := cr.conn.Watch(ctx, cr.filePath) + if err != nil { + cancel() + return err } cr.mu.Lock() diff --git a/go/vt/vttablet/tabletmanager/shard_sync.go b/go/vt/vttablet/tabletmanager/shard_sync.go index 0b257537b4b..41d2c608e0d 100644 --- a/go/vt/vttablet/tabletmanager/shard_sync.go +++ b/go/vt/vttablet/tabletmanager/shard_sync.go @@ -17,11 +17,10 @@ limitations under the License. package tabletmanager import ( + "context" "flag" "time" - "context" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl" @@ -133,7 +132,7 @@ func (tm *TabletManager) shardSyncLoop(ctx context.Context, notifyChan <-chan st // We already have an active watch. Nothing to do. continue } - if err := shardWatch.start(ctx, tm.TopoServer, tablet.Keyspace, tablet.Shard); err != nil { + if err := shardWatch.start(tm.TopoServer, tablet.Keyspace, tablet.Shard); err != nil { log.Errorf("Failed to start shard watch: %v", err) // Start retry timer and go back to sleep. retryChan = time.After(*shardSyncRetryDelay) diff --git a/go/vt/vttablet/tabletmanager/shard_watcher.go b/go/vt/vttablet/tabletmanager/shard_watcher.go index 5a98f8a83fd..d8a6dd2c2dc 100644 --- a/go/vt/vttablet/tabletmanager/shard_watcher.go +++ b/go/vt/vttablet/tabletmanager/shard_watcher.go @@ -25,26 +25,25 @@ import ( type shardWatcher struct { watchChan <-chan *topo.WatchShardData - watchCancel topo.CancelFunc + watchCancel context.CancelFunc } func (sw *shardWatcher) active() bool { return sw.watchChan != nil } -func (sw *shardWatcher) start(ctx context.Context, ts *topo.Server, keyspace, shard string) error { - ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) - defer cancel() - +func (sw *shardWatcher) start(ts *topo.Server, keyspace, shard string) error { log.Infof("Starting shard watch of %v/%v", keyspace, shard) - event, c, watchCancel := ts.WatchShard(ctx, keyspace, shard) - if event.Err != nil { - return event.Err + ctx, cancel := context.WithCancel(context.Background()) + _, c, err := ts.WatchShard(ctx, keyspace, shard) + if err != nil { + cancel() + return err } sw.watchChan = c - sw.watchCancel = watchCancel + sw.watchCancel = cancel return nil }