From 5de70fa288637eee4d0f254031988167e889b2a1 Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Mon, 28 Oct 2024 16:10:46 +0100 Subject: [PATCH] Make client list function use the traffic-managers WatchWorkloads Let the `telepresence list` command use a workload collection that is backed by the traffic-managers `WatchWorkloads` function. The client will use local shared informers to watch the workloads when connecting to an older traffic-manager that doesn't support the `WatchWorkloads` call. Signed-off-by: Thomas Hallgren --- pkg/client/userd/trafficmgr/intercept.go | 1 - pkg/client/userd/trafficmgr/session.go | 355 ++++++++++++++++------- pkg/client/userd/trafficmgr/workloads.go | 326 --------------------- 3 files changed, 256 insertions(+), 426 deletions(-) delete mode 100644 pkg/client/userd/trafficmgr/workloads.go diff --git a/pkg/client/userd/trafficmgr/intercept.go b/pkg/client/userd/trafficmgr/intercept.go index 5a280098e2..418cc5ffb6 100644 --- a/pkg/client/userd/trafficmgr/intercept.go +++ b/pkg/client/userd/trafficmgr/intercept.go @@ -503,7 +503,6 @@ func (s *session) ensureNoInterceptConflict(ir *rpc.CreateInterceptRequest) *rpc // only if the returned rpc.InterceptResult is nil. The returned runtime.Object is either nil, indicating a local // intercept, or the workload for the intercept. func (s *session) CanIntercept(c context.Context, ir *rpc.CreateInterceptRequest) (userd.InterceptInfo, *rpc.InterceptResult) { - s.waitForSync(c) spec := ir.Spec if spec.Namespace == "" { spec.Namespace = s.Namespace diff --git a/pkg/client/userd/trafficmgr/session.go b/pkg/client/userd/trafficmgr/session.go index 76bd82ea35..f41b84f984 100644 --- a/pkg/client/userd/trafficmgr/session.go +++ b/pkg/client/userd/trafficmgr/session.go @@ -17,6 +17,7 @@ import ( "github.com/blang/semver/v4" "github.com/go-json-experiment/json" + "github.com/google/uuid" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -42,7 +43,6 @@ import ( rootdRpc "github.com/telepresenceio/telepresence/rpc/v2/daemon" "github.com/telepresenceio/telepresence/rpc/v2/manager" "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" - "github.com/telepresenceio/telepresence/v2/pkg/agentmap" authGrpc "github.com/telepresenceio/telepresence/v2/pkg/authenticator/grpc" "github.com/telepresenceio/telepresence/v2/pkg/authenticator/patcher" "github.com/telepresenceio/telepresence/v2/pkg/client" @@ -55,9 +55,11 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/client/userd" "github.com/telepresenceio/telepresence/v2/pkg/client/userd/k8s" "github.com/telepresenceio/telepresence/v2/pkg/errcat" + "github.com/telepresenceio/telepresence/v2/pkg/informer" "github.com/telepresenceio/telepresence/v2/pkg/matcher" "github.com/telepresenceio/telepresence/v2/pkg/proc" "github.com/telepresenceio/telepresence/v2/pkg/restapi" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) type apiServer struct { @@ -70,6 +72,17 @@ type apiMatcher struct { metadata map[string]string } +type workloadInfoKey struct { + kind manager.WorkloadInfo_Kind + name string +} + +type workloadInfo struct { + uid types.UID + agentState manager.WorkloadInfo_AgentState + interceptClients []string +} + type session struct { *k8s.Cluster rootDaemon rootdRpc.DaemonClient @@ -96,7 +109,12 @@ type session struct { sessionInfo *manager.SessionInfo // sessionInfo returned by the traffic-manager - wlWatcher *workloadsAndServicesWatcher + workloadsLock sync.Mutex + + // Map of manager.WorkloadInfo split into namespace, key of kind and name, and workloadInfo + workloads map[string]map[workloadInfoKey]workloadInfo + + workloadSubscribers map[uuid.UUID]chan struct{} // currentInterceptsLock ensures that all accesses to currentIntercepts, currentMatchers, // currentAPIServers, interceptWaiters, and ingressInfo are synchronized @@ -415,19 +433,6 @@ func connectMgr( managerName = "Traffic Manager" } - knownWorkloadKinds, err := mClient.GetKnownWorkloadKinds(ctx, si) - if err != nil { - if status.Code(err) != codes.Unimplemented { - return nil, fmt.Errorf("failed to get known workload kinds: %w", err) - } - // Talking to an older traffic-manager, use legacy default types - knownWorkloadKinds = &manager.KnownWorkloadKinds{Kinds: []manager.WorkloadInfo_Kind{ - manager.WorkloadInfo_DEPLOYMENT, - manager.WorkloadInfo_REPLICASET, - manager.WorkloadInfo_STATEFULSET, - }} - } - sess := &session{ Cluster: cluster, installID: installID, @@ -438,8 +443,8 @@ func connectMgr( managerName: managerName, managerVersion: managerVersion, sessionInfo: si, + workloads: make(map[string]map[workloadInfoKey]workloadInfo), interceptWaiters: make(map[string]*awaitIntercept), - wlWatcher: newWASWatcher(knownWorkloadKinds), isPodDaemon: cr.IsPodDaemon, done: make(chan struct{}), subnetViaWorkloads: cr.SubnetViaWorkloads, @@ -507,8 +512,6 @@ func connectError(t rpc.ConnectInfo_ErrType, err error) *rpc.ConnectInfo { func (s *session) updateDaemonNamespaces(c context.Context) { const svcDomain = "svc" - s.wlWatcher.setNamespacesToWatch(c, s.GetCurrentNamespaces(true)) - domains := s.GetCurrentNamespaces(false) if !slices.Contains(domains, svcDomain) { domains = append(domains, svcDomain) @@ -572,37 +575,19 @@ func (s *session) ApplyConfig(ctx context.Context) error { // getInfosForWorkloads returns a list of workloads found in the given namespace that fulfils the given filter criteria. func (s *session) getInfosForWorkloads( - ctx context.Context, namespaces []string, iMap map[string][]*manager.InterceptInfo, sMap map[string]*rpc.WorkloadInfo_Sidecar, filter rpc.ListRequest_Filter, ) []*rpc.WorkloadInfo { - wiMap := make(map[types.UID]*rpc.WorkloadInfo) - s.wlWatcher.eachWorkload(ctx, k8s.GetManagerNamespace(ctx), namespaces, func(workload k8sapi.Workload) { - name := workload.GetName() - dlog.Debugf(ctx, "Getting info for %s %s.%s, matching service", workload.GetKind(), name, workload.GetNamespace()) - + wiMap := make(map[string]*rpc.WorkloadInfo) + s.eachWorkload(namespaces, func(wlKind manager.WorkloadInfo_Kind, name, namespace string, info workloadInfo) { + kind := wlKind.String() wlInfo := &rpc.WorkloadInfo{ Name: name, - Namespace: workload.GetNamespace(), - WorkloadResourceType: workload.GetKind(), - Uid: string(workload.GetUID()), - } - - svcs, err := agentmap.FindServicesForPod(ctx, workload.GetPodTemplate(), "") - if err == nil && len(svcs) > 0 { - srm := make(map[string]*rpc.WorkloadInfo_ServiceReference, len(svcs)) - for _, so := range svcs { - if svc, ok := k8sapi.ServiceImpl(so); ok { - srm[string(svc.UID)] = &rpc.WorkloadInfo_ServiceReference{ - Name: svc.Name, - Namespace: svc.Namespace, - Ports: getServicePorts(svc), - } - } - } - wlInfo.Services = srm + Namespace: namespace, + WorkloadResourceType: kind, + Uid: string(info.uid), } var ok bool @@ -612,7 +597,7 @@ func (s *session) getInfosForWorkloads( if wlInfo.Sidecar, ok = sMap[name]; !ok && filter <= rpc.ListRequest_INSTALLED_AGENTS { return } - wiMap[workload.GetUID()] = wlInfo + wiMap[fmt.Sprintf("%s:%s.%s", kind, name, namespace)] = wlInfo }) wiz := make([]*rpc.WorkloadInfo, len(wiMap)) i := 0 @@ -624,58 +609,46 @@ func (s *session) getInfosForWorkloads( return wiz } -func getServicePorts(svc *core.Service) []*rpc.WorkloadInfo_ServiceReference_Port { - ports := make([]*rpc.WorkloadInfo_ServiceReference_Port, len(svc.Spec.Ports)) - for i, p := range svc.Spec.Ports { - ports[i] = &rpc.WorkloadInfo_ServiceReference_Port{ - Name: p.Name, - Port: p.Port, - } +func (s *session) WatchWorkloads(c context.Context, wr *rpc.WatchWorkloadsRequest, stream userd.WatchWorkloadsStream) error { + id := uuid.New() + ch := make(chan struct{}) + s.workloadsLock.Lock() + if s.workloadSubscribers == nil { + s.workloadSubscribers = make(map[uuid.UUID]chan struct{}) } - return ports -} + s.workloadSubscribers[id] = ch + s.workloadsLock.Unlock() -func (s *session) waitForSync(ctx context.Context) { - s.wlWatcher.setNamespacesToWatch(ctx, s.GetCurrentNamespaces(true)) - s.wlWatcher.waitForSync(ctx) -} + defer func() { + s.workloadsLock.Lock() + delete(s.workloadSubscribers, id) + s.workloadsLock.Unlock() + }() -func (s *session) WatchWorkloads(c context.Context, wr *rpc.WatchWorkloadsRequest, stream userd.WatchWorkloadsStream) error { - s.waitForSync(c) - s.ensureWatchers(c, wr.Namespaces) - sCtx, sCancel := context.WithCancel(c) - // We need to make sure the subscription ends when we leave this method, since this is the one consuming the snapshotAvailable channel. - // Otherwise, the goroutine that writes to the channel will leak. - defer sCancel() - snapshotAvailable := s.wlWatcher.subscribe(sCtx) + send := func() error { + ws, err := s.WorkloadInfoSnapshot(c, wr.Namespaces, rpc.ListRequest_EVERYTHING) + if err != nil { + return err + } + return stream.Send(ws) + } + + // Send initial snapshot + if err := send(); err != nil { + return err + } for { select { - case <-c.Done(): // if context is done (usually the session's context). - return nil - case <-stream.Context().Done(): // if stream context is done. + case <-c.Done(): return nil - case <-snapshotAvailable: - snapshot, err := s.workloadInfoSnapshot(c, wr.GetNamespaces(), rpc.ListRequest_INTERCEPTABLE) - if err != nil { - return status.Errorf(codes.Unavailable, "failed to create WorkloadInfoSnapshot: %v", err) - } - if err := stream.Send(snapshot); err != nil { - dlog.Errorf(c, "WatchWorkloads.Send() failed: %v", err) + case <-ch: + if err := send(); err != nil { return err } } } } -func (s *session) WorkloadInfoSnapshot( - ctx context.Context, - namespaces []string, - filter rpc.ListRequest_Filter, -) (*rpc.WorkloadInfoSnapshot, error) { - s.waitForSync(ctx) - return s.workloadInfoSnapshot(ctx, namespaces, filter) -} - func (s *session) ensureWatchers(ctx context.Context, namespaces []string, ) { @@ -686,21 +659,26 @@ func (s *session) ensureWatchers(ctx context.Context, if ns == "" { ns = s.Namespace } - wgp := &wg - s.wlWatcher.ensureStarted(ctx, ns, func(started bool) { - if started { - dlog.Debugf(ctx, "watchers for %s started", ns) - } - if wgp != nil { - wgp.Done() - wgp = nil - } - }) + s.workloadsLock.Lock() + _, ok := s.workloads[ns] + s.workloadsLock.Unlock() + if ok { + wg.Done() + } else { + go func() { + if err := s.workloadsWatcher(ctx, ns, &wg); err != nil { + dlog.Errorf(ctx, "error ensuring watcher for namespace %s: %v", ns, err) + return + } + }() + dlog.Debugf(ctx, "watchers for %s started", ns) + } } wg.Wait() + dlog.Debugf(ctx, "watchers for %q synced", namespaces) } -func (s *session) workloadInfoSnapshot( +func (s *session) WorkloadInfoSnapshot( ctx context.Context, namespaces []string, filter rpc.ListRequest_Filter, @@ -748,7 +726,7 @@ nextIs: } } - workloadInfos := s.getInfosForWorkloads(ctx, nss, iMap, sMap, filter) + workloadInfos := s.getInfosForWorkloads(nss, iMap, sMap, filter) return &rpc.WorkloadInfoSnapshot{Workloads: workloadInfos}, nil } @@ -908,11 +886,9 @@ func (s *session) Uninstall(ctx context.Context, ur *rpc.UninstallRequest) (*com // to prevent the clients from doing it. if ur.UninstallType == rpc.UninstallRequest_NAMED_AGENTS { // must have a valid namespace in order to uninstall named agents - s.waitForSync(ctx) if ur.Namespace == "" { ur.Namespace = s.Namespace } - s.wlWatcher.ensureStarted(ctx, ur.Namespace, nil) namespace := s.ActualNamespace(ur.Namespace) if namespace == "" { // namespace is not mapped @@ -962,11 +938,9 @@ func (s *session) Uninstall(ctx context.Context, ur *rpc.UninstallRequest) (*com } if ur.Namespace != "" { - s.waitForSync(ctx) if ur.Namespace == "" { ur.Namespace = s.Namespace } - s.wlWatcher.ensureStarted(ctx, ur.Namespace, nil) namespace := s.ActualNamespace(ur.Namespace) if namespace == "" { // namespace is not mapped @@ -1071,3 +1045,186 @@ func (s *session) connectRootDaemon(ctx context.Context, nc *rootdRpc.NetworkCon dlog.Debug(ctx, "Connected to root daemon") return rd, nil } + +func (s *session) eachWorkload(namespaces []string, do func(kind manager.WorkloadInfo_Kind, name, namespace string, info workloadInfo)) { + s.workloadsLock.Lock() + for _, ns := range namespaces { + if workloads, ok := s.workloads[ns]; ok { + for key, info := range workloads { + do(key.kind, key.name, ns, info) + } + } + } + s.workloadsLock.Unlock() +} + +func rpcKind(s string) manager.WorkloadInfo_Kind { + switch strings.ToLower(s) { + case "deployment": + return manager.WorkloadInfo_DEPLOYMENT + case "replicaset": + return manager.WorkloadInfo_REPLICASET + case "statefulset": + return manager.WorkloadInfo_STATEFULSET + case "rollout": + return manager.WorkloadInfo_ROLLOUT + default: + return manager.WorkloadInfo_UNSPECIFIED + } +} + +func (s *session) localWorkloadsWatcher(ctx context.Context, namespace string, synced *sync.WaitGroup) error { + defer func() { + if synced != nil { + synced.Done() + } + dlog.Debug(ctx, "client workload watcher ended") + }() + + knownWorkloadKinds, err := s.managerClient.GetKnownWorkloadKinds(ctx, s.sessionInfo) + if err != nil { + if status.Code(err) != codes.Unimplemented { + return fmt.Errorf("failed to get known workload kinds: %w", err) + } + // Talking to an older traffic-manager, use legacy default types + knownWorkloadKinds = &manager.KnownWorkloadKinds{Kinds: []manager.WorkloadInfo_Kind{ + manager.WorkloadInfo_DEPLOYMENT, + manager.WorkloadInfo_REPLICASET, + manager.WorkloadInfo_STATEFULSET, + }} + } + + dlog.Debugf(ctx, "Watching workloads from client due to lack of workload watcher support in traffic-manager %s", s.managerVersion) + fc := informer.GetFactory(ctx, namespace) + if fc == nil { + ctx = informer.WithFactory(ctx, namespace) + fc = informer.GetFactory(ctx, namespace) + } + workload.StartDeployments(ctx, namespace) + workload.StartReplicaSets(ctx, namespace) + workload.StartStatefulSets(ctx, namespace) + kf := fc.GetK8sInformerFactory() + kf.Start(ctx.Done()) + + rolloutsEnabled := slices.Index(knownWorkloadKinds.Kinds, manager.WorkloadInfo_ROLLOUT) >= 0 + if rolloutsEnabled { + workload.StartRollouts(ctx, namespace) + af := fc.GetArgoRolloutsInformerFactory() + af.Start(ctx.Done()) + } + + ww, err := workload.NewWatcher(ctx, namespace, rolloutsEnabled) + if err != nil { + workload.StartRollouts(ctx, namespace) + return err + } + kf.WaitForCacheSync(ctx.Done()) + + wlCh := ww.Subscribe(ctx) + for { + select { + case <-ctx.Done(): + return nil + case wls := <-wlCh: + if wls == nil { + return nil + } + s.workloadsLock.Lock() + workloads, ok := s.workloads[namespace] + if !ok { + workloads = make(map[workloadInfoKey]workloadInfo) + s.workloads[namespace] = workloads + } + for _, we := range wls { + w := we.Workload + key := workloadInfoKey{kind: rpcKind(w.GetKind()), name: w.GetName()} + if we.Type == workload.EventTypeDelete { + delete(workloads, key) + } else { + workloads[key] = workloadInfo{ + uid: w.GetUID(), + } + } + } + for _, subscriber := range s.workloadSubscribers { + select { + case subscriber <- struct{}{}: + default: + } + } + s.workloadsLock.Unlock() + if synced != nil { + synced.Done() + synced = nil + } + } + } +} + +func (s *session) workloadsWatcher(ctx context.Context, namespace string, synced *sync.WaitGroup) error { + defer func() { + if synced != nil { + synced.Done() + } + }() + wlc, err := s.managerClient.WatchWorkloads(ctx, &manager.WorkloadEventsRequest{SessionInfo: s.sessionInfo, Namespace: namespace}) + if err != nil { + return err + } + + for ctx.Err() == nil { + wls, err := wlc.Recv() + if err != nil { + if status.Code(err) != codes.Unimplemented { + return err + } + localSynced := synced + synced = nil + return s.localWorkloadsWatcher(ctx, namespace, localSynced) + } + + s.workloadsLock.Lock() + workloads, ok := s.workloads[namespace] + if !ok { + workloads = make(map[workloadInfoKey]workloadInfo) + s.workloads[namespace] = workloads + } + + for _, we := range wls.GetEvents() { + w := we.Workload + key := workloadInfoKey{kind: w.Kind, name: w.Name} + if we.Type == manager.WorkloadEvent_DELETED { + delete(workloads, key) + } else { + if w.State == manager.WorkloadInfo_AVAILABLE { + var clients []string + if lc := len(w.InterceptClients); lc > 0 { + clients = make([]string, lc) + for i, ic := range w.InterceptClients { + clients[i] = ic.Client + } + } + workloads[key] = workloadInfo{ + uid: types.UID(w.Uid), + agentState: w.AgentState, + interceptClients: clients, + } + } else { + delete(workloads, key) + } + } + } + for _, subscriber := range s.workloadSubscribers { + select { + case subscriber <- struct{}{}: + default: + } + } + s.workloadsLock.Unlock() + if synced != nil { + synced.Done() + synced = nil + } + } + return nil +} diff --git a/pkg/client/userd/trafficmgr/workloads.go b/pkg/client/userd/trafficmgr/workloads.go deleted file mode 100644 index 7c577c8327..0000000000 --- a/pkg/client/userd/trafficmgr/workloads.go +++ /dev/null @@ -1,326 +0,0 @@ -package trafficmgr - -import ( - "context" - "slices" - "sort" - "sync" - "time" - - core "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/cache" - - "github.com/datawire/dlib/dlog" - "github.com/datawire/k8sapi/pkg/k8sapi" - "github.com/telepresenceio/telepresence/rpc/v2/manager" -) - -type workloadsAndServicesWatcher struct { - sync.Mutex - wlKinds []manager.WorkloadInfo_Kind - nsWatchers map[string]*namespacedWASWatcher - nsListeners []func() - cond sync.Cond -} - -const ( - deployments = 0 - replicasets = 1 - statefulsets = 2 - rollouts = 3 -) - -// namespacedWASWatcher is watches Workloads And Services (WAS) for a namespace. -type namespacedWASWatcher struct { - svcWatcher *k8sapi.Watcher[*core.Service] - wlWatchers [4]*k8sapi.Watcher[runtime.Object] -} - -// svcEquals compare only the Service fields that are of interest to Telepresence. They are -// -// - UID -// - Name -// - Namespace -// - Spec.Ports -// - Spec.Type -func svcEquals(a, b *core.Service) bool { - aPorts := a.Spec.Ports - bPorts := b.Spec.Ports - if len(aPorts) != len(bPorts) { - return false - } - if a.UID != b.UID || a.Name != b.Name || a.Namespace != b.Namespace || a.Spec.Type != b.Spec.Type { - return false - } -nextMP: - // order is not significant (nor can it be trusted) when comparing - for _, mp := range aPorts { - for _, op := range bPorts { - if mp == op { - continue nextMP - } - } - return false - } - return true -} - -// workloadEquals compare only the workload (Deployment, ResourceSet, or StatefulSet) fields that are of interest to Telepresence. They are -// -// - UID -// - Name -// - Namespace -// - Spec.Template: -// - Labels -// - Containers (must contain an equal number of equally named containers with equal ports) -func workloadEquals(oa, ob runtime.Object) bool { - a, err := k8sapi.WrapWorkload(oa) - if err != nil { - // This should definitely never happen - panic(err) - } - b, err := k8sapi.WrapWorkload(ob) - if err != nil { - // This should definitely never happen - panic(err) - } - if a.GetUID() != b.GetUID() || a.GetName() != b.GetName() || a.GetNamespace() != b.GetNamespace() { - return false - } - - aSpec := a.GetPodTemplate() - bSpec := b.GetPodTemplate() - if !labels.Equals(aSpec.Labels, bSpec.Labels) { - return false - } - aPod := aSpec.Spec - bPod := bSpec.Spec - if len(aPod.Containers) != len(bPod.Containers) { - return false - } - makeContainerMap := func(cs []core.Container) map[string]*core.Container { - m := make(map[string]*core.Container, len(cs)) - for i := range cs { - c := &cs[i] - m[c.Name] = c - } - return m - } - - portsEqual := func(a, b []core.ContainerPort) bool { - if len(a) != len(b) { - return false - } - nextAP: - for _, ap := range a { - for _, bp := range b { - if ap == bp { - continue nextAP - } - } - return false - } - return true - } - - am := makeContainerMap(aPod.Containers) - bm := makeContainerMap(bPod.Containers) - for n, ac := range am { - bc, ok := bm[n] - if !ok { - return false - } - if !portsEqual(ac.Ports, bc.Ports) { - return false - } - } - return true -} - -func newNamespaceWatcher(c context.Context, namespace string, cond *sync.Cond, wlKinds []manager.WorkloadInfo_Kind) *namespacedWASWatcher { - dlog.Debugf(c, "newNamespaceWatcher %s", namespace) - ki := k8sapi.GetJoinedClientSetInterface(c) - appsGetter, rolloutsGetter := ki.AppsV1().RESTClient(), ki.ArgoprojV1alpha1().RESTClient() - w := &namespacedWASWatcher{ - svcWatcher: k8sapi.NewWatcher("services", ki.CoreV1().RESTClient(), cond, k8sapi.WithEquals(svcEquals), k8sapi.WithNamespace[*core.Service](namespace)), - wlWatchers: [4]*k8sapi.Watcher[runtime.Object]{ - k8sapi.NewWatcher("deployments", appsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)), - k8sapi.NewWatcher("replicasets", appsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)), - k8sapi.NewWatcher("statefulsets", appsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)), - nil, - }, - } - if slices.Contains(wlKinds, manager.WorkloadInfo_ROLLOUT) { - w.wlWatchers[rollouts] = k8sapi.NewWatcher("rollouts", rolloutsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)) - } - return w -} - -func (nw *namespacedWASWatcher) cancel() { - nw.svcWatcher.Cancel() - for _, w := range nw.wlWatchers { - if w != nil { - w.Cancel() - } - } -} - -func (nw *namespacedWASWatcher) hasSynced() bool { - return nw.svcWatcher.HasSynced() && - nw.wlWatchers[deployments].HasSynced() && - nw.wlWatchers[replicasets].HasSynced() && - nw.wlWatchers[statefulsets].HasSynced() && - (nw.wlWatchers[rollouts] == nil || nw.wlWatchers[rollouts].HasSynced()) -} - -func newWASWatcher(knownWorkloadKinds *manager.KnownWorkloadKinds) *workloadsAndServicesWatcher { - w := &workloadsAndServicesWatcher{ - wlKinds: knownWorkloadKinds.Kinds, - nsWatchers: make(map[string]*namespacedWASWatcher), - } - w.cond.L = &w.Mutex - return w -} - -// eachWorkload will iterate over the workloads in the current snapshot. Unless namespace -// is the empty string, the iteration is limited to the workloads matching that namespace. -// The traffic-manager workload is excluded. -func (w *workloadsAndServicesWatcher) eachWorkload(c context.Context, tmns string, namespaces []string, f func(workload k8sapi.Workload)) { - if len(namespaces) != 1 { - // Produce workloads in a predictable order - nss := make([]string, len(namespaces)) - copy(nss, namespaces) - sort.Strings(nss) - for _, n := range nss { - w.eachWorkload(c, tmns, []string{n}, f) - } - } else { - ns := namespaces[0] - w.Lock() - nw, ok := w.nsWatchers[ns] - w.Unlock() - if ok { - for _, wlw := range nw.wlWatchers { - if wlw == nil { - continue - } - wls, err := wlw.List(c) - if err != nil { - dlog.Errorf(c, "error listing workloads: %v", err) - return - } - - nextWorkload: - for _, ro := range wls { - wl, err := k8sapi.WrapWorkload(ro) - if err != nil { - dlog.Errorf(c, "error wrapping runtime object as a workload: %v", err) - return - } - - // Exclude workloads that are owned by a supported workload. - for _, or := range wl.GetOwnerReferences() { - if or.Controller != nil && *or.Controller { - switch or.Kind { - case "Deployment", "ReplicaSet", "StatefulSet": - continue nextWorkload - case "Rollout": - if slices.Contains(w.wlKinds, manager.WorkloadInfo_ROLLOUT) { - continue nextWorkload - } - } - } - } - // If this is our traffic-manager namespace, then exclude the traffic-manager service. - lbs := wl.GetLabels() - if !(ns == tmns && lbs["app"] == "traffic-manager" && lbs["telepresence"] == "manager") { - f(wl) - } - } - } - } - } -} - -func (w *workloadsAndServicesWatcher) waitForSync(c context.Context) { - hss := make([]cache.InformerSynced, len(w.nsWatchers)) - w.Lock() - i := 0 - for _, nw := range w.nsWatchers { - hss[i] = nw.hasSynced - i++ - } - w.Unlock() - - hasSynced := true - for _, hs := range hss { - if !hs() { - hasSynced = false - break - } - } - if !hasSynced { - // Waiting for cache sync will sometimes block, so a timeout is necessary here - c, cancel := context.WithTimeout(c, 5*time.Second) - defer cancel() - cache.WaitForCacheSync(c.Done(), hss...) - } -} - -// subscribe writes to the given channel whenever relevant information has changed -// in the current snapshot. -func (w *workloadsAndServicesWatcher) subscribe(c context.Context) <-chan struct{} { - return k8sapi.Subscribe(c, &w.cond) -} - -// setNamespacesToWatch starts new watchers or kills old ones to make the current -// set of watchers reflect the nss argument. -func (w *workloadsAndServicesWatcher) setNamespacesToWatch(c context.Context, nss []string) { - var adds []string - desired := make(map[string]struct{}) - - w.Lock() - for _, ns := range nss { - desired[ns] = struct{}{} - if _, ok := w.nsWatchers[ns]; !ok { - adds = append(adds, ns) - } - } - for ns, nw := range w.nsWatchers { - if _, ok := desired[ns]; !ok { - delete(w.nsWatchers, ns) - nw.cancel() - } - } - for _, ns := range adds { - w.addNSLocked(c, ns) - } - w.Unlock() -} - -func (w *workloadsAndServicesWatcher) addNSLocked(c context.Context, ns string) *namespacedWASWatcher { - nw := newNamespaceWatcher(c, ns, &w.cond, w.wlKinds) - w.nsWatchers[ns] = nw - for _, l := range w.nsListeners { - nw.svcWatcher.AddStateListener(&k8sapi.StateListener{Cb: l}) - } - return nw -} - -func (w *workloadsAndServicesWatcher) ensureStarted(c context.Context, ns string, cb func(bool)) { - w.Lock() - defer w.Unlock() - nw, ok := w.nsWatchers[ns] - if !ok { - nw = w.addNSLocked(c, ns) - } - // Starting the svcWatcher will set it to active and also trigger its state listener - // which means a) that the set of active namespaces will change, and b) that the - // WatchAgentsNS will restart with that namespace included. - err := nw.svcWatcher.EnsureStarted(c, cb) - if err != nil { - dlog.Errorf(c, "error starting service watchers: %s", err) - } -}