diff --git a/charts/telepresence/templates/deployment.yaml b/charts/telepresence/templates/deployment.yaml index 3b2c299559..676a587efe 100644 --- a/charts/telepresence/templates/deployment.yaml +++ b/charts/telepresence/templates/deployment.yaml @@ -242,6 +242,12 @@ spec: {{- end }} {{- end }} {{- end }} + {{- with .compatibility }} + {{- if .version }} + - name: COMPATIBILITY_VERSION + value: {{ .version }} + {{- end }} + {{- end }} {{- if and .trafficManager .trafficManager.envTemplate }} {{- template "traffic-manager-env" . }} {{- end }} diff --git a/charts/telepresence/values.yaml b/charts/telepresence/values.yaml index 0fbd2b8701..dd85245a98 100644 --- a/charts/telepresence/values.yaml +++ b/charts/telepresence/values.yaml @@ -349,3 +349,12 @@ client: workloads: argoRollouts: enabled: false + +# Use for testing only. +compatibility: + # Controls the enablement of features more recent than the given version. Only applicable + # for versions 2.18.0 and up, and only recognized by versions 2.21.0 and up. In other words, + # you can make a 2.21.0 version behave as far back as a 2.18.0 version, but you cannot + # alter the behavior of versions earlier than 2.21.0. + # + # version: 2.19.0 \ No newline at end of file diff --git a/cmd/traffic/cmd/manager/managerutil/envconfig.go b/cmd/traffic/cmd/manager/managerutil/envconfig.go index c261f5d08c..77cc0906e3 100644 --- a/cmd/traffic/cmd/manager/managerutil/envconfig.go +++ b/cmd/traffic/cmd/manager/managerutil/envconfig.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/blang/semver/v4" "github.com/go-json-experiment/json" core "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -70,6 +71,9 @@ type Env struct { ClientConnectionTTL time.Duration `env:"CLIENT_CONNECTION_TTL, parser=time.ParseDuration"` ArgoRolloutsEnabled bool `env:"ARGO_ROLLOUTS_ENABLED, parser=bool, default=false"` + + // For testing only + CompatibilityVersion *semver.Version `env:"COMPATIBILITY_VERSION, parser=version, default="` } func (e *Env) GeneratorConfig(qualifiedAgentImage string) (agentmap.GeneratorConfig, error) { @@ -116,7 +120,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return uint16(pn), err }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.SetUint(uint64(src.(uint16))) }, + Setter: func(dst reflect.Value, src any) { dst.SetUint(uint64(src.(uint16))) }, } fhs[reflect.TypeOf(k8sapi.AppProtocolStrategy(0))] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -124,7 +128,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return k8sapi.NewAppProtocolStrategy(str) }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.SetInt(int64(src.(k8sapi.AppProtocolStrategy))) }, + Setter: func(dst reflect.Value, src any) { dst.SetInt(int64(src.(k8sapi.AppProtocolStrategy))) }, } fhs[reflect.TypeOf(agentconfig.InjectPolicy(0))] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -132,7 +136,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return agentconfig.NewEnablePolicy(str) }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.SetInt(int64(src.(agentconfig.InjectPolicy))) }, + Setter: func(dst reflect.Value, src any) { dst.SetInt(int64(src.(agentconfig.InjectPolicy))) }, } fhs[reflect.TypeOf(resource.Quantity{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -140,7 +144,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return resource.ParseQuantity(str) }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.(resource.Quantity))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(resource.Quantity))) }, } fhs[reflect.TypeOf(netip.Addr{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -148,7 +152,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return netip.ParseAddr(str) }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.(netip.Addr))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(netip.Addr))) }, } fhs[reflect.TypeOf([]string{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -163,7 +167,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return ss, nil }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.([]string))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.([]string))) }, } fhs[reflect.TypeOf([]netip.Prefix{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -182,7 +186,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return ns, nil }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.([]netip.Prefix))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.([]netip.Prefix))) }, } fhs[reflect.TypeOf([]core.LocalObjectReference{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -197,7 +201,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return rr, nil }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.([]core.LocalObjectReference))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.([]core.LocalObjectReference))) }, } fhs[reflect.TypeOf(&core.ResourceRequirements{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -212,7 +216,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return rr, nil }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.(*core.ResourceRequirements))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(*core.ResourceRequirements))) }, } fhs[reflect.TypeOf(&core.SecurityContext{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -227,7 +231,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return rr, nil }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.(*core.SecurityContext))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(*core.SecurityContext))) }, } fhs[reflect.TypeOf(true)] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -235,7 +239,22 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return strconv.ParseBool(str) }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.SetBool(src.(bool)) }, + Setter: func(dst reflect.Value, src any) { dst.SetBool(src.(bool)) }, + } + fhs[reflect.TypeOf(&semver.Version{})] = envconfig.FieldTypeHandler{ + Parsers: map[string]func(string) (any, error){ + "version": func(str string) (any, error) { + if str == "" { + return nil, nil + } + v, err := semver.Parse(str) + if err != nil { + return nil, err + } + return &v, nil + }, + }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(*semver.Version))) }, } return fhs } diff --git a/cmd/traffic/cmd/manager/mutator/constants.go b/cmd/traffic/cmd/manager/mutator/constants.go index 616a15284f..7ba9fbf1ff 100644 --- a/cmd/traffic/cmd/manager/mutator/constants.go +++ b/cmd/traffic/cmd/manager/mutator/constants.go @@ -1,10 +1,12 @@ package mutator -import "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" +import ( + "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" + "github.com/telepresenceio/telepresence/v2/pkg/workload" +) const ( - DomainPrefix = "telepresence.getambassador.io/" - InjectAnnotation = DomainPrefix + "inject-" + agentconfig.ContainerName - ServiceNameAnnotation = DomainPrefix + "inject-service-name" - ManualInjectAnnotation = DomainPrefix + "manually-injected" + InjectAnnotation = workload.DomainPrefix + "inject-" + agentconfig.ContainerName + ServiceNameAnnotation = workload.DomainPrefix + "inject-service-name" + ManualInjectAnnotation = workload.DomainPrefix + "manually-injected" ) diff --git a/cmd/traffic/cmd/manager/mutator/watcher.go b/cmd/traffic/cmd/manager/mutator/watcher.go index 315b259229..77b94a1f03 100644 --- a/cmd/traffic/cmd/manager/mutator/watcher.go +++ b/cmd/traffic/cmd/manager/mutator/watcher.go @@ -33,6 +33,7 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/agentmap" "github.com/telepresenceio/telepresence/v2/pkg/informer" "github.com/telepresenceio/telepresence/v2/pkg/tracing" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) type Map interface { @@ -163,6 +164,14 @@ func (c *configWatcher) isRolloutNeeded(ctx context.Context, wl k8sapi.Workload, } return true } + if ac == nil { + if okPods < runningPods { + dlog.Debugf(ctx, "Rollout of %s.%s is necessary. At least one pod still has an agent", + wl.GetName(), wl.GetNamespace()) + return true + } + return false + } dlog.Debugf(ctx, "Rollout of %s.%s is not necessary. At least one pod have the desired agent state", wl.GetName(), wl.GetNamespace()) return false @@ -229,8 +238,6 @@ func isRolloutNeededForPod(ctx context.Context, ac *agentconfig.Sidecar, name, n return "" } -const AnnRestartedAt = DomainPrefix + "restartedAt" - func (c *configWatcher) triggerRollout(ctx context.Context, wl k8sapi.Workload, ac *agentconfig.Sidecar) { lck := c.getRolloutLock(wl) if !lck.TryLock() { @@ -269,10 +276,10 @@ func generateRestartAnnotationPatch(podTemplate *core.PodTemplateSpec) string { basePointer := "/spec/template/metadata/annotations" pointer := fmt.Sprintf( basePointer+"/%s", - strings.ReplaceAll(AnnRestartedAt, "/", "~1"), + strings.ReplaceAll(workload.AnnRestartedAt, "/", "~1"), ) - if _, ok := podTemplate.Annotations[AnnRestartedAt]; ok { + if _, ok := podTemplate.Annotations[workload.AnnRestartedAt]; ok { return fmt.Sprintf( `[{"op": "replace", "path": "%s", "value": "%s"}]`, pointer, time.Now().Format(time.RFC3339), ) @@ -833,9 +840,9 @@ func (c *configWatcher) Start(ctx context.Context) { for i, ns := range nss { c.cms[i] = c.startConfigMap(ctx, ns) c.svs[i] = c.startServices(ctx, ns) - c.dps[i] = c.startDeployments(ctx, ns) - c.rss[i] = c.startReplicaSets(ctx, ns) - c.sss[i] = c.startStatefulSets(ctx, ns) + c.dps[i] = workload.StartDeployments(ctx, ns) + c.rss[i] = workload.StartReplicaSets(ctx, ns) + c.sss[i] = workload.StartStatefulSets(ctx, ns) c.startPods(ctx, ns) kf := informer.GetK8sFactory(ctx, ns) kf.Start(ctx.Done()) @@ -844,7 +851,7 @@ func (c *configWatcher) Start(ctx context.Context) { if managerutil.ArgoRolloutsEnabled(ctx) { c.rls = make([]cache.SharedIndexInformer, len(nss)) for i, ns := range nss { - c.rls[i] = c.startRollouts(ctx, ns) + c.rls[i] = workload.StartRollouts(ctx, ns) rf := informer.GetArgoRolloutsFactory(ctx, ns) rf.Start(ctx.Done()) rf.WaitForCacheSync(ctx.Done()) diff --git a/cmd/traffic/cmd/manager/mutator/workload_state.go b/cmd/traffic/cmd/manager/mutator/workload_state.go deleted file mode 100644 index 88a1157fe1..0000000000 --- a/cmd/traffic/cmd/manager/mutator/workload_state.go +++ /dev/null @@ -1,87 +0,0 @@ -package mutator - -import ( - appsv1 "k8s.io/api/apps/v1" - core "k8s.io/api/core/v1" - - argorollouts "github.com/datawire/argo-rollouts-go-client/pkg/apis/rollouts/v1alpha1" - "github.com/datawire/k8sapi/pkg/k8sapi" -) - -type WorkloadState int - -const ( - WorkloadStateUnknown WorkloadState = iota - WorkloadStateProgressing - WorkloadStateAvailable - WorkloadStateFailure -) - -func deploymentState(d *appsv1.Deployment) WorkloadState { - for _, c := range d.Status.Conditions { - switch c.Type { - case appsv1.DeploymentProgressing: - if c.Status == core.ConditionTrue { - return WorkloadStateProgressing - } - case appsv1.DeploymentAvailable: - if c.Status == core.ConditionTrue { - return WorkloadStateAvailable - } - case appsv1.DeploymentReplicaFailure: - if c.Status == core.ConditionTrue { - return WorkloadStateFailure - } - } - } - return WorkloadStateUnknown -} - -func replicaSetState(d *appsv1.ReplicaSet) WorkloadState { - for _, c := range d.Status.Conditions { - if c.Type == appsv1.ReplicaSetReplicaFailure && c.Status == core.ConditionTrue { - return WorkloadStateFailure - } - } - return WorkloadStateAvailable -} - -func statefulSetState(d *appsv1.StatefulSet) WorkloadState { - return WorkloadStateAvailable -} - -func rolloutSetState(r *argorollouts.Rollout) WorkloadState { - for _, c := range r.Status.Conditions { - switch c.Type { - case argorollouts.RolloutProgressing: - if c.Status == core.ConditionTrue { - return WorkloadStateProgressing - } - case argorollouts.RolloutHealthy: - if c.Status == core.ConditionTrue { - return WorkloadStateAvailable - } - case argorollouts.RolloutReplicaFailure: - if c.Status == core.ConditionTrue { - return WorkloadStateFailure - } - } - } - return WorkloadStateUnknown -} - -func GetWorkloadState(wl k8sapi.Workload) WorkloadState { - if d, ok := k8sapi.DeploymentImpl(wl); ok { - return deploymentState(d) - } - if r, ok := k8sapi.ReplicaSetImpl(wl); ok { - return replicaSetState(r) - } - if s, ok := k8sapi.StatefulSetImpl(wl); ok { - return statefulSetState(s) - } - if rt, ok := k8sapi.RolloutImpl(wl); ok { - return rolloutSetState(rt) - } - return WorkloadStateUnknown -} diff --git a/cmd/traffic/cmd/manager/mutator/workload_watcher.go b/cmd/traffic/cmd/manager/mutator/workload_watcher.go index 56512286fd..cfabe69253 100644 --- a/cmd/traffic/cmd/manager/mutator/workload_watcher.go +++ b/cmd/traffic/cmd/manager/mutator/workload_watcher.go @@ -6,139 +6,40 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - apps "k8s.io/api/apps/v1" - core "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" - argorollouts "github.com/datawire/argo-rollouts-go-client/pkg/apis/rollouts/v1alpha1" "github.com/datawire/dlib/dlog" "github.com/datawire/k8sapi/pkg/k8sapi" "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/managerutil" "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" "github.com/telepresenceio/telepresence/v2/pkg/agentmap" - "github.com/telepresenceio/telepresence/v2/pkg/informer" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) -func (c *configWatcher) startDeployments(ctx context.Context, ns string) cache.SharedIndexInformer { - f := informer.GetK8sFactory(ctx, ns) - ix := f.Apps().V1().Deployments().Informer() - _ = ix.SetTransform(func(o any) (any, error) { - // Strip the parts of the deployment that we don't care about to save memory - if dep, ok := o.(*apps.Deployment); ok { - om := &dep.ObjectMeta - if an := om.Annotations; an != nil { - delete(an, core.LastAppliedConfigAnnotation) - } - dep.ManagedFields = nil - dep.Finalizers = nil - dep.OwnerReferences = nil - } - return o, nil - }) - _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { - dlog.Errorf(ctx, "watcher for Deployments %s: %v", whereWeWatch(ns), err) - }) - return ix -} - -func (c *configWatcher) startReplicaSets(ctx context.Context, ns string) cache.SharedIndexInformer { - f := informer.GetK8sFactory(ctx, ns) - ix := f.Apps().V1().ReplicaSets().Informer() - _ = ix.SetTransform(func(o any) (any, error) { - // Strip the parts of the replicaset that we don't care about. Saves memory - if dep, ok := o.(*apps.ReplicaSet); ok { - om := &dep.ObjectMeta - if an := om.Annotations; an != nil { - delete(an, core.LastAppliedConfigAnnotation) - } - dep.ManagedFields = nil - dep.Finalizers = nil - } - return o, nil - }) - _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { - dlog.Errorf(ctx, "watcher for ReplicaSets %s: %v", whereWeWatch(ns), err) - }) - return ix -} - -func (c *configWatcher) startStatefulSets(ctx context.Context, ns string) cache.SharedIndexInformer { - f := informer.GetK8sFactory(ctx, ns) - ix := f.Apps().V1().StatefulSets().Informer() - _ = ix.SetTransform(func(o any) (any, error) { - // Strip the parts of the stateful that we don't care about. Saves memory - if dep, ok := o.(*apps.StatefulSet); ok { - om := &dep.ObjectMeta - if an := om.Annotations; an != nil { - delete(an, core.LastAppliedConfigAnnotation) - } - dep.ManagedFields = nil - dep.Finalizers = nil - } - return o, nil - }) - _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { - dlog.Errorf(ctx, "watcher for StatefulSet %s: %v", whereWeWatch(ns), err) - }) - return ix -} - -func (c *configWatcher) startRollouts(ctx context.Context, ns string) cache.SharedIndexInformer { - f := informer.GetArgoRolloutsFactory(ctx, ns) - dlog.Infof(ctx, "Watching Rollouts in %s", ns) - ix := f.Argoproj().V1alpha1().Rollouts().Informer() - _ = ix.SetTransform(func(o any) (any, error) { - // Strip the parts of the rollout that we don't care about. Saves memory - if dep, ok := o.(*argorollouts.Rollout); ok { - om := &dep.ObjectMeta - if an := om.Annotations; an != nil { - delete(an, core.LastAppliedConfigAnnotation) - } - dep.ManagedFields = nil - dep.Finalizers = nil - } - return o, nil - }) - _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { - dlog.Errorf(ctx, "watcher for Rollouts %s: %v", whereWeWatch(ns), err) - }) - return ix -} - -func WorkloadFromAny(obj any) (k8sapi.Workload, bool) { - if ro, ok := obj.(runtime.Object); ok { - if wl, err := k8sapi.WrapWorkload(ro); err == nil { - return wl, true - } - } - return nil, false -} - func (c *configWatcher) watchWorkloads(ctx context.Context, ix cache.SharedIndexInformer) error { _, err := ix.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { - if wl, ok := WorkloadFromAny(obj); ok && len(wl.GetOwnerReferences()) == 0 { - c.updateWorkload(ctx, wl, nil, GetWorkloadState(wl)) + if wl, ok := workload.FromAny(obj); ok && len(wl.GetOwnerReferences()) == 0 { + c.updateWorkload(ctx, wl, nil, workload.GetWorkloadState(wl)) } }, DeleteFunc: func(obj any) { - if wl, ok := WorkloadFromAny(obj); ok { + if wl, ok := workload.FromAny(obj); ok { if len(wl.GetOwnerReferences()) == 0 { c.deleteWorkload(ctx, wl) } } else if dfsu, ok := obj.(*cache.DeletedFinalStateUnknown); ok { - if wl, ok = WorkloadFromAny(dfsu.Obj); ok && len(wl.GetOwnerReferences()) == 0 { + if wl, ok = workload.FromAny(dfsu.Obj); ok && len(wl.GetOwnerReferences()) == 0 { c.deleteWorkload(ctx, wl) } } }, UpdateFunc: func(oldObj, newObj any) { - if wl, ok := WorkloadFromAny(newObj); ok && len(wl.GetOwnerReferences()) == 0 { - if oldWl, ok := WorkloadFromAny(oldObj); ok { - c.updateWorkload(ctx, wl, oldWl, GetWorkloadState(wl)) + if wl, ok := workload.FromAny(newObj); ok && len(wl.GetOwnerReferences()) == 0 { + if oldWl, ok := workload.FromAny(oldObj); ok { + c.updateWorkload(ctx, wl, oldWl, workload.GetWorkloadState(wl)) } } }, @@ -158,19 +59,19 @@ func (c *configWatcher) deleteWorkload(ctx context.Context, wl k8sapi.Workload) } } -func (c *configWatcher) updateWorkload(ctx context.Context, wl, oldWl k8sapi.Workload, state WorkloadState) { - if state == WorkloadStateFailure { +func (c *configWatcher) updateWorkload(ctx context.Context, wl, oldWl k8sapi.Workload, state workload.State) { + if state == workload.StateFailure { return } tpl := wl.GetPodTemplate() - ia, ok := tpl.Annotations[InjectAnnotation] + ia, ok := tpl.Annotations[workload.InjectAnnotation] if !ok { return } if oldWl != nil && cmp.Equal(oldWl.GetPodTemplate(), tpl, cmpopts.IgnoreFields(meta.ObjectMeta{}, "Namespace", "UID", "ResourceVersion", "CreationTimestamp", "DeletionTimestamp"), cmpopts.IgnoreMapEntries(func(k, _ string) bool { - return k == AnnRestartedAt + return k == workload.AnnRestartedAt })) { return } diff --git a/cmd/traffic/cmd/manager/service.go b/cmd/traffic/cmd/manager/service.go index 2f7715c0b7..f2b09f1d72 100644 --- a/cmd/traffic/cmd/manager/service.go +++ b/cmd/traffic/cmd/manager/service.go @@ -2,10 +2,12 @@ package manager import ( "context" + "fmt" "sort" "strings" "time" + "github.com/blang/semver/v4" "github.com/google/uuid" dns2 "github.com/miekg/dns" "go.opentelemetry.io/otel/trace" @@ -78,6 +80,15 @@ func (wall) Now() time.Time { return time.Now() } +// checkCompat checks if a CompatibilityVersion has been set for this traffic-manager, and if so, errors with +// an Unimplemented error mentioning the given name if it is less than the required version. +func checkCompat(ctx context.Context, name, requiredVersion string) error { + if cv := managerutil.GetEnv(ctx).CompatibilityVersion; cv != nil && cv.Compare(semver.MustParse(requiredVersion)) < 0 { + return status.Error(codes.Unimplemented, fmt.Sprintf("traffic manager of version %s does not implement %s", cv, name)) + } + return nil +} + func NewService(ctx context.Context) (Service, *dgroup.Group, error) { ret := &service{ clock: wall{}, @@ -567,6 +578,9 @@ func (s *service) PrepareIntercept(ctx context.Context, request *rpc.CreateInter } func (s *service) GetKnownWorkloadKinds(ctx context.Context, request *rpc.SessionInfo) (*rpc.KnownWorkloadKinds, error) { + if err := checkCompat(ctx, "GetKnownWorkloadKinds", "2.20.0"); err != nil { + return nil, err + } ctx = managerutil.WithSessionInfo(ctx, request) dlog.Debugf(ctx, "GetKnownWorkloadKinds called") kinds := []rpc.WorkloadInfo_Kind{rpc.WorkloadInfo_DEPLOYMENT, rpc.WorkloadInfo_REPLICASET, rpc.WorkloadInfo_STATEFULSET} @@ -939,7 +953,12 @@ func (s *service) WatchClusterInfo(session *rpc.SessionInfo, stream rpc.Manager_ } func (s *service) WatchWorkloads(request *rpc.WorkloadEventsRequest, stream rpc.Manager_WatchWorkloadsServer) (err error) { - ctx := managerutil.WithSessionInfo(stream.Context(), request.SessionInfo) + ctx := stream.Context() + // Dysfunctional prior to 2.21.0 because no initial snapshot was sent. + if err := checkCompat(ctx, "WatchWorkloads", "2.21.0-alpha.4"); err != nil { + return err + } + ctx = managerutil.WithSessionInfo(ctx, request.SessionInfo) defer func() { if r := recover(); r != nil { err = derror.PanicToError(r) @@ -954,11 +973,15 @@ func (s *service) WatchWorkloads(request *rpc.WorkloadEventsRequest, stream rpc. return status.Error(codes.InvalidArgument, "SessionInfo is required") } clientSession := request.SessionInfo.SessionId - clientInfo := s.state.GetClient(clientSession) - if clientInfo == nil { - return status.Errorf(codes.NotFound, "Client session %q not found", clientSession) + namespace := request.Namespace + if namespace == "" { + clientInfo := s.state.GetClient(clientSession) + if clientInfo == nil { + return status.Errorf(codes.NotFound, "Client session %q not found", clientSession) + } + namespace = clientInfo.Namespace } - ww := s.state.NewWorkloadInfoWatcher(clientSession, clientInfo.Namespace) + ww := s.state.NewWorkloadInfoWatcher(clientSession, namespace) return ww.Watch(ctx, stream) } diff --git a/cmd/traffic/cmd/manager/state/state.go b/cmd/traffic/cmd/manager/state/state.go index 1377081fa8..e245401118 100644 --- a/cmd/traffic/cmd/manager/state/state.go +++ b/cmd/traffic/cmd/manager/state/state.go @@ -32,6 +32,7 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/log" "github.com/telepresenceio/telepresence/v2/pkg/tracing" "github.com/telepresenceio/telepresence/v2/pkg/tunnel" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) type State interface { @@ -90,7 +91,7 @@ type State interface { WatchAgents(context.Context, func(sessionID string, agent *rpc.AgentInfo) bool) <-chan watchable.Snapshot[*rpc.AgentInfo] WatchDial(sessionID string) <-chan *rpc.DialRequest WatchIntercepts(context.Context, func(sessionID string, intercept *rpc.InterceptInfo) bool) <-chan watchable.Snapshot[*rpc.InterceptInfo] - WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []WorkloadEvent, err error) + WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []workload.WorkloadEvent, err error) WatchLookupDNS(string) <-chan *rpc.DNSRequest ValidateCreateAgent(context.Context, k8sapi.Workload, agentconfig.SidecarExt) error NewWorkloadInfoWatcher(clientSession, namespace string) WorkloadInfoWatcher @@ -135,7 +136,7 @@ type state struct { interceptStates *xsync.MapOf[string, *interceptState] timedLogLevel log.TimedLevel llSubs *loglevelSubscribers - workloadWatchers *xsync.MapOf[string, WorkloadWatcher] // workload watchers, created on demand and keyed by namespace + workloadWatchers *xsync.MapOf[string, workload.Watcher] // workload watchers, created on demand and keyed by namespace tunnelCounter int32 tunnelIngressCounter uint64 tunnelEgressCounter uint64 @@ -157,7 +158,7 @@ func NewState(ctx context.Context) State { sessions: xsync.NewMapOf[string, SessionState](), agentsByName: xsync.NewMapOf[string, *xsync.MapOf[string, *rpc.AgentInfo]](), interceptStates: xsync.NewMapOf[string, *interceptState](), - workloadWatchers: xsync.NewMapOf[string, WorkloadWatcher](), + workloadWatchers: xsync.NewMapOf[string, workload.Watcher](), timedLogLevel: log.NewTimedLevel(loglevel, log.SetLevel), llSubs: newLoglevelSubscribers(), } @@ -487,14 +488,14 @@ func (s *state) WatchAgents( } } -func (s *state) WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []WorkloadEvent, err error) { +func (s *state) WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []workload.WorkloadEvent, err error) { client := s.GetClient(sessionID) if client == nil { return nil, status.Errorf(codes.NotFound, "session %q not found", sessionID) } ns := client.Namespace - ww, _ := s.workloadWatchers.LoadOrCompute(ns, func() (ww WorkloadWatcher) { - ww, err = NewWorkloadWatcher(s.backgroundCtx, ns) + ww, _ := s.workloadWatchers.LoadOrCompute(ns, func() (ww workload.Watcher) { + ww, err = workload.NewWatcher(s.backgroundCtx, ns, managerutil.ArgoRolloutsEnabled(ctx)) return ww }) if err != nil { diff --git a/cmd/traffic/cmd/manager/state/workload_info_watcher.go b/cmd/traffic/cmd/manager/state/workload_info_watcher.go index e16629f29d..3db126e817 100644 --- a/cmd/traffic/cmd/manager/state/workload_info_watcher.go +++ b/cmd/traffic/cmd/manager/state/workload_info_watcher.go @@ -13,8 +13,8 @@ import ( "github.com/datawire/dlib/dlog" "github.com/datawire/k8sapi/pkg/k8sapi" rpc "github.com/telepresenceio/telepresence/rpc/v2/manager" - "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/mutator" "github.com/telepresenceio/telepresence/v2/pkg/agentmap" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) type WorkloadInfoWatcher interface { @@ -77,6 +77,7 @@ func (wf *workloadInfoWatcher) Watch(ctx context.Context, stream rpc.Manager_Wat // Everything in this loop happens in sequence, even the firing of the timer. This means // that there's no concurrency and no need for mutexes. + initial := true for { select { case <-ctx.Done(): @@ -84,13 +85,14 @@ func (wf *workloadInfoWatcher) Watch(ctx context.Context, stream rpc.Manager_Wat case <-sessionDone: return nil case <-wf.ticker.C: - wf.sendEvents(ctx) + wf.sendEvents(ctx, false) case wes, ok := <-workloadsCh: if !ok { dlog.Debug(ctx, "Workloads channel closed") return nil } - wf.handleWorkloadsSnapshot(ctx, wes) + wf.handleWorkloadsSnapshot(ctx, wes, initial) + initial = false // Events that arrive at the agent channel should be counted as modifications. case ais, ok := <-agentsCh: if !ok { @@ -120,7 +122,7 @@ func (wf *workloadInfoWatcher) getIntercepts(name, namespace string) (iis []*rpc return iis } -func (wf *workloadInfoWatcher) sendEvents(ctx context.Context) { +func (wf *workloadInfoWatcher) sendEvents(ctx context.Context, sendEmpty bool) { // Time to send what we have wf.ticker.Reset(time.Duration(math.MaxInt64)) evs := make([]*rpc.WorkloadEvent, 0, len(wf.workloadEvents)) @@ -132,7 +134,7 @@ func (wf *workloadInfoWatcher) sendEvents(ctx context.Context) { } evs = append(evs, rew) } - if len(evs) == 0 { + if !sendEmpty && len(evs) == 0 { return } dlog.Debugf(ctx, "Sending %d WorkloadEvents", len(evs)) @@ -168,13 +170,13 @@ func rpcKind(s string) rpc.WorkloadInfo_Kind { } } -func rpcWorkloadState(s mutator.WorkloadState) (state rpc.WorkloadInfo_State) { +func rpcWorkloadState(s workload.State) (state rpc.WorkloadInfo_State) { switch s { - case mutator.WorkloadStateFailure: + case workload.StateFailure: state = rpc.WorkloadInfo_FAILURE - case mutator.WorkloadStateAvailable: + case workload.StateAvailable: state = rpc.WorkloadInfo_AVAILABLE - case mutator.WorkloadStateProgressing: + case workload.StateProgressing: state = rpc.WorkloadInfo_PROGRESSING default: state = rpc.WorkloadInfo_UNKNOWN_UNSPECIFIED @@ -187,27 +189,40 @@ func rpcWorkload(wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState, iClients [] Kind: rpcKind(wl.GetKind()), Name: wl.GetName(), Namespace: wl.GetNamespace(), - State: rpcWorkloadState(mutator.GetWorkloadState(wl)), + Uid: string(wl.GetUID()), + State: rpcWorkloadState(workload.GetWorkloadState(wl)), AgentState: as, InterceptClients: iClients, } } -func (wf *workloadInfoWatcher) addEvent(ctx context.Context, eventType EventType, wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState, iClients []*rpc.WorkloadInfo_Intercept) { +func (wf *workloadInfoWatcher) addEvent( + eventType workload.EventType, + wl k8sapi.Workload, + as rpc.WorkloadInfo_AgentState, + iClients []*rpc.WorkloadInfo_Intercept, +) { wf.workloadEvents[wl.GetName()] = &rpc.WorkloadEvent{ Type: rpc.WorkloadEvent_Type(eventType), Workload: rpcWorkload(wl, as, iClients), } - wf.sendEvents(ctx) + wf.resetTicker() } -func (wf *workloadInfoWatcher) handleWorkloadsSnapshot(ctx context.Context, wes []WorkloadEvent) { +func (wf *workloadInfoWatcher) handleWorkloadsSnapshot(ctx context.Context, wes []workload.WorkloadEvent, initial bool) { + if len(wes) == 0 { + if initial { + // The initial snapshot may be empty, but must be sent anyway. + wf.sendEvents(ctx, true) + } + return + } for _, we := range wes { wl := we.Workload if w, ok := wf.workloadEvents[wl.GetName()]; ok { - if we.Type == EventTypeDelete && w.Type != rpc.WorkloadEvent_DELETED { + if we.Type == workload.EventTypeDelete && w.Type != rpc.WorkloadEvent_DELETED { w.Type = rpc.WorkloadEvent_DELETED - dlog.Debugf(ctx, "WorkloadInfoEvent: Workload %s %s %s.%s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace()) + dlog.Debugf(ctx, "WorkloadInfoEvent: Workload %s %s %s.%s %s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace(), workload.GetWorkloadState(wl)) wf.resetTicker() } } else { @@ -224,15 +239,15 @@ func (wf *workloadInfoWatcher) handleWorkloadsSnapshot(ctx context.Context, wes // If we've sent an ADDED event for this workload, and this is a MODIFIED event without any changes that // we care about, then just skip it. - if we.Type == EventTypeUpdate { + if we.Type == workload.EventTypeUpdate { lew, ok := wf.lastEvents[wl.GetName()] if ok && (lew.Type == rpc.WorkloadEvent_ADDED_UNSPECIFIED || lew.Type == rpc.WorkloadEvent_MODIFIED) && proto.Equal(lew.Workload, rpcWorkload(we.Workload, as, iClients)) { break } } - dlog.Debugf(ctx, "WorkloadInfoEvent: Workload %s %s %s.%s %s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace(), as) - wf.addEvent(ctx, we.Type, wl, as, iClients) + dlog.Debugf(ctx, "WorkloadInfoEvent: Workload %s %s %s.%s %s %s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace(), as, workload.GetWorkloadState(wl)) + wf.addEvent(we.Type, wl, as, iClients) } } } @@ -244,15 +259,16 @@ func (wf *workloadInfoWatcher) handleAgentSnapshot(ctx context.Context, ais map[ if _, ok := ais[k]; !ok { name := a.Name as := rpc.WorkloadInfo_NO_AGENT_UNSPECIFIED - dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s", a.Name, a.Namespace, as) if w, ok := wf.workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED { wl := w.Workload if wl.AgentState != as { wl.AgentState = as + dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s %s", a.Name, a.Namespace, as, wl.State) wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil { - wf.addEvent(ctx, EventTypeUpdate, wl, as, nil) + dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s %s", a.Name, a.Namespace, as, workload.GetWorkloadState(wl)) + wf.addEvent(workload.EventTypeUpdate, wl, as, nil) } else { dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err) if errors.IsNotFound(err) { @@ -264,7 +280,7 @@ func (wf *workloadInfoWatcher) handleAgentSnapshot(ctx context.Context, ais map[ AgentState: as, }, } - wf.sendEvents(ctx) + wf.sendEvents(ctx, false) } } } @@ -277,16 +293,17 @@ func (wf *workloadInfoWatcher) handleAgentSnapshot(ctx context.Context, ais map[ as = rpc.WorkloadInfo_INTERCEPTED iClients = iis } - dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s", a.Name, a.Namespace, as) if w, ok := wf.workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED { wl := w.Workload + dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s %s", a.Name, a.Namespace, as, w.Workload.State) if wl.AgentState != as { wl.AgentState = as wl.InterceptClients = iClients wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil { - wf.addEvent(ctx, EventTypeUpdate, wl, as, iClients) + dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s %s", a.Name, a.Namespace, as, workload.GetWorkloadState(wl)) + wf.addEvent(workload.EventTypeUpdate, wl, as, iClients) } else { dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err) } @@ -300,15 +317,16 @@ func (wf *workloadInfoWatcher) handleInterceptSnapshot(ctx context.Context, iis if _, ok := wf.interceptInfos[k]; !ok { name := ii.Spec.Agent as := rpc.WorkloadInfo_INSTALLED - dlog.Debugf(ctx, "InterceptInfo %s.%s %s", name, ii.Spec.Namespace, as) if w, ok := wf.workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED { if w.Workload.AgentState != as { w.Workload.AgentState = as w.Workload.InterceptClients = nil + dlog.Debugf(ctx, "WorkloadInfoEvent: InterceptInfo %s.%s %s %s", w.Workload.Name, w.Workload.Namespace, as, w.Workload.State) wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, wf.namespace, ""); err == nil { - wf.addEvent(ctx, EventTypeUpdate, wl, as, nil) + dlog.Debugf(ctx, "WorkloadInfoEvent: InterceptInfo %s.%s %s %s", wl.GetName(), wl.GetNamespace(), as, workload.GetWorkloadState(wl)) + wf.addEvent(workload.EventTypeUpdate, wl, as, nil) } } } @@ -329,10 +347,12 @@ func (wf *workloadInfoWatcher) handleInterceptSnapshot(ctx context.Context, iis if w.Workload.AgentState != as { w.Workload.AgentState = as w.Workload.InterceptClients = iClients + dlog.Debugf(ctx, "WorkloadInfoEvent: InterceptInfo %s.%s %s %s", w.Workload.Name, w.Workload.Namespace, as, w.Workload.State) wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, wf.namespace, ""); err == nil { - wf.addEvent(ctx, EventTypeUpdate, wl, as, iClients) + dlog.Debugf(ctx, "WorkloadInfoEvent: InterceptInfo %s.%s %s %s", wl.GetName(), wl.GetNamespace(), as, workload.GetWorkloadState(wl)) + wf.addEvent(workload.EventTypeUpdate, wl, as, iClients) } } } diff --git a/cmd/traffic/cmd/manager/state/workloads.go b/cmd/traffic/cmd/manager/state/workloads.go deleted file mode 100644 index fee584e474..0000000000 --- a/cmd/traffic/cmd/manager/state/workloads.go +++ /dev/null @@ -1,197 +0,0 @@ -package state - -import ( - "context" - "math" - "sync" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/google/uuid" - apps "k8s.io/api/apps/v1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/kubectl/pkg/util/deployment" - - "github.com/datawire/dlib/dlog" - "github.com/datawire/k8sapi/pkg/k8sapi" - "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/managerutil" - "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/mutator" - "github.com/telepresenceio/telepresence/v2/pkg/informer" -) - -type EventType int - -const ( - EventTypeAdd = iota - EventTypeUpdate - EventTypeDelete -) - -type WorkloadEvent struct { - Type EventType - Workload k8sapi.Workload -} - -func (e EventType) String() string { - switch e { - case EventTypeAdd: - return "add" - case EventTypeUpdate: - return "update" - case EventTypeDelete: - return "delete" - default: - return "unknown" - } -} - -type WorkloadWatcher interface { - Subscribe(ctx context.Context) <-chan []WorkloadEvent -} - -type wlWatcher struct { - sync.Mutex - subscriptions map[uuid.UUID]chan<- []WorkloadEvent - timer *time.Timer - events []WorkloadEvent -} - -func NewWorkloadWatcher(ctx context.Context, ns string) (WorkloadWatcher, error) { - w := new(wlWatcher) - w.subscriptions = make(map[uuid.UUID]chan<- []WorkloadEvent) - w.timer = time.AfterFunc(time.Duration(math.MaxInt64), func() { - w.Lock() - ss := make([]chan<- []WorkloadEvent, len(w.subscriptions)) - i := 0 - for _, sub := range w.subscriptions { - ss[i] = sub - i++ - } - events := w.events - w.events = nil - w.Unlock() - for _, s := range ss { - select { - case <-ctx.Done(): - return - case s <- events: - } - } - }) - - err := w.addEventHandler(ctx, ns) - if err != nil { - return nil, err - } - return w, nil -} - -func (w *wlWatcher) Subscribe(ctx context.Context) <-chan []WorkloadEvent { - ch := make(chan []WorkloadEvent) - id := uuid.New() - w.Lock() - w.subscriptions[id] = ch - w.Unlock() - go func() { - <-ctx.Done() - close(ch) - w.Lock() - delete(w.subscriptions, id) - w.Unlock() - }() - return ch -} - -func compareOptions() []cmp.Option { - return []cmp.Option{ - // Ignore frequently changing fields of no interest - cmpopts.IgnoreFields(meta.ObjectMeta{}, "Namespace", "ResourceVersion", "Generation", "ManagedFields"), - - // Only the Conditions are of interest in the DeploymentStatus. - cmp.Comparer(func(a, b apps.DeploymentStatus) bool { - // Only compare the DeploymentCondition's type and status - return cmp.Equal(a.Conditions, b.Conditions, cmp.Comparer(func(c1, c2 apps.DeploymentCondition) bool { - return c1.Type == c2.Type && c1.Status == c2.Status - })) - }), - - // Treat a nil map or slice as empty. - cmpopts.EquateEmpty(), - - // Ignore frequently changing annotations of no interest. - cmpopts.IgnoreMapEntries(func(k, _ string) bool { - return k == mutator.AnnRestartedAt || k == deployment.RevisionAnnotation - }), - } -} - -func (w *wlWatcher) watchWorkloads(ix cache.SharedIndexInformer, ns string) error { - _, err := ix.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { - if wl, ok := mutator.WorkloadFromAny(obj); ok && ns == wl.GetNamespace() && len(wl.GetOwnerReferences()) == 0 { - w.handleEvent(WorkloadEvent{Type: EventTypeAdd, Workload: wl}) - } - }, - DeleteFunc: func(obj any) { - if wl, ok := mutator.WorkloadFromAny(obj); ok { - if ns == wl.GetNamespace() && len(wl.GetOwnerReferences()) == 0 { - w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: wl}) - } - } else if dfsu, ok := obj.(*cache.DeletedFinalStateUnknown); ok { - if wl, ok = mutator.WorkloadFromAny(dfsu.Obj); ok && ns == wl.GetNamespace() && len(wl.GetOwnerReferences()) == 0 { - w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: wl}) - } - } - }, - UpdateFunc: func(oldObj, newObj any) { - if wl, ok := mutator.WorkloadFromAny(newObj); ok && ns == wl.GetNamespace() && len(wl.GetOwnerReferences()) == 0 { - if oldWl, ok := mutator.WorkloadFromAny(oldObj); ok { - if cmp.Equal(wl, oldWl, compareOptions()...) { - return - } - // Replace the cmp.Equal above with this to view the changes that trigger an update: - // - // diff := cmp.Diff(wl, oldWl, compareOptions()...) - // if diff == "" { - // return - // } - // dlog.Debugf(ctx, "DIFF:\n%s", diff) - w.handleEvent(WorkloadEvent{Type: EventTypeUpdate, Workload: wl}) - } - } - }, - }) - return err -} - -func (w *wlWatcher) addEventHandler(ctx context.Context, ns string) error { - kf := informer.GetFactory(ctx, ns) - ai, ri := kf.GetK8sInformerFactory().Apps().V1(), kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() - if err := w.watchWorkloads(ai.Deployments().Informer(), ns); err != nil { - return err - } - if err := w.watchWorkloads(ai.ReplicaSets().Informer(), ns); err != nil { - return err - } - if err := w.watchWorkloads(ai.StatefulSets().Informer(), ns); err != nil { - return err - } - if !managerutil.ArgoRolloutsEnabled(ctx) { - dlog.Infof(ctx, "Argo Rollouts is disabled, Argo Rollouts will not be watched") - } else if err := w.watchWorkloads(ri.Rollouts().Informer(), ns); err != nil { - return err - } - return nil -} - -func (w *wlWatcher) handleEvent(we WorkloadEvent) { - w.Lock() - w.events = append(w.events, we) - w.Unlock() - - // Defers sending until things been quiet for a while - w.timer.Reset(50 * time.Millisecond) -} diff --git a/integration_test/argo_rollouts_test.go b/integration_test/argo_rollouts_test.go index adaf8428b4..8b64d1f235 100644 --- a/integration_test/argo_rollouts_test.go +++ b/integration_test/argo_rollouts_test.go @@ -97,9 +97,10 @@ func (s *argoRolloutsSuite) Test_SuccessfullyInterceptsArgoRollout() { stdout := itest.TelepresenceOk(ctx, "intercept", "--mount", "false", "--port", port, svc) require.Contains(stdout, "Using "+tp+" "+svc) - stdout = itest.TelepresenceOk(ctx, "list", "--intercepts") - require.Contains(stdout, svc+": intercepted") - require.NotContains(stdout, "Volume Mount Point") + require.Eventually(func() bool { + stdout = itest.TelepresenceOk(ctx, "list", "--intercepts") + return strings.Contains(stdout, svc+": intercepted") && !strings.Contains(stdout, "Volume Mount Point") + }, 14*time.Second, 2*time.Second) s.CapturePodLogs(ctx, svc, "traffic-agent", s.AppNamespace()) itest.TelepresenceOk(ctx, "leave", svc) stdout = itest.TelepresenceOk(ctx, "list", "--intercepts") diff --git a/integration_test/workspace_watch_test.go b/integration_test/workspace_watch_test.go index 9b1da30ab5..70afa7096e 100644 --- a/integration_test/workspace_watch_test.go +++ b/integration_test/workspace_watch_test.go @@ -163,6 +163,7 @@ func (s *notConnectedSuite) Test_WorkspaceListener() { expectations["agent installed"] = true } case manager.WorkloadInfo_INTERCEPTED: + expectations["agent installed"] = true expectations["agent intercepted"] = true if ics := ev.Workload.InterceptClients; len(ics) == 1 { interceptingClient = ics[0].Client diff --git a/pkg/client/cli/cmd/list.go b/pkg/client/cli/cmd/list.go index 4dd6e528c6..2706350ad9 100644 --- a/pkg/client/cli/cmd/list.go +++ b/pkg/client/cli/cmd/list.go @@ -175,6 +175,9 @@ func (s *listCommand) printList(ctx context.Context, workloads []*connector.Work return intercept.DescribeIntercepts(ctx, iis, "", s.debug) } ai := workload.Sidecar + if workload.NotInterceptableReason == "Progressing" { + return "progressing..." + } if ai != nil { return "ready to intercept (traffic-agent already installed)" } diff --git a/pkg/client/userd/trafficmgr/intercept.go b/pkg/client/userd/trafficmgr/intercept.go index 5a280098e2..418cc5ffb6 100644 --- a/pkg/client/userd/trafficmgr/intercept.go +++ b/pkg/client/userd/trafficmgr/intercept.go @@ -503,7 +503,6 @@ func (s *session) ensureNoInterceptConflict(ir *rpc.CreateInterceptRequest) *rpc // only if the returned rpc.InterceptResult is nil. The returned runtime.Object is either nil, indicating a local // intercept, or the workload for the intercept. func (s *session) CanIntercept(c context.Context, ir *rpc.CreateInterceptRequest) (userd.InterceptInfo, *rpc.InterceptResult) { - s.waitForSync(c) spec := ir.Spec if spec.Namespace == "" { spec.Namespace = s.Namespace diff --git a/pkg/client/userd/trafficmgr/session.go b/pkg/client/userd/trafficmgr/session.go index 76bd82ea35..84d64965c8 100644 --- a/pkg/client/userd/trafficmgr/session.go +++ b/pkg/client/userd/trafficmgr/session.go @@ -17,6 +17,7 @@ import ( "github.com/blang/semver/v4" "github.com/go-json-experiment/json" + "github.com/google/uuid" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -42,7 +43,6 @@ import ( rootdRpc "github.com/telepresenceio/telepresence/rpc/v2/daemon" "github.com/telepresenceio/telepresence/rpc/v2/manager" "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" - "github.com/telepresenceio/telepresence/v2/pkg/agentmap" authGrpc "github.com/telepresenceio/telepresence/v2/pkg/authenticator/grpc" "github.com/telepresenceio/telepresence/v2/pkg/authenticator/patcher" "github.com/telepresenceio/telepresence/v2/pkg/client" @@ -55,9 +55,11 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/client/userd" "github.com/telepresenceio/telepresence/v2/pkg/client/userd/k8s" "github.com/telepresenceio/telepresence/v2/pkg/errcat" + "github.com/telepresenceio/telepresence/v2/pkg/informer" "github.com/telepresenceio/telepresence/v2/pkg/matcher" "github.com/telepresenceio/telepresence/v2/pkg/proc" "github.com/telepresenceio/telepresence/v2/pkg/restapi" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) type apiServer struct { @@ -70,6 +72,18 @@ type apiMatcher struct { metadata map[string]string } +type workloadInfoKey struct { + kind manager.WorkloadInfo_Kind + name string +} + +type workloadInfo struct { + uid types.UID + state workload.State + agentState manager.WorkloadInfo_AgentState + interceptClients []string +} + type session struct { *k8s.Cluster rootDaemon rootdRpc.DaemonClient @@ -96,7 +110,12 @@ type session struct { sessionInfo *manager.SessionInfo // sessionInfo returned by the traffic-manager - wlWatcher *workloadsAndServicesWatcher + workloadsLock sync.Mutex + + // Map of manager.WorkloadInfo split into namespace, key of kind and name, and workloadInfo + workloads map[string]map[workloadInfoKey]workloadInfo + + workloadSubscribers map[uuid.UUID]chan struct{} // currentInterceptsLock ensures that all accesses to currentIntercepts, currentMatchers, // currentAPIServers, interceptWaiters, and ingressInfo are synchronized @@ -415,19 +434,6 @@ func connectMgr( managerName = "Traffic Manager" } - knownWorkloadKinds, err := mClient.GetKnownWorkloadKinds(ctx, si) - if err != nil { - if status.Code(err) != codes.Unimplemented { - return nil, fmt.Errorf("failed to get known workload kinds: %w", err) - } - // Talking to an older traffic-manager, use legacy default types - knownWorkloadKinds = &manager.KnownWorkloadKinds{Kinds: []manager.WorkloadInfo_Kind{ - manager.WorkloadInfo_DEPLOYMENT, - manager.WorkloadInfo_REPLICASET, - manager.WorkloadInfo_STATEFULSET, - }} - } - sess := &session{ Cluster: cluster, installID: installID, @@ -438,8 +444,8 @@ func connectMgr( managerName: managerName, managerVersion: managerVersion, sessionInfo: si, + workloads: make(map[string]map[workloadInfoKey]workloadInfo), interceptWaiters: make(map[string]*awaitIntercept), - wlWatcher: newWASWatcher(knownWorkloadKinds), isPodDaemon: cr.IsPodDaemon, done: make(chan struct{}), subnetViaWorkloads: cr.SubnetViaWorkloads, @@ -507,8 +513,6 @@ func connectError(t rpc.ConnectInfo_ErrType, err error) *rpc.ConnectInfo { func (s *session) updateDaemonNamespaces(c context.Context) { const svcDomain = "svc" - s.wlWatcher.setNamespacesToWatch(c, s.GetCurrentNamespaces(true)) - domains := s.GetCurrentNamespaces(false) if !slices.Contains(domains, svcDomain) { domains = append(domains, svcDomain) @@ -572,37 +576,22 @@ func (s *session) ApplyConfig(ctx context.Context) error { // getInfosForWorkloads returns a list of workloads found in the given namespace that fulfils the given filter criteria. func (s *session) getInfosForWorkloads( - ctx context.Context, namespaces []string, iMap map[string][]*manager.InterceptInfo, sMap map[string]*rpc.WorkloadInfo_Sidecar, filter rpc.ListRequest_Filter, ) []*rpc.WorkloadInfo { - wiMap := make(map[types.UID]*rpc.WorkloadInfo) - s.wlWatcher.eachWorkload(ctx, k8s.GetManagerNamespace(ctx), namespaces, func(workload k8sapi.Workload) { - name := workload.GetName() - dlog.Debugf(ctx, "Getting info for %s %s.%s, matching service", workload.GetKind(), name, workload.GetNamespace()) - + wiMap := make(map[string]*rpc.WorkloadInfo) + s.eachWorkload(namespaces, func(wlKind manager.WorkloadInfo_Kind, name, namespace string, info workloadInfo) { + kind := wlKind.String() wlInfo := &rpc.WorkloadInfo{ Name: name, - Namespace: workload.GetNamespace(), - WorkloadResourceType: workload.GetKind(), - Uid: string(workload.GetUID()), + Namespace: namespace, + WorkloadResourceType: kind, + Uid: string(info.uid), } - - svcs, err := agentmap.FindServicesForPod(ctx, workload.GetPodTemplate(), "") - if err == nil && len(svcs) > 0 { - srm := make(map[string]*rpc.WorkloadInfo_ServiceReference, len(svcs)) - for _, so := range svcs { - if svc, ok := k8sapi.ServiceImpl(so); ok { - srm[string(svc.UID)] = &rpc.WorkloadInfo_ServiceReference{ - Name: svc.Name, - Namespace: svc.Namespace, - Ports: getServicePorts(svc), - } - } - } - wlInfo.Services = srm + if info.state != workload.StateAvailable { + wlInfo.NotInterceptableReason = info.state.String() } var ok bool @@ -612,7 +601,7 @@ func (s *session) getInfosForWorkloads( if wlInfo.Sidecar, ok = sMap[name]; !ok && filter <= rpc.ListRequest_INSTALLED_AGENTS { return } - wiMap[workload.GetUID()] = wlInfo + wiMap[fmt.Sprintf("%s:%s.%s", kind, name, namespace)] = wlInfo }) wiz := make([]*rpc.WorkloadInfo, len(wiMap)) i := 0 @@ -624,58 +613,46 @@ func (s *session) getInfosForWorkloads( return wiz } -func getServicePorts(svc *core.Service) []*rpc.WorkloadInfo_ServiceReference_Port { - ports := make([]*rpc.WorkloadInfo_ServiceReference_Port, len(svc.Spec.Ports)) - for i, p := range svc.Spec.Ports { - ports[i] = &rpc.WorkloadInfo_ServiceReference_Port{ - Name: p.Name, - Port: p.Port, - } +func (s *session) WatchWorkloads(c context.Context, wr *rpc.WatchWorkloadsRequest, stream userd.WatchWorkloadsStream) error { + id := uuid.New() + ch := make(chan struct{}) + s.workloadsLock.Lock() + if s.workloadSubscribers == nil { + s.workloadSubscribers = make(map[uuid.UUID]chan struct{}) } - return ports -} + s.workloadSubscribers[id] = ch + s.workloadsLock.Unlock() -func (s *session) waitForSync(ctx context.Context) { - s.wlWatcher.setNamespacesToWatch(ctx, s.GetCurrentNamespaces(true)) - s.wlWatcher.waitForSync(ctx) -} + defer func() { + s.workloadsLock.Lock() + delete(s.workloadSubscribers, id) + s.workloadsLock.Unlock() + }() -func (s *session) WatchWorkloads(c context.Context, wr *rpc.WatchWorkloadsRequest, stream userd.WatchWorkloadsStream) error { - s.waitForSync(c) - s.ensureWatchers(c, wr.Namespaces) - sCtx, sCancel := context.WithCancel(c) - // We need to make sure the subscription ends when we leave this method, since this is the one consuming the snapshotAvailable channel. - // Otherwise, the goroutine that writes to the channel will leak. - defer sCancel() - snapshotAvailable := s.wlWatcher.subscribe(sCtx) + send := func() error { + ws, err := s.WorkloadInfoSnapshot(c, wr.Namespaces, rpc.ListRequest_EVERYTHING) + if err != nil { + return err + } + return stream.Send(ws) + } + + // Send initial snapshot + if err := send(); err != nil { + return err + } for { select { - case <-c.Done(): // if context is done (usually the session's context). - return nil - case <-stream.Context().Done(): // if stream context is done. + case <-c.Done(): return nil - case <-snapshotAvailable: - snapshot, err := s.workloadInfoSnapshot(c, wr.GetNamespaces(), rpc.ListRequest_INTERCEPTABLE) - if err != nil { - return status.Errorf(codes.Unavailable, "failed to create WorkloadInfoSnapshot: %v", err) - } - if err := stream.Send(snapshot); err != nil { - dlog.Errorf(c, "WatchWorkloads.Send() failed: %v", err) + case <-ch: + if err := send(); err != nil { return err } } } } -func (s *session) WorkloadInfoSnapshot( - ctx context.Context, - namespaces []string, - filter rpc.ListRequest_Filter, -) (*rpc.WorkloadInfoSnapshot, error) { - s.waitForSync(ctx) - return s.workloadInfoSnapshot(ctx, namespaces, filter) -} - func (s *session) ensureWatchers(ctx context.Context, namespaces []string, ) { @@ -683,30 +660,31 @@ func (s *session) ensureWatchers(ctx context.Context, wg := sync.WaitGroup{} wg.Add(len(namespaces)) for _, ns := range namespaces { - if ns == "" { - ns = s.Namespace + s.workloadsLock.Lock() + _, ok := s.workloads[ns] + s.workloadsLock.Unlock() + if ok { + wg.Done() + } else { + go func() { + if err := s.workloadsWatcher(ctx, ns, &wg); err != nil { + dlog.Errorf(ctx, "error ensuring watcher for namespace %s: %v", ns, err) + return + } + }() + dlog.Debugf(ctx, "watcher for namespace %s started", ns) } - wgp := &wg - s.wlWatcher.ensureStarted(ctx, ns, func(started bool) { - if started { - dlog.Debugf(ctx, "watchers for %s started", ns) - } - if wgp != nil { - wgp.Done() - wgp = nil - } - }) } wg.Wait() + dlog.Debugf(ctx, "watchers for %q synced", namespaces) } -func (s *session) workloadInfoSnapshot( +func (s *session) WorkloadInfoSnapshot( ctx context.Context, namespaces []string, filter rpc.ListRequest_Filter, ) (*rpc.WorkloadInfoSnapshot, error) { is := s.getCurrentIntercepts() - s.ensureWatchers(ctx, namespaces) var nss []string if filter == rpc.ListRequest_INTERCEPTS { @@ -723,8 +701,10 @@ func (s *session) workloadInfoSnapshot( } if len(nss) == 0 { // none of the namespaces are currently mapped + dlog.Debug(ctx, "No namespaces are mapped") return &rpc.WorkloadInfoSnapshot{}, nil } + s.ensureWatchers(ctx, nss) iMap := make(map[string][]*manager.InterceptInfo, len(is)) nextIs: @@ -748,7 +728,7 @@ nextIs: } } - workloadInfos := s.getInfosForWorkloads(ctx, nss, iMap, sMap, filter) + workloadInfos := s.getInfosForWorkloads(nss, iMap, sMap, filter) return &rpc.WorkloadInfoSnapshot{Workloads: workloadInfos}, nil } @@ -908,11 +888,9 @@ func (s *session) Uninstall(ctx context.Context, ur *rpc.UninstallRequest) (*com // to prevent the clients from doing it. if ur.UninstallType == rpc.UninstallRequest_NAMED_AGENTS { // must have a valid namespace in order to uninstall named agents - s.waitForSync(ctx) if ur.Namespace == "" { ur.Namespace = s.Namespace } - s.wlWatcher.ensureStarted(ctx, ur.Namespace, nil) namespace := s.ActualNamespace(ur.Namespace) if namespace == "" { // namespace is not mapped @@ -962,11 +940,9 @@ func (s *session) Uninstall(ctx context.Context, ur *rpc.UninstallRequest) (*com } if ur.Namespace != "" { - s.waitForSync(ctx) if ur.Namespace == "" { ur.Namespace = s.Namespace } - s.wlWatcher.ensureStarted(ctx, ur.Namespace, nil) namespace := s.ActualNamespace(ur.Namespace) if namespace == "" { // namespace is not mapped @@ -1071,3 +1047,186 @@ func (s *session) connectRootDaemon(ctx context.Context, nc *rootdRpc.NetworkCon dlog.Debug(ctx, "Connected to root daemon") return rd, nil } + +func (s *session) eachWorkload(namespaces []string, do func(kind manager.WorkloadInfo_Kind, name, namespace string, info workloadInfo)) { + s.workloadsLock.Lock() + for _, ns := range namespaces { + if workloads, ok := s.workloads[ns]; ok { + for key, info := range workloads { + do(key.kind, key.name, ns, info) + } + } + } + s.workloadsLock.Unlock() +} + +func rpcKind(s string) manager.WorkloadInfo_Kind { + switch strings.ToLower(s) { + case "deployment": + return manager.WorkloadInfo_DEPLOYMENT + case "replicaset": + return manager.WorkloadInfo_REPLICASET + case "statefulset": + return manager.WorkloadInfo_STATEFULSET + case "rollout": + return manager.WorkloadInfo_ROLLOUT + default: + return manager.WorkloadInfo_UNSPECIFIED + } +} + +func (s *session) localWorkloadsWatcher(ctx context.Context, namespace string, synced *sync.WaitGroup) error { + defer func() { + if synced != nil { + synced.Done() + } + dlog.Debug(ctx, "client workload watcher ended") + }() + + knownWorkloadKinds, err := s.managerClient.GetKnownWorkloadKinds(ctx, s.sessionInfo) + if err != nil { + if status.Code(err) != codes.Unimplemented { + return fmt.Errorf("failed to get known workload kinds: %w", err) + } + // Talking to an older traffic-manager, use legacy default types + knownWorkloadKinds = &manager.KnownWorkloadKinds{Kinds: []manager.WorkloadInfo_Kind{ + manager.WorkloadInfo_DEPLOYMENT, + manager.WorkloadInfo_REPLICASET, + manager.WorkloadInfo_STATEFULSET, + }} + } + + dlog.Debugf(ctx, "Watching workloads from client due to lack of workload watcher support in traffic-manager %s", s.managerVersion) + fc := informer.GetFactory(ctx, namespace) + if fc == nil { + ctx = informer.WithFactory(ctx, namespace) + fc = informer.GetFactory(ctx, namespace) + } + workload.StartDeployments(ctx, namespace) + workload.StartReplicaSets(ctx, namespace) + workload.StartStatefulSets(ctx, namespace) + kf := fc.GetK8sInformerFactory() + kf.Start(ctx.Done()) + + rolloutsEnabled := slices.Index(knownWorkloadKinds.Kinds, manager.WorkloadInfo_ROLLOUT) >= 0 + if rolloutsEnabled { + workload.StartRollouts(ctx, namespace) + af := fc.GetArgoRolloutsInformerFactory() + af.Start(ctx.Done()) + } + + ww, err := workload.NewWatcher(ctx, namespace, rolloutsEnabled) + if err != nil { + workload.StartRollouts(ctx, namespace) + return err + } + kf.WaitForCacheSync(ctx.Done()) + + wlCh := ww.Subscribe(ctx) + for { + select { + case <-ctx.Done(): + return nil + case wls := <-wlCh: + if wls == nil { + return nil + } + s.workloadsLock.Lock() + workloads, ok := s.workloads[namespace] + if !ok { + workloads = make(map[workloadInfoKey]workloadInfo) + s.workloads[namespace] = workloads + } + for _, we := range wls { + w := we.Workload + key := workloadInfoKey{kind: rpcKind(w.GetKind()), name: w.GetName()} + if we.Type == workload.EventTypeDelete { + delete(workloads, key) + } else { + workloads[key] = workloadInfo{ + state: workload.GetWorkloadState(w), + uid: w.GetUID(), + } + } + } + for _, subscriber := range s.workloadSubscribers { + select { + case subscriber <- struct{}{}: + default: + } + } + s.workloadsLock.Unlock() + if synced != nil { + synced.Done() + synced = nil + } + } + } +} + +func (s *session) workloadsWatcher(ctx context.Context, namespace string, synced *sync.WaitGroup) error { + defer func() { + if synced != nil { + synced.Done() + } + }() + wlc, err := s.managerClient.WatchWorkloads(ctx, &manager.WorkloadEventsRequest{SessionInfo: s.sessionInfo, Namespace: namespace}) + if err != nil { + return err + } + + for ctx.Err() == nil { + wls, err := wlc.Recv() + if err != nil { + if status.Code(err) != codes.Unimplemented { + return err + } + localSynced := synced + synced = nil + return s.localWorkloadsWatcher(ctx, namespace, localSynced) + } + + s.workloadsLock.Lock() + workloads, ok := s.workloads[namespace] + if !ok { + workloads = make(map[workloadInfoKey]workloadInfo) + s.workloads[namespace] = workloads + } + + for _, we := range wls.GetEvents() { + w := we.Workload + key := workloadInfoKey{kind: w.Kind, name: w.Name} + if we.Type == manager.WorkloadEvent_DELETED { + dlog.Debugf(ctx, "Deleting workload %s/%s.%s", key.kind, key.name, namespace) + delete(workloads, key) + } else { + var clients []string + if lc := len(w.InterceptClients); lc > 0 { + clients = make([]string, lc) + for i, ic := range w.InterceptClients { + clients[i] = ic.Client + } + } + dlog.Debugf(ctx, "Adding workload %s/%s.%s", key.kind, key.name, namespace) + workloads[key] = workloadInfo{ + uid: types.UID(w.Uid), + state: workload.StateFromRPC(w.State), + agentState: w.AgentState, + interceptClients: clients, + } + } + } + for _, subscriber := range s.workloadSubscribers { + select { + case subscriber <- struct{}{}: + default: + } + } + s.workloadsLock.Unlock() + if synced != nil { + synced.Done() + synced = nil + } + } + return nil +} diff --git a/pkg/client/userd/trafficmgr/workloads.go b/pkg/client/userd/trafficmgr/workloads.go deleted file mode 100644 index 7c577c8327..0000000000 --- a/pkg/client/userd/trafficmgr/workloads.go +++ /dev/null @@ -1,326 +0,0 @@ -package trafficmgr - -import ( - "context" - "slices" - "sort" - "sync" - "time" - - core "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/cache" - - "github.com/datawire/dlib/dlog" - "github.com/datawire/k8sapi/pkg/k8sapi" - "github.com/telepresenceio/telepresence/rpc/v2/manager" -) - -type workloadsAndServicesWatcher struct { - sync.Mutex - wlKinds []manager.WorkloadInfo_Kind - nsWatchers map[string]*namespacedWASWatcher - nsListeners []func() - cond sync.Cond -} - -const ( - deployments = 0 - replicasets = 1 - statefulsets = 2 - rollouts = 3 -) - -// namespacedWASWatcher is watches Workloads And Services (WAS) for a namespace. -type namespacedWASWatcher struct { - svcWatcher *k8sapi.Watcher[*core.Service] - wlWatchers [4]*k8sapi.Watcher[runtime.Object] -} - -// svcEquals compare only the Service fields that are of interest to Telepresence. They are -// -// - UID -// - Name -// - Namespace -// - Spec.Ports -// - Spec.Type -func svcEquals(a, b *core.Service) bool { - aPorts := a.Spec.Ports - bPorts := b.Spec.Ports - if len(aPorts) != len(bPorts) { - return false - } - if a.UID != b.UID || a.Name != b.Name || a.Namespace != b.Namespace || a.Spec.Type != b.Spec.Type { - return false - } -nextMP: - // order is not significant (nor can it be trusted) when comparing - for _, mp := range aPorts { - for _, op := range bPorts { - if mp == op { - continue nextMP - } - } - return false - } - return true -} - -// workloadEquals compare only the workload (Deployment, ResourceSet, or StatefulSet) fields that are of interest to Telepresence. They are -// -// - UID -// - Name -// - Namespace -// - Spec.Template: -// - Labels -// - Containers (must contain an equal number of equally named containers with equal ports) -func workloadEquals(oa, ob runtime.Object) bool { - a, err := k8sapi.WrapWorkload(oa) - if err != nil { - // This should definitely never happen - panic(err) - } - b, err := k8sapi.WrapWorkload(ob) - if err != nil { - // This should definitely never happen - panic(err) - } - if a.GetUID() != b.GetUID() || a.GetName() != b.GetName() || a.GetNamespace() != b.GetNamespace() { - return false - } - - aSpec := a.GetPodTemplate() - bSpec := b.GetPodTemplate() - if !labels.Equals(aSpec.Labels, bSpec.Labels) { - return false - } - aPod := aSpec.Spec - bPod := bSpec.Spec - if len(aPod.Containers) != len(bPod.Containers) { - return false - } - makeContainerMap := func(cs []core.Container) map[string]*core.Container { - m := make(map[string]*core.Container, len(cs)) - for i := range cs { - c := &cs[i] - m[c.Name] = c - } - return m - } - - portsEqual := func(a, b []core.ContainerPort) bool { - if len(a) != len(b) { - return false - } - nextAP: - for _, ap := range a { - for _, bp := range b { - if ap == bp { - continue nextAP - } - } - return false - } - return true - } - - am := makeContainerMap(aPod.Containers) - bm := makeContainerMap(bPod.Containers) - for n, ac := range am { - bc, ok := bm[n] - if !ok { - return false - } - if !portsEqual(ac.Ports, bc.Ports) { - return false - } - } - return true -} - -func newNamespaceWatcher(c context.Context, namespace string, cond *sync.Cond, wlKinds []manager.WorkloadInfo_Kind) *namespacedWASWatcher { - dlog.Debugf(c, "newNamespaceWatcher %s", namespace) - ki := k8sapi.GetJoinedClientSetInterface(c) - appsGetter, rolloutsGetter := ki.AppsV1().RESTClient(), ki.ArgoprojV1alpha1().RESTClient() - w := &namespacedWASWatcher{ - svcWatcher: k8sapi.NewWatcher("services", ki.CoreV1().RESTClient(), cond, k8sapi.WithEquals(svcEquals), k8sapi.WithNamespace[*core.Service](namespace)), - wlWatchers: [4]*k8sapi.Watcher[runtime.Object]{ - k8sapi.NewWatcher("deployments", appsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)), - k8sapi.NewWatcher("replicasets", appsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)), - k8sapi.NewWatcher("statefulsets", appsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)), - nil, - }, - } - if slices.Contains(wlKinds, manager.WorkloadInfo_ROLLOUT) { - w.wlWatchers[rollouts] = k8sapi.NewWatcher("rollouts", rolloutsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)) - } - return w -} - -func (nw *namespacedWASWatcher) cancel() { - nw.svcWatcher.Cancel() - for _, w := range nw.wlWatchers { - if w != nil { - w.Cancel() - } - } -} - -func (nw *namespacedWASWatcher) hasSynced() bool { - return nw.svcWatcher.HasSynced() && - nw.wlWatchers[deployments].HasSynced() && - nw.wlWatchers[replicasets].HasSynced() && - nw.wlWatchers[statefulsets].HasSynced() && - (nw.wlWatchers[rollouts] == nil || nw.wlWatchers[rollouts].HasSynced()) -} - -func newWASWatcher(knownWorkloadKinds *manager.KnownWorkloadKinds) *workloadsAndServicesWatcher { - w := &workloadsAndServicesWatcher{ - wlKinds: knownWorkloadKinds.Kinds, - nsWatchers: make(map[string]*namespacedWASWatcher), - } - w.cond.L = &w.Mutex - return w -} - -// eachWorkload will iterate over the workloads in the current snapshot. Unless namespace -// is the empty string, the iteration is limited to the workloads matching that namespace. -// The traffic-manager workload is excluded. -func (w *workloadsAndServicesWatcher) eachWorkload(c context.Context, tmns string, namespaces []string, f func(workload k8sapi.Workload)) { - if len(namespaces) != 1 { - // Produce workloads in a predictable order - nss := make([]string, len(namespaces)) - copy(nss, namespaces) - sort.Strings(nss) - for _, n := range nss { - w.eachWorkload(c, tmns, []string{n}, f) - } - } else { - ns := namespaces[0] - w.Lock() - nw, ok := w.nsWatchers[ns] - w.Unlock() - if ok { - for _, wlw := range nw.wlWatchers { - if wlw == nil { - continue - } - wls, err := wlw.List(c) - if err != nil { - dlog.Errorf(c, "error listing workloads: %v", err) - return - } - - nextWorkload: - for _, ro := range wls { - wl, err := k8sapi.WrapWorkload(ro) - if err != nil { - dlog.Errorf(c, "error wrapping runtime object as a workload: %v", err) - return - } - - // Exclude workloads that are owned by a supported workload. - for _, or := range wl.GetOwnerReferences() { - if or.Controller != nil && *or.Controller { - switch or.Kind { - case "Deployment", "ReplicaSet", "StatefulSet": - continue nextWorkload - case "Rollout": - if slices.Contains(w.wlKinds, manager.WorkloadInfo_ROLLOUT) { - continue nextWorkload - } - } - } - } - // If this is our traffic-manager namespace, then exclude the traffic-manager service. - lbs := wl.GetLabels() - if !(ns == tmns && lbs["app"] == "traffic-manager" && lbs["telepresence"] == "manager") { - f(wl) - } - } - } - } - } -} - -func (w *workloadsAndServicesWatcher) waitForSync(c context.Context) { - hss := make([]cache.InformerSynced, len(w.nsWatchers)) - w.Lock() - i := 0 - for _, nw := range w.nsWatchers { - hss[i] = nw.hasSynced - i++ - } - w.Unlock() - - hasSynced := true - for _, hs := range hss { - if !hs() { - hasSynced = false - break - } - } - if !hasSynced { - // Waiting for cache sync will sometimes block, so a timeout is necessary here - c, cancel := context.WithTimeout(c, 5*time.Second) - defer cancel() - cache.WaitForCacheSync(c.Done(), hss...) - } -} - -// subscribe writes to the given channel whenever relevant information has changed -// in the current snapshot. -func (w *workloadsAndServicesWatcher) subscribe(c context.Context) <-chan struct{} { - return k8sapi.Subscribe(c, &w.cond) -} - -// setNamespacesToWatch starts new watchers or kills old ones to make the current -// set of watchers reflect the nss argument. -func (w *workloadsAndServicesWatcher) setNamespacesToWatch(c context.Context, nss []string) { - var adds []string - desired := make(map[string]struct{}) - - w.Lock() - for _, ns := range nss { - desired[ns] = struct{}{} - if _, ok := w.nsWatchers[ns]; !ok { - adds = append(adds, ns) - } - } - for ns, nw := range w.nsWatchers { - if _, ok := desired[ns]; !ok { - delete(w.nsWatchers, ns) - nw.cancel() - } - } - for _, ns := range adds { - w.addNSLocked(c, ns) - } - w.Unlock() -} - -func (w *workloadsAndServicesWatcher) addNSLocked(c context.Context, ns string) *namespacedWASWatcher { - nw := newNamespaceWatcher(c, ns, &w.cond, w.wlKinds) - w.nsWatchers[ns] = nw - for _, l := range w.nsListeners { - nw.svcWatcher.AddStateListener(&k8sapi.StateListener{Cb: l}) - } - return nw -} - -func (w *workloadsAndServicesWatcher) ensureStarted(c context.Context, ns string, cb func(bool)) { - w.Lock() - defer w.Unlock() - nw, ok := w.nsWatchers[ns] - if !ok { - nw = w.addNSLocked(c, ns) - } - // Starting the svcWatcher will set it to active and also trigger its state listener - // which means a) that the set of active namespaces will change, and b) that the - // WatchAgentsNS will restart with that namespace included. - err := nw.svcWatcher.EnsureStarted(c, cb) - if err != nil { - dlog.Errorf(c, "error starting service watchers: %s", err) - } -} diff --git a/pkg/workload/informers.go b/pkg/workload/informers.go new file mode 100644 index 0000000000..9171995214 --- /dev/null +++ b/pkg/workload/informers.go @@ -0,0 +1,106 @@ +package workload + +import ( + "context" + + apps "k8s.io/api/apps/v1" + core "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + + argorollouts "github.com/datawire/argo-rollouts-go-client/pkg/apis/rollouts/v1alpha1" + "github.com/datawire/dlib/dlog" + "github.com/telepresenceio/telepresence/v2/pkg/informer" +) + +func whereWeWatch(ns string) string { + if ns == "" { + return "cluster wide" + } + return "in namespace " + ns +} + +func StartDeployments(ctx context.Context, ns string) cache.SharedIndexInformer { + f := informer.GetK8sFactory(ctx, ns) + ix := f.Apps().V1().Deployments().Informer() + _ = ix.SetTransform(func(o any) (any, error) { + // Strip the parts of the deployment that we don't care about to save memory + if dep, ok := o.(*apps.Deployment); ok { + om := &dep.ObjectMeta + if an := om.Annotations; an != nil { + delete(an, core.LastAppliedConfigAnnotation) + } + dep.ManagedFields = nil + dep.Finalizers = nil + dep.OwnerReferences = nil + } + return o, nil + }) + _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { + dlog.Errorf(ctx, "watcher for Deployments %s: %v", whereWeWatch(ns), err) + }) + return ix +} + +func StartReplicaSets(ctx context.Context, ns string) cache.SharedIndexInformer { + f := informer.GetK8sFactory(ctx, ns) + ix := f.Apps().V1().ReplicaSets().Informer() + _ = ix.SetTransform(func(o any) (any, error) { + // Strip the parts of the replicaset that we don't care about. Saves memory + if dep, ok := o.(*apps.ReplicaSet); ok { + om := &dep.ObjectMeta + if an := om.Annotations; an != nil { + delete(an, core.LastAppliedConfigAnnotation) + } + dep.ManagedFields = nil + dep.Finalizers = nil + } + return o, nil + }) + _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { + dlog.Errorf(ctx, "watcher for ReplicaSets %s: %v", whereWeWatch(ns), err) + }) + return ix +} + +func StartStatefulSets(ctx context.Context, ns string) cache.SharedIndexInformer { + f := informer.GetK8sFactory(ctx, ns) + ix := f.Apps().V1().StatefulSets().Informer() + _ = ix.SetTransform(func(o any) (any, error) { + // Strip the parts of the stateful that we don't care about. Saves memory + if dep, ok := o.(*apps.StatefulSet); ok { + om := &dep.ObjectMeta + if an := om.Annotations; an != nil { + delete(an, core.LastAppliedConfigAnnotation) + } + dep.ManagedFields = nil + dep.Finalizers = nil + } + return o, nil + }) + _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { + dlog.Errorf(ctx, "watcher for StatefulSet %s: %v", whereWeWatch(ns), err) + }) + return ix +} + +func StartRollouts(ctx context.Context, ns string) cache.SharedIndexInformer { + f := informer.GetArgoRolloutsFactory(ctx, ns) + dlog.Infof(ctx, "Watching Rollouts in %s", ns) + ix := f.Argoproj().V1alpha1().Rollouts().Informer() + _ = ix.SetTransform(func(o any) (any, error) { + // Strip the parts of the rollout that we don't care about. Saves memory + if dep, ok := o.(*argorollouts.Rollout); ok { + om := &dep.ObjectMeta + if an := om.Annotations; an != nil { + delete(an, core.LastAppliedConfigAnnotation) + } + dep.ManagedFields = nil + dep.Finalizers = nil + } + return o, nil + }) + _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { + dlog.Errorf(ctx, "watcher for Rollouts %s: %v", whereWeWatch(ns), err) + }) + return ix +} diff --git a/pkg/workload/state.go b/pkg/workload/state.go new file mode 100644 index 0000000000..4a8bc3f51a --- /dev/null +++ b/pkg/workload/state.go @@ -0,0 +1,130 @@ +package workload + +import ( + "sort" + + appsv1 "k8s.io/api/apps/v1" + core "k8s.io/api/core/v1" + + argorollouts "github.com/datawire/argo-rollouts-go-client/pkg/apis/rollouts/v1alpha1" + "github.com/datawire/k8sapi/pkg/k8sapi" + "github.com/telepresenceio/telepresence/rpc/v2/manager" +) + +type State int + +const ( + StateUnknown State = iota + StateProgressing + StateAvailable + StateFailure +) + +func deploymentState(d *appsv1.Deployment) State { + conds := d.Status.Conditions + sort.Slice(conds, func(i, j int) bool { + return conds[i].LastTransitionTime.Compare(conds[j].LastTransitionTime.Time) > 0 + }) + for _, c := range conds { + switch c.Type { + case appsv1.DeploymentProgressing: + if c.Status == core.ConditionTrue { + return StateProgressing + } + case appsv1.DeploymentAvailable: + if c.Status == core.ConditionTrue { + return StateAvailable + } + case appsv1.DeploymentReplicaFailure: + if c.Status == core.ConditionTrue { + return StateFailure + } + } + } + if len(conds) == 0 { + return StateProgressing + } + return StateUnknown +} + +func replicaSetState(d *appsv1.ReplicaSet) State { + for _, c := range d.Status.Conditions { + if c.Type == appsv1.ReplicaSetReplicaFailure && c.Status == core.ConditionTrue { + return StateFailure + } + } + return StateAvailable +} + +func statefulSetState(_ *appsv1.StatefulSet) State { + return StateAvailable +} + +func rolloutSetState(r *argorollouts.Rollout) State { + conds := r.Status.Conditions + sort.Slice(conds, func(i, j int) bool { + return conds[i].LastTransitionTime.Compare(conds[j].LastTransitionTime.Time) > 0 + }) + for _, c := range conds { + switch c.Type { + case argorollouts.RolloutProgressing: + if c.Status == core.ConditionTrue { + return StateProgressing + } + case argorollouts.RolloutAvailable: + if c.Status == core.ConditionTrue { + return StateAvailable + } + case argorollouts.RolloutReplicaFailure: + if c.Status == core.ConditionTrue { + return StateFailure + } + } + } + if len(conds) == 0 { + return StateProgressing + } + return StateUnknown +} + +func (ws State) String() string { + switch ws { + case StateProgressing: + return "Progressing" + case StateAvailable: + return "Available" + case StateFailure: + return "Failure" + default: + return "Unknown" + } +} + +func GetWorkloadState(wl k8sapi.Workload) State { + if d, ok := k8sapi.DeploymentImpl(wl); ok { + return deploymentState(d) + } + if r, ok := k8sapi.ReplicaSetImpl(wl); ok { + return replicaSetState(r) + } + if s, ok := k8sapi.StatefulSetImpl(wl); ok { + return statefulSetState(s) + } + if rt, ok := k8sapi.RolloutImpl(wl); ok { + return rolloutSetState(rt) + } + return StateUnknown +} + +func StateFromRPC(s manager.WorkloadInfo_State) State { + switch s { + case manager.WorkloadInfo_AVAILABLE: + return StateAvailable + case manager.WorkloadInfo_FAILURE: + return StateFailure + case manager.WorkloadInfo_PROGRESSING: + return StateProgressing + default: + return StateUnknown + } +} diff --git a/pkg/workload/util.go b/pkg/workload/util.go new file mode 100644 index 0000000000..fb58969f13 --- /dev/null +++ b/pkg/workload/util.go @@ -0,0 +1,25 @@ +package workload + +import ( + "k8s.io/apimachinery/pkg/runtime" + + "github.com/datawire/k8sapi/pkg/k8sapi" + "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" +) + +const ( + DomainPrefix = "telepresence.getambassador.io/" + InjectAnnotation = DomainPrefix + "inject-" + agentconfig.ContainerName + ServiceNameAnnotation = DomainPrefix + "inject-service-name" + ManualInjectAnnotation = DomainPrefix + "manually-injected" + AnnRestartedAt = DomainPrefix + "restartedAt" +) + +func FromAny(obj any) (k8sapi.Workload, bool) { + if ro, ok := obj.(runtime.Object); ok { + if wl, err := k8sapi.WrapWorkload(ro); err == nil { + return wl, true + } + } + return nil, false +} diff --git a/pkg/workload/watcher.go b/pkg/workload/watcher.go new file mode 100644 index 0000000000..faeb589675 --- /dev/null +++ b/pkg/workload/watcher.go @@ -0,0 +1,272 @@ +package workload + +import ( + "context" + "math" + "sync" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + apps "k8s.io/api/apps/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" + "k8s.io/kubectl/pkg/util/deployment" + + "github.com/datawire/dlib/dlog" + "github.com/datawire/k8sapi/pkg/k8sapi" + "github.com/telepresenceio/telepresence/v2/pkg/informer" +) + +type EventType int + +const ( + EventTypeAdd = iota + EventTypeUpdate + EventTypeDelete +) + +type WorkloadEvent struct { + Type EventType + Workload k8sapi.Workload +} + +func (e EventType) String() string { + switch e { + case EventTypeAdd: + return "add" + case EventTypeUpdate: + return "update" + case EventTypeDelete: + return "delete" + default: + return "unknown" + } +} + +type Watcher interface { + Subscribe(ctx context.Context) <-chan []WorkloadEvent +} + +type watcher struct { + sync.Mutex + namespace string + subscriptions map[uuid.UUID]chan<- []WorkloadEvent + timer *time.Timer + events []WorkloadEvent + rolloutsEnabled bool +} + +func NewWatcher(ctx context.Context, ns string, rolloutsEnabled bool) (Watcher, error) { + w := new(watcher) + w.namespace = ns + w.rolloutsEnabled = rolloutsEnabled + w.subscriptions = make(map[uuid.UUID]chan<- []WorkloadEvent) + w.timer = time.AfterFunc(time.Duration(math.MaxInt64), func() { + w.Lock() + ss := make([]chan<- []WorkloadEvent, len(w.subscriptions)) + i := 0 + for _, sub := range w.subscriptions { + ss[i] = sub + i++ + } + events := w.events + w.events = nil + w.Unlock() + for _, s := range ss { + select { + case <-ctx.Done(): + return + case s <- events: + } + } + }) + + err := w.addEventHandler(ctx, ns) + if err != nil { + return nil, err + } + return w, nil +} + +func hasValidReplicasetOwner(wl k8sapi.Workload, rolloutsEnabled bool) bool { + for _, ref := range wl.GetOwnerReferences() { + if ref.Controller != nil && *ref.Controller { + switch ref.Kind { + case "Deployment": + return true + case "Rollout": + if rolloutsEnabled { + return true + } + } + } + } + return false +} + +func (w *watcher) Subscribe(ctx context.Context) <-chan []WorkloadEvent { + ch := make(chan []WorkloadEvent, 1) + initialEvents := make([]WorkloadEvent, 0, 100) + id := uuid.New() + kf := informer.GetFactory(ctx, w.namespace) + ai := kf.GetK8sInformerFactory().Apps().V1() + dlog.Debugf(ctx, "workload.Watcher producing initial events for namespace %s", w.namespace) + if dps, err := ai.Deployments().Lister().Deployments(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range dps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } + } + } + if rps, err := ai.ReplicaSets().Lister().ReplicaSets(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range rps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } + } + } + if sps, err := ai.StatefulSets().Lister().StatefulSets(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range sps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } + } + } + if w.rolloutsEnabled { + ri := kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() + if sps, err := ri.Rollouts().Lister().Rollouts(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range sps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } + } + } + } + ch <- initialEvents + + w.Lock() + w.subscriptions[id] = ch + w.Unlock() + go func() { + <-ctx.Done() + close(ch) + w.Lock() + delete(w.subscriptions, id) + w.Unlock() + }() + return ch +} + +func compareOptions() []cmp.Option { + return []cmp.Option{ + // Ignore frequently changing fields of no interest + cmpopts.IgnoreFields(meta.ObjectMeta{}, "Namespace", "ResourceVersion", "Generation", "ManagedFields"), + + // Only the Conditions are of interest in the DeploymentStatus. + cmp.Comparer(func(a, b apps.DeploymentStatus) bool { + // Only compare the DeploymentCondition's type and status + return cmp.Equal(a.Conditions, b.Conditions, cmp.Comparer(func(c1, c2 apps.DeploymentCondition) bool { + return c1.Type == c2.Type && c1.Status == c2.Status + })) + }), + + // Treat a nil map or slice as empty. + cmpopts.EquateEmpty(), + + // Ignore frequently changing annotations of no interest. + cmpopts.IgnoreMapEntries(func(k, _ string) bool { + return k == AnnRestartedAt || k == deployment.RevisionAnnotation + }), + } +} + +func (w *watcher) watch(ix cache.SharedIndexInformer, ns string, hasValidController func(k8sapi.Workload) bool) error { + _, err := ix.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + if wl, ok := FromAny(obj); ok && ns == wl.GetNamespace() && !hasValidController(wl) { + w.handleEvent(WorkloadEvent{Type: EventTypeAdd, Workload: wl}) + } + }, + DeleteFunc: func(obj any) { + if wl, ok := FromAny(obj); ok { + if ns == wl.GetNamespace() && !hasValidController(wl) { + w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: wl}) + } + } else if dfsu, ok := obj.(*cache.DeletedFinalStateUnknown); ok { + if wl, ok = FromAny(dfsu.Obj); ok && ns == wl.GetNamespace() && !hasValidController(wl) { + w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: wl}) + } + } + }, + UpdateFunc: func(oldObj, newObj any) { + if wl, ok := FromAny(newObj); ok && ns == wl.GetNamespace() && !hasValidController(wl) { + if oldWl, ok := FromAny(oldObj); ok { + if cmp.Equal(wl, oldWl, compareOptions()...) { + return + } + // Replace the cmp.Equal above with this to view the changes that trigger an update: + // + // diff := cmp.Diff(wl, oldWl, compareOptions()...) + // if diff == "" { + // return + // } + // dlog.Debugf(ctx, "DIFF:\n%s", diff) + w.handleEvent(WorkloadEvent{Type: EventTypeUpdate, Workload: wl}) + } + } + }, + }) + return err +} + +func (w *watcher) addEventHandler(ctx context.Context, ns string) error { + kf := informer.GetFactory(ctx, ns) + hvc := func(wl k8sapi.Workload) bool { + return hasValidReplicasetOwner(wl, w.rolloutsEnabled) + } + + ai := kf.GetK8sInformerFactory().Apps().V1() + if err := w.watch(ai.Deployments().Informer(), ns, hvc); err != nil { + return err + } + if err := w.watch(ai.ReplicaSets().Informer(), ns, hvc); err != nil { + return err + } + if err := w.watch(ai.StatefulSets().Informer(), ns, hvc); err != nil { + return err + } + if !w.rolloutsEnabled { + dlog.Infof(ctx, "Argo Rollouts is disabled, Argo Rollouts will not be watched") + } else { + ri := kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() + if err := w.watch(ri.Rollouts().Informer(), ns, hvc); err != nil { + return err + } + } + return nil +} + +func (w *watcher) handleEvent(we WorkloadEvent) { + w.Lock() + w.events = append(w.events, we) + w.Unlock() + + // Defers sending until things been quiet for a while + w.timer.Reset(5 * time.Millisecond) +} diff --git a/rpc/manager/manager.pb.go b/rpc/manager/manager.pb.go index 3a7eb876f2..aede26124e 100644 --- a/rpc/manager/manager.pb.go +++ b/rpc/manager/manager.pb.go @@ -3522,6 +3522,7 @@ type WorkloadInfo struct { Kind WorkloadInfo_Kind `protobuf:"varint,1,opt,name=kind,proto3,enum=telepresence.manager.WorkloadInfo_Kind" json:"kind,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` Namespace string `protobuf:"bytes,3,opt,name=namespace,proto3" json:"namespace,omitempty"` + Uid string `protobuf:"bytes,7,opt,name=uid,proto3" json:"uid,omitempty"` AgentState WorkloadInfo_AgentState `protobuf:"varint,4,opt,name=agent_state,json=agentState,proto3,enum=telepresence.manager.WorkloadInfo_AgentState" json:"agent_state,omitempty"` InterceptClients []*WorkloadInfo_Intercept `protobuf:"bytes,5,rep,name=intercept_clients,json=interceptClients,proto3" json:"intercept_clients,omitempty"` State WorkloadInfo_State `protobuf:"varint,6,opt,name=state,proto3,enum=telepresence.manager.WorkloadInfo_State" json:"state,omitempty"` @@ -3580,6 +3581,13 @@ func (x *WorkloadInfo) GetNamespace() string { return "" } +func (x *WorkloadInfo) GetUid() string { + if x != nil { + return x.Uid + } + return "" +} + func (x *WorkloadInfo) GetAgentState() WorkloadInfo_AgentState { if x != nil { return x.AgentState @@ -3727,6 +3735,9 @@ type WorkloadEventsRequest struct { // The timestamp from which the first delta should be computed. Set to // undefined to get a delta that contains everything. Since *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=since,proto3" json:"since,omitempty"` + // The namespace to watch. Must be one of the namespaces that are + // managed by the traffic-manager. Defaults to the connected namespace. + Namespace string `protobuf:"bytes,3,opt,name=namespace,proto3" json:"namespace,omitempty"` } func (x *WorkloadEventsRequest) Reset() { @@ -3775,6 +3786,13 @@ func (x *WorkloadEventsRequest) GetSince() *timestamppb.Timestamp { return nil } +func (x *WorkloadEventsRequest) GetNamespace() string { + if x != nil { + return x.Namespace + } + return "" +} + // "Mechanisms" are the ways that an Agent can decide handle // incoming requests, and decide whether to send them to the // in-cluster service, or whether to intercept them. The "tcp" @@ -4457,7 +4475,7 @@ var file_manager_manager_proto_rawDesc = []byte{ 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0e, 0x32, 0x27, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x4b, 0x69, 0x6e, - 0x64, 0x52, 0x05, 0x6b, 0x69, 0x6e, 0x64, 0x73, 0x22, 0xfb, 0x04, 0x0a, 0x0c, 0x57, 0x6f, 0x72, + 0x64, 0x52, 0x05, 0x6b, 0x69, 0x6e, 0x64, 0x73, 0x22, 0x8d, 0x05, 0x0a, 0x0c, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x3b, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x27, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, @@ -4465,69 +4483,72 @@ var file_manager_manager_proto_rawDesc = []byte{ 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, - 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x4e, 0x0a, 0x0b, 0x61, 0x67, 0x65, 0x6e, - 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2d, 0x2e, - 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, - 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, - 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x0a, 0x61, 0x67, - 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x59, 0x0a, 0x11, 0x69, 0x6e, 0x74, 0x65, - 0x72, 0x63, 0x65, 0x70, 0x74, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x05, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, - 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, - 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, - 0x74, 0x52, 0x10, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, 0x74, 0x43, 0x6c, 0x69, 0x65, - 0x6e, 0x74, 0x73, 0x12, 0x3e, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x06, 0x20, 0x01, - 0x28, 0x0e, 0x32, 0x28, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, - 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, - 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, - 0x61, 0x74, 0x65, 0x1a, 0x23, 0x0a, 0x09, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, 0x74, - 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x06, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x22, 0x55, 0x0a, 0x04, 0x4b, 0x69, 0x6e, 0x64, - 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, - 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, 0x45, 0x4e, 0x54, 0x10, - 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x53, 0x45, 0x54, 0x10, - 0x02, 0x12, 0x0f, 0x0a, 0x0b, 0x53, 0x54, 0x41, 0x54, 0x45, 0x46, 0x55, 0x4c, 0x53, 0x45, 0x54, - 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x4f, 0x4c, 0x4c, 0x4f, 0x55, 0x54, 0x10, 0x04, 0x22, - 0x4d, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x17, 0x0a, 0x13, 0x55, 0x4e, 0x4b, 0x4e, - 0x4f, 0x57, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, - 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x41, 0x56, 0x41, 0x49, 0x4c, 0x41, 0x42, 0x4c, 0x45, 0x10, 0x01, - 0x12, 0x0f, 0x0a, 0x0b, 0x50, 0x52, 0x4f, 0x47, 0x52, 0x45, 0x53, 0x53, 0x49, 0x4e, 0x47, 0x10, - 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x03, 0x22, 0x46, - 0x0a, 0x0a, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x18, 0x0a, 0x14, - 0x4e, 0x4f, 0x5f, 0x41, 0x47, 0x45, 0x4e, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, - 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x49, 0x4e, 0x53, 0x54, 0x41, 0x4c, - 0x4c, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x43, 0x45, - 0x50, 0x54, 0x45, 0x44, 0x10, 0x02, 0x22, 0xc7, 0x01, 0x0a, 0x0d, 0x57, 0x6f, 0x72, 0x6b, 0x6c, - 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x3c, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x28, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, - 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, - 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x54, 0x79, 0x70, 0x65, - 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x3e, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, - 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, + 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x4e, 0x0a, 0x0b, 0x61, 0x67, + 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x2d, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, + 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, + 0x6e, 0x66, 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x0a, + 0x61, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x59, 0x0a, 0x11, 0x69, 0x6e, + 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, 0x74, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x18, + 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, + 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, + 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x63, + 0x65, 0x70, 0x74, 0x52, 0x10, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, 0x74, 0x43, 0x6c, + 0x69, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x3e, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x0e, 0x32, 0x28, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, + 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, + 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, + 0x73, 0x74, 0x61, 0x74, 0x65, 0x1a, 0x23, 0x0a, 0x09, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, + 0x70, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x22, 0x55, 0x0a, 0x04, 0x4b, 0x69, + 0x6e, 0x64, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, + 0x44, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, 0x45, 0x4e, + 0x54, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x53, 0x45, + 0x54, 0x10, 0x02, 0x12, 0x0f, 0x0a, 0x0b, 0x53, 0x54, 0x41, 0x54, 0x45, 0x46, 0x55, 0x4c, 0x53, + 0x45, 0x54, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x4f, 0x4c, 0x4c, 0x4f, 0x55, 0x54, 0x10, + 0x04, 0x22, 0x4d, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x17, 0x0a, 0x13, 0x55, 0x4e, + 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, + 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x41, 0x56, 0x41, 0x49, 0x4c, 0x41, 0x42, 0x4c, 0x45, + 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x50, 0x52, 0x4f, 0x47, 0x52, 0x45, 0x53, 0x53, 0x49, 0x4e, + 0x47, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x03, + 0x22, 0x46, 0x0a, 0x0a, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x18, + 0x0a, 0x14, 0x4e, 0x4f, 0x5f, 0x41, 0x47, 0x45, 0x4e, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, + 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x49, 0x4e, 0x53, 0x54, + 0x41, 0x4c, 0x4c, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x49, 0x4e, 0x54, 0x45, 0x52, + 0x43, 0x45, 0x50, 0x54, 0x45, 0x44, 0x10, 0x02, 0x22, 0xc7, 0x01, 0x0a, 0x0d, 0x57, 0x6f, 0x72, + 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x3c, 0x0a, 0x04, 0x74, 0x79, + 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x28, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, - 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x77, 0x6f, - 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x38, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x15, - 0x0a, 0x11, 0x41, 0x44, 0x44, 0x45, 0x44, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, - 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x4f, 0x44, 0x49, 0x46, 0x49, 0x45, - 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, - 0x22, 0x84, 0x01, 0x0a, 0x13, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x73, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x69, 0x6e, 0x63, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x52, 0x05, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x65, 0x6c, + 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x54, 0x79, + 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x3e, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, + 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, - 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, - 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x8f, 0x01, 0x0a, 0x15, 0x57, 0x6f, 0x72, 0x6b, - 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x44, 0x0a, 0x0c, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x6e, 0x66, - 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, - 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x53, - 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0b, 0x73, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x69, 0x6e, 0x63, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x52, 0x05, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x2a, 0xad, 0x01, 0x0a, 0x18, 0x49, 0x6e, + 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, + 0x77, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x38, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x15, 0x0a, 0x11, 0x41, 0x44, 0x44, 0x45, 0x44, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, + 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x4f, 0x44, 0x49, 0x46, + 0x49, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, + 0x10, 0x02, 0x22, 0x84, 0x01, 0x0a, 0x13, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x73, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x69, + 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x12, 0x3b, 0x0a, 0x06, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, + 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, + 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0xad, 0x01, 0x0a, 0x15, 0x57, 0x6f, + 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x44, 0x0a, 0x0c, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x69, + 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x74, 0x65, 0x6c, 0x65, + 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, + 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0b, 0x73, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x69, 0x6e, + 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x6e, + 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, + 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x2a, 0xad, 0x01, 0x0a, 0x18, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, 0x74, 0x44, 0x69, 0x73, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x43, 0x54, 0x49, 0x56, diff --git a/rpc/manager/manager.proto b/rpc/manager/manager.proto index 3ab2455eb0..61a382d6a7 100644 --- a/rpc/manager/manager.proto +++ b/rpc/manager/manager.proto @@ -631,6 +631,7 @@ message WorkloadInfo { Kind kind = 1; string name = 2; string namespace = 3; + string uid = 7; AgentState agent_state = 4; repeated Intercept intercept_clients = 5; @@ -667,6 +668,10 @@ message WorkloadEventsRequest { // The timestamp from which the first delta should be computed. Set to // undefined to get a delta that contains everything. google.protobuf.Timestamp since = 2; + + // The namespace to watch. Must be one of the namespaces that are + // managed by the traffic-manager. Defaults to the connected namespace. + string namespace = 3; } service Manager {