From 050fdf322f9d643f2a01fce69db14579b4151db4 Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Mon, 28 Oct 2024 15:57:22 +0100 Subject: [PATCH 1/6] Add compatibility.version Helm setting for test and debug This setting Controls the enablement of features more recent than the given version. In particular, it's intended to make newer grpc functions return an `Unimplemented` when set to a version where the function is not implemented. This setting is intended for test and debugging only. Signed-off-by: Thomas Hallgren --- charts/telepresence/templates/deployment.yaml | 6 +++ charts/telepresence/values.yaml | 9 ++++ .../cmd/manager/managerutil/envconfig.go | 41 ++++++++++++++----- cmd/traffic/cmd/manager/service.go | 21 +++++++++- 4 files changed, 65 insertions(+), 12 deletions(-) diff --git a/charts/telepresence/templates/deployment.yaml b/charts/telepresence/templates/deployment.yaml index 3b2c299559..676a587efe 100644 --- a/charts/telepresence/templates/deployment.yaml +++ b/charts/telepresence/templates/deployment.yaml @@ -242,6 +242,12 @@ spec: {{- end }} {{- end }} {{- end }} + {{- with .compatibility }} + {{- if .version }} + - name: COMPATIBILITY_VERSION + value: {{ .version }} + {{- end }} + {{- end }} {{- if and .trafficManager .trafficManager.envTemplate }} {{- template "traffic-manager-env" . }} {{- end }} diff --git a/charts/telepresence/values.yaml b/charts/telepresence/values.yaml index 0fbd2b8701..dd85245a98 100644 --- a/charts/telepresence/values.yaml +++ b/charts/telepresence/values.yaml @@ -349,3 +349,12 @@ client: workloads: argoRollouts: enabled: false + +# Use for testing only. +compatibility: + # Controls the enablement of features more recent than the given version. Only applicable + # for versions 2.18.0 and up, and only recognized by versions 2.21.0 and up. In other words, + # you can make a 2.21.0 version behave as far back as a 2.18.0 version, but you cannot + # alter the behavior of versions earlier than 2.21.0. + # + # version: 2.19.0 \ No newline at end of file diff --git a/cmd/traffic/cmd/manager/managerutil/envconfig.go b/cmd/traffic/cmd/manager/managerutil/envconfig.go index c261f5d08c..77cc0906e3 100644 --- a/cmd/traffic/cmd/manager/managerutil/envconfig.go +++ b/cmd/traffic/cmd/manager/managerutil/envconfig.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/blang/semver/v4" "github.com/go-json-experiment/json" core "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -70,6 +71,9 @@ type Env struct { ClientConnectionTTL time.Duration `env:"CLIENT_CONNECTION_TTL, parser=time.ParseDuration"` ArgoRolloutsEnabled bool `env:"ARGO_ROLLOUTS_ENABLED, parser=bool, default=false"` + + // For testing only + CompatibilityVersion *semver.Version `env:"COMPATIBILITY_VERSION, parser=version, default="` } func (e *Env) GeneratorConfig(qualifiedAgentImage string) (agentmap.GeneratorConfig, error) { @@ -116,7 +120,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return uint16(pn), err }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.SetUint(uint64(src.(uint16))) }, + Setter: func(dst reflect.Value, src any) { dst.SetUint(uint64(src.(uint16))) }, } fhs[reflect.TypeOf(k8sapi.AppProtocolStrategy(0))] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -124,7 +128,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return k8sapi.NewAppProtocolStrategy(str) }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.SetInt(int64(src.(k8sapi.AppProtocolStrategy))) }, + Setter: func(dst reflect.Value, src any) { dst.SetInt(int64(src.(k8sapi.AppProtocolStrategy))) }, } fhs[reflect.TypeOf(agentconfig.InjectPolicy(0))] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -132,7 +136,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return agentconfig.NewEnablePolicy(str) }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.SetInt(int64(src.(agentconfig.InjectPolicy))) }, + Setter: func(dst reflect.Value, src any) { dst.SetInt(int64(src.(agentconfig.InjectPolicy))) }, } fhs[reflect.TypeOf(resource.Quantity{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -140,7 +144,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return resource.ParseQuantity(str) }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.(resource.Quantity))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(resource.Quantity))) }, } fhs[reflect.TypeOf(netip.Addr{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -148,7 +152,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return netip.ParseAddr(str) }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.(netip.Addr))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(netip.Addr))) }, } fhs[reflect.TypeOf([]string{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -163,7 +167,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return ss, nil }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.([]string))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.([]string))) }, } fhs[reflect.TypeOf([]netip.Prefix{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -182,7 +186,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return ns, nil }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.([]netip.Prefix))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.([]netip.Prefix))) }, } fhs[reflect.TypeOf([]core.LocalObjectReference{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -197,7 +201,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return rr, nil }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.([]core.LocalObjectReference))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.([]core.LocalObjectReference))) }, } fhs[reflect.TypeOf(&core.ResourceRequirements{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -212,7 +216,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return rr, nil }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.(*core.ResourceRequirements))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(*core.ResourceRequirements))) }, } fhs[reflect.TypeOf(&core.SecurityContext{})] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -227,7 +231,7 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return rr, nil }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.(*core.SecurityContext))) }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(*core.SecurityContext))) }, } fhs[reflect.TypeOf(true)] = envconfig.FieldTypeHandler{ Parsers: map[string]func(string) (any, error){ @@ -235,7 +239,22 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { return strconv.ParseBool(str) }, }, - Setter: func(dst reflect.Value, src interface{}) { dst.SetBool(src.(bool)) }, + Setter: func(dst reflect.Value, src any) { dst.SetBool(src.(bool)) }, + } + fhs[reflect.TypeOf(&semver.Version{})] = envconfig.FieldTypeHandler{ + Parsers: map[string]func(string) (any, error){ + "version": func(str string) (any, error) { + if str == "" { + return nil, nil + } + v, err := semver.Parse(str) + if err != nil { + return nil, err + } + return &v, nil + }, + }, + Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(*semver.Version))) }, } return fhs } diff --git a/cmd/traffic/cmd/manager/service.go b/cmd/traffic/cmd/manager/service.go index 2f7715c0b7..1a9ae1ae15 100644 --- a/cmd/traffic/cmd/manager/service.go +++ b/cmd/traffic/cmd/manager/service.go @@ -2,10 +2,12 @@ package manager import ( "context" + "fmt" "sort" "strings" "time" + "github.com/blang/semver/v4" "github.com/google/uuid" dns2 "github.com/miekg/dns" "go.opentelemetry.io/otel/trace" @@ -78,6 +80,15 @@ func (wall) Now() time.Time { return time.Now() } +// checkCompat checks if a CompatibilityVersion has been set for this traffic-manager, and if so, errors with +// an Unimplemented error mentioning the given name if it is less than the required version. +func checkCompat(ctx context.Context, name, requiredVersion string) error { + if cv := managerutil.GetEnv(ctx).CompatibilityVersion; cv != nil && cv.Compare(semver.MustParse(requiredVersion)) < 0 { + return status.Error(codes.Unimplemented, fmt.Sprintf("traffic manager of version %s does not implement %s", cv, name)) + } + return nil +} + func NewService(ctx context.Context) (Service, *dgroup.Group, error) { ret := &service{ clock: wall{}, @@ -567,6 +578,9 @@ func (s *service) PrepareIntercept(ctx context.Context, request *rpc.CreateInter } func (s *service) GetKnownWorkloadKinds(ctx context.Context, request *rpc.SessionInfo) (*rpc.KnownWorkloadKinds, error) { + if err := checkCompat(ctx, "GetKnownWorkloadKinds", "2.20.0"); err != nil { + return nil, err + } ctx = managerutil.WithSessionInfo(ctx, request) dlog.Debugf(ctx, "GetKnownWorkloadKinds called") kinds := []rpc.WorkloadInfo_Kind{rpc.WorkloadInfo_DEPLOYMENT, rpc.WorkloadInfo_REPLICASET, rpc.WorkloadInfo_STATEFULSET} @@ -939,7 +953,12 @@ func (s *service) WatchClusterInfo(session *rpc.SessionInfo, stream rpc.Manager_ } func (s *service) WatchWorkloads(request *rpc.WorkloadEventsRequest, stream rpc.Manager_WatchWorkloadsServer) (err error) { - ctx := managerutil.WithSessionInfo(stream.Context(), request.SessionInfo) + ctx := stream.Context() + // Dysfunctional prior to 2.21.0 because no initial snapshot was sent. + if err := checkCompat(ctx, "WatchWorkloads", "2.21.0-alpha.4"); err != nil { + return err + } + ctx = managerutil.WithSessionInfo(ctx, request.SessionInfo) defer func() { if r := recover(); r != nil { err = derror.PanicToError(r) From ccd496c11b77f1aa1a713ffe1903f8bacf333145 Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Mon, 28 Oct 2024 16:04:30 +0100 Subject: [PATCH 2/6] Add namespace to WorkloadEventsRequest and UID to the WorkloadInfo Signed-off-by: Thomas Hallgren --- cmd/traffic/cmd/manager/service.go | 12 ++- rpc/manager/manager.pb.go | 145 +++++++++++++++++------------ rpc/manager/manager.proto | 5 + 3 files changed, 96 insertions(+), 66 deletions(-) diff --git a/cmd/traffic/cmd/manager/service.go b/cmd/traffic/cmd/manager/service.go index 1a9ae1ae15..f2b09f1d72 100644 --- a/cmd/traffic/cmd/manager/service.go +++ b/cmd/traffic/cmd/manager/service.go @@ -973,11 +973,15 @@ func (s *service) WatchWorkloads(request *rpc.WorkloadEventsRequest, stream rpc. return status.Error(codes.InvalidArgument, "SessionInfo is required") } clientSession := request.SessionInfo.SessionId - clientInfo := s.state.GetClient(clientSession) - if clientInfo == nil { - return status.Errorf(codes.NotFound, "Client session %q not found", clientSession) + namespace := request.Namespace + if namespace == "" { + clientInfo := s.state.GetClient(clientSession) + if clientInfo == nil { + return status.Errorf(codes.NotFound, "Client session %q not found", clientSession) + } + namespace = clientInfo.Namespace } - ww := s.state.NewWorkloadInfoWatcher(clientSession, clientInfo.Namespace) + ww := s.state.NewWorkloadInfoWatcher(clientSession, namespace) return ww.Watch(ctx, stream) } diff --git a/rpc/manager/manager.pb.go b/rpc/manager/manager.pb.go index 3a7eb876f2..aede26124e 100644 --- a/rpc/manager/manager.pb.go +++ b/rpc/manager/manager.pb.go @@ -3522,6 +3522,7 @@ type WorkloadInfo struct { Kind WorkloadInfo_Kind `protobuf:"varint,1,opt,name=kind,proto3,enum=telepresence.manager.WorkloadInfo_Kind" json:"kind,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` Namespace string `protobuf:"bytes,3,opt,name=namespace,proto3" json:"namespace,omitempty"` + Uid string `protobuf:"bytes,7,opt,name=uid,proto3" json:"uid,omitempty"` AgentState WorkloadInfo_AgentState `protobuf:"varint,4,opt,name=agent_state,json=agentState,proto3,enum=telepresence.manager.WorkloadInfo_AgentState" json:"agent_state,omitempty"` InterceptClients []*WorkloadInfo_Intercept `protobuf:"bytes,5,rep,name=intercept_clients,json=interceptClients,proto3" json:"intercept_clients,omitempty"` State WorkloadInfo_State `protobuf:"varint,6,opt,name=state,proto3,enum=telepresence.manager.WorkloadInfo_State" json:"state,omitempty"` @@ -3580,6 +3581,13 @@ func (x *WorkloadInfo) GetNamespace() string { return "" } +func (x *WorkloadInfo) GetUid() string { + if x != nil { + return x.Uid + } + return "" +} + func (x *WorkloadInfo) GetAgentState() WorkloadInfo_AgentState { if x != nil { return x.AgentState @@ -3727,6 +3735,9 @@ type WorkloadEventsRequest struct { // The timestamp from which the first delta should be computed. Set to // undefined to get a delta that contains everything. Since *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=since,proto3" json:"since,omitempty"` + // The namespace to watch. Must be one of the namespaces that are + // managed by the traffic-manager. Defaults to the connected namespace. + Namespace string `protobuf:"bytes,3,opt,name=namespace,proto3" json:"namespace,omitempty"` } func (x *WorkloadEventsRequest) Reset() { @@ -3775,6 +3786,13 @@ func (x *WorkloadEventsRequest) GetSince() *timestamppb.Timestamp { return nil } +func (x *WorkloadEventsRequest) GetNamespace() string { + if x != nil { + return x.Namespace + } + return "" +} + // "Mechanisms" are the ways that an Agent can decide handle // incoming requests, and decide whether to send them to the // in-cluster service, or whether to intercept them. The "tcp" @@ -4457,7 +4475,7 @@ var file_manager_manager_proto_rawDesc = []byte{ 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0e, 0x32, 0x27, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x4b, 0x69, 0x6e, - 0x64, 0x52, 0x05, 0x6b, 0x69, 0x6e, 0x64, 0x73, 0x22, 0xfb, 0x04, 0x0a, 0x0c, 0x57, 0x6f, 0x72, + 0x64, 0x52, 0x05, 0x6b, 0x69, 0x6e, 0x64, 0x73, 0x22, 0x8d, 0x05, 0x0a, 0x0c, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x3b, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x27, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, @@ -4465,69 +4483,72 @@ var file_manager_manager_proto_rawDesc = []byte{ 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, - 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x4e, 0x0a, 0x0b, 0x61, 0x67, 0x65, 0x6e, - 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2d, 0x2e, - 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, - 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, - 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x0a, 0x61, 0x67, - 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x59, 0x0a, 0x11, 0x69, 0x6e, 0x74, 0x65, - 0x72, 0x63, 0x65, 0x70, 0x74, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x05, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, - 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, - 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, - 0x74, 0x52, 0x10, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, 0x74, 0x43, 0x6c, 0x69, 0x65, - 0x6e, 0x74, 0x73, 0x12, 0x3e, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x06, 0x20, 0x01, - 0x28, 0x0e, 0x32, 0x28, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, - 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, - 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, - 0x61, 0x74, 0x65, 0x1a, 0x23, 0x0a, 0x09, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, 0x74, - 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x06, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x22, 0x55, 0x0a, 0x04, 0x4b, 0x69, 0x6e, 0x64, - 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, - 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, 0x45, 0x4e, 0x54, 0x10, - 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x53, 0x45, 0x54, 0x10, - 0x02, 0x12, 0x0f, 0x0a, 0x0b, 0x53, 0x54, 0x41, 0x54, 0x45, 0x46, 0x55, 0x4c, 0x53, 0x45, 0x54, - 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x4f, 0x4c, 0x4c, 0x4f, 0x55, 0x54, 0x10, 0x04, 0x22, - 0x4d, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x17, 0x0a, 0x13, 0x55, 0x4e, 0x4b, 0x4e, - 0x4f, 0x57, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, - 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x41, 0x56, 0x41, 0x49, 0x4c, 0x41, 0x42, 0x4c, 0x45, 0x10, 0x01, - 0x12, 0x0f, 0x0a, 0x0b, 0x50, 0x52, 0x4f, 0x47, 0x52, 0x45, 0x53, 0x53, 0x49, 0x4e, 0x47, 0x10, - 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x03, 0x22, 0x46, - 0x0a, 0x0a, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x18, 0x0a, 0x14, - 0x4e, 0x4f, 0x5f, 0x41, 0x47, 0x45, 0x4e, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, - 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x49, 0x4e, 0x53, 0x54, 0x41, 0x4c, - 0x4c, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x43, 0x45, - 0x50, 0x54, 0x45, 0x44, 0x10, 0x02, 0x22, 0xc7, 0x01, 0x0a, 0x0d, 0x57, 0x6f, 0x72, 0x6b, 0x6c, - 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x3c, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x28, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, - 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, - 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x54, 0x79, 0x70, 0x65, - 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x3e, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, - 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, + 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x4e, 0x0a, 0x0b, 0x61, 0x67, + 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x2d, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, + 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, + 0x6e, 0x66, 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x0a, + 0x61, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x59, 0x0a, 0x11, 0x69, 0x6e, + 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, 0x74, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x73, 0x18, + 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, + 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, + 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x63, + 0x65, 0x70, 0x74, 0x52, 0x10, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, 0x74, 0x43, 0x6c, + 0x69, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x3e, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x0e, 0x32, 0x28, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, + 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, + 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, + 0x73, 0x74, 0x61, 0x74, 0x65, 0x1a, 0x23, 0x0a, 0x09, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, + 0x70, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x22, 0x55, 0x0a, 0x04, 0x4b, 0x69, + 0x6e, 0x64, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, + 0x44, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, 0x45, 0x4e, + 0x54, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x53, 0x45, + 0x54, 0x10, 0x02, 0x12, 0x0f, 0x0a, 0x0b, 0x53, 0x54, 0x41, 0x54, 0x45, 0x46, 0x55, 0x4c, 0x53, + 0x45, 0x54, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x4f, 0x4c, 0x4c, 0x4f, 0x55, 0x54, 0x10, + 0x04, 0x22, 0x4d, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x17, 0x0a, 0x13, 0x55, 0x4e, + 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, + 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x41, 0x56, 0x41, 0x49, 0x4c, 0x41, 0x42, 0x4c, 0x45, + 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x50, 0x52, 0x4f, 0x47, 0x52, 0x45, 0x53, 0x53, 0x49, 0x4e, + 0x47, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x03, + 0x22, 0x46, 0x0a, 0x0a, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x18, + 0x0a, 0x14, 0x4e, 0x4f, 0x5f, 0x41, 0x47, 0x45, 0x4e, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, + 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x49, 0x4e, 0x53, 0x54, + 0x41, 0x4c, 0x4c, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x49, 0x4e, 0x54, 0x45, 0x52, + 0x43, 0x45, 0x50, 0x54, 0x45, 0x44, 0x10, 0x02, 0x22, 0xc7, 0x01, 0x0a, 0x0d, 0x57, 0x6f, 0x72, + 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x3c, 0x0a, 0x04, 0x74, 0x79, + 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x28, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, - 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x77, 0x6f, - 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x38, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x15, - 0x0a, 0x11, 0x41, 0x44, 0x44, 0x45, 0x44, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, - 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x4f, 0x44, 0x49, 0x46, 0x49, 0x45, - 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, - 0x22, 0x84, 0x01, 0x0a, 0x13, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x73, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x69, 0x6e, 0x63, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x52, 0x05, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x65, 0x6c, + 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x54, 0x79, + 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x3e, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, + 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, - 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, - 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x8f, 0x01, 0x0a, 0x15, 0x57, 0x6f, 0x72, 0x6b, - 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x44, 0x0a, 0x0c, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x6e, 0x66, - 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x72, - 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x53, - 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0b, 0x73, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x69, 0x6e, 0x63, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x52, 0x05, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x2a, 0xad, 0x01, 0x0a, 0x18, 0x49, 0x6e, + 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, + 0x77, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x38, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x15, 0x0a, 0x11, 0x41, 0x44, 0x44, 0x45, 0x44, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, + 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x4f, 0x44, 0x49, 0x46, + 0x49, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, + 0x10, 0x02, 0x22, 0x84, 0x01, 0x0a, 0x13, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x73, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x69, + 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x12, 0x3b, 0x0a, 0x06, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, + 0x65, 0x6c, 0x65, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, + 0x67, 0x65, 0x72, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0xad, 0x01, 0x0a, 0x15, 0x57, 0x6f, + 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x44, 0x0a, 0x0c, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x69, + 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x74, 0x65, 0x6c, 0x65, + 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, + 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0b, 0x73, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x69, 0x6e, + 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x6e, + 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, + 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x2a, 0xad, 0x01, 0x0a, 0x18, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x63, 0x65, 0x70, 0x74, 0x44, 0x69, 0x73, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x43, 0x54, 0x49, 0x56, diff --git a/rpc/manager/manager.proto b/rpc/manager/manager.proto index 3ab2455eb0..61a382d6a7 100644 --- a/rpc/manager/manager.proto +++ b/rpc/manager/manager.proto @@ -631,6 +631,7 @@ message WorkloadInfo { Kind kind = 1; string name = 2; string namespace = 3; + string uid = 7; AgentState agent_state = 4; repeated Intercept intercept_clients = 5; @@ -667,6 +668,10 @@ message WorkloadEventsRequest { // The timestamp from which the first delta should be computed. Set to // undefined to get a delta that contains everything. google.protobuf.Timestamp since = 2; + + // The namespace to watch. Must be one of the namespaces that are + // managed by the traffic-manager. Defaults to the connected namespace. + string namespace = 3; } service Manager { From 9d1d8598837c7a6c40a6ed5ba7c8237fc667ccd2 Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Mon, 28 Oct 2024 16:06:03 +0100 Subject: [PATCH 3/6] Move the WorkloadWatcher to a location shared by manager and client. Signed-off-by: Thomas Hallgren --- cmd/traffic/cmd/manager/mutator/constants.go | 12 +- cmd/traffic/cmd/manager/mutator/watcher.go | 15 +- .../cmd/manager/mutator/workload_watcher.go | 115 +------- cmd/traffic/cmd/manager/state/state.go | 13 +- .../manager/state/workload_info_watcher.go | 24 +- cmd/traffic/cmd/manager/state/workloads.go | 197 ------------- pkg/workload/informers.go | 106 +++++++ pkg/workload/util.go | 25 ++ pkg/workload/watcher.go | 271 ++++++++++++++++++ 9 files changed, 447 insertions(+), 331 deletions(-) delete mode 100644 cmd/traffic/cmd/manager/state/workloads.go create mode 100644 pkg/workload/informers.go create mode 100644 pkg/workload/util.go create mode 100644 pkg/workload/watcher.go diff --git a/cmd/traffic/cmd/manager/mutator/constants.go b/cmd/traffic/cmd/manager/mutator/constants.go index 616a15284f..7ba9fbf1ff 100644 --- a/cmd/traffic/cmd/manager/mutator/constants.go +++ b/cmd/traffic/cmd/manager/mutator/constants.go @@ -1,10 +1,12 @@ package mutator -import "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" +import ( + "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" + "github.com/telepresenceio/telepresence/v2/pkg/workload" +) const ( - DomainPrefix = "telepresence.getambassador.io/" - InjectAnnotation = DomainPrefix + "inject-" + agentconfig.ContainerName - ServiceNameAnnotation = DomainPrefix + "inject-service-name" - ManualInjectAnnotation = DomainPrefix + "manually-injected" + InjectAnnotation = workload.DomainPrefix + "inject-" + agentconfig.ContainerName + ServiceNameAnnotation = workload.DomainPrefix + "inject-service-name" + ManualInjectAnnotation = workload.DomainPrefix + "manually-injected" ) diff --git a/cmd/traffic/cmd/manager/mutator/watcher.go b/cmd/traffic/cmd/manager/mutator/watcher.go index 315b259229..4bf15c034f 100644 --- a/cmd/traffic/cmd/manager/mutator/watcher.go +++ b/cmd/traffic/cmd/manager/mutator/watcher.go @@ -33,6 +33,7 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/agentmap" "github.com/telepresenceio/telepresence/v2/pkg/informer" "github.com/telepresenceio/telepresence/v2/pkg/tracing" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) type Map interface { @@ -229,8 +230,6 @@ func isRolloutNeededForPod(ctx context.Context, ac *agentconfig.Sidecar, name, n return "" } -const AnnRestartedAt = DomainPrefix + "restartedAt" - func (c *configWatcher) triggerRollout(ctx context.Context, wl k8sapi.Workload, ac *agentconfig.Sidecar) { lck := c.getRolloutLock(wl) if !lck.TryLock() { @@ -269,10 +268,10 @@ func generateRestartAnnotationPatch(podTemplate *core.PodTemplateSpec) string { basePointer := "/spec/template/metadata/annotations" pointer := fmt.Sprintf( basePointer+"/%s", - strings.ReplaceAll(AnnRestartedAt, "/", "~1"), + strings.ReplaceAll(workload.AnnRestartedAt, "/", "~1"), ) - if _, ok := podTemplate.Annotations[AnnRestartedAt]; ok { + if _, ok := podTemplate.Annotations[workload.AnnRestartedAt]; ok { return fmt.Sprintf( `[{"op": "replace", "path": "%s", "value": "%s"}]`, pointer, time.Now().Format(time.RFC3339), ) @@ -833,9 +832,9 @@ func (c *configWatcher) Start(ctx context.Context) { for i, ns := range nss { c.cms[i] = c.startConfigMap(ctx, ns) c.svs[i] = c.startServices(ctx, ns) - c.dps[i] = c.startDeployments(ctx, ns) - c.rss[i] = c.startReplicaSets(ctx, ns) - c.sss[i] = c.startStatefulSets(ctx, ns) + c.dps[i] = workload.StartDeployments(ctx, ns) + c.rss[i] = workload.StartReplicaSets(ctx, ns) + c.sss[i] = workload.StartStatefulSets(ctx, ns) c.startPods(ctx, ns) kf := informer.GetK8sFactory(ctx, ns) kf.Start(ctx.Done()) @@ -844,7 +843,7 @@ func (c *configWatcher) Start(ctx context.Context) { if managerutil.ArgoRolloutsEnabled(ctx) { c.rls = make([]cache.SharedIndexInformer, len(nss)) for i, ns := range nss { - c.rls[i] = c.startRollouts(ctx, ns) + c.rls[i] = workload.StartRollouts(ctx, ns) rf := informer.GetArgoRolloutsFactory(ctx, ns) rf.Start(ctx.Done()) rf.WaitForCacheSync(ctx.Done()) diff --git a/cmd/traffic/cmd/manager/mutator/workload_watcher.go b/cmd/traffic/cmd/manager/mutator/workload_watcher.go index 56512286fd..3f84fa1514 100644 --- a/cmd/traffic/cmd/manager/mutator/workload_watcher.go +++ b/cmd/traffic/cmd/manager/mutator/workload_watcher.go @@ -6,138 +6,39 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - apps "k8s.io/api/apps/v1" - core "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" - argorollouts "github.com/datawire/argo-rollouts-go-client/pkg/apis/rollouts/v1alpha1" "github.com/datawire/dlib/dlog" "github.com/datawire/k8sapi/pkg/k8sapi" "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/managerutil" "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" "github.com/telepresenceio/telepresence/v2/pkg/agentmap" - "github.com/telepresenceio/telepresence/v2/pkg/informer" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) -func (c *configWatcher) startDeployments(ctx context.Context, ns string) cache.SharedIndexInformer { - f := informer.GetK8sFactory(ctx, ns) - ix := f.Apps().V1().Deployments().Informer() - _ = ix.SetTransform(func(o any) (any, error) { - // Strip the parts of the deployment that we don't care about to save memory - if dep, ok := o.(*apps.Deployment); ok { - om := &dep.ObjectMeta - if an := om.Annotations; an != nil { - delete(an, core.LastAppliedConfigAnnotation) - } - dep.ManagedFields = nil - dep.Finalizers = nil - dep.OwnerReferences = nil - } - return o, nil - }) - _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { - dlog.Errorf(ctx, "watcher for Deployments %s: %v", whereWeWatch(ns), err) - }) - return ix -} - -func (c *configWatcher) startReplicaSets(ctx context.Context, ns string) cache.SharedIndexInformer { - f := informer.GetK8sFactory(ctx, ns) - ix := f.Apps().V1().ReplicaSets().Informer() - _ = ix.SetTransform(func(o any) (any, error) { - // Strip the parts of the replicaset that we don't care about. Saves memory - if dep, ok := o.(*apps.ReplicaSet); ok { - om := &dep.ObjectMeta - if an := om.Annotations; an != nil { - delete(an, core.LastAppliedConfigAnnotation) - } - dep.ManagedFields = nil - dep.Finalizers = nil - } - return o, nil - }) - _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { - dlog.Errorf(ctx, "watcher for ReplicaSets %s: %v", whereWeWatch(ns), err) - }) - return ix -} - -func (c *configWatcher) startStatefulSets(ctx context.Context, ns string) cache.SharedIndexInformer { - f := informer.GetK8sFactory(ctx, ns) - ix := f.Apps().V1().StatefulSets().Informer() - _ = ix.SetTransform(func(o any) (any, error) { - // Strip the parts of the stateful that we don't care about. Saves memory - if dep, ok := o.(*apps.StatefulSet); ok { - om := &dep.ObjectMeta - if an := om.Annotations; an != nil { - delete(an, core.LastAppliedConfigAnnotation) - } - dep.ManagedFields = nil - dep.Finalizers = nil - } - return o, nil - }) - _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { - dlog.Errorf(ctx, "watcher for StatefulSet %s: %v", whereWeWatch(ns), err) - }) - return ix -} - -func (c *configWatcher) startRollouts(ctx context.Context, ns string) cache.SharedIndexInformer { - f := informer.GetArgoRolloutsFactory(ctx, ns) - dlog.Infof(ctx, "Watching Rollouts in %s", ns) - ix := f.Argoproj().V1alpha1().Rollouts().Informer() - _ = ix.SetTransform(func(o any) (any, error) { - // Strip the parts of the rollout that we don't care about. Saves memory - if dep, ok := o.(*argorollouts.Rollout); ok { - om := &dep.ObjectMeta - if an := om.Annotations; an != nil { - delete(an, core.LastAppliedConfigAnnotation) - } - dep.ManagedFields = nil - dep.Finalizers = nil - } - return o, nil - }) - _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { - dlog.Errorf(ctx, "watcher for Rollouts %s: %v", whereWeWatch(ns), err) - }) - return ix -} - -func WorkloadFromAny(obj any) (k8sapi.Workload, bool) { - if ro, ok := obj.(runtime.Object); ok { - if wl, err := k8sapi.WrapWorkload(ro); err == nil { - return wl, true - } - } - return nil, false -} - func (c *configWatcher) watchWorkloads(ctx context.Context, ix cache.SharedIndexInformer) error { _, err := ix.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { - if wl, ok := WorkloadFromAny(obj); ok && len(wl.GetOwnerReferences()) == 0 { + if wl, ok := workload.FromAny(obj); ok && len(wl.GetOwnerReferences()) == 0 { c.updateWorkload(ctx, wl, nil, GetWorkloadState(wl)) } }, DeleteFunc: func(obj any) { - if wl, ok := WorkloadFromAny(obj); ok { + if wl, ok := workload.FromAny(obj); ok { if len(wl.GetOwnerReferences()) == 0 { c.deleteWorkload(ctx, wl) } } else if dfsu, ok := obj.(*cache.DeletedFinalStateUnknown); ok { - if wl, ok = WorkloadFromAny(dfsu.Obj); ok && len(wl.GetOwnerReferences()) == 0 { + if wl, ok = workload.FromAny(dfsu.Obj); ok && len(wl.GetOwnerReferences()) == 0 { c.deleteWorkload(ctx, wl) } } }, UpdateFunc: func(oldObj, newObj any) { - if wl, ok := WorkloadFromAny(newObj); ok && len(wl.GetOwnerReferences()) == 0 { - if oldWl, ok := WorkloadFromAny(oldObj); ok { + if wl, ok := workload.FromAny(newObj); ok && len(wl.GetOwnerReferences()) == 0 { + if oldWl, ok := workload.FromAny(oldObj); ok { c.updateWorkload(ctx, wl, oldWl, GetWorkloadState(wl)) } } @@ -163,14 +64,14 @@ func (c *configWatcher) updateWorkload(ctx context.Context, wl, oldWl k8sapi.Wor return } tpl := wl.GetPodTemplate() - ia, ok := tpl.Annotations[InjectAnnotation] + ia, ok := tpl.Annotations[workload.InjectAnnotation] if !ok { return } if oldWl != nil && cmp.Equal(oldWl.GetPodTemplate(), tpl, cmpopts.IgnoreFields(meta.ObjectMeta{}, "Namespace", "UID", "ResourceVersion", "CreationTimestamp", "DeletionTimestamp"), cmpopts.IgnoreMapEntries(func(k, _ string) bool { - return k == AnnRestartedAt + return k == workload.AnnRestartedAt })) { return } diff --git a/cmd/traffic/cmd/manager/state/state.go b/cmd/traffic/cmd/manager/state/state.go index 1377081fa8..e245401118 100644 --- a/cmd/traffic/cmd/manager/state/state.go +++ b/cmd/traffic/cmd/manager/state/state.go @@ -32,6 +32,7 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/log" "github.com/telepresenceio/telepresence/v2/pkg/tracing" "github.com/telepresenceio/telepresence/v2/pkg/tunnel" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) type State interface { @@ -90,7 +91,7 @@ type State interface { WatchAgents(context.Context, func(sessionID string, agent *rpc.AgentInfo) bool) <-chan watchable.Snapshot[*rpc.AgentInfo] WatchDial(sessionID string) <-chan *rpc.DialRequest WatchIntercepts(context.Context, func(sessionID string, intercept *rpc.InterceptInfo) bool) <-chan watchable.Snapshot[*rpc.InterceptInfo] - WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []WorkloadEvent, err error) + WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []workload.WorkloadEvent, err error) WatchLookupDNS(string) <-chan *rpc.DNSRequest ValidateCreateAgent(context.Context, k8sapi.Workload, agentconfig.SidecarExt) error NewWorkloadInfoWatcher(clientSession, namespace string) WorkloadInfoWatcher @@ -135,7 +136,7 @@ type state struct { interceptStates *xsync.MapOf[string, *interceptState] timedLogLevel log.TimedLevel llSubs *loglevelSubscribers - workloadWatchers *xsync.MapOf[string, WorkloadWatcher] // workload watchers, created on demand and keyed by namespace + workloadWatchers *xsync.MapOf[string, workload.Watcher] // workload watchers, created on demand and keyed by namespace tunnelCounter int32 tunnelIngressCounter uint64 tunnelEgressCounter uint64 @@ -157,7 +158,7 @@ func NewState(ctx context.Context) State { sessions: xsync.NewMapOf[string, SessionState](), agentsByName: xsync.NewMapOf[string, *xsync.MapOf[string, *rpc.AgentInfo]](), interceptStates: xsync.NewMapOf[string, *interceptState](), - workloadWatchers: xsync.NewMapOf[string, WorkloadWatcher](), + workloadWatchers: xsync.NewMapOf[string, workload.Watcher](), timedLogLevel: log.NewTimedLevel(loglevel, log.SetLevel), llSubs: newLoglevelSubscribers(), } @@ -487,14 +488,14 @@ func (s *state) WatchAgents( } } -func (s *state) WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []WorkloadEvent, err error) { +func (s *state) WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan []workload.WorkloadEvent, err error) { client := s.GetClient(sessionID) if client == nil { return nil, status.Errorf(codes.NotFound, "session %q not found", sessionID) } ns := client.Namespace - ww, _ := s.workloadWatchers.LoadOrCompute(ns, func() (ww WorkloadWatcher) { - ww, err = NewWorkloadWatcher(s.backgroundCtx, ns) + ww, _ := s.workloadWatchers.LoadOrCompute(ns, func() (ww workload.Watcher) { + ww, err = workload.NewWatcher(s.backgroundCtx, ns, managerutil.ArgoRolloutsEnabled(ctx)) return ww }) if err != nil { diff --git a/cmd/traffic/cmd/manager/state/workload_info_watcher.go b/cmd/traffic/cmd/manager/state/workload_info_watcher.go index e16629f29d..d68d6896b7 100644 --- a/cmd/traffic/cmd/manager/state/workload_info_watcher.go +++ b/cmd/traffic/cmd/manager/state/workload_info_watcher.go @@ -15,6 +15,7 @@ import ( rpc "github.com/telepresenceio/telepresence/rpc/v2/manager" "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/mutator" "github.com/telepresenceio/telepresence/v2/pkg/agentmap" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) type WorkloadInfoWatcher interface { @@ -187,13 +188,20 @@ func rpcWorkload(wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState, iClients [] Kind: rpcKind(wl.GetKind()), Name: wl.GetName(), Namespace: wl.GetNamespace(), + Uid: string(wl.GetUID()), State: rpcWorkloadState(mutator.GetWorkloadState(wl)), AgentState: as, InterceptClients: iClients, } } -func (wf *workloadInfoWatcher) addEvent(ctx context.Context, eventType EventType, wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState, iClients []*rpc.WorkloadInfo_Intercept) { +func (wf *workloadInfoWatcher) addEvent( + ctx context.Context, + eventType workload.EventType, + wl k8sapi.Workload, + as rpc.WorkloadInfo_AgentState, + iClients []*rpc.WorkloadInfo_Intercept, +) { wf.workloadEvents[wl.GetName()] = &rpc.WorkloadEvent{ Type: rpc.WorkloadEvent_Type(eventType), Workload: rpcWorkload(wl, as, iClients), @@ -201,11 +209,11 @@ func (wf *workloadInfoWatcher) addEvent(ctx context.Context, eventType EventType wf.sendEvents(ctx) } -func (wf *workloadInfoWatcher) handleWorkloadsSnapshot(ctx context.Context, wes []WorkloadEvent) { +func (wf *workloadInfoWatcher) handleWorkloadsSnapshot(ctx context.Context, wes []workload.WorkloadEvent) { for _, we := range wes { wl := we.Workload if w, ok := wf.workloadEvents[wl.GetName()]; ok { - if we.Type == EventTypeDelete && w.Type != rpc.WorkloadEvent_DELETED { + if we.Type == workload.EventTypeDelete && w.Type != rpc.WorkloadEvent_DELETED { w.Type = rpc.WorkloadEvent_DELETED dlog.Debugf(ctx, "WorkloadInfoEvent: Workload %s %s %s.%s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace()) wf.resetTicker() @@ -224,7 +232,7 @@ func (wf *workloadInfoWatcher) handleWorkloadsSnapshot(ctx context.Context, wes // If we've sent an ADDED event for this workload, and this is a MODIFIED event without any changes that // we care about, then just skip it. - if we.Type == EventTypeUpdate { + if we.Type == workload.EventTypeUpdate { lew, ok := wf.lastEvents[wl.GetName()] if ok && (lew.Type == rpc.WorkloadEvent_ADDED_UNSPECIFIED || lew.Type == rpc.WorkloadEvent_MODIFIED) && proto.Equal(lew.Workload, rpcWorkload(we.Workload, as, iClients)) { @@ -252,7 +260,7 @@ func (wf *workloadInfoWatcher) handleAgentSnapshot(ctx context.Context, ais map[ wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil { - wf.addEvent(ctx, EventTypeUpdate, wl, as, nil) + wf.addEvent(ctx, workload.EventTypeUpdate, wl, as, nil) } else { dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err) if errors.IsNotFound(err) { @@ -286,7 +294,7 @@ func (wf *workloadInfoWatcher) handleAgentSnapshot(ctx context.Context, ais map[ wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil { - wf.addEvent(ctx, EventTypeUpdate, wl, as, iClients) + wf.addEvent(ctx, workload.EventTypeUpdate, wl, as, iClients) } else { dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err) } @@ -308,7 +316,7 @@ func (wf *workloadInfoWatcher) handleInterceptSnapshot(ctx context.Context, iis wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, wf.namespace, ""); err == nil { - wf.addEvent(ctx, EventTypeUpdate, wl, as, nil) + wf.addEvent(ctx, workload.EventTypeUpdate, wl, as, nil) } } } @@ -332,7 +340,7 @@ func (wf *workloadInfoWatcher) handleInterceptSnapshot(ctx context.Context, iis wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, wf.namespace, ""); err == nil { - wf.addEvent(ctx, EventTypeUpdate, wl, as, iClients) + wf.addEvent(ctx, workload.EventTypeUpdate, wl, as, iClients) } } } diff --git a/cmd/traffic/cmd/manager/state/workloads.go b/cmd/traffic/cmd/manager/state/workloads.go deleted file mode 100644 index fee584e474..0000000000 --- a/cmd/traffic/cmd/manager/state/workloads.go +++ /dev/null @@ -1,197 +0,0 @@ -package state - -import ( - "context" - "math" - "sync" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/google/uuid" - apps "k8s.io/api/apps/v1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/kubectl/pkg/util/deployment" - - "github.com/datawire/dlib/dlog" - "github.com/datawire/k8sapi/pkg/k8sapi" - "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/managerutil" - "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/mutator" - "github.com/telepresenceio/telepresence/v2/pkg/informer" -) - -type EventType int - -const ( - EventTypeAdd = iota - EventTypeUpdate - EventTypeDelete -) - -type WorkloadEvent struct { - Type EventType - Workload k8sapi.Workload -} - -func (e EventType) String() string { - switch e { - case EventTypeAdd: - return "add" - case EventTypeUpdate: - return "update" - case EventTypeDelete: - return "delete" - default: - return "unknown" - } -} - -type WorkloadWatcher interface { - Subscribe(ctx context.Context) <-chan []WorkloadEvent -} - -type wlWatcher struct { - sync.Mutex - subscriptions map[uuid.UUID]chan<- []WorkloadEvent - timer *time.Timer - events []WorkloadEvent -} - -func NewWorkloadWatcher(ctx context.Context, ns string) (WorkloadWatcher, error) { - w := new(wlWatcher) - w.subscriptions = make(map[uuid.UUID]chan<- []WorkloadEvent) - w.timer = time.AfterFunc(time.Duration(math.MaxInt64), func() { - w.Lock() - ss := make([]chan<- []WorkloadEvent, len(w.subscriptions)) - i := 0 - for _, sub := range w.subscriptions { - ss[i] = sub - i++ - } - events := w.events - w.events = nil - w.Unlock() - for _, s := range ss { - select { - case <-ctx.Done(): - return - case s <- events: - } - } - }) - - err := w.addEventHandler(ctx, ns) - if err != nil { - return nil, err - } - return w, nil -} - -func (w *wlWatcher) Subscribe(ctx context.Context) <-chan []WorkloadEvent { - ch := make(chan []WorkloadEvent) - id := uuid.New() - w.Lock() - w.subscriptions[id] = ch - w.Unlock() - go func() { - <-ctx.Done() - close(ch) - w.Lock() - delete(w.subscriptions, id) - w.Unlock() - }() - return ch -} - -func compareOptions() []cmp.Option { - return []cmp.Option{ - // Ignore frequently changing fields of no interest - cmpopts.IgnoreFields(meta.ObjectMeta{}, "Namespace", "ResourceVersion", "Generation", "ManagedFields"), - - // Only the Conditions are of interest in the DeploymentStatus. - cmp.Comparer(func(a, b apps.DeploymentStatus) bool { - // Only compare the DeploymentCondition's type and status - return cmp.Equal(a.Conditions, b.Conditions, cmp.Comparer(func(c1, c2 apps.DeploymentCondition) bool { - return c1.Type == c2.Type && c1.Status == c2.Status - })) - }), - - // Treat a nil map or slice as empty. - cmpopts.EquateEmpty(), - - // Ignore frequently changing annotations of no interest. - cmpopts.IgnoreMapEntries(func(k, _ string) bool { - return k == mutator.AnnRestartedAt || k == deployment.RevisionAnnotation - }), - } -} - -func (w *wlWatcher) watchWorkloads(ix cache.SharedIndexInformer, ns string) error { - _, err := ix.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { - if wl, ok := mutator.WorkloadFromAny(obj); ok && ns == wl.GetNamespace() && len(wl.GetOwnerReferences()) == 0 { - w.handleEvent(WorkloadEvent{Type: EventTypeAdd, Workload: wl}) - } - }, - DeleteFunc: func(obj any) { - if wl, ok := mutator.WorkloadFromAny(obj); ok { - if ns == wl.GetNamespace() && len(wl.GetOwnerReferences()) == 0 { - w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: wl}) - } - } else if dfsu, ok := obj.(*cache.DeletedFinalStateUnknown); ok { - if wl, ok = mutator.WorkloadFromAny(dfsu.Obj); ok && ns == wl.GetNamespace() && len(wl.GetOwnerReferences()) == 0 { - w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: wl}) - } - } - }, - UpdateFunc: func(oldObj, newObj any) { - if wl, ok := mutator.WorkloadFromAny(newObj); ok && ns == wl.GetNamespace() && len(wl.GetOwnerReferences()) == 0 { - if oldWl, ok := mutator.WorkloadFromAny(oldObj); ok { - if cmp.Equal(wl, oldWl, compareOptions()...) { - return - } - // Replace the cmp.Equal above with this to view the changes that trigger an update: - // - // diff := cmp.Diff(wl, oldWl, compareOptions()...) - // if diff == "" { - // return - // } - // dlog.Debugf(ctx, "DIFF:\n%s", diff) - w.handleEvent(WorkloadEvent{Type: EventTypeUpdate, Workload: wl}) - } - } - }, - }) - return err -} - -func (w *wlWatcher) addEventHandler(ctx context.Context, ns string) error { - kf := informer.GetFactory(ctx, ns) - ai, ri := kf.GetK8sInformerFactory().Apps().V1(), kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() - if err := w.watchWorkloads(ai.Deployments().Informer(), ns); err != nil { - return err - } - if err := w.watchWorkloads(ai.ReplicaSets().Informer(), ns); err != nil { - return err - } - if err := w.watchWorkloads(ai.StatefulSets().Informer(), ns); err != nil { - return err - } - if !managerutil.ArgoRolloutsEnabled(ctx) { - dlog.Infof(ctx, "Argo Rollouts is disabled, Argo Rollouts will not be watched") - } else if err := w.watchWorkloads(ri.Rollouts().Informer(), ns); err != nil { - return err - } - return nil -} - -func (w *wlWatcher) handleEvent(we WorkloadEvent) { - w.Lock() - w.events = append(w.events, we) - w.Unlock() - - // Defers sending until things been quiet for a while - w.timer.Reset(50 * time.Millisecond) -} diff --git a/pkg/workload/informers.go b/pkg/workload/informers.go new file mode 100644 index 0000000000..9171995214 --- /dev/null +++ b/pkg/workload/informers.go @@ -0,0 +1,106 @@ +package workload + +import ( + "context" + + apps "k8s.io/api/apps/v1" + core "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + + argorollouts "github.com/datawire/argo-rollouts-go-client/pkg/apis/rollouts/v1alpha1" + "github.com/datawire/dlib/dlog" + "github.com/telepresenceio/telepresence/v2/pkg/informer" +) + +func whereWeWatch(ns string) string { + if ns == "" { + return "cluster wide" + } + return "in namespace " + ns +} + +func StartDeployments(ctx context.Context, ns string) cache.SharedIndexInformer { + f := informer.GetK8sFactory(ctx, ns) + ix := f.Apps().V1().Deployments().Informer() + _ = ix.SetTransform(func(o any) (any, error) { + // Strip the parts of the deployment that we don't care about to save memory + if dep, ok := o.(*apps.Deployment); ok { + om := &dep.ObjectMeta + if an := om.Annotations; an != nil { + delete(an, core.LastAppliedConfigAnnotation) + } + dep.ManagedFields = nil + dep.Finalizers = nil + dep.OwnerReferences = nil + } + return o, nil + }) + _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { + dlog.Errorf(ctx, "watcher for Deployments %s: %v", whereWeWatch(ns), err) + }) + return ix +} + +func StartReplicaSets(ctx context.Context, ns string) cache.SharedIndexInformer { + f := informer.GetK8sFactory(ctx, ns) + ix := f.Apps().V1().ReplicaSets().Informer() + _ = ix.SetTransform(func(o any) (any, error) { + // Strip the parts of the replicaset that we don't care about. Saves memory + if dep, ok := o.(*apps.ReplicaSet); ok { + om := &dep.ObjectMeta + if an := om.Annotations; an != nil { + delete(an, core.LastAppliedConfigAnnotation) + } + dep.ManagedFields = nil + dep.Finalizers = nil + } + return o, nil + }) + _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { + dlog.Errorf(ctx, "watcher for ReplicaSets %s: %v", whereWeWatch(ns), err) + }) + return ix +} + +func StartStatefulSets(ctx context.Context, ns string) cache.SharedIndexInformer { + f := informer.GetK8sFactory(ctx, ns) + ix := f.Apps().V1().StatefulSets().Informer() + _ = ix.SetTransform(func(o any) (any, error) { + // Strip the parts of the stateful that we don't care about. Saves memory + if dep, ok := o.(*apps.StatefulSet); ok { + om := &dep.ObjectMeta + if an := om.Annotations; an != nil { + delete(an, core.LastAppliedConfigAnnotation) + } + dep.ManagedFields = nil + dep.Finalizers = nil + } + return o, nil + }) + _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { + dlog.Errorf(ctx, "watcher for StatefulSet %s: %v", whereWeWatch(ns), err) + }) + return ix +} + +func StartRollouts(ctx context.Context, ns string) cache.SharedIndexInformer { + f := informer.GetArgoRolloutsFactory(ctx, ns) + dlog.Infof(ctx, "Watching Rollouts in %s", ns) + ix := f.Argoproj().V1alpha1().Rollouts().Informer() + _ = ix.SetTransform(func(o any) (any, error) { + // Strip the parts of the rollout that we don't care about. Saves memory + if dep, ok := o.(*argorollouts.Rollout); ok { + om := &dep.ObjectMeta + if an := om.Annotations; an != nil { + delete(an, core.LastAppliedConfigAnnotation) + } + dep.ManagedFields = nil + dep.Finalizers = nil + } + return o, nil + }) + _ = ix.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { + dlog.Errorf(ctx, "watcher for Rollouts %s: %v", whereWeWatch(ns), err) + }) + return ix +} diff --git a/pkg/workload/util.go b/pkg/workload/util.go new file mode 100644 index 0000000000..fb58969f13 --- /dev/null +++ b/pkg/workload/util.go @@ -0,0 +1,25 @@ +package workload + +import ( + "k8s.io/apimachinery/pkg/runtime" + + "github.com/datawire/k8sapi/pkg/k8sapi" + "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" +) + +const ( + DomainPrefix = "telepresence.getambassador.io/" + InjectAnnotation = DomainPrefix + "inject-" + agentconfig.ContainerName + ServiceNameAnnotation = DomainPrefix + "inject-service-name" + ManualInjectAnnotation = DomainPrefix + "manually-injected" + AnnRestartedAt = DomainPrefix + "restartedAt" +) + +func FromAny(obj any) (k8sapi.Workload, bool) { + if ro, ok := obj.(runtime.Object); ok { + if wl, err := k8sapi.WrapWorkload(ro); err == nil { + return wl, true + } + } + return nil, false +} diff --git a/pkg/workload/watcher.go b/pkg/workload/watcher.go new file mode 100644 index 0000000000..284ceaaf08 --- /dev/null +++ b/pkg/workload/watcher.go @@ -0,0 +1,271 @@ +package workload + +import ( + "context" + "math" + "sync" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + apps "k8s.io/api/apps/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" + "k8s.io/kubectl/pkg/util/deployment" + + "github.com/datawire/dlib/dlog" + "github.com/datawire/k8sapi/pkg/k8sapi" + "github.com/telepresenceio/telepresence/v2/pkg/informer" +) + +type EventType int + +const ( + EventTypeAdd = iota + EventTypeUpdate + EventTypeDelete +) + +type WorkloadEvent struct { + Type EventType + Workload k8sapi.Workload +} + +func (e EventType) String() string { + switch e { + case EventTypeAdd: + return "add" + case EventTypeUpdate: + return "update" + case EventTypeDelete: + return "delete" + default: + return "unknown" + } +} + +type Watcher interface { + Subscribe(ctx context.Context) <-chan []WorkloadEvent +} + +type watcher struct { + sync.Mutex + namespace string + subscriptions map[uuid.UUID]chan<- []WorkloadEvent + timer *time.Timer + events []WorkloadEvent + rolloutsEnabled bool +} + +func NewWatcher(ctx context.Context, ns string, rolloutsEnabled bool) (Watcher, error) { + w := new(watcher) + w.namespace = ns + w.rolloutsEnabled = rolloutsEnabled + w.subscriptions = make(map[uuid.UUID]chan<- []WorkloadEvent) + w.timer = time.AfterFunc(time.Duration(math.MaxInt64), func() { + w.Lock() + ss := make([]chan<- []WorkloadEvent, len(w.subscriptions)) + i := 0 + for _, sub := range w.subscriptions { + ss[i] = sub + i++ + } + events := w.events + w.events = nil + w.Unlock() + for _, s := range ss { + select { + case <-ctx.Done(): + return + case s <- events: + } + } + }) + + err := w.addEventHandler(ctx, ns) + if err != nil { + return nil, err + } + return w, nil +} + +func hasValidReplicasetOwner(wl k8sapi.Workload, rolloutsEnabled bool) bool { + for _, ref := range wl.GetOwnerReferences() { + if ref.Controller != nil && *ref.Controller { + switch ref.Kind { + case "Deployment": + return true + case "Rollout": + if rolloutsEnabled { + return true + } + } + } + } + return false +} + +func (w *watcher) Subscribe(ctx context.Context) <-chan []WorkloadEvent { + ch := make(chan []WorkloadEvent, 1) + initialEvents := make([]WorkloadEvent, 0, 100) + id := uuid.New() + kf := informer.GetFactory(ctx, w.namespace) + ai := kf.GetK8sInformerFactory().Apps().V1() + if dps, err := ai.Deployments().Lister().Deployments(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range dps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } + } + } + if rps, err := ai.ReplicaSets().Lister().ReplicaSets(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range rps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } + } + } + if sps, err := ai.StatefulSets().Lister().StatefulSets(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range sps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } + } + } + if w.rolloutsEnabled { + ri := kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() + if sps, err := ri.Rollouts().Lister().Rollouts(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range sps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } + } + } + } + ch <- initialEvents + + w.Lock() + w.subscriptions[id] = ch + w.Unlock() + go func() { + <-ctx.Done() + close(ch) + w.Lock() + delete(w.subscriptions, id) + w.Unlock() + }() + return ch +} + +func compareOptions() []cmp.Option { + return []cmp.Option{ + // Ignore frequently changing fields of no interest + cmpopts.IgnoreFields(meta.ObjectMeta{}, "Namespace", "ResourceVersion", "Generation", "ManagedFields"), + + // Only the Conditions are of interest in the DeploymentStatus. + cmp.Comparer(func(a, b apps.DeploymentStatus) bool { + // Only compare the DeploymentCondition's type and status + return cmp.Equal(a.Conditions, b.Conditions, cmp.Comparer(func(c1, c2 apps.DeploymentCondition) bool { + return c1.Type == c2.Type && c1.Status == c2.Status + })) + }), + + // Treat a nil map or slice as empty. + cmpopts.EquateEmpty(), + + // Ignore frequently changing annotations of no interest. + cmpopts.IgnoreMapEntries(func(k, _ string) bool { + return k == AnnRestartedAt || k == deployment.RevisionAnnotation + }), + } +} + +func (w *watcher) watch(ix cache.SharedIndexInformer, ns string, hasValidController func(k8sapi.Workload) bool) error { + _, err := ix.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + if wl, ok := FromAny(obj); ok && ns == wl.GetNamespace() && !hasValidController(wl) { + w.handleEvent(WorkloadEvent{Type: EventTypeAdd, Workload: wl}) + } + }, + DeleteFunc: func(obj any) { + if wl, ok := FromAny(obj); ok { + if ns == wl.GetNamespace() && len(wl.GetOwnerReferences()) == 0 { + w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: wl}) + } + } else if dfsu, ok := obj.(*cache.DeletedFinalStateUnknown); ok { + if wl, ok = FromAny(dfsu.Obj); ok && ns == wl.GetNamespace() && !hasValidController(wl) { + w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: wl}) + } + } + }, + UpdateFunc: func(oldObj, newObj any) { + if wl, ok := FromAny(newObj); ok && ns == wl.GetNamespace() && !hasValidController(wl) { + if oldWl, ok := FromAny(oldObj); ok { + if cmp.Equal(wl, oldWl, compareOptions()...) { + return + } + // Replace the cmp.Equal above with this to view the changes that trigger an update: + // + // diff := cmp.Diff(wl, oldWl, compareOptions()...) + // if diff == "" { + // return + // } + // dlog.Debugf(ctx, "DIFF:\n%s", diff) + w.handleEvent(WorkloadEvent{Type: EventTypeUpdate, Workload: wl}) + } + } + }, + }) + return err +} + +func (w *watcher) addEventHandler(ctx context.Context, ns string) error { + kf := informer.GetFactory(ctx, ns) + hvc := func(wl k8sapi.Workload) bool { + return hasValidReplicasetOwner(wl, w.rolloutsEnabled) + } + + ai := kf.GetK8sInformerFactory().Apps().V1() + if err := w.watch(ai.Deployments().Informer(), ns, hvc); err != nil { + return err + } + if err := w.watch(ai.ReplicaSets().Informer(), ns, hvc); err != nil { + return err + } + if err := w.watch(ai.StatefulSets().Informer(), ns, hvc); err != nil { + return err + } + if !w.rolloutsEnabled { + dlog.Infof(ctx, "Argo Rollouts is disabled, Argo Rollouts will not be watched") + } else { + ri := kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() + if err := w.watch(ri.Rollouts().Informer(), ns, hvc); err != nil { + return err + } + } + return nil +} + +func (w *watcher) handleEvent(we WorkloadEvent) { + w.Lock() + w.events = append(w.events, we) + w.Unlock() + + // Defers sending until things been quiet for a while + w.timer.Reset(50 * time.Millisecond) +} From 6abb4ed63cd78307781f13c14c50a36acc38a7cc Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Mon, 28 Oct 2024 16:10:46 +0100 Subject: [PATCH 4/6] Make client list function use the traffic-managers WatchWorkloads Let the `telepresence list` command use a workload collection that is backed by the traffic-managers `WatchWorkloads` function. The client will use local shared informers to watch the workloads when connecting to an older traffic-manager that doesn't support the `WatchWorkloads` call. Sort conditions by LastTransitionTime so that more recent has priority. When checking the state of an argo-rollout, it's essential to look at the transition timestamp, because the rollout can be both "progressing" and "available" when pausing between steps. We now look at the `RolloutAvailable` rather than `RolloutHealthy`. The latter is not set until all steps of a canary rollout have completed, which might be never. Signed-off-by: Thomas Hallgren --- .../cmd/manager/mutator/workload_state.go | 87 ----- .../cmd/manager/mutator/workload_watcher.go | 8 +- .../manager/state/workload_info_watcher.go | 59 +-- integration_test/argo_rollouts_test.go | 7 +- integration_test/workspace_watch_test.go | 1 + pkg/client/cli/cmd/list.go | 3 + pkg/client/userd/trafficmgr/intercept.go | 1 - pkg/client/userd/trafficmgr/session.go | 361 +++++++++++++----- pkg/client/userd/trafficmgr/workloads.go | 326 ---------------- pkg/workload/state.go | 130 +++++++ pkg/workload/watcher.go | 5 +- 11 files changed, 441 insertions(+), 547 deletions(-) delete mode 100644 cmd/traffic/cmd/manager/mutator/workload_state.go delete mode 100644 pkg/client/userd/trafficmgr/workloads.go create mode 100644 pkg/workload/state.go diff --git a/cmd/traffic/cmd/manager/mutator/workload_state.go b/cmd/traffic/cmd/manager/mutator/workload_state.go deleted file mode 100644 index 88a1157fe1..0000000000 --- a/cmd/traffic/cmd/manager/mutator/workload_state.go +++ /dev/null @@ -1,87 +0,0 @@ -package mutator - -import ( - appsv1 "k8s.io/api/apps/v1" - core "k8s.io/api/core/v1" - - argorollouts "github.com/datawire/argo-rollouts-go-client/pkg/apis/rollouts/v1alpha1" - "github.com/datawire/k8sapi/pkg/k8sapi" -) - -type WorkloadState int - -const ( - WorkloadStateUnknown WorkloadState = iota - WorkloadStateProgressing - WorkloadStateAvailable - WorkloadStateFailure -) - -func deploymentState(d *appsv1.Deployment) WorkloadState { - for _, c := range d.Status.Conditions { - switch c.Type { - case appsv1.DeploymentProgressing: - if c.Status == core.ConditionTrue { - return WorkloadStateProgressing - } - case appsv1.DeploymentAvailable: - if c.Status == core.ConditionTrue { - return WorkloadStateAvailable - } - case appsv1.DeploymentReplicaFailure: - if c.Status == core.ConditionTrue { - return WorkloadStateFailure - } - } - } - return WorkloadStateUnknown -} - -func replicaSetState(d *appsv1.ReplicaSet) WorkloadState { - for _, c := range d.Status.Conditions { - if c.Type == appsv1.ReplicaSetReplicaFailure && c.Status == core.ConditionTrue { - return WorkloadStateFailure - } - } - return WorkloadStateAvailable -} - -func statefulSetState(d *appsv1.StatefulSet) WorkloadState { - return WorkloadStateAvailable -} - -func rolloutSetState(r *argorollouts.Rollout) WorkloadState { - for _, c := range r.Status.Conditions { - switch c.Type { - case argorollouts.RolloutProgressing: - if c.Status == core.ConditionTrue { - return WorkloadStateProgressing - } - case argorollouts.RolloutHealthy: - if c.Status == core.ConditionTrue { - return WorkloadStateAvailable - } - case argorollouts.RolloutReplicaFailure: - if c.Status == core.ConditionTrue { - return WorkloadStateFailure - } - } - } - return WorkloadStateUnknown -} - -func GetWorkloadState(wl k8sapi.Workload) WorkloadState { - if d, ok := k8sapi.DeploymentImpl(wl); ok { - return deploymentState(d) - } - if r, ok := k8sapi.ReplicaSetImpl(wl); ok { - return replicaSetState(r) - } - if s, ok := k8sapi.StatefulSetImpl(wl); ok { - return statefulSetState(s) - } - if rt, ok := k8sapi.RolloutImpl(wl); ok { - return rolloutSetState(rt) - } - return WorkloadStateUnknown -} diff --git a/cmd/traffic/cmd/manager/mutator/workload_watcher.go b/cmd/traffic/cmd/manager/mutator/workload_watcher.go index 3f84fa1514..cfabe69253 100644 --- a/cmd/traffic/cmd/manager/mutator/workload_watcher.go +++ b/cmd/traffic/cmd/manager/mutator/workload_watcher.go @@ -22,7 +22,7 @@ func (c *configWatcher) watchWorkloads(ctx context.Context, ix cache.SharedIndex cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { if wl, ok := workload.FromAny(obj); ok && len(wl.GetOwnerReferences()) == 0 { - c.updateWorkload(ctx, wl, nil, GetWorkloadState(wl)) + c.updateWorkload(ctx, wl, nil, workload.GetWorkloadState(wl)) } }, DeleteFunc: func(obj any) { @@ -39,7 +39,7 @@ func (c *configWatcher) watchWorkloads(ctx context.Context, ix cache.SharedIndex UpdateFunc: func(oldObj, newObj any) { if wl, ok := workload.FromAny(newObj); ok && len(wl.GetOwnerReferences()) == 0 { if oldWl, ok := workload.FromAny(oldObj); ok { - c.updateWorkload(ctx, wl, oldWl, GetWorkloadState(wl)) + c.updateWorkload(ctx, wl, oldWl, workload.GetWorkloadState(wl)) } } }, @@ -59,8 +59,8 @@ func (c *configWatcher) deleteWorkload(ctx context.Context, wl k8sapi.Workload) } } -func (c *configWatcher) updateWorkload(ctx context.Context, wl, oldWl k8sapi.Workload, state WorkloadState) { - if state == WorkloadStateFailure { +func (c *configWatcher) updateWorkload(ctx context.Context, wl, oldWl k8sapi.Workload, state workload.State) { + if state == workload.StateFailure { return } tpl := wl.GetPodTemplate() diff --git a/cmd/traffic/cmd/manager/state/workload_info_watcher.go b/cmd/traffic/cmd/manager/state/workload_info_watcher.go index d68d6896b7..beb7b85134 100644 --- a/cmd/traffic/cmd/manager/state/workload_info_watcher.go +++ b/cmd/traffic/cmd/manager/state/workload_info_watcher.go @@ -13,7 +13,6 @@ import ( "github.com/datawire/dlib/dlog" "github.com/datawire/k8sapi/pkg/k8sapi" rpc "github.com/telepresenceio/telepresence/rpc/v2/manager" - "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/mutator" "github.com/telepresenceio/telepresence/v2/pkg/agentmap" "github.com/telepresenceio/telepresence/v2/pkg/workload" ) @@ -78,6 +77,7 @@ func (wf *workloadInfoWatcher) Watch(ctx context.Context, stream rpc.Manager_Wat // Everything in this loop happens in sequence, even the firing of the timer. This means // that there's no concurrency and no need for mutexes. + initial := true for { select { case <-ctx.Done(): @@ -85,13 +85,14 @@ func (wf *workloadInfoWatcher) Watch(ctx context.Context, stream rpc.Manager_Wat case <-sessionDone: return nil case <-wf.ticker.C: - wf.sendEvents(ctx) + wf.sendEvents(ctx, false) case wes, ok := <-workloadsCh: if !ok { dlog.Debug(ctx, "Workloads channel closed") return nil } - wf.handleWorkloadsSnapshot(ctx, wes) + wf.handleWorkloadsSnapshot(ctx, wes, initial) + initial = false // Events that arrive at the agent channel should be counted as modifications. case ais, ok := <-agentsCh: if !ok { @@ -121,7 +122,7 @@ func (wf *workloadInfoWatcher) getIntercepts(name, namespace string) (iis []*rpc return iis } -func (wf *workloadInfoWatcher) sendEvents(ctx context.Context) { +func (wf *workloadInfoWatcher) sendEvents(ctx context.Context, sendEmpty bool) { // Time to send what we have wf.ticker.Reset(time.Duration(math.MaxInt64)) evs := make([]*rpc.WorkloadEvent, 0, len(wf.workloadEvents)) @@ -133,7 +134,7 @@ func (wf *workloadInfoWatcher) sendEvents(ctx context.Context) { } evs = append(evs, rew) } - if len(evs) == 0 { + if !sendEmpty && len(evs) == 0 { return } dlog.Debugf(ctx, "Sending %d WorkloadEvents", len(evs)) @@ -169,13 +170,13 @@ func rpcKind(s string) rpc.WorkloadInfo_Kind { } } -func rpcWorkloadState(s mutator.WorkloadState) (state rpc.WorkloadInfo_State) { +func rpcWorkloadState(s workload.State) (state rpc.WorkloadInfo_State) { switch s { - case mutator.WorkloadStateFailure: + case workload.StateFailure: state = rpc.WorkloadInfo_FAILURE - case mutator.WorkloadStateAvailable: + case workload.StateAvailable: state = rpc.WorkloadInfo_AVAILABLE - case mutator.WorkloadStateProgressing: + case workload.StateProgressing: state = rpc.WorkloadInfo_PROGRESSING default: state = rpc.WorkloadInfo_UNKNOWN_UNSPECIFIED @@ -189,7 +190,7 @@ func rpcWorkload(wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState, iClients [] Name: wl.GetName(), Namespace: wl.GetNamespace(), Uid: string(wl.GetUID()), - State: rpcWorkloadState(mutator.GetWorkloadState(wl)), + State: rpcWorkloadState(workload.GetWorkloadState(wl)), AgentState: as, InterceptClients: iClients, } @@ -206,16 +207,23 @@ func (wf *workloadInfoWatcher) addEvent( Type: rpc.WorkloadEvent_Type(eventType), Workload: rpcWorkload(wl, as, iClients), } - wf.sendEvents(ctx) + wf.sendEvents(ctx, false) } -func (wf *workloadInfoWatcher) handleWorkloadsSnapshot(ctx context.Context, wes []workload.WorkloadEvent) { +func (wf *workloadInfoWatcher) handleWorkloadsSnapshot(ctx context.Context, wes []workload.WorkloadEvent, initial bool) { + if len(wes) == 0 { + if initial { + // The initial snapshot may be empty, but must be sent anyway. + wf.sendEvents(ctx, true) + } + return + } for _, we := range wes { wl := we.Workload if w, ok := wf.workloadEvents[wl.GetName()]; ok { if we.Type == workload.EventTypeDelete && w.Type != rpc.WorkloadEvent_DELETED { w.Type = rpc.WorkloadEvent_DELETED - dlog.Debugf(ctx, "WorkloadInfoEvent: Workload %s %s %s.%s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace()) + dlog.Debugf(ctx, "WorkloadInfoEvent: Workload %s %s %s.%s %s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace(), workload.GetWorkloadState(wl)) wf.resetTicker() } } else { @@ -239,8 +247,8 @@ func (wf *workloadInfoWatcher) handleWorkloadsSnapshot(ctx context.Context, wes break } } - dlog.Debugf(ctx, "WorkloadInfoEvent: Workload %s %s %s.%s %s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace(), as) - wf.addEvent(ctx, we.Type, wl, as, iClients) + dlog.Debugf(ctx, "WorkloadInfoEvent: Workload %s %s %s.%s %s %s", we.Type, wl.GetKind(), wl.GetName(), wl.GetNamespace(), as, workload.GetWorkloadState(wl)) + wf.addEvent(we.Type, wl, as, iClients) } } } @@ -252,15 +260,16 @@ func (wf *workloadInfoWatcher) handleAgentSnapshot(ctx context.Context, ais map[ if _, ok := ais[k]; !ok { name := a.Name as := rpc.WorkloadInfo_NO_AGENT_UNSPECIFIED - dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s", a.Name, a.Namespace, as) if w, ok := wf.workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED { wl := w.Workload if wl.AgentState != as { wl.AgentState = as + dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s %s", a.Name, a.Namespace, as, wl.State) wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil { - wf.addEvent(ctx, workload.EventTypeUpdate, wl, as, nil) + dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s %s", a.Name, a.Namespace, as, workload.GetWorkloadState(wl)) + wf.addEvent(workload.EventTypeUpdate, wl, as, nil) } else { dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err) if errors.IsNotFound(err) { @@ -272,7 +281,7 @@ func (wf *workloadInfoWatcher) handleAgentSnapshot(ctx context.Context, ais map[ AgentState: as, }, } - wf.sendEvents(ctx) + wf.sendEvents(ctx, false) } } } @@ -285,16 +294,17 @@ func (wf *workloadInfoWatcher) handleAgentSnapshot(ctx context.Context, ais map[ as = rpc.WorkloadInfo_INTERCEPTED iClients = iis } - dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s", a.Name, a.Namespace, as) if w, ok := wf.workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED { wl := w.Workload + dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s %s", a.Name, a.Namespace, as, w.Workload.State) if wl.AgentState != as { wl.AgentState = as wl.InterceptClients = iClients wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, a.Namespace, ""); err == nil { - wf.addEvent(ctx, workload.EventTypeUpdate, wl, as, iClients) + dlog.Debugf(ctx, "WorkloadInfoEvent: AgentInfo %s.%s %s %s", a.Name, a.Namespace, as, workload.GetWorkloadState(wl)) + wf.addEvent(workload.EventTypeUpdate, wl, as, iClients) } else { dlog.Debugf(ctx, "Unable to get workload %s.%s: %v", name, a.Namespace, err) } @@ -308,15 +318,16 @@ func (wf *workloadInfoWatcher) handleInterceptSnapshot(ctx context.Context, iis if _, ok := wf.interceptInfos[k]; !ok { name := ii.Spec.Agent as := rpc.WorkloadInfo_INSTALLED - dlog.Debugf(ctx, "InterceptInfo %s.%s %s", name, ii.Spec.Namespace, as) if w, ok := wf.workloadEvents[name]; ok && w.Type != rpc.WorkloadEvent_DELETED { if w.Workload.AgentState != as { w.Workload.AgentState = as w.Workload.InterceptClients = nil + dlog.Debugf(ctx, "WorkloadInfoEvent: InterceptInfo %s.%s %s %s", w.Workload.Name, w.Workload.Namespace, as, w.Workload.State) wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, wf.namespace, ""); err == nil { - wf.addEvent(ctx, workload.EventTypeUpdate, wl, as, nil) + dlog.Debugf(ctx, "WorkloadInfoEvent: InterceptInfo %s.%s %s %s", wl.GetName(), wl.GetNamespace(), as, workload.GetWorkloadState(wl)) + wf.addEvent(workload.EventTypeUpdate, wl, as, nil) } } } @@ -337,10 +348,12 @@ func (wf *workloadInfoWatcher) handleInterceptSnapshot(ctx context.Context, iis if w.Workload.AgentState != as { w.Workload.AgentState = as w.Workload.InterceptClients = iClients + dlog.Debugf(ctx, "WorkloadInfoEvent: InterceptInfo %s.%s %s %s", w.Workload.Name, w.Workload.Namespace, as, w.Workload.State) wf.resetTicker() } } else if wl, err := agentmap.GetWorkload(ctx, name, wf.namespace, ""); err == nil { - wf.addEvent(ctx, workload.EventTypeUpdate, wl, as, iClients) + dlog.Debugf(ctx, "WorkloadInfoEvent: InterceptInfo %s.%s %s %s", wl.GetName(), wl.GetNamespace(), as, workload.GetWorkloadState(wl)) + wf.addEvent(workload.EventTypeUpdate, wl, as, iClients) } } } diff --git a/integration_test/argo_rollouts_test.go b/integration_test/argo_rollouts_test.go index adaf8428b4..8b64d1f235 100644 --- a/integration_test/argo_rollouts_test.go +++ b/integration_test/argo_rollouts_test.go @@ -97,9 +97,10 @@ func (s *argoRolloutsSuite) Test_SuccessfullyInterceptsArgoRollout() { stdout := itest.TelepresenceOk(ctx, "intercept", "--mount", "false", "--port", port, svc) require.Contains(stdout, "Using "+tp+" "+svc) - stdout = itest.TelepresenceOk(ctx, "list", "--intercepts") - require.Contains(stdout, svc+": intercepted") - require.NotContains(stdout, "Volume Mount Point") + require.Eventually(func() bool { + stdout = itest.TelepresenceOk(ctx, "list", "--intercepts") + return strings.Contains(stdout, svc+": intercepted") && !strings.Contains(stdout, "Volume Mount Point") + }, 14*time.Second, 2*time.Second) s.CapturePodLogs(ctx, svc, "traffic-agent", s.AppNamespace()) itest.TelepresenceOk(ctx, "leave", svc) stdout = itest.TelepresenceOk(ctx, "list", "--intercepts") diff --git a/integration_test/workspace_watch_test.go b/integration_test/workspace_watch_test.go index 9b1da30ab5..70afa7096e 100644 --- a/integration_test/workspace_watch_test.go +++ b/integration_test/workspace_watch_test.go @@ -163,6 +163,7 @@ func (s *notConnectedSuite) Test_WorkspaceListener() { expectations["agent installed"] = true } case manager.WorkloadInfo_INTERCEPTED: + expectations["agent installed"] = true expectations["agent intercepted"] = true if ics := ev.Workload.InterceptClients; len(ics) == 1 { interceptingClient = ics[0].Client diff --git a/pkg/client/cli/cmd/list.go b/pkg/client/cli/cmd/list.go index 4dd6e528c6..2706350ad9 100644 --- a/pkg/client/cli/cmd/list.go +++ b/pkg/client/cli/cmd/list.go @@ -175,6 +175,9 @@ func (s *listCommand) printList(ctx context.Context, workloads []*connector.Work return intercept.DescribeIntercepts(ctx, iis, "", s.debug) } ai := workload.Sidecar + if workload.NotInterceptableReason == "Progressing" { + return "progressing..." + } if ai != nil { return "ready to intercept (traffic-agent already installed)" } diff --git a/pkg/client/userd/trafficmgr/intercept.go b/pkg/client/userd/trafficmgr/intercept.go index 5a280098e2..418cc5ffb6 100644 --- a/pkg/client/userd/trafficmgr/intercept.go +++ b/pkg/client/userd/trafficmgr/intercept.go @@ -503,7 +503,6 @@ func (s *session) ensureNoInterceptConflict(ir *rpc.CreateInterceptRequest) *rpc // only if the returned rpc.InterceptResult is nil. The returned runtime.Object is either nil, indicating a local // intercept, or the workload for the intercept. func (s *session) CanIntercept(c context.Context, ir *rpc.CreateInterceptRequest) (userd.InterceptInfo, *rpc.InterceptResult) { - s.waitForSync(c) spec := ir.Spec if spec.Namespace == "" { spec.Namespace = s.Namespace diff --git a/pkg/client/userd/trafficmgr/session.go b/pkg/client/userd/trafficmgr/session.go index 76bd82ea35..84d64965c8 100644 --- a/pkg/client/userd/trafficmgr/session.go +++ b/pkg/client/userd/trafficmgr/session.go @@ -17,6 +17,7 @@ import ( "github.com/blang/semver/v4" "github.com/go-json-experiment/json" + "github.com/google/uuid" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -42,7 +43,6 @@ import ( rootdRpc "github.com/telepresenceio/telepresence/rpc/v2/daemon" "github.com/telepresenceio/telepresence/rpc/v2/manager" "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" - "github.com/telepresenceio/telepresence/v2/pkg/agentmap" authGrpc "github.com/telepresenceio/telepresence/v2/pkg/authenticator/grpc" "github.com/telepresenceio/telepresence/v2/pkg/authenticator/patcher" "github.com/telepresenceio/telepresence/v2/pkg/client" @@ -55,9 +55,11 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/client/userd" "github.com/telepresenceio/telepresence/v2/pkg/client/userd/k8s" "github.com/telepresenceio/telepresence/v2/pkg/errcat" + "github.com/telepresenceio/telepresence/v2/pkg/informer" "github.com/telepresenceio/telepresence/v2/pkg/matcher" "github.com/telepresenceio/telepresence/v2/pkg/proc" "github.com/telepresenceio/telepresence/v2/pkg/restapi" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) type apiServer struct { @@ -70,6 +72,18 @@ type apiMatcher struct { metadata map[string]string } +type workloadInfoKey struct { + kind manager.WorkloadInfo_Kind + name string +} + +type workloadInfo struct { + uid types.UID + state workload.State + agentState manager.WorkloadInfo_AgentState + interceptClients []string +} + type session struct { *k8s.Cluster rootDaemon rootdRpc.DaemonClient @@ -96,7 +110,12 @@ type session struct { sessionInfo *manager.SessionInfo // sessionInfo returned by the traffic-manager - wlWatcher *workloadsAndServicesWatcher + workloadsLock sync.Mutex + + // Map of manager.WorkloadInfo split into namespace, key of kind and name, and workloadInfo + workloads map[string]map[workloadInfoKey]workloadInfo + + workloadSubscribers map[uuid.UUID]chan struct{} // currentInterceptsLock ensures that all accesses to currentIntercepts, currentMatchers, // currentAPIServers, interceptWaiters, and ingressInfo are synchronized @@ -415,19 +434,6 @@ func connectMgr( managerName = "Traffic Manager" } - knownWorkloadKinds, err := mClient.GetKnownWorkloadKinds(ctx, si) - if err != nil { - if status.Code(err) != codes.Unimplemented { - return nil, fmt.Errorf("failed to get known workload kinds: %w", err) - } - // Talking to an older traffic-manager, use legacy default types - knownWorkloadKinds = &manager.KnownWorkloadKinds{Kinds: []manager.WorkloadInfo_Kind{ - manager.WorkloadInfo_DEPLOYMENT, - manager.WorkloadInfo_REPLICASET, - manager.WorkloadInfo_STATEFULSET, - }} - } - sess := &session{ Cluster: cluster, installID: installID, @@ -438,8 +444,8 @@ func connectMgr( managerName: managerName, managerVersion: managerVersion, sessionInfo: si, + workloads: make(map[string]map[workloadInfoKey]workloadInfo), interceptWaiters: make(map[string]*awaitIntercept), - wlWatcher: newWASWatcher(knownWorkloadKinds), isPodDaemon: cr.IsPodDaemon, done: make(chan struct{}), subnetViaWorkloads: cr.SubnetViaWorkloads, @@ -507,8 +513,6 @@ func connectError(t rpc.ConnectInfo_ErrType, err error) *rpc.ConnectInfo { func (s *session) updateDaemonNamespaces(c context.Context) { const svcDomain = "svc" - s.wlWatcher.setNamespacesToWatch(c, s.GetCurrentNamespaces(true)) - domains := s.GetCurrentNamespaces(false) if !slices.Contains(domains, svcDomain) { domains = append(domains, svcDomain) @@ -572,37 +576,22 @@ func (s *session) ApplyConfig(ctx context.Context) error { // getInfosForWorkloads returns a list of workloads found in the given namespace that fulfils the given filter criteria. func (s *session) getInfosForWorkloads( - ctx context.Context, namespaces []string, iMap map[string][]*manager.InterceptInfo, sMap map[string]*rpc.WorkloadInfo_Sidecar, filter rpc.ListRequest_Filter, ) []*rpc.WorkloadInfo { - wiMap := make(map[types.UID]*rpc.WorkloadInfo) - s.wlWatcher.eachWorkload(ctx, k8s.GetManagerNamespace(ctx), namespaces, func(workload k8sapi.Workload) { - name := workload.GetName() - dlog.Debugf(ctx, "Getting info for %s %s.%s, matching service", workload.GetKind(), name, workload.GetNamespace()) - + wiMap := make(map[string]*rpc.WorkloadInfo) + s.eachWorkload(namespaces, func(wlKind manager.WorkloadInfo_Kind, name, namespace string, info workloadInfo) { + kind := wlKind.String() wlInfo := &rpc.WorkloadInfo{ Name: name, - Namespace: workload.GetNamespace(), - WorkloadResourceType: workload.GetKind(), - Uid: string(workload.GetUID()), + Namespace: namespace, + WorkloadResourceType: kind, + Uid: string(info.uid), } - - svcs, err := agentmap.FindServicesForPod(ctx, workload.GetPodTemplate(), "") - if err == nil && len(svcs) > 0 { - srm := make(map[string]*rpc.WorkloadInfo_ServiceReference, len(svcs)) - for _, so := range svcs { - if svc, ok := k8sapi.ServiceImpl(so); ok { - srm[string(svc.UID)] = &rpc.WorkloadInfo_ServiceReference{ - Name: svc.Name, - Namespace: svc.Namespace, - Ports: getServicePorts(svc), - } - } - } - wlInfo.Services = srm + if info.state != workload.StateAvailable { + wlInfo.NotInterceptableReason = info.state.String() } var ok bool @@ -612,7 +601,7 @@ func (s *session) getInfosForWorkloads( if wlInfo.Sidecar, ok = sMap[name]; !ok && filter <= rpc.ListRequest_INSTALLED_AGENTS { return } - wiMap[workload.GetUID()] = wlInfo + wiMap[fmt.Sprintf("%s:%s.%s", kind, name, namespace)] = wlInfo }) wiz := make([]*rpc.WorkloadInfo, len(wiMap)) i := 0 @@ -624,58 +613,46 @@ func (s *session) getInfosForWorkloads( return wiz } -func getServicePorts(svc *core.Service) []*rpc.WorkloadInfo_ServiceReference_Port { - ports := make([]*rpc.WorkloadInfo_ServiceReference_Port, len(svc.Spec.Ports)) - for i, p := range svc.Spec.Ports { - ports[i] = &rpc.WorkloadInfo_ServiceReference_Port{ - Name: p.Name, - Port: p.Port, - } +func (s *session) WatchWorkloads(c context.Context, wr *rpc.WatchWorkloadsRequest, stream userd.WatchWorkloadsStream) error { + id := uuid.New() + ch := make(chan struct{}) + s.workloadsLock.Lock() + if s.workloadSubscribers == nil { + s.workloadSubscribers = make(map[uuid.UUID]chan struct{}) } - return ports -} + s.workloadSubscribers[id] = ch + s.workloadsLock.Unlock() -func (s *session) waitForSync(ctx context.Context) { - s.wlWatcher.setNamespacesToWatch(ctx, s.GetCurrentNamespaces(true)) - s.wlWatcher.waitForSync(ctx) -} + defer func() { + s.workloadsLock.Lock() + delete(s.workloadSubscribers, id) + s.workloadsLock.Unlock() + }() -func (s *session) WatchWorkloads(c context.Context, wr *rpc.WatchWorkloadsRequest, stream userd.WatchWorkloadsStream) error { - s.waitForSync(c) - s.ensureWatchers(c, wr.Namespaces) - sCtx, sCancel := context.WithCancel(c) - // We need to make sure the subscription ends when we leave this method, since this is the one consuming the snapshotAvailable channel. - // Otherwise, the goroutine that writes to the channel will leak. - defer sCancel() - snapshotAvailable := s.wlWatcher.subscribe(sCtx) + send := func() error { + ws, err := s.WorkloadInfoSnapshot(c, wr.Namespaces, rpc.ListRequest_EVERYTHING) + if err != nil { + return err + } + return stream.Send(ws) + } + + // Send initial snapshot + if err := send(); err != nil { + return err + } for { select { - case <-c.Done(): // if context is done (usually the session's context). - return nil - case <-stream.Context().Done(): // if stream context is done. + case <-c.Done(): return nil - case <-snapshotAvailable: - snapshot, err := s.workloadInfoSnapshot(c, wr.GetNamespaces(), rpc.ListRequest_INTERCEPTABLE) - if err != nil { - return status.Errorf(codes.Unavailable, "failed to create WorkloadInfoSnapshot: %v", err) - } - if err := stream.Send(snapshot); err != nil { - dlog.Errorf(c, "WatchWorkloads.Send() failed: %v", err) + case <-ch: + if err := send(); err != nil { return err } } } } -func (s *session) WorkloadInfoSnapshot( - ctx context.Context, - namespaces []string, - filter rpc.ListRequest_Filter, -) (*rpc.WorkloadInfoSnapshot, error) { - s.waitForSync(ctx) - return s.workloadInfoSnapshot(ctx, namespaces, filter) -} - func (s *session) ensureWatchers(ctx context.Context, namespaces []string, ) { @@ -683,30 +660,31 @@ func (s *session) ensureWatchers(ctx context.Context, wg := sync.WaitGroup{} wg.Add(len(namespaces)) for _, ns := range namespaces { - if ns == "" { - ns = s.Namespace + s.workloadsLock.Lock() + _, ok := s.workloads[ns] + s.workloadsLock.Unlock() + if ok { + wg.Done() + } else { + go func() { + if err := s.workloadsWatcher(ctx, ns, &wg); err != nil { + dlog.Errorf(ctx, "error ensuring watcher for namespace %s: %v", ns, err) + return + } + }() + dlog.Debugf(ctx, "watcher for namespace %s started", ns) } - wgp := &wg - s.wlWatcher.ensureStarted(ctx, ns, func(started bool) { - if started { - dlog.Debugf(ctx, "watchers for %s started", ns) - } - if wgp != nil { - wgp.Done() - wgp = nil - } - }) } wg.Wait() + dlog.Debugf(ctx, "watchers for %q synced", namespaces) } -func (s *session) workloadInfoSnapshot( +func (s *session) WorkloadInfoSnapshot( ctx context.Context, namespaces []string, filter rpc.ListRequest_Filter, ) (*rpc.WorkloadInfoSnapshot, error) { is := s.getCurrentIntercepts() - s.ensureWatchers(ctx, namespaces) var nss []string if filter == rpc.ListRequest_INTERCEPTS { @@ -723,8 +701,10 @@ func (s *session) workloadInfoSnapshot( } if len(nss) == 0 { // none of the namespaces are currently mapped + dlog.Debug(ctx, "No namespaces are mapped") return &rpc.WorkloadInfoSnapshot{}, nil } + s.ensureWatchers(ctx, nss) iMap := make(map[string][]*manager.InterceptInfo, len(is)) nextIs: @@ -748,7 +728,7 @@ nextIs: } } - workloadInfos := s.getInfosForWorkloads(ctx, nss, iMap, sMap, filter) + workloadInfos := s.getInfosForWorkloads(nss, iMap, sMap, filter) return &rpc.WorkloadInfoSnapshot{Workloads: workloadInfos}, nil } @@ -908,11 +888,9 @@ func (s *session) Uninstall(ctx context.Context, ur *rpc.UninstallRequest) (*com // to prevent the clients from doing it. if ur.UninstallType == rpc.UninstallRequest_NAMED_AGENTS { // must have a valid namespace in order to uninstall named agents - s.waitForSync(ctx) if ur.Namespace == "" { ur.Namespace = s.Namespace } - s.wlWatcher.ensureStarted(ctx, ur.Namespace, nil) namespace := s.ActualNamespace(ur.Namespace) if namespace == "" { // namespace is not mapped @@ -962,11 +940,9 @@ func (s *session) Uninstall(ctx context.Context, ur *rpc.UninstallRequest) (*com } if ur.Namespace != "" { - s.waitForSync(ctx) if ur.Namespace == "" { ur.Namespace = s.Namespace } - s.wlWatcher.ensureStarted(ctx, ur.Namespace, nil) namespace := s.ActualNamespace(ur.Namespace) if namespace == "" { // namespace is not mapped @@ -1071,3 +1047,186 @@ func (s *session) connectRootDaemon(ctx context.Context, nc *rootdRpc.NetworkCon dlog.Debug(ctx, "Connected to root daemon") return rd, nil } + +func (s *session) eachWorkload(namespaces []string, do func(kind manager.WorkloadInfo_Kind, name, namespace string, info workloadInfo)) { + s.workloadsLock.Lock() + for _, ns := range namespaces { + if workloads, ok := s.workloads[ns]; ok { + for key, info := range workloads { + do(key.kind, key.name, ns, info) + } + } + } + s.workloadsLock.Unlock() +} + +func rpcKind(s string) manager.WorkloadInfo_Kind { + switch strings.ToLower(s) { + case "deployment": + return manager.WorkloadInfo_DEPLOYMENT + case "replicaset": + return manager.WorkloadInfo_REPLICASET + case "statefulset": + return manager.WorkloadInfo_STATEFULSET + case "rollout": + return manager.WorkloadInfo_ROLLOUT + default: + return manager.WorkloadInfo_UNSPECIFIED + } +} + +func (s *session) localWorkloadsWatcher(ctx context.Context, namespace string, synced *sync.WaitGroup) error { + defer func() { + if synced != nil { + synced.Done() + } + dlog.Debug(ctx, "client workload watcher ended") + }() + + knownWorkloadKinds, err := s.managerClient.GetKnownWorkloadKinds(ctx, s.sessionInfo) + if err != nil { + if status.Code(err) != codes.Unimplemented { + return fmt.Errorf("failed to get known workload kinds: %w", err) + } + // Talking to an older traffic-manager, use legacy default types + knownWorkloadKinds = &manager.KnownWorkloadKinds{Kinds: []manager.WorkloadInfo_Kind{ + manager.WorkloadInfo_DEPLOYMENT, + manager.WorkloadInfo_REPLICASET, + manager.WorkloadInfo_STATEFULSET, + }} + } + + dlog.Debugf(ctx, "Watching workloads from client due to lack of workload watcher support in traffic-manager %s", s.managerVersion) + fc := informer.GetFactory(ctx, namespace) + if fc == nil { + ctx = informer.WithFactory(ctx, namespace) + fc = informer.GetFactory(ctx, namespace) + } + workload.StartDeployments(ctx, namespace) + workload.StartReplicaSets(ctx, namespace) + workload.StartStatefulSets(ctx, namespace) + kf := fc.GetK8sInformerFactory() + kf.Start(ctx.Done()) + + rolloutsEnabled := slices.Index(knownWorkloadKinds.Kinds, manager.WorkloadInfo_ROLLOUT) >= 0 + if rolloutsEnabled { + workload.StartRollouts(ctx, namespace) + af := fc.GetArgoRolloutsInformerFactory() + af.Start(ctx.Done()) + } + + ww, err := workload.NewWatcher(ctx, namespace, rolloutsEnabled) + if err != nil { + workload.StartRollouts(ctx, namespace) + return err + } + kf.WaitForCacheSync(ctx.Done()) + + wlCh := ww.Subscribe(ctx) + for { + select { + case <-ctx.Done(): + return nil + case wls := <-wlCh: + if wls == nil { + return nil + } + s.workloadsLock.Lock() + workloads, ok := s.workloads[namespace] + if !ok { + workloads = make(map[workloadInfoKey]workloadInfo) + s.workloads[namespace] = workloads + } + for _, we := range wls { + w := we.Workload + key := workloadInfoKey{kind: rpcKind(w.GetKind()), name: w.GetName()} + if we.Type == workload.EventTypeDelete { + delete(workloads, key) + } else { + workloads[key] = workloadInfo{ + state: workload.GetWorkloadState(w), + uid: w.GetUID(), + } + } + } + for _, subscriber := range s.workloadSubscribers { + select { + case subscriber <- struct{}{}: + default: + } + } + s.workloadsLock.Unlock() + if synced != nil { + synced.Done() + synced = nil + } + } + } +} + +func (s *session) workloadsWatcher(ctx context.Context, namespace string, synced *sync.WaitGroup) error { + defer func() { + if synced != nil { + synced.Done() + } + }() + wlc, err := s.managerClient.WatchWorkloads(ctx, &manager.WorkloadEventsRequest{SessionInfo: s.sessionInfo, Namespace: namespace}) + if err != nil { + return err + } + + for ctx.Err() == nil { + wls, err := wlc.Recv() + if err != nil { + if status.Code(err) != codes.Unimplemented { + return err + } + localSynced := synced + synced = nil + return s.localWorkloadsWatcher(ctx, namespace, localSynced) + } + + s.workloadsLock.Lock() + workloads, ok := s.workloads[namespace] + if !ok { + workloads = make(map[workloadInfoKey]workloadInfo) + s.workloads[namespace] = workloads + } + + for _, we := range wls.GetEvents() { + w := we.Workload + key := workloadInfoKey{kind: w.Kind, name: w.Name} + if we.Type == manager.WorkloadEvent_DELETED { + dlog.Debugf(ctx, "Deleting workload %s/%s.%s", key.kind, key.name, namespace) + delete(workloads, key) + } else { + var clients []string + if lc := len(w.InterceptClients); lc > 0 { + clients = make([]string, lc) + for i, ic := range w.InterceptClients { + clients[i] = ic.Client + } + } + dlog.Debugf(ctx, "Adding workload %s/%s.%s", key.kind, key.name, namespace) + workloads[key] = workloadInfo{ + uid: types.UID(w.Uid), + state: workload.StateFromRPC(w.State), + agentState: w.AgentState, + interceptClients: clients, + } + } + } + for _, subscriber := range s.workloadSubscribers { + select { + case subscriber <- struct{}{}: + default: + } + } + s.workloadsLock.Unlock() + if synced != nil { + synced.Done() + synced = nil + } + } + return nil +} diff --git a/pkg/client/userd/trafficmgr/workloads.go b/pkg/client/userd/trafficmgr/workloads.go deleted file mode 100644 index 7c577c8327..0000000000 --- a/pkg/client/userd/trafficmgr/workloads.go +++ /dev/null @@ -1,326 +0,0 @@ -package trafficmgr - -import ( - "context" - "slices" - "sort" - "sync" - "time" - - core "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/cache" - - "github.com/datawire/dlib/dlog" - "github.com/datawire/k8sapi/pkg/k8sapi" - "github.com/telepresenceio/telepresence/rpc/v2/manager" -) - -type workloadsAndServicesWatcher struct { - sync.Mutex - wlKinds []manager.WorkloadInfo_Kind - nsWatchers map[string]*namespacedWASWatcher - nsListeners []func() - cond sync.Cond -} - -const ( - deployments = 0 - replicasets = 1 - statefulsets = 2 - rollouts = 3 -) - -// namespacedWASWatcher is watches Workloads And Services (WAS) for a namespace. -type namespacedWASWatcher struct { - svcWatcher *k8sapi.Watcher[*core.Service] - wlWatchers [4]*k8sapi.Watcher[runtime.Object] -} - -// svcEquals compare only the Service fields that are of interest to Telepresence. They are -// -// - UID -// - Name -// - Namespace -// - Spec.Ports -// - Spec.Type -func svcEquals(a, b *core.Service) bool { - aPorts := a.Spec.Ports - bPorts := b.Spec.Ports - if len(aPorts) != len(bPorts) { - return false - } - if a.UID != b.UID || a.Name != b.Name || a.Namespace != b.Namespace || a.Spec.Type != b.Spec.Type { - return false - } -nextMP: - // order is not significant (nor can it be trusted) when comparing - for _, mp := range aPorts { - for _, op := range bPorts { - if mp == op { - continue nextMP - } - } - return false - } - return true -} - -// workloadEquals compare only the workload (Deployment, ResourceSet, or StatefulSet) fields that are of interest to Telepresence. They are -// -// - UID -// - Name -// - Namespace -// - Spec.Template: -// - Labels -// - Containers (must contain an equal number of equally named containers with equal ports) -func workloadEquals(oa, ob runtime.Object) bool { - a, err := k8sapi.WrapWorkload(oa) - if err != nil { - // This should definitely never happen - panic(err) - } - b, err := k8sapi.WrapWorkload(ob) - if err != nil { - // This should definitely never happen - panic(err) - } - if a.GetUID() != b.GetUID() || a.GetName() != b.GetName() || a.GetNamespace() != b.GetNamespace() { - return false - } - - aSpec := a.GetPodTemplate() - bSpec := b.GetPodTemplate() - if !labels.Equals(aSpec.Labels, bSpec.Labels) { - return false - } - aPod := aSpec.Spec - bPod := bSpec.Spec - if len(aPod.Containers) != len(bPod.Containers) { - return false - } - makeContainerMap := func(cs []core.Container) map[string]*core.Container { - m := make(map[string]*core.Container, len(cs)) - for i := range cs { - c := &cs[i] - m[c.Name] = c - } - return m - } - - portsEqual := func(a, b []core.ContainerPort) bool { - if len(a) != len(b) { - return false - } - nextAP: - for _, ap := range a { - for _, bp := range b { - if ap == bp { - continue nextAP - } - } - return false - } - return true - } - - am := makeContainerMap(aPod.Containers) - bm := makeContainerMap(bPod.Containers) - for n, ac := range am { - bc, ok := bm[n] - if !ok { - return false - } - if !portsEqual(ac.Ports, bc.Ports) { - return false - } - } - return true -} - -func newNamespaceWatcher(c context.Context, namespace string, cond *sync.Cond, wlKinds []manager.WorkloadInfo_Kind) *namespacedWASWatcher { - dlog.Debugf(c, "newNamespaceWatcher %s", namespace) - ki := k8sapi.GetJoinedClientSetInterface(c) - appsGetter, rolloutsGetter := ki.AppsV1().RESTClient(), ki.ArgoprojV1alpha1().RESTClient() - w := &namespacedWASWatcher{ - svcWatcher: k8sapi.NewWatcher("services", ki.CoreV1().RESTClient(), cond, k8sapi.WithEquals(svcEquals), k8sapi.WithNamespace[*core.Service](namespace)), - wlWatchers: [4]*k8sapi.Watcher[runtime.Object]{ - k8sapi.NewWatcher("deployments", appsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)), - k8sapi.NewWatcher("replicasets", appsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)), - k8sapi.NewWatcher("statefulsets", appsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)), - nil, - }, - } - if slices.Contains(wlKinds, manager.WorkloadInfo_ROLLOUT) { - w.wlWatchers[rollouts] = k8sapi.NewWatcher("rollouts", rolloutsGetter, cond, k8sapi.WithEquals(workloadEquals), k8sapi.WithNamespace[runtime.Object](namespace)) - } - return w -} - -func (nw *namespacedWASWatcher) cancel() { - nw.svcWatcher.Cancel() - for _, w := range nw.wlWatchers { - if w != nil { - w.Cancel() - } - } -} - -func (nw *namespacedWASWatcher) hasSynced() bool { - return nw.svcWatcher.HasSynced() && - nw.wlWatchers[deployments].HasSynced() && - nw.wlWatchers[replicasets].HasSynced() && - nw.wlWatchers[statefulsets].HasSynced() && - (nw.wlWatchers[rollouts] == nil || nw.wlWatchers[rollouts].HasSynced()) -} - -func newWASWatcher(knownWorkloadKinds *manager.KnownWorkloadKinds) *workloadsAndServicesWatcher { - w := &workloadsAndServicesWatcher{ - wlKinds: knownWorkloadKinds.Kinds, - nsWatchers: make(map[string]*namespacedWASWatcher), - } - w.cond.L = &w.Mutex - return w -} - -// eachWorkload will iterate over the workloads in the current snapshot. Unless namespace -// is the empty string, the iteration is limited to the workloads matching that namespace. -// The traffic-manager workload is excluded. -func (w *workloadsAndServicesWatcher) eachWorkload(c context.Context, tmns string, namespaces []string, f func(workload k8sapi.Workload)) { - if len(namespaces) != 1 { - // Produce workloads in a predictable order - nss := make([]string, len(namespaces)) - copy(nss, namespaces) - sort.Strings(nss) - for _, n := range nss { - w.eachWorkload(c, tmns, []string{n}, f) - } - } else { - ns := namespaces[0] - w.Lock() - nw, ok := w.nsWatchers[ns] - w.Unlock() - if ok { - for _, wlw := range nw.wlWatchers { - if wlw == nil { - continue - } - wls, err := wlw.List(c) - if err != nil { - dlog.Errorf(c, "error listing workloads: %v", err) - return - } - - nextWorkload: - for _, ro := range wls { - wl, err := k8sapi.WrapWorkload(ro) - if err != nil { - dlog.Errorf(c, "error wrapping runtime object as a workload: %v", err) - return - } - - // Exclude workloads that are owned by a supported workload. - for _, or := range wl.GetOwnerReferences() { - if or.Controller != nil && *or.Controller { - switch or.Kind { - case "Deployment", "ReplicaSet", "StatefulSet": - continue nextWorkload - case "Rollout": - if slices.Contains(w.wlKinds, manager.WorkloadInfo_ROLLOUT) { - continue nextWorkload - } - } - } - } - // If this is our traffic-manager namespace, then exclude the traffic-manager service. - lbs := wl.GetLabels() - if !(ns == tmns && lbs["app"] == "traffic-manager" && lbs["telepresence"] == "manager") { - f(wl) - } - } - } - } - } -} - -func (w *workloadsAndServicesWatcher) waitForSync(c context.Context) { - hss := make([]cache.InformerSynced, len(w.nsWatchers)) - w.Lock() - i := 0 - for _, nw := range w.nsWatchers { - hss[i] = nw.hasSynced - i++ - } - w.Unlock() - - hasSynced := true - for _, hs := range hss { - if !hs() { - hasSynced = false - break - } - } - if !hasSynced { - // Waiting for cache sync will sometimes block, so a timeout is necessary here - c, cancel := context.WithTimeout(c, 5*time.Second) - defer cancel() - cache.WaitForCacheSync(c.Done(), hss...) - } -} - -// subscribe writes to the given channel whenever relevant information has changed -// in the current snapshot. -func (w *workloadsAndServicesWatcher) subscribe(c context.Context) <-chan struct{} { - return k8sapi.Subscribe(c, &w.cond) -} - -// setNamespacesToWatch starts new watchers or kills old ones to make the current -// set of watchers reflect the nss argument. -func (w *workloadsAndServicesWatcher) setNamespacesToWatch(c context.Context, nss []string) { - var adds []string - desired := make(map[string]struct{}) - - w.Lock() - for _, ns := range nss { - desired[ns] = struct{}{} - if _, ok := w.nsWatchers[ns]; !ok { - adds = append(adds, ns) - } - } - for ns, nw := range w.nsWatchers { - if _, ok := desired[ns]; !ok { - delete(w.nsWatchers, ns) - nw.cancel() - } - } - for _, ns := range adds { - w.addNSLocked(c, ns) - } - w.Unlock() -} - -func (w *workloadsAndServicesWatcher) addNSLocked(c context.Context, ns string) *namespacedWASWatcher { - nw := newNamespaceWatcher(c, ns, &w.cond, w.wlKinds) - w.nsWatchers[ns] = nw - for _, l := range w.nsListeners { - nw.svcWatcher.AddStateListener(&k8sapi.StateListener{Cb: l}) - } - return nw -} - -func (w *workloadsAndServicesWatcher) ensureStarted(c context.Context, ns string, cb func(bool)) { - w.Lock() - defer w.Unlock() - nw, ok := w.nsWatchers[ns] - if !ok { - nw = w.addNSLocked(c, ns) - } - // Starting the svcWatcher will set it to active and also trigger its state listener - // which means a) that the set of active namespaces will change, and b) that the - // WatchAgentsNS will restart with that namespace included. - err := nw.svcWatcher.EnsureStarted(c, cb) - if err != nil { - dlog.Errorf(c, "error starting service watchers: %s", err) - } -} diff --git a/pkg/workload/state.go b/pkg/workload/state.go new file mode 100644 index 0000000000..4a8bc3f51a --- /dev/null +++ b/pkg/workload/state.go @@ -0,0 +1,130 @@ +package workload + +import ( + "sort" + + appsv1 "k8s.io/api/apps/v1" + core "k8s.io/api/core/v1" + + argorollouts "github.com/datawire/argo-rollouts-go-client/pkg/apis/rollouts/v1alpha1" + "github.com/datawire/k8sapi/pkg/k8sapi" + "github.com/telepresenceio/telepresence/rpc/v2/manager" +) + +type State int + +const ( + StateUnknown State = iota + StateProgressing + StateAvailable + StateFailure +) + +func deploymentState(d *appsv1.Deployment) State { + conds := d.Status.Conditions + sort.Slice(conds, func(i, j int) bool { + return conds[i].LastTransitionTime.Compare(conds[j].LastTransitionTime.Time) > 0 + }) + for _, c := range conds { + switch c.Type { + case appsv1.DeploymentProgressing: + if c.Status == core.ConditionTrue { + return StateProgressing + } + case appsv1.DeploymentAvailable: + if c.Status == core.ConditionTrue { + return StateAvailable + } + case appsv1.DeploymentReplicaFailure: + if c.Status == core.ConditionTrue { + return StateFailure + } + } + } + if len(conds) == 0 { + return StateProgressing + } + return StateUnknown +} + +func replicaSetState(d *appsv1.ReplicaSet) State { + for _, c := range d.Status.Conditions { + if c.Type == appsv1.ReplicaSetReplicaFailure && c.Status == core.ConditionTrue { + return StateFailure + } + } + return StateAvailable +} + +func statefulSetState(_ *appsv1.StatefulSet) State { + return StateAvailable +} + +func rolloutSetState(r *argorollouts.Rollout) State { + conds := r.Status.Conditions + sort.Slice(conds, func(i, j int) bool { + return conds[i].LastTransitionTime.Compare(conds[j].LastTransitionTime.Time) > 0 + }) + for _, c := range conds { + switch c.Type { + case argorollouts.RolloutProgressing: + if c.Status == core.ConditionTrue { + return StateProgressing + } + case argorollouts.RolloutAvailable: + if c.Status == core.ConditionTrue { + return StateAvailable + } + case argorollouts.RolloutReplicaFailure: + if c.Status == core.ConditionTrue { + return StateFailure + } + } + } + if len(conds) == 0 { + return StateProgressing + } + return StateUnknown +} + +func (ws State) String() string { + switch ws { + case StateProgressing: + return "Progressing" + case StateAvailable: + return "Available" + case StateFailure: + return "Failure" + default: + return "Unknown" + } +} + +func GetWorkloadState(wl k8sapi.Workload) State { + if d, ok := k8sapi.DeploymentImpl(wl); ok { + return deploymentState(d) + } + if r, ok := k8sapi.ReplicaSetImpl(wl); ok { + return replicaSetState(r) + } + if s, ok := k8sapi.StatefulSetImpl(wl); ok { + return statefulSetState(s) + } + if rt, ok := k8sapi.RolloutImpl(wl); ok { + return rolloutSetState(rt) + } + return StateUnknown +} + +func StateFromRPC(s manager.WorkloadInfo_State) State { + switch s { + case manager.WorkloadInfo_AVAILABLE: + return StateAvailable + case manager.WorkloadInfo_FAILURE: + return StateFailure + case manager.WorkloadInfo_PROGRESSING: + return StateProgressing + default: + return StateUnknown + } +} diff --git a/pkg/workload/watcher.go b/pkg/workload/watcher.go index 284ceaaf08..faeb589675 100644 --- a/pkg/workload/watcher.go +++ b/pkg/workload/watcher.go @@ -113,6 +113,7 @@ func (w *watcher) Subscribe(ctx context.Context) <-chan []WorkloadEvent { id := uuid.New() kf := informer.GetFactory(ctx, w.namespace) ai := kf.GetK8sInformerFactory().Apps().V1() + dlog.Debugf(ctx, "workload.Watcher producing initial events for namespace %s", w.namespace) if dps, err := ai.Deployments().Lister().Deployments(w.namespace).List(labels.Everything()); err == nil { for _, obj := range dps { if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { @@ -204,7 +205,7 @@ func (w *watcher) watch(ix cache.SharedIndexInformer, ns string, hasValidControl }, DeleteFunc: func(obj any) { if wl, ok := FromAny(obj); ok { - if ns == wl.GetNamespace() && len(wl.GetOwnerReferences()) == 0 { + if ns == wl.GetNamespace() && !hasValidController(wl) { w.handleEvent(WorkloadEvent{Type: EventTypeDelete, Workload: wl}) } } else if dfsu, ok := obj.(*cache.DeletedFinalStateUnknown); ok { @@ -267,5 +268,5 @@ func (w *watcher) handleEvent(we WorkloadEvent) { w.Unlock() // Defers sending until things been quiet for a while - w.timer.Reset(50 * time.Millisecond) + w.timer.Reset(5 * time.Millisecond) } From 58727722e3fa1879b70fc05692ace4cec1154897 Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Tue, 29 Oct 2024 17:06:11 +0100 Subject: [PATCH 5/6] Ensure rollout when deleting agent when at least one pod has an agent. The condition "Rollout of n.n is not necessary. At least one pod has the desired agent state" must be reversed when deleting an agent so that we also have "Rollout of n.n is necessary. At least one pod still has an agent". Signed-off-by: Thomas Hallgren --- cmd/traffic/cmd/manager/mutator/watcher.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cmd/traffic/cmd/manager/mutator/watcher.go b/cmd/traffic/cmd/manager/mutator/watcher.go index 4bf15c034f..77b94a1f03 100644 --- a/cmd/traffic/cmd/manager/mutator/watcher.go +++ b/cmd/traffic/cmd/manager/mutator/watcher.go @@ -164,6 +164,14 @@ func (c *configWatcher) isRolloutNeeded(ctx context.Context, wl k8sapi.Workload, } return true } + if ac == nil { + if okPods < runningPods { + dlog.Debugf(ctx, "Rollout of %s.%s is necessary. At least one pod still has an agent", + wl.GetName(), wl.GetNamespace()) + return true + } + return false + } dlog.Debugf(ctx, "Rollout of %s.%s is not necessary. At least one pod have the desired agent state", wl.GetName(), wl.GetNamespace()) return false From 5596c914425204c6461c91cc940ca6187274236d Mon Sep 17 00:00:00 2001 From: Thomas Hallgren Date: Tue, 29 Oct 2024 17:10:29 +0100 Subject: [PATCH 6/6] Fix bug causing excessive event sending of workload events. Signed-off-by: Thomas Hallgren --- cmd/traffic/cmd/manager/state/workload_info_watcher.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/traffic/cmd/manager/state/workload_info_watcher.go b/cmd/traffic/cmd/manager/state/workload_info_watcher.go index beb7b85134..3db126e817 100644 --- a/cmd/traffic/cmd/manager/state/workload_info_watcher.go +++ b/cmd/traffic/cmd/manager/state/workload_info_watcher.go @@ -197,7 +197,6 @@ func rpcWorkload(wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState, iClients [] } func (wf *workloadInfoWatcher) addEvent( - ctx context.Context, eventType workload.EventType, wl k8sapi.Workload, as rpc.WorkloadInfo_AgentState, @@ -207,7 +206,7 @@ func (wf *workloadInfoWatcher) addEvent( Type: rpc.WorkloadEvent_Type(eventType), Workload: rpcWorkload(wl, as, iClients), } - wf.sendEvents(ctx, false) + wf.resetTicker() } func (wf *workloadInfoWatcher) handleWorkloadsSnapshot(ctx context.Context, wes []workload.WorkloadEvent, initial bool) {