diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index fd9830edb35..868a2c53287 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -153,9 +153,7 @@ func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan st } notifications := make(chan string, 8) - watchIndex := nextWatchIndex - nextWatchIndex++ - n.watches[watchIndex] = watch{lock: notifications} + watchIndex := n.addWatch(watch{lock: notifications}) if n.lock != nil { notifications <- n.lockContents diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index cdad2ddbcdd..504f1d4bd39 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -49,10 +49,6 @@ const ( UnreachableServerAddr = "unreachable" ) -var ( - nextWatchIndex = 0 -) - // Factory is a memory-based implementation of topo.Factory. It // takes a file-system like approach, with directories at each level // being an actual directory node. This is meant to be closer to @@ -206,6 +202,20 @@ func (n *node) propagateRecursiveWatch(ev *topo.WatchDataRecursive) { } } +var ( + nextWatchIndex = 0 + nextWatchIndexMu sync.Mutex +) + +func (n *node) addWatch(w watch) int { + nextWatchIndexMu.Lock() + defer nextWatchIndexMu.Unlock() + watchIndex := nextWatchIndex + nextWatchIndex++ + n.watches[watchIndex] = w + return watchIndex +} + // PropagateWatchError propagates the given error to all watches on this node // and recursively applies to all children func (n *node) PropagateWatchError(err error) { diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 14cb20bc09d..73b2d248434 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -50,9 +50,7 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c } notifications := make(chan *topo.WatchData, 100) - watchIndex := nextWatchIndex - nextWatchIndex++ - n.watches[watchIndex] = watch{contents: notifications} + watchIndex := n.addWatch(watch{contents: notifications}) go func() { <-ctx.Done() @@ -105,9 +103,7 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc }) notifications := make(chan *topo.WatchDataRecursive, 100) - watchIndex := nextWatchIndex - nextWatchIndex++ - n.watches[watchIndex] = watch{recursive: notifications} + watchIndex := n.addWatch(watch{recursive: notifications}) go func() { defer close(notifications)