Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve topo handling and add additional functionality #10906

Merged
merged 7 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions go/vt/srvtopo/watch_srvkeyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,20 @@ func (k *srvKeyspaceKey) String() string {
func NewSrvKeyspaceWatcher(topoServer *topo.Server, counts *stats.CountersWithSingleLabel, cacheRefresh, cacheTTL time.Duration) *SrvKeyspaceWatcher {
watch := func(ctx context.Context, entry *watchEntry) {
key := entry.key.(*srvKeyspaceKey)
current, changes, cancel := topoServer.WatchSrvKeyspace(context.Background(), key.cell, key.keyspace)
watchCtx, watchCancel := context.WithCancel(ctx)
defer watchCancel()

current, changes, err := topoServer.WatchSrvKeyspace(watchCtx, key.cell, key.keyspace)
if err != nil {
entry.update(ctx, nil, err, true)
return
}

entry.update(ctx, current.Value, current.Err, true)
if current.Err != nil {
return
}

defer cancel()
for c := range changes {
entry.update(ctx, c.Value, c.Err, false)
if c.Err != nil {
Expand Down
9 changes: 8 additions & 1 deletion go/vt/srvtopo/watch_srvvschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ func (k cellName) String() string {
func NewSrvVSchemaWatcher(topoServer *topo.Server, counts *stats.CountersWithSingleLabel, cacheRefresh, cacheTTL time.Duration) *SrvVSchemaWatcher {
watch := func(ctx context.Context, entry *watchEntry) {
key := entry.key.(cellName)
current, changes, cancel := topoServer.WatchSrvVSchema(context.Background(), key.String())
ctx, cancel := context.WithCancel(ctx)
defer cancel()

current, changes, err := topoServer.WatchSrvVSchema(ctx, key.String())
if err != nil {
entry.update(ctx, nil, err, true)
return
}

entry.update(ctx, current.Value, current.Err, true)
if current.Err != nil {
Expand Down
78 changes: 66 additions & 12 deletions go/vt/topo/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ limitations under the License.
package topo

import (
"sort"

"context"
"sort"
)

// Conn defines the interface that must be implemented by topology
Expand Down Expand Up @@ -120,17 +119,13 @@ type Conn interface {

// Watch starts watching a file in the provided cell. It
// returns the current value, a 'changes' channel to read the
// changes from, and a 'cancel' function to call to stop the
// watch. If the initial read fails, or the file doesn't
// exist, current.Err is set, and 'changes'/'cancel' are nil.
// Otherwise current.Err is nil, and current.Contents /
// current.Version are accurate. The provided context is only
// used to setup the current watch, and not after Watch()
// returns.
// changes from, and an error.
// If the initial read fails, or the file doesn't
// exist, an error is returned.
//
// To stop the watch, just call the returned 'cancel' function.
// To stop the watch, cancel the provided context.
// This will eventually result in a final WatchData result with Err =
// ErrInterrupted. It should be safe to call the 'cancel' function
// ErrInterrupted. It should be safe to cancel the context
// multiple times, or after the Watch already errored out.
//
// The 'changes' channel may return a record with Err != nil.
Expand Down Expand Up @@ -158,7 +153,45 @@ type Conn interface {
// being correct quickly, as long as it eventually gets there.
//
// filePath is a path relative to the root directory of the cell.
Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, cancel CancelFunc)
Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, err error)

Comment on lines +156 to +157
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a breaking change requiring release-notes changes. My understanding is that the users can have their own implementation for the topo server and therefore any change to the interface is breaking.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to worry about this. Unlike vindexes, we don't expect custom topo server implementations.
The interface is exported because it has to be, but that doesn't mean that every exported interface is part of the "public" interface for the vitess codebase.

// WatchRecursive starts watching a file prefix in the provided cell. It
// returns all the current values for existing files with the given
// prefix, a 'changes' channel to read the changes from and an error.
//
// The provided context should be canceled when stopping WatchRecursive().
// This API is different from Watch() and Watch() will be changed
// to match this API as well in the future.
//
// Canceling will eventually result in a final WatchDataRecursive result with Err =
// ErrInterrupted.
//
// The 'changes' channel may return a record with Err != nil.
// In that case, the channel will also be closed right after
// that record. In any case, 'changes' has to be drained of
// all events, even when 'stop' is closed.
//
// Note the 'changes' channel can return twice the same
// Version/Contents (for instance, if the watch is interrupted
// and restarted within the Conn implementation).
// Similarly, the 'changes' channel may skip versions / changes
// (that is, if value goes [A, B, C, D, E, F], the watch may only
// receive [A, B, F]). This should only happen for rapidly
// changing values though. Usually, the initial value will come
// back right away. And a stable value (that hasn't changed for
// a while) should be seen shortly.
//
// The WatchRecursive call is not guaranteed to return exactly up to
// date data right away. For instance, if a file is created
// and saved, and then a watch is set on that file, it may
// return ErrNoNode (as the underlying configuration service
// may use asynchronous caches that are not up to date
// yet). The only guarantee is that the watch data will
// eventually converge. Vitess doesn't explicitly depend on the data
// being correct quickly, as long as it eventually gets there.
//
// path is a path relative to the root directory of the cell.
WatchRecursive(ctx context.Context, path string) ([]*WatchDataRecursive, <-chan *WatchDataRecursive, error)

//
// Leader election methods. This is meant to have a small
Expand Down Expand Up @@ -276,6 +309,17 @@ type WatchData struct {
Err error
}

// WatchDataRecursive is the structure returned by the WatchRecursive() API.
// It contains the same data as WatchData, but additionally also the specific
// path of the entry that the recursive watch applies to, since an entire
// file prefix can be watched.
type WatchDataRecursive struct {
// Path is the path that has changed
Path string

WatchData
}

// KVInfo is a structure that contains a generic key/value pair from
// the topo server, along with important metadata about it.
// This should be used to provide multiple entries in List like calls
Expand Down Expand Up @@ -342,4 +386,14 @@ type LeaderParticipation interface {
// GetCurrentLeaderID returns the current primary id.
// This may not work after Stop has been called.
GetCurrentLeaderID(ctx context.Context) (string, error)

// WaitForNewLeader allows for nodes to wait until a leadership
// election cycle completes and to get subsequent updates of
// leadership changes. This way logic that needs to know if leadership
// changes also if we're not the leader ourselves doesn't need to
// poll for leadership status.
//
// For topo implementation that have this, it can be used more
// efficiently than needing a busy wait loop.
WaitForNewLeader(ctx context.Context) (<-chan string, error)
}
11 changes: 9 additions & 2 deletions go/vt/topo/consultopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ limitations under the License.
package consultopo

import (
"path"

"context"
"path"

"github.com/hashicorp/consul/api"

Expand Down Expand Up @@ -133,3 +132,11 @@ func (mp *consulLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (st
}
return string(pair.Value), nil
}

// WaitForNewLeader is part of the topo.LeaderParticipation interface
func (mp *consulLeaderParticipation) WaitForNewLeader(context.Context) (<-chan string, error) {
// This isn't implemented yet, but likely can be implemented using List
// with blocking logic on election path.
// See also how WatchRecursive could be implemented as well.
return nil, topo.NewError(topo.NoImplementation, "wait for leader not supported in Consul topo")
}
36 changes: 23 additions & 13 deletions go/vt/topo/consultopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ limitations under the License.
package consultopo

import (
"context"
"flag"
"path"
"time"

"context"

"github.com/hashicorp/consul/api"

"vitess.io/vitess/go/vt/topo"
Expand All @@ -33,16 +32,21 @@ var (
)

// Watch is part of the topo.Conn interface.
func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) {
func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) {
// Initial get.
nodePath := path.Join(s.root, filePath)
pair, _, err := s.kv.Get(nodePath, nil)
options := &api.QueryOptions{}

initialCtx, initialCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer initialCancel()

pair, _, err := s.kv.Get(nodePath, options.WithContext(initialCtx))
if err != nil {
return &topo.WatchData{Err: err}, nil, nil
return nil, nil, err
}
if pair == nil {
// Node doesn't exist.
return &topo.WatchData{Err: topo.NewError(topo.NoNode, nodePath)}, nil, nil
return nil, nil, topo.NewError(topo.NoNode, nodePath)
}

// Initial value to return.
Expand All @@ -51,9 +55,6 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
Version: ConsulVersion(pair.ModifyIndex),
}

// Create a context, will be used to cancel the watch.
watchCtx, watchCancel := context.WithCancel(context.Background())

// Create the notifications channel, send updates to it.
notifications := make(chan *topo.WatchData, 10)
go func() {
Expand Down Expand Up @@ -83,7 +84,7 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
// This essentially uses WaitTime as a heartbeat interval to detect
// a dead connection.
cancelGetCtx()
getCtx, cancelGetCtx = context.WithTimeout(watchCtx, 2*opts.WaitTime)
getCtx, cancelGetCtx = context.WithTimeout(ctx, 2*opts.WaitTime)

pair, _, err = s.kv.Get(nodePath, opts.WithContext(getCtx))
if err != nil {
Expand Down Expand Up @@ -114,9 +115,9 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <

// See if the watch was canceled.
select {
case <-watchCtx.Done():
case <-ctx.Done():
notifications <- &topo.WatchData{
Err: convertError(watchCtx.Err(), nodePath),
Err: convertError(ctx.Err(), nodePath),
}
cancelGetCtx()
return
Expand All @@ -125,5 +126,14 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
}
}()

return wd, notifications, topo.CancelFunc(watchCancel)
return wd, notifications, nil
}

// WatchRecursive is part of the topo.Conn interface.
func (s *Server) WatchRecursive(_ context.Context, path string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) {
// This isn't implemented yet, but likely can be implemented using List
// with blocking logic like how we use Get with blocking for regular Watch.
// See also how https://www.consul.io/docs/dynamic-app-config/watches#keyprefix
// works under the hood.
return nil, nil, topo.NewError(topo.NoImplementation, path)
}
73 changes: 70 additions & 3 deletions go/vt/topo/etcd2topo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ limitations under the License.
package etcd2topo

import (
"path"

"context"
"path"

clientv3 "go.etcd.io/etcd/client/v3"

Expand Down Expand Up @@ -76,7 +75,11 @@ func (mp *etcdLeaderParticipation) WaitForLeadership() (context.Context, error)
// we just cancel that context.
lockCtx, lockCancel := context.WithCancel(context.Background())
go func() {
<-mp.stop
select {
case <-mp.s.running:
return
case <-mp.stop:
}
if ld != nil {
if err := ld.Unlock(context.Background()); err != nil {
log.Errorf("failed to unlock electionPath %v: %v", electionPath, err)
Expand Down Expand Up @@ -123,3 +126,67 @@ func (mp *etcdLeaderParticipation) GetCurrentLeaderID(ctx context.Context) (stri
}
return string(resp.Kvs[0].Value), nil
}

func (mp *etcdLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan string, error) {
electionPath := path.Join(mp.s.root, electionsPath, mp.name)

notifications := make(chan string, 8)
ctx, cancel := context.WithCancel(ctx)

// Get the current leader
initial, err := mp.s.cli.Get(ctx, electionPath+"/",
clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByModRevision, clientv3.SortAscend),
clientv3.WithLimit(1))
if err != nil {
cancel()
return nil, err
}

if len(initial.Kvs) == 1 {
leader := initial.Kvs[0].Value
notifications <- string(leader)
}

// Create the Watcher. We start watching from the response we
// got, not from the file original version, as the server may
// not have that much history.
watcher := mp.s.cli.Watch(ctx, electionPath, clientv3.WithPrefix(), clientv3.WithRev(initial.Header.Revision))
if watcher == nil {
cancel()
return nil, convertError(err, electionPath)
}

go func() {
defer cancel()
defer close(notifications)
for {
select {
case <-mp.s.running:
return
case <-mp.done:
return
case <-ctx.Done():
return
case wresp, ok := <-watcher:
if !ok || wresp.Canceled {
return
}

currentLeader, err := mp.s.cli.Get(ctx, electionPath+"/",
clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByModRevision, clientv3.SortAscend),
clientv3.WithLimit(1))
if err != nil {
continue
}
if len(currentLeader.Kvs) != 1 {
continue
}
notifications <- string(currentLeader.Kvs[0].Value)
}
}
}()

return notifications, nil
}
8 changes: 6 additions & 2 deletions go/vt/topo/etcd2topo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ type Server struct {

// root is the root path for this client.
root string

running chan struct{}
}

// Close implements topo.Server.Close.
// It will nil out the global and cells fields, so any attempt to
// re-use this server will panic.
func (s *Server) Close() {
close(s.running)
s.cli.Close()
s.cli = nil
}
Expand Down Expand Up @@ -140,8 +143,9 @@ func NewServerWithOpts(serverAddr, root, certPath, keyPath, caPath string) (*Ser
}

return &Server{
cli: cli,
root: root,
cli: cli,
root: root,
running: make(chan struct{}),
}, nil
}

Expand Down
Loading