Skip to content

Commit

Permalink
Merge pull request #3717 from telepresenceio/thallgren/local-list-bw-…
Browse files Browse the repository at this point in the history
…compat

TM versions <= 2.20.x have a broken WorkloadEventsWatcher
  • Loading branch information
thallgren authored Oct 31, 2024
2 parents d7d3ab2 + ce35b17 commit 7513ffd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
18 changes: 11 additions & 7 deletions pkg/client/userd/trafficmgr/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,9 @@ func (s *session) WatchWorkloads(c context.Context, wr *rpc.WatchWorkloadsReques
func (s *session) ensureWatchers(ctx context.Context,
namespaces []string,
) {
v := s.managerVersion
managerHasWatcherSupport := v.Major > 2 || v.Major == 2 && v.Minor > 20

dlog.Debugf(ctx, "Ensure watchers %v", namespaces)
wg := sync.WaitGroup{}
wg.Add(len(namespaces))
Expand All @@ -667,7 +670,13 @@ func (s *session) ensureWatchers(ctx context.Context,
wg.Done()
} else {
go func() {
if err := s.workloadsWatcher(ctx, ns, &wg); err != nil {
var err error
if managerHasWatcherSupport {
err = s.workloadsWatcher(ctx, ns, &wg)
} else {
err = s.localWorkloadsWatcher(ctx, ns, &wg)
}
if err != nil {
dlog.Errorf(ctx, "error ensuring watcher for namespace %s: %v", ns, err)
return
}
Expand Down Expand Up @@ -1178,12 +1187,7 @@ func (s *session) workloadsWatcher(ctx context.Context, namespace string, synced
for ctx.Err() == nil {
wls, err := wlc.Recv()
if err != nil {
if status.Code(err) != codes.Unimplemented {
return err
}
localSynced := synced
synced = nil
return s.localWorkloadsWatcher(ctx, namespace, localSynced)
return err
}

s.workloadsLock.Lock()
Expand Down
12 changes: 11 additions & 1 deletion pkg/workload/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/datawire/dlib/dlog"
"github.com/datawire/k8sapi/pkg/k8sapi"
"github.com/telepresenceio/telepresence/v2/pkg/agentmap"
"github.com/telepresenceio/telepresence/v2/pkg/informer"
)

Expand Down Expand Up @@ -107,6 +108,11 @@ func hasValidReplicasetOwner(wl k8sapi.Workload, rolloutsEnabled bool) bool {
return false
}

var trafficManagerSelector = labels.SelectorFromSet(map[string]string{ //nolint:gochecknoglobals // constant
"app": agentmap.ManagerAppName,
"telepresence": "manager",
})

func (w *watcher) Subscribe(ctx context.Context) <-chan []WorkloadEvent {
ch := make(chan []WorkloadEvent, 1)
initialEvents := make([]WorkloadEvent, 0, 100)
Expand All @@ -116,7 +122,7 @@ func (w *watcher) Subscribe(ctx context.Context) <-chan []WorkloadEvent {
dlog.Debugf(ctx, "workload.Watcher producing initial events for namespace %s", w.namespace)
if dps, err := ai.Deployments().Lister().Deployments(w.namespace).List(labels.Everything()); err == nil {
for _, obj := range dps {
if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) {
if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) && !trafficManagerSelector.Matches(labels.Set(obj.Labels)) {
initialEvents = append(initialEvents, WorkloadEvent{
Type: EventTypeAdd,
Workload: wl,
Expand Down Expand Up @@ -263,6 +269,10 @@ func (w *watcher) addEventHandler(ctx context.Context, ns string) error {
}

func (w *watcher) handleEvent(we WorkloadEvent) {
// Always exclude the traffic-manager
if we.Workload.GetKind() == "Deployment" && trafficManagerSelector.Matches(labels.Set(we.Workload.GetLabels())) {
return
}
w.Lock()
w.events = append(w.events, we)
w.Unlock()
Expand Down

0 comments on commit 7513ffd

Please sign in to comment.