diff --git a/chart/templates/_rbac.tpl b/chart/templates/_rbac.tpl index 9eac3ff94..248730e8e 100644 --- a/chart/templates/_rbac.tpl +++ b/chart/templates/_rbac.tpl @@ -32,7 +32,7 @@ (eq (toString .Values.sync.fromHost.csiDrivers.enabled) "true") (eq (toString .Values.sync.fromHost.csiStorageCapacities.enabled) "true") .Values.sync.fromHost.nodes.enabled - .Values.observability.metrics.proxy.nodes + (and .Values.integrations.metricsServer.enabled .Values.integrations.metricsServer.nodes) .Values.experimental.multiNamespaceMode.enabled -}} {{- true -}} {{- end -}} diff --git a/chart/templates/clusterrole.yaml b/chart/templates/clusterrole.yaml index 217c73a71..049ac8be3 100644 --- a/chart/templates/clusterrole.yaml +++ b/chart/templates/clusterrole.yaml @@ -102,7 +102,7 @@ rules: resources: ["namespaces", "serviceaccounts"] verbs: ["create", "delete", "patch", "update", "get", "watch", "list"] {{- end }} - {{- if .Values.observability.metrics.proxy.nodes }} + {{- if (and .Values.integrations.metricsServer.enabled .Values.integrations.metricsServer.nodes) }} - apiGroups: ["metrics.k8s.io"] resources: ["nodes"] verbs: ["get", "list"] diff --git a/chart/templates/role.yaml b/chart/templates/role.yaml index 37ed162be..b1cf71ceb 100644 --- a/chart/templates/role.yaml +++ b/chart/templates/role.yaml @@ -47,7 +47,7 @@ rules: resources: ["leases"] verbs: ["create", "delete", "patch", "update", "get", "list", "watch"] {{- end }} - {{- if .Values.observability.metrics.proxy.pods }} + {{- if (and .Values.integrations.metricsServer.enabled .Values.integrations.metricsServer.pods) }} - apiGroups: ["metrics.k8s.io"] resources: ["pods"] verbs: ["get", "list"] diff --git a/chart/tests/clusterrole_test.yaml b/chart/tests/clusterrole_test.yaml index ecbd899c3..a0115f1f9 100644 --- a/chart/tests/clusterrole_test.yaml +++ b/chart/tests/clusterrole_test.yaml @@ -280,10 +280,10 @@ tests: - it: metrics proxy set: - observability: - metrics: - proxy: - nodes: true + integrations: + metricsServer: + enabled: true + nodes: true release: name: my-release namespace: my-namespace diff --git a/chart/tests/role_test.yaml b/chart/tests/role_test.yaml index 615075fd3..a5903e5c1 100644 --- a/chart/tests/role_test.yaml +++ b/chart/tests/role_test.yaml @@ -155,10 +155,10 @@ tests: - it: metrics proxy set: - observability: - metrics: - proxy: - pods: true + integrations: + metricsServer: + enabled: true + pods: true release: name: my-release namespace: my-namespace diff --git a/chart/values.schema.json b/chart/values.schema.json index 9c77b86ba..1f5e55324 100755 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -1673,6 +1673,17 @@ "additionalProperties": false, "type": "object" }, + "Integrations": { + "properties": { + "metricsServer": { + "$ref": "#/$defs/MetricsServer", + "description": "MetricsServer reuses the metrics server from the host cluster within the vCluster." + } + }, + "additionalProperties": false, + "type": "object", + "description": "Integrations holds config for vCluster integrations with other operators or tools running on the host cluster" + }, "LabelsAndAnnotations": { "properties": { "annotations": { @@ -1725,8 +1736,12 @@ "additionalProperties": false, "type": "object" }, - "MetricsProxy": { + "MetricsServer": { "properties": { + "enabled": { + "type": "boolean", + "description": "Enabled signals the metrics server integration should be enabled." + }, "nodes": { "type": "boolean", "description": "Nodes defines if metrics-server nodes api should get proxied from host to virtual cluster." @@ -1737,7 +1752,8 @@ } }, "additionalProperties": false, - "type": "object" + "type": "object", + "description": "MetricsServer reuses the metrics server from the host cluster within the vCluster." }, "MutatingWebhook": { "properties": { @@ -1929,26 +1945,6 @@ "additionalProperties": false, "type": "object" }, - "Observability": { - "properties": { - "metrics": { - "$ref": "#/$defs/ObservabilityMetrics", - "description": "Metrics allows to proxy metrics server apis from host to virtual cluster." - } - }, - "additionalProperties": false, - "type": "object" - }, - "ObservabilityMetrics": { - "properties": { - "proxy": { - "$ref": "#/$defs/MetricsProxy", - "description": "Proxy holds the configuration what metrics-server apis should get proxied." - } - }, - "additionalProperties": false, - "type": "object" - }, "OutgoingConnections": { "properties": { "ipBlock": { @@ -3042,6 +3038,10 @@ "$ref": "#/$defs/Sync", "description": "Sync describes how to sync resources from the virtual cluster to host cluster and back." }, + "integrations": { + "$ref": "#/$defs/Integrations", + "description": "Integrations holds config for vCluster integrations with other operators or tools running on the host cluster" + }, "networking": { "$ref": "#/$defs/Networking", "description": "Networking options related to the virtual cluster." @@ -3050,10 +3050,6 @@ "$ref": "#/$defs/Policies", "description": "Policies to enforce for the virtual cluster deployment as well as within the virtual cluster." }, - "observability": { - "$ref": "#/$defs/Observability", - "description": "Observability holds options to proxy metrics from the host cluster into the virtual cluster." - }, "controlPlane": { "$ref": "#/$defs/ControlPlane", "description": "Configure vCluster's control plane components and deployment." diff --git a/chart/values.yaml b/chart/values.yaml index b4973e13e..b48a9f6bb 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -116,6 +116,17 @@ sync: all: false labels: {} +# Integrations holds config for vCluster integrations with other operators or tools running on the host cluster +integrations: + # MetricsServer reuses the metrics server from the host cluster within the vCluster. + metricsServer: + # Enabled signals the metrics server integration should be enabled. + enabled: false + # Nodes defines if metrics-server nodes api should get proxied from host to virtual cluster. + nodes: true + # Pods defines if metrics-server pods api should get proxied from host to virtual cluster. + pods: true + # Configure vCluster's control plane components and deployment. controlPlane: # Distro holds virtual cluster related distro options. A distro cannot be changed after vCluster is deployed. @@ -709,17 +720,6 @@ rbac: # ExtraRules will add rules to the cluster role. extraRules: [] -# Observability holds options to proxy metrics from the host cluster into the virtual cluster. -observability: - # Metrics allows to proxy metrics server apis from host to virtual cluster. - metrics: - # Proxy holds the configuration what metrics-server apis should get proxied. - proxy: - # Nodes defines if metrics-server nodes api should get proxied from host to virtual cluster. - nodes: false - # Pods defines if metrics-server pods api should get proxied from host to virtual cluster. - pods: false - # Networking options related to the virtual cluster. networking: # ReplicateServices allows replicating services from the host within the virtual cluster or the other way around. diff --git a/cmd/vcluster/cmd/start.go b/cmd/vcluster/cmd/start.go index d0fc650db..2ea33b1da 100644 --- a/cmd/vcluster/cmd/start.go +++ b/cmd/vcluster/cmd/start.go @@ -7,6 +7,7 @@ import ( "runtime/debug" "github.com/loft-sh/vcluster/pkg/config" + "github.com/loft-sh/vcluster/pkg/integrations" "github.com/loft-sh/vcluster/pkg/leaderelection" "github.com/loft-sh/vcluster/pkg/plugin" "github.com/loft-sh/vcluster/pkg/pro" @@ -98,6 +99,12 @@ func ExecuteStart(ctx context.Context, options *StartOptions) error { return fmt.Errorf("create controller context: %w", err) } + // start integrations + err = integrations.StartIntegrations(controllerCtx) + if err != nil { + return fmt.Errorf("start integrations: %w", err) + } + // start proxy err = setup.StartProxy(controllerCtx) if err != nil { diff --git a/config/config.go b/config/config.go index 8285d3fb9..41eb1ef80 100644 --- a/config/config.go +++ b/config/config.go @@ -39,15 +39,15 @@ type Config struct { // Sync describes how to sync resources from the virtual cluster to host cluster and back. Sync Sync `json:"sync,omitempty"` + // Integrations holds config for vCluster integrations with other operators or tools running on the host cluster + Integrations Integrations `json:"integrations,omitempty"` + // Networking options related to the virtual cluster. Networking Networking `json:"networking,omitempty"` // Policies to enforce for the virtual cluster deployment as well as within the virtual cluster. Policies Policies `json:"policies,omitempty"` - // Observability holds options to proxy metrics from the host cluster into the virtual cluster. - Observability Observability `json:"observability,omitempty"` - // Configure vCluster's control plane components and deployment. ControlPlane ControlPlane `json:"controlPlane,omitempty"` @@ -76,6 +76,24 @@ type Config struct { Plugin map[string]Plugin `json:"plugin,omitempty"` } +// Integrations holds config for vCluster integrations with other operators or tools running on the host cluster +type Integrations struct { + // MetricsServer reuses the metrics server from the host cluster within the vCluster. + MetricsServer MetricsServer `json:"metricsServer,omitempty"` +} + +// MetricsServer reuses the metrics server from the host cluster within the vCluster. +type MetricsServer struct { + // Enabled signals the metrics server integration should be enabled. + Enabled bool `json:"enabled,omitempty"` + + // Nodes defines if metrics-server nodes api should get proxied from host to virtual cluster. + Nodes bool `json:"nodes,omitempty"` + + // Pods defines if metrics-server pods api should get proxied from host to virtual cluster. + Pods bool `json:"pods,omitempty"` +} + // ExternalConfig holds external tool configuration type ExternalConfig map[string]interface{} @@ -415,11 +433,6 @@ type SyncNodeSelector struct { Labels map[string]string `json:"labels,omitempty"` } -type Observability struct { - // Metrics allows to proxy metrics server apis from host to virtual cluster. - Metrics ObservabilityMetrics `json:"metrics,omitempty"` -} - type ServiceMonitor struct { // Enabled configures if Helm should create the service monitor. Enabled bool `json:"enabled,omitempty"` @@ -431,19 +444,6 @@ type ServiceMonitor struct { Annotations map[string]string `json:"annotations,omitempty"` } -type ObservabilityMetrics struct { - // Proxy holds the configuration what metrics-server apis should get proxied. - Proxy MetricsProxy `json:"proxy,omitempty"` -} - -type MetricsProxy struct { - // Nodes defines if metrics-server nodes api should get proxied from host to virtual cluster. - Nodes bool `json:"nodes,omitempty"` - - // Pods defines if metrics-server pods api should get proxied from host to virtual cluster. - Pods bool `json:"pods,omitempty"` -} - type Networking struct { // ReplicateServices allows replicating services from the host within the virtual cluster or the other way around. ReplicateServices ReplicateServices `json:"replicateServices,omitempty"` diff --git a/config/legacyconfig/migrate.go b/config/legacyconfig/migrate.go index 639d342c8..9a22f9479 100644 --- a/config/legacyconfig/migrate.go +++ b/config/legacyconfig/migrate.go @@ -282,10 +282,12 @@ func convertBaseValues(oldConfig BaseHelm, newConfig *config.Config) error { newConfig.Networking.ReplicateServices.ToHost = oldConfig.MapServices.FromVirtual if oldConfig.Proxy.MetricsServer.Pods.Enabled != nil { - newConfig.Observability.Metrics.Proxy.Pods = *oldConfig.Proxy.MetricsServer.Pods.Enabled + newConfig.Integrations.MetricsServer.Enabled = true + newConfig.Integrations.MetricsServer.Pods = *oldConfig.Proxy.MetricsServer.Pods.Enabled } if oldConfig.Proxy.MetricsServer.Nodes.Enabled != nil { - newConfig.Observability.Metrics.Proxy.Nodes = *oldConfig.Proxy.MetricsServer.Nodes.Enabled + newConfig.Integrations.MetricsServer.Enabled = true + newConfig.Integrations.MetricsServer.Nodes = *oldConfig.Proxy.MetricsServer.Nodes.Enabled } if len(oldConfig.Volumes) > 0 { @@ -1026,8 +1028,9 @@ func migrateFlag(key, value string, newConfig *config.Config) error { } case "proxy-metrics-server": if value == "" || value == "true" { - newConfig.Observability.Metrics.Proxy.Pods = true - newConfig.Observability.Metrics.Proxy.Nodes = true + newConfig.Integrations.MetricsServer.Enabled = true + newConfig.Integrations.MetricsServer.Pods = true + newConfig.Integrations.MetricsServer.Nodes = true } case "service-account-token-secrets": if value == "" || value == "true" { diff --git a/config/values.yaml b/config/values.yaml index f5c460dde..29986edd3 100644 --- a/config/values.yaml +++ b/config/values.yaml @@ -69,6 +69,12 @@ sync: all: false labels: {} +integrations: + metricsServer: + enabled: false + nodes: true + pods: true + controlPlane: distro: k8s: @@ -423,12 +429,6 @@ rbac: overwriteRules: [] extraRules: [] -observability: - metrics: - proxy: - nodes: false - pods: false - networking: replicateServices: toHost: [] diff --git a/pkg/apiservice/generic.go b/pkg/apiservice/generic.go new file mode 100644 index 000000000..bc28df6ae --- /dev/null +++ b/pkg/apiservice/generic.go @@ -0,0 +1,231 @@ +package apiservice + +import ( + "context" + "fmt" + "math" + "net/http" + "os" + "strconv" + "strings" + "time" + + "github.com/loft-sh/vcluster/pkg/config" + "github.com/loft-sh/vcluster/pkg/scheme" + "github.com/loft-sh/vcluster/pkg/server/handler" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + kerrors "k8s.io/apimachinery/pkg/api/errors" +) + +const proxyPort = 9000 + +func checkExistingAPIService(ctx context.Context, client client.Client, groupVersion schema.GroupVersion) bool { + var exists bool + _ = applyOperation(ctx, func(ctx context.Context) (bool, error) { + err := client.Get(ctx, types.NamespacedName{Name: groupVersion.Version + "." + groupVersion.Group}, &apiregistrationv1.APIService{}) + if err != nil { + if kerrors.IsNotFound(err) { + return true, nil + } + + return false, err + } + + exists = true + return true, nil + }) + + return exists +} + +func applyOperation(ctx context.Context, operationFunc wait.ConditionWithContextFunc) error { + return wait.ExponentialBackoffWithContext(ctx, wait.Backoff{ + Duration: time.Second, + Factor: 1.5, + Cap: time.Minute, + Steps: math.MaxInt32, + }, operationFunc) +} + +func deleteOperation(ctrlCtx *config.ControllerContext, groupVersion schema.GroupVersion) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + err := ctrlCtx.VirtualManager.GetClient().Delete(ctx, &apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: groupVersion.Version + "." + groupVersion.Group, + }, + }) + if err != nil { + if kerrors.IsNotFound(err) { + return true, nil + } + + klog.Errorf("error deleting api service %v", err) + return false, nil + } + + return true, nil + } +} + +func createOperation(ctrlCtx *config.ControllerContext, serviceName string, groupVersion schema.GroupVersion) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: "kube-system", + }, + } + _, err := controllerutil.CreateOrUpdate(ctx, ctrlCtx.VirtualManager.GetClient(), service, func() error { + service.Spec.Type = corev1.ServiceTypeExternalName + service.Spec.ExternalName = "localhost" + service.Spec.Ports = []corev1.ServicePort{ + { + Port: int32(proxyPort), + }, + } + return nil + }) + if err != nil { + if kerrors.IsAlreadyExists(err) { + return true, nil + } + + klog.Errorf("error creating api service %v", err) + return false, nil + } + + apiServiceSpec := apiregistrationv1.APIServiceSpec{ + Service: &apiregistrationv1.ServiceReference{ + Name: serviceName, + Namespace: "kube-system", + Port: ptr.To(int32(proxyPort)), + }, + InsecureSkipTLSVerify: true, + Group: groupVersion.Group, + GroupPriorityMinimum: 100, + Version: groupVersion.Version, + VersionPriority: 100, + } + apiService := &apiregistrationv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: groupVersion.Version + "." + groupVersion.Group, + }, + } + _, err = controllerutil.CreateOrUpdate(ctx, ctrlCtx.VirtualManager.GetClient(), apiService, func() error { + apiService.Spec = apiServiceSpec + return nil + }) + if err != nil { + if kerrors.IsAlreadyExists(err) { + return true, nil + } + + klog.Errorf("error creating api service %v", err) + return false, nil + } + + return true, nil + } +} + +func StartAPIServiceProxy(ctx context.Context, hostConfig *rest.Config, tlsCertFile, tlsKeyFile string) error { + proxyHandler, err := handler.Handler("", hostConfig, nil) + if err != nil { + return fmt.Errorf("create host proxy handler: %w", err) + } + + s := serializer.NewCodecFactory(scheme.Scheme) + server := &http.Server{ + Addr: "localhost:" + strconv.Itoa(proxyPort), + Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + // we only allow traffic to discovery paths + if !isAPIServiceProxyPathAllowed(request.Method, request.URL.Path) { + klog.FromContext(ctx).Info("Denied access to api service proxy at path", "path", request.URL.Path, "method", request.Method) + responsewriters.ErrorNegotiated( + kerrors.NewForbidden(metav1.SchemeGroupVersion.WithResource("proxy").GroupResource(), "proxy", fmt.Errorf("paths other than discovery paths are not allowed")), + s, + corev1.SchemeGroupVersion, + writer, + request, + ) + return + } + + proxyHandler.ServeHTTP(writer, request) + }), + } + + go func() { + klog.Infof("Listening apiservice proxy on localhost:%d...", proxyPort) + err = server.ListenAndServeTLS(tlsCertFile, tlsKeyFile) + if err != nil { + klog.FromContext(ctx).Error(err, "error listening for apiservice proxy and serve tls") + os.Exit(1) + } + }() + + return nil +} + +func isAPIServiceProxyPathAllowed(method, path string) bool { + if strings.ToUpper(method) != http.MethodGet { + return false + } + + path = strings.TrimPrefix(strings.TrimSuffix(path, "/"), "/") + if strings.HasPrefix(path, "openapi") { + return true + } + + if path == "" { + return true + } + if path == "version" { + return true + } + if path == "api" || path == "apis" { + return true + } + + splitPath := strings.Split(path, "/") + if splitPath[0] == "apis" && len(splitPath) <= 3 { + return true + } else if splitPath[0] == "api" && len(splitPath) <= 2 { + return true + } else if splitPath[0] == ".well-known" { + return true + } else if splitPath[0] == "readyz" { + return true + } else if splitPath[0] == "livez" { + return true + } + + return false +} + +func RegisterAPIService(ctx *config.ControllerContext, serviceName string, groupVersion schema.GroupVersion) error { + return applyOperation(ctx.Context, createOperation(ctx, serviceName, groupVersion)) +} + +func DeregisterAPIService(ctx *config.ControllerContext, groupVersion schema.GroupVersion) error { + // check if the api service should get created + exists := checkExistingAPIService(ctx.Context, ctx.VirtualManager.GetClient(), groupVersion) + if exists { + return applyOperation(ctx.Context, deleteOperation(ctx, groupVersion)) + } + + return nil +} diff --git a/pkg/cli/connect_platform.go b/pkg/cli/connect_platform.go index e1f80feb7..016213eb4 100644 --- a/pkg/cli/connect_platform.go +++ b/pkg/cli/connect_platform.go @@ -79,9 +79,6 @@ func (cmd *connectPlatform) validateProFlags() error { if cmd.Server != "" { return fmt.Errorf("cannot use --server with a pro vCluster") } - if cmd.BackgroundProxy { - return fmt.Errorf("cannot use --background-proxy with a pro vCluster") - } if cmd.LocalPort != 0 { return fmt.Errorf("cannot use --local-port with a pro vCluster") } diff --git a/pkg/config/config.go b/pkg/config/config.go index 5eb037590..c103e2785 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -182,7 +182,7 @@ func (v VirtualClusterConfig) LegacyOptions() (*legacyconfig.LegacyVirtualCluste MultiNamespaceMode: v.Experimental.MultiNamespaceMode.Enabled, SyncAllSecrets: v.Sync.ToHost.Secrets.All, SyncAllConfigMaps: v.Sync.ToHost.ConfigMaps.All, - ProxyMetricsServer: v.Observability.Metrics.Proxy.Nodes || v.Observability.Metrics.Proxy.Pods, + ProxyMetricsServer: v.Integrations.MetricsServer.Enabled, DeprecatedSyncNodeChanges: v.Sync.FromHost.Nodes.SyncBackChanges, }, nil diff --git a/pkg/config/controller_context.go b/pkg/config/controller_context.go index e23ccc3d4..3c73d4cd2 100644 --- a/pkg/config/controller_context.go +++ b/pkg/config/controller_context.go @@ -5,6 +5,7 @@ import ( "net/http" "k8s.io/apimachinery/pkg/version" + "k8s.io/client-go/rest" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -23,19 +24,30 @@ type ControllerContext struct { Config *VirtualClusterConfig StopChan <-chan struct{} - // PreHooks are extra filters to inject into the server before everything else - PreHooks []Filter + // PreServerHooks are extra filters to inject into the server before everything else + PreServerHooks []Filter - // PostHooks are extra filters to inject into the server after everything else - PostHooks []Filter + // PostServerHooks are extra filters to inject into the server after everything else + PostServerHooks []Filter + + // AcquiredLeaderHooks are hooks to start after vCluster acquired leader + AcquiredLeaderHooks []Hook + + // StartAPIServiceProxy will start the api service proxy if needed + StartAPIServiceProxy bool } type Filter func(http.Handler, Clients) http.Handler +type Hook func(ctx *ControllerContext) error + type Clients struct { UncachedVirtualClient client.Client CachedVirtualClient client.Client UncachedHostClient client.Client CachedHostClient client.Client + + HostConfig *rest.Config + VirtualConfig *rest.Config } diff --git a/pkg/controllers/generic/export_patcher.go b/pkg/controllers/generic/export_patcher.go new file mode 100644 index 000000000..533dec800 --- /dev/null +++ b/pkg/controllers/generic/export_patcher.go @@ -0,0 +1,127 @@ +package generic + +import ( + "context" + "fmt" + "regexp" + + vclusterconfig "github.com/loft-sh/vcluster/config" + "github.com/loft-sh/vcluster/pkg/patches" + patchesregex "github.com/loft-sh/vcluster/pkg/patches/regex" + "github.com/loft-sh/vcluster/pkg/util/translate" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type exportPatcher struct { + config *vclusterconfig.Export + gvk schema.GroupVersionKind +} + +var _ ObjectPatcher = &exportPatcher{} + +func (e *exportPatcher) ServerSideApply(_ context.Context, fromObj, destObj, sourceObj client.Object) error { + return patches.ApplyPatches(destObj, sourceObj, e.config.Patches, e.config.ReversePatches, &virtualToHostNameResolver{ + namespace: fromObj.GetNamespace(), + targetNamespace: translate.Default.PhysicalNamespace(fromObj.GetNamespace()), + }) +} + +func (e *exportPatcher) ReverseUpdate(_ context.Context, destObj, sourceObj client.Object) error { + return patches.ApplyPatches(destObj, sourceObj, e.config.ReversePatches, nil, &hostToVirtualNameResolver{ + gvk: e.gvk, + pObj: sourceObj, + }) +} + +type virtualToHostNameResolver struct { + namespace string + targetNamespace string +} + +func (r *virtualToHostNameResolver) TranslateName(name string, regex *regexp.Regexp, _ string) (string, error) { + return r.TranslateNameWithNamespace(name, r.namespace, regex, "") +} + +func (r *virtualToHostNameResolver) TranslateNameWithNamespace(name string, namespace string, regex *regexp.Regexp, _ string) (string, error) { + if regex != nil { + return patchesregex.ProcessRegex(regex, name, func(name, ns string) types.NamespacedName { + // if the regex match doesn't contain namespace - use the namespace set in this resolver + if ns == "" { + ns = namespace + } + + return types.NamespacedName{ + Namespace: translate.Default.PhysicalNamespace(namespace), + Name: translate.Default.PhysicalName(name, ns), + } + }), nil + } + + return translate.Default.PhysicalName(name, namespace), nil +} + +func (r *virtualToHostNameResolver) TranslateLabelExpressionsSelector(selector *metav1.LabelSelector) (*metav1.LabelSelector, error) { + return translate.Default.TranslateLabelSelectorCluster(selector), nil +} + +func (r *virtualToHostNameResolver) TranslateLabelKey(key string) (string, error) { + return translate.Default.ConvertLabelKey(key), nil +} + +func (r *virtualToHostNameResolver) TranslateLabelSelector(selector map[string]string) (map[string]string, error) { + labelSelector := &metav1.LabelSelector{ + MatchLabels: selector, + } + + return metav1.LabelSelectorAsMap( + translate.Default.TranslateLabelSelector(labelSelector)) +} + +func (r *virtualToHostNameResolver) TranslateNamespaceRef(namespace string) (string, error) { + return translate.Default.PhysicalNamespace(namespace), nil +} + +func validateExportConfig(config *vclusterconfig.Export) error { + for _, p := range append(config.Patches, config.ReversePatches...) { + if p.Regex != "" { + parsed, err := patchesregex.PrepareRegex(p.Regex) + if err != nil { + return fmt.Errorf("invalid Regex: %w", err) + } + p.ParsedRegex = parsed + } + } + return nil +} + +type hostToVirtualNameResolver struct { + pObj client.Object + gvk schema.GroupVersionKind +} + +func (r *hostToVirtualNameResolver) TranslateName(string, *regexp.Regexp, string) (string, error) { + return "", fmt.Errorf("translation not supported from host to virtual object") +} + +func (r *hostToVirtualNameResolver) TranslateNameWithNamespace(string, string, *regexp.Regexp, string) (string, error) { + return "", fmt.Errorf("translation not supported from host to virtual object") +} + +func (r *hostToVirtualNameResolver) TranslateLabelKey(string) (string, error) { + return "", fmt.Errorf("translation not supported from host to virtual object") +} + +func (r *hostToVirtualNameResolver) TranslateLabelExpressionsSelector(*metav1.LabelSelector) (*metav1.LabelSelector, error) { + return nil, fmt.Errorf("translation not supported from host to virtual object") +} + +func (r *hostToVirtualNameResolver) TranslateLabelSelector(map[string]string) (map[string]string, error) { + return nil, fmt.Errorf("translation not supported from host to virtual object") +} + +func (r *hostToVirtualNameResolver) TranslateNamespaceRef(string) (string, error) { + return "", fmt.Errorf("translation not supported from host to virtual object") +} diff --git a/pkg/controllers/generic/export_syncer.go b/pkg/controllers/generic/export_syncer.go index d7cc17ba4..c00bc844c 100644 --- a/pkg/controllers/generic/export_syncer.go +++ b/pkg/controllers/generic/export_syncer.go @@ -3,7 +3,6 @@ package generic import ( "context" "fmt" - "regexp" "strings" "time" @@ -17,7 +16,6 @@ import ( "github.com/loft-sh/vcluster/pkg/controllers/syncer" synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" "github.com/loft-sh/vcluster/pkg/controllers/syncer/translator" - patchesregex "github.com/loft-sh/vcluster/pkg/patches/regex" syncertypes "github.com/loft-sh/vcluster/pkg/types" util "github.com/loft-sh/vcluster/pkg/util/context" "github.com/loft-sh/vcluster/pkg/util/translate" @@ -37,26 +35,21 @@ func CreateExporters(ctx *config.ControllerContext) error { if len(exporterConfig.Exports) == 0 { return nil } - - scheme := ctx.LocalManager.GetScheme() registerCtx := util.ToRegisterContext(ctx) for _, exportConfig := range exporterConfig.Exports { - gvk := schema.FromAPIVersionAndKind(exportConfig.APIVersion, exportConfig.Kind) - if !scheme.Recognizes(gvk) { - _, _, err := translate.EnsureCRDFromPhysicalCluster( - registerCtx.Context, - registerCtx.PhysicalManager.GetConfig(), - registerCtx.VirtualManager.GetConfig(), - gvk) - if err != nil { - if exportConfig.Optional { - klog.Infof("error ensuring CRD %s(%s) from host cluster: %v. Skipping exportSyncer as resource is optional", exportConfig.Kind, exportConfig.APIVersion, err) - continue - } - - return fmt.Errorf("error creating %s(%s) syncer: %w", exportConfig.Kind, exportConfig.APIVersion, err) + _, hasStatusSubresource, err := translate.EnsureCRDFromPhysicalCluster( + registerCtx.Context, + registerCtx.PhysicalManager.GetConfig(), + registerCtx.VirtualManager.GetConfig(), + schema.FromAPIVersionAndKind(exportConfig.APIVersion, exportConfig.Kind)) + if err != nil { + if exportConfig.Optional { + klog.Infof("error ensuring CRD %s(%s) from host cluster: %v. Skipping exportSyncer as resource is optional", exportConfig.Kind, exportConfig.APIVersion, err) + continue } + + return fmt.Errorf("error creating %s(%s) syncer: %w", exportConfig.Kind, exportConfig.APIVersion, err) } reversePatches := []*vclusterconfig.Patch{ @@ -69,7 +62,7 @@ func CreateExporters(ctx *config.ControllerContext) error { reversePatches = append(reversePatches, exportConfig.ReversePatches...) exportConfig.ReversePatches = reversePatches - s, err := createExporter(registerCtx, exportConfig) + s, err := createExporterFromConfig(registerCtx, exportConfig, hasStatusSubresource) klog.Infof("creating exporter for %s/%s", exportConfig.APIVersion, exportConfig.Kind) if err != nil { return fmt.Errorf("error creating %s(%s) syncer: %w", exportConfig.Kind, exportConfig.APIVersion, err) @@ -85,7 +78,7 @@ func CreateExporters(ctx *config.ControllerContext) error { return nil } -func createExporter(ctx *synccontext.RegisterContext, config *vclusterconfig.Export) (syncertypes.Syncer, error) { +func createExporterFromConfig(ctx *synccontext.RegisterContext, config *vclusterconfig.Export, hasStatusSubresource bool) (syncertypes.Syncer, error) { obj := &unstructured.Unstructured{} obj.SetKind(config.Kind) obj.SetAPIVersion(config.APIVersion) @@ -103,34 +96,65 @@ func createExporter(ctx *synccontext.RegisterContext, config *vclusterconfig.Exp } } - statusIsSubresource := true - // TODO: [low priority] check if config.Kind + config.APIVersion has status subresource - gvk := schema.FromAPIVersionAndKind(config.APIVersion, config.Kind) controllerID := fmt.Sprintf("%s/%s/GenericExport", strings.ToLower(gvk.Kind), strings.ToLower(gvk.Group)) return &exporter{ - NamespacedTranslator: translator.NewNamespacedTranslator(ctx, controllerID, obj), - patcher: &patcher{ - fromClient: ctx.VirtualManager.GetClient(), - toClient: ctx.PhysicalManager.GetClient(), - statusIsSubresource: statusIsSubresource, - log: log.New(controllerID), + ObjectPatcher: &exportPatcher{ + config: config, + gvk: gvk, }, + NamespacedTranslator: translator.NewNamespacedTranslator(ctx, controllerID, obj), + + patcher: NewPatcher(ctx.VirtualManager.GetClient(), ctx.PhysicalManager.GetClient(), hasStatusSubresource, log.New(controllerID)), gvk: gvk, - config: config, selector: selector, name: controllerID, + + replaceWhenInvalid: config.ReplaceWhenInvalid, }, nil } +func BuildCustomExporter( + registerCtx *synccontext.RegisterContext, + controllerID string, + objectPatcher ObjectPatcher, + gvk schema.GroupVersionKind, + namespacedTranslator translator.NamespacedTranslator, + replaceWhenInvalid bool, +) (syncertypes.Object, error) { + _, hasStatusSubresource, err := translate.EnsureCRDFromPhysicalCluster( + registerCtx.Context, + registerCtx.PhysicalManager.GetConfig(), + registerCtx.VirtualManager.GetConfig(), + gvk) + if err != nil { + return nil, fmt.Errorf("error creating %s(%s) syncer: %w", gvk.Kind, gvk.GroupVersion().String(), err) + } + + return &exporter{ + ObjectPatcher: objectPatcher, + NamespacedTranslator: namespacedTranslator, + + patcher: NewPatcher(registerCtx.VirtualManager.GetClient(), registerCtx.PhysicalManager.GetClient(), hasStatusSubresource, log.New(controllerID)), + gvk: gvk, + name: controllerID, + + replaceWhenInvalid: replaceWhenInvalid, + }, nil +} + +var _ syncertypes.Syncer = &exporter{} + type exporter struct { translator.NamespacedTranslator + ObjectPatcher - patcher *patcher - gvk schema.GroupVersionKind - config *vclusterconfig.Export - selector labels.Selector - name string + patcher *Patcher + gvk schema.GroupVersionKind + selector labels.Selector + replaceWhenInvalid bool + + name string } func (f *exporter) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object) (ctrl.Result, error) { @@ -140,10 +164,8 @@ func (f *exporter) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object) } // apply object to physical cluster - ctx.Log.Infof("Create physical %s %s/%s, since it is missing, but virtual object exists", f.config.Kind, vObj.GetNamespace(), vObj.GetName()) - pObj, err := f.patcher.ApplyPatches(ctx.Context, vObj, nil, f.config.Patches, f.config.ReversePatches, func(vObj client.Object) (client.Object, error) { - return f.TranslateMetadata(ctx.Context, vObj), nil - }, &virtualToHostNameResolver{namespace: vObj.GetNamespace(), targetNamespace: translate.Default.PhysicalNamespace(vObj.GetNamespace())}) + ctx.Log.Infof("Create physical %s %s/%s, since it is missing, but virtual object exists", f.gvk.Kind, vObj.GetNamespace(), vObj.GetName()) + pObj, err := f.patcher.ApplyPatches(ctx.Context, vObj, nil, f) if kerrors.IsConflict(err) { return ctrl.Result{Requeue: true}, nil } @@ -178,12 +200,12 @@ func (f *exporter) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object) func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj client.Object) (ctrl.Result, error) { // check if virtual object is not matching anymore if !f.objectMatches(vObj) { - ctx.Log.Infof("delete physical %s %s/%s, because it is not used anymore", f.config.Kind, pObj.GetNamespace(), pObj.GetName()) + ctx.Log.Infof("delete physical %s %s/%s, because it is not used anymore", f.gvk.Kind, pObj.GetNamespace(), pObj.GetName()) err := ctx.PhysicalClient.Delete(ctx.Context, pObj, &client.DeleteOptions{ GracePeriodSeconds: &[]int64{0}[0], }) if err != nil { - ctx.Log.Infof("error deleting physical %s %s/%s in physical cluster: %v", f.config.Kind, pObj.GetNamespace(), pObj.GetName(), err) + ctx.Log.Infof("error deleting physical %s %s/%s in physical cluster: %v", f.gvk.Kind, pObj.GetNamespace(), pObj.GetName(), err) return ctrl.Result{}, err } @@ -198,7 +220,7 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c return ctrl.Result{}, err } } else if vObj.GetDeletionTimestamp() == nil { - ctx.Log.Infof("delete virtual object %s/%s, because physical object is being deleted", vObj.GetNamespace(), vObj.GetName()) + ctx.Log.Infof("delete virtual object %s/%s, because physical object %s/%s is being deleted", vObj.GetNamespace(), vObj.GetName(), pObj.GetNamespace(), pObj.GetName()) if err := ctx.VirtualClient.Delete(ctx.Context, vObj); err != nil { return ctrl.Result{}, nil } @@ -208,23 +230,20 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c } // apply reverse patches - result, err := f.patcher.ApplyReversePatches(ctx.Context, vObj, pObj, f.config.ReversePatches, &hostToVirtualNameResolver{ - gvk: f.gvk, - pObj: pObj, - }) + result, err := f.patcher.ApplyReversePatches(ctx.Context, vObj, pObj, f) if err != nil { if kerrors.IsConflict(err) { return ctrl.Result{Requeue: true}, nil } if kerrors.IsInvalid(err) { - ctx.Log.Infof("Warning: this message could indicate a timing issue with no significant impact, or a bug. Please report this if your resource never reaches the expected state. Error message: failed to patch virtual %s %s/%s: %v", f.config.Kind, vObj.GetNamespace(), vObj.GetName(), err) + ctx.Log.Infof("Warning: this message could indicate a timing issue with no significant impact, or a bug. Please report this if your resource never reaches the expected state. Error message: failed to patch virtual %s %s/%s: %v", f.gvk.Kind, vObj.GetNamespace(), vObj.GetName(), err) // this happens when some field is being removed shortly after being added, which suggest it's a timing issue // it doesn't seem to have any negative consequence besides the logged error message return ctrl.Result{Requeue: true}, nil } f.EventRecorder().Eventf(vObj, "Warning", "SyncError", "Error syncing to virtual cluster: %v", err) - return ctrl.Result{}, fmt.Errorf("failed to patch virtual %s %s/%s: %w", f.config.Kind, vObj.GetNamespace(), vObj.GetName(), err) + return ctrl.Result{}, fmt.Errorf("failed to patch virtual %s %s/%s: %w", f.gvk.Kind, vObj.GetNamespace(), vObj.GetName(), err) } else if result == controllerutil.OperationResultUpdated || result == controllerutil.OperationResultUpdatedStatus || result == controllerutil.OperationResultUpdatedStatusOnly { // a change will trigger reconciliation anyway, and at that point we can make // a more accurate updates(reverse patches) to the virtual resource @@ -232,15 +251,10 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c } // apply patches - _, err = f.patcher.ApplyPatches(ctx.Context, vObj, pObj, f.config.Patches, f.config.ReversePatches, func(vObj client.Object) (client.Object, error) { - return f.TranslateMetadata(ctx.Context, vObj), nil - }, &virtualToHostNameResolver{ - namespace: vObj.GetNamespace(), - targetNamespace: translate.Default.PhysicalNamespace(vObj.GetNamespace()), - }) + _, err = f.patcher.ApplyPatches(ctx.Context, vObj, pObj, f) if err != nil { // when invalid, auto delete and recreate to recover - if kerrors.IsInvalid(err) && f.config.ReplaceWhenInvalid { + if kerrors.IsInvalid(err) && f.replaceWhenInvalid { // Replace the object ctx.Log.Infof("Replace physical object, because apply failed: %v", err) err = ctx.PhysicalClient.Delete(ctx.Context, pObj, &client.DeleteOptions{ @@ -266,7 +280,10 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c var _ syncertypes.ToVirtualSyncer = &exporter{} func (f *exporter) SyncToVirtual(ctx *synccontext.SyncContext, pObj client.Object) (ctrl.Result, error) { - if !translate.Default.IsManaged(pObj) { + isManaged, err := f.NamespacedTranslator.IsManaged(ctx.Context, pObj) + if err != nil { + return ctrl.Result{}, err + } else if !isManaged { return ctrl.Result{}, nil } @@ -291,100 +308,6 @@ func (f *exporter) TranslateMetadata(ctx context.Context, vObj client.Object) cl return pObj } -func (f *exporter) IsManaged(_ context.Context, pObj client.Object) (bool, error) { - return translate.Default.IsManaged(pObj), nil -} - func (f *exporter) objectMatches(obj client.Object) bool { return f.selector == nil || f.selector.Matches(labels.Set(obj.GetLabels())) } - -type virtualToHostNameResolver struct { - namespace string - targetNamespace string -} - -func (r *virtualToHostNameResolver) TranslateName(name string, regex *regexp.Regexp, _ string) (string, error) { - return r.TranslateNameWithNamespace(name, r.namespace, regex, "") -} - -func (r *virtualToHostNameResolver) TranslateNameWithNamespace(name string, namespace string, regex *regexp.Regexp, _ string) (string, error) { - if regex != nil { - return patchesregex.ProcessRegex(regex, name, func(name, ns string) types.NamespacedName { - // if the regex match doesn't contain namespace - use the namespace set in this resolver - if ns == "" { - ns = namespace - } - - return types.NamespacedName{ - Namespace: translate.Default.PhysicalNamespace(namespace), - Name: translate.Default.PhysicalName(name, ns), - } - }), nil - } - - return translate.Default.PhysicalName(name, namespace), nil -} - -func (r *virtualToHostNameResolver) TranslateLabelExpressionsSelector(selector *metav1.LabelSelector) (*metav1.LabelSelector, error) { - return translate.Default.TranslateLabelSelectorCluster(selector), nil -} - -func (r *virtualToHostNameResolver) TranslateLabelKey(key string) (string, error) { - return translate.Default.ConvertLabelKey(key), nil -} - -func (r *virtualToHostNameResolver) TranslateLabelSelector(selector map[string]string) (map[string]string, error) { - labelSelector := &metav1.LabelSelector{ - MatchLabels: selector, - } - - return metav1.LabelSelectorAsMap( - translate.Default.TranslateLabelSelector(labelSelector)) -} - -func (r *virtualToHostNameResolver) TranslateNamespaceRef(namespace string) (string, error) { - return translate.Default.PhysicalNamespace(namespace), nil -} - -func validateExportConfig(config *vclusterconfig.Export) error { - for _, p := range append(config.Patches, config.ReversePatches...) { - if p.Regex != "" { - parsed, err := patchesregex.PrepareRegex(p.Regex) - if err != nil { - return fmt.Errorf("invalid Regex: %w", err) - } - p.ParsedRegex = parsed - } - } - return nil -} - -type hostToVirtualNameResolver struct { - pObj client.Object - gvk schema.GroupVersionKind -} - -func (r *hostToVirtualNameResolver) TranslateName(string, *regexp.Regexp, string) (string, error) { - return "", fmt.Errorf("translation not supported from host to virtual object") -} - -func (r *hostToVirtualNameResolver) TranslateNameWithNamespace(string, string, *regexp.Regexp, string) (string, error) { - return "", fmt.Errorf("translation not supported from host to virtual object") -} - -func (r *hostToVirtualNameResolver) TranslateLabelKey(string) (string, error) { - return "", fmt.Errorf("translation not supported from host to virtual object") -} - -func (r *hostToVirtualNameResolver) TranslateLabelExpressionsSelector(*metav1.LabelSelector) (*metav1.LabelSelector, error) { - return nil, fmt.Errorf("translation not supported from host to virtual object") -} - -func (r *hostToVirtualNameResolver) TranslateLabelSelector(map[string]string) (map[string]string, error) { - return nil, fmt.Errorf("translation not supported from host to virtual object") -} - -func (r *hostToVirtualNameResolver) TranslateNamespaceRef(string) (string, error) { - return "", fmt.Errorf("translation not supported from host to virtual object") -} diff --git a/pkg/controllers/generic/import_patcher.go b/pkg/controllers/generic/import_patcher.go new file mode 100644 index 000000000..0ddc28457 --- /dev/null +++ b/pkg/controllers/generic/import_patcher.go @@ -0,0 +1,63 @@ +package generic + +import ( + "context" + "regexp" + + vclusterconfig "github.com/loft-sh/vcluster/config" + "github.com/loft-sh/vcluster/pkg/constants" + "github.com/loft-sh/vcluster/pkg/patches" + "github.com/loft-sh/vcluster/pkg/util/clienthelper" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type importPatcher struct { + config *vclusterconfig.Import + virtualClient client.Client +} + +var _ ObjectPatcher = &importPatcher{} + +func (s *importPatcher) ServerSideApply(ctx context.Context, _, destObj, sourceObj client.Object) error { + return patches.ApplyPatches(destObj, sourceObj, s.config.Patches, s.config.ReversePatches, &hostToVirtualImportNameResolver{virtualClient: s.virtualClient, ctx: ctx}) +} + +func (s *importPatcher) ReverseUpdate(_ context.Context, destObj, sourceObj client.Object) error { + return patches.ApplyPatches(destObj, sourceObj, s.config.ReversePatches, nil, &virtualToHostNameResolver{namespace: sourceObj.GetNamespace()}) +} + +type hostToVirtualImportNameResolver struct { + virtualClient client.Client + ctx context.Context +} + +func (r *hostToVirtualImportNameResolver) TranslateName(name string, _ *regexp.Regexp, _ string) (string, error) { + return name, nil +} + +func (r *hostToVirtualImportNameResolver) TranslateNameWithNamespace(name string, _ string, _ *regexp.Regexp, _ string) (string, error) { + return name, nil +} + +func (r *hostToVirtualImportNameResolver) TranslateLabelKey(key string) (string, error) { + return key, nil +} + +func (r *hostToVirtualImportNameResolver) TranslateLabelExpressionsSelector(selector *metav1.LabelSelector) (*metav1.LabelSelector, error) { + return selector, nil +} + +func (r *hostToVirtualImportNameResolver) TranslateLabelSelector(selector map[string]string) (map[string]string, error) { + return selector, nil +} + +func (r *hostToVirtualImportNameResolver) TranslateNamespaceRef(namespace string) (string, error) { + vNamespace := (&corev1.Namespace{}).DeepCopyObject().(client.Object) + err := clienthelper.GetByIndex(r.ctx, r.virtualClient, vNamespace, constants.IndexByPhysicalName, namespace) + if err != nil { + return "", err + } + return vNamespace.GetName(), nil +} diff --git a/pkg/controllers/generic/import_syncer.go b/pkg/controllers/generic/import_syncer.go index 418f6a528..5970ae43f 100644 --- a/pkg/controllers/generic/import_syncer.go +++ b/pkg/controllers/generic/import_syncer.go @@ -3,7 +3,6 @@ package generic import ( "context" "fmt" - "regexp" "strings" "time" @@ -15,7 +14,6 @@ import ( "github.com/loft-sh/vcluster/pkg/constants" "github.com/loft-sh/vcluster/pkg/controllers/syncer" synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" - "github.com/loft-sh/vcluster/pkg/controllers/syncer/translator" "github.com/loft-sh/vcluster/pkg/log" syncertypes "github.com/loft-sh/vcluster/pkg/types" "github.com/loft-sh/vcluster/pkg/util/clienthelper" @@ -24,7 +22,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" kerrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -33,6 +30,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) +type HostToVirtual func(ctx context.Context, req types.NamespacedName, pObj client.Object) types.NamespacedName + +type VirtualToHost func(ctx context.Context, req types.NamespacedName, vObj client.Object) types.NamespacedName + func CreateImporters(ctx *config.ControllerContext) error { cfg := ctx.Config.Experimental.GenericSync if len(cfg.Imports) == 0 { @@ -44,7 +45,6 @@ func CreateImporters(ctx *config.ControllerContext) error { return fmt.Errorf("invalid configuration, 'import' type sync of the generic CRDs is allowed only in the multi-namespace mode") } - gvkRegister := make(GVKRegister) for _, importConfig := range cfg.Imports { gvk := schema.FromAPIVersionAndKind(importConfig.APIVersion, importConfig.Kind) @@ -64,12 +64,7 @@ func CreateImporters(ctx *config.ControllerContext) error { return fmt.Errorf("error syncronizing CRD %s(%s) from the host cluster into vcluster: %w", importConfig.Kind, importConfig.APIVersion, err) } - gvkRegister[gvk] = &GVKScopeAndSubresource{ - IsClusterScoped: isClusterScoped, - HasStatusSubresource: hasStatusSubresource, - } - - s, err := createImporter(registerCtx, importConfig, gvkRegister) + s, err := createImporter(registerCtx, importConfig, isClusterScoped, hasStatusSubresource) klog.Infof("creating importer for %s/%s", importConfig.APIVersion, importConfig.Kind) if err != nil { return fmt.Errorf("error creating %s(%s) syncer: %w", importConfig.Kind, importConfig.APIVersion, err) @@ -85,39 +80,81 @@ func CreateImporters(ctx *config.ControllerContext) error { return nil } -func createImporter(ctx *synccontext.RegisterContext, config *vclusterconfig.Import, gvkRegister GVKRegister) (syncertypes.Syncer, error) { +func createImporter(ctx *synccontext.RegisterContext, config *vclusterconfig.Import, isClusterScoped, hasStatusSubresource bool) (syncertypes.Syncer, error) { gvk := schema.FromAPIVersionAndKind(config.APIVersion, config.Kind) controllerID := fmt.Sprintf("%s/%s/GenericImport", strings.ToLower(gvk.Kind), strings.ToLower(gvk.GroupVersion().String())) + return &importer{ + ObjectPatcher: &importPatcher{ + config: config, + virtualClient: ctx.VirtualManager.GetClient(), + }, - syncerOptions := &syncertypes.Options{ - DisableUIDDeletion: true, - } + patcher: NewPatcher(ctx.PhysicalManager.GetClient(), ctx.VirtualManager.GetClient(), hasStatusSubresource, log.New(controllerID)), + gvk: gvk, + + replaceWhenInvalid: config.ReplaceWhenInvalid, - if scopeAndSubresource, ok := gvkRegister[gvk]; ok { - syncerOptions.IsClusterScopedCRD = scopeAndSubresource.IsClusterScoped - syncerOptions.HasStatusSubresource = scopeAndSubresource.HasStatusSubresource + virtualClient: ctx.VirtualManager.GetClient(), + + name: controllerID, + syncerOptions: &syncertypes.Options{ + DisableUIDDeletion: true, + IsClusterScopedCRD: isClusterScoped, + HasStatusSubresource: hasStatusSubresource, + }, + }, nil +} + +func BuildCustomImporter( + registerCtx *synccontext.RegisterContext, + controllerID string, + objectPatcher ObjectPatcher, + hostToVirtual HostToVirtual, + virtualToHost VirtualToHost, + gvk schema.GroupVersionKind, + replaceWhenInvalid bool, +) (syncertypes.Object, error) { + isClusterScoped, hasStatusSubresource, err := translate.EnsureCRDFromPhysicalCluster( + registerCtx.Context, + registerCtx.PhysicalManager.GetConfig(), + registerCtx.VirtualManager.GetConfig(), + gvk) + if err != nil { + return nil, fmt.Errorf("error creating %s(%s) syncer: %w", gvk.Kind, gvk.GroupVersion().String(), err) } return &importer{ - patcher: &patcher{ - fromClient: ctx.PhysicalManager.GetClient(), - toClient: ctx.VirtualManager.GetClient(), - statusIsSubresource: syncerOptions.HasStatusSubresource, - log: log.New(controllerID), + ObjectPatcher: objectPatcher, + + hostToVirtual: hostToVirtual, + virtualToHost: virtualToHost, + + patcher: NewPatcher(registerCtx.PhysicalManager.GetClient(), registerCtx.VirtualManager.GetClient(), hasStatusSubresource, log.New(controllerID)), + gvk: gvk, + name: controllerID, + syncerOptions: &syncertypes.Options{ + DisableUIDDeletion: true, + IsClusterScopedCRD: isClusterScoped, + HasStatusSubresource: hasStatusSubresource, }, - gvk: gvk, - config: config, - virtualClient: ctx.VirtualManager.GetClient(), - name: controllerID, - syncerOptions: syncerOptions, + + virtualClient: registerCtx.VirtualManager.GetClient(), + + replaceWhenInvalid: replaceWhenInvalid, }, nil } type importer struct { - translator.Translator + ObjectPatcher + + hostToVirtual HostToVirtual + virtualToHost VirtualToHost + + patcher *Patcher + replaceWhenInvalid bool + virtualClient client.Client - patcher *patcher - config *vclusterconfig.Import + syncerOptions *syncertypes.Options gvk schema.GroupVersionKind name string @@ -125,8 +162,8 @@ type importer struct { func (s *importer) Resource() client.Object { obj := &unstructured.Unstructured{} - obj.SetKind(s.config.Kind) - obj.SetAPIVersion(s.config.APIVersion) + obj.SetKind(s.gvk.Kind) + obj.SetAPIVersion(s.gvk.GroupVersion().String()) return obj } @@ -140,75 +177,36 @@ func (s *importer) WithOptions() *syncertypes.Options { return s.syncerOptions } -var _ syncertypes.ObjectExcluder = &importer{} - -func (s *importer) ExcludeVirtual(vObj client.Object) bool { - return s.excludeObject(vObj) -} - -func (s *importer) ExcludePhysical(pObj client.Object) bool { - return s.excludeObject(pObj) -} - -func (s *importer) excludeObject(obj client.Object) bool { - // check if back sync is disabled eg. for service account token secrets - if obj.GetAnnotations() != nil && - obj.GetAnnotations()[translate.SkipBackSyncInMultiNamespaceMode] == "true" { - return true - } - - if obj.GetLabels() != nil && - obj.GetLabels()[translate.ControllerLabel] != "" { - return true - } - if obj.GetAnnotations() != nil && - obj.GetAnnotations()[translate.ControllerLabel] != "" && - obj.GetAnnotations()[translate.ControllerLabel] != s.Name() { - // make sure kind matches - splitted := strings.Split(obj.GetAnnotations()[translate.ControllerLabel], "/") - if len(splitted) != 3 { - return true - } else if splitted[0] != strings.ToLower(obj.GetObjectKind().GroupVersionKind().Kind) || splitted[1] != strings.ToLower(obj.GetObjectKind().GroupVersionKind().Group) { - return false - } - - return true - } - return false -} - var _ syncertypes.ToVirtualSyncer = &importer{} func (s *importer) SyncToVirtual(ctx *synccontext.SyncContext, pObj client.Object) (ctrl.Result, error) { // check if annotation is already present - if pObj.GetAnnotations() != nil { - if pObj.GetAnnotations()[translate.ControllerLabel] == s.Name() && - !s.syncerOptions.IsClusterScopedCRD { // only delete pObj if its not cluster scoped - err := ctx.PhysicalClient.Delete(ctx.Context, pObj) - if err != nil && !kerrors.IsNotFound(err) { - return ctrl.Result{}, err - } - return ctrl.Result{}, nil + pAnnotations := pObj.GetAnnotations() + if pAnnotations != nil && pAnnotations[translate.ControllerLabel] == s.Name() && !s.syncerOptions.IsClusterScopedCRD { // only delete pObj if its not cluster scoped + ctx.Log.Infof("Delete physical %s %s/%s, since virtual is missing, but physical object was already synced", s.gvk.Kind, pObj.GetNamespace(), pObj.GetName()) + err := ctx.PhysicalClient.Delete(ctx.Context, pObj) + if err != nil && !kerrors.IsNotFound(err) { + return ctrl.Result{}, err } - } - // add annotation to physical resource to mark it as controlled by this syncer - err := s.addAnnotationsToPhysicalObject(ctx, pObj) - if err != nil { - return ctrl.Result{}, err + return ctrl.Result{}, nil } // apply object to virtual cluster - ctx.Log.Infof("Create virtual %s %s/%s, since it is missing, but physical object exists", s.config.Kind, pObj.GetNamespace(), pObj.GetName()) - vObj, err := s.patcher.ApplyPatches(ctx.Context, pObj, nil, s.config.Patches, s.config.ReversePatches, func(vObj client.Object) (client.Object, error) { - return s.TranslateMetadata(ctx.Context, vObj), nil - }, &hostToVirtualImportNameResolver{virtualClient: s.virtualClient, ctx: ctx.Context}) + ctx.Log.Infof("Create virtual %s, since it is missing, but physical object %s/%s exists", s.gvk.Kind, pObj.GetNamespace(), pObj.GetName()) + vObj, err := s.patcher.ApplyPatches(ctx.Context, pObj, nil, s) if err != nil { // TODO: add eventRecorder? // s.EventRecorder().Eventf(vObj, "Warning", "SyncError", "Error syncing to virtual cluster: %v", err) return ctrl.Result{}, fmt.Errorf("error applying patches: %w", err) } + // add annotation to physical resource to mark it as controlled by this syncer + err = s.addAnnotationsToPhysicalObject(ctx, pObj, vObj) + if err != nil { + return ctrl.Result{}, err + } + // wait here for vObj to be created err = wait.PollUntilContextTimeout(ctx.Context, time.Millisecond*10, time.Second, true, func(syncContext context.Context) (done bool, err error) { err = ctx.VirtualClient.Get(syncContext, types.NamespacedName{ @@ -236,13 +234,13 @@ var _ syncertypes.Syncer = &importer{} func (s *importer) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object) (ctrl.Result, error) { // ignore all virtual resources that were not created by this controller - if !s.IsVirtualManaged(vObj) { + if !s.isVirtualManaged(vObj) { return ctrl.Result{}, nil } // should we delete the object? if vObj.GetDeletionTimestamp() == nil { - ctx.Log.Infof("remove virtual %s %s/%s, because object should get deleted", s.config.Kind, vObj.GetNamespace(), vObj.GetName()) + ctx.Log.Infof("remove virtual %s %s/%s, because object should get deleted", s.gvk.Kind, vObj.GetNamespace(), vObj.GetName()) return ctrl.Result{}, ctx.VirtualClient.Delete(ctx.Context, vObj) } @@ -250,7 +248,7 @@ func (s *importer) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object) if len(vObj.GetFinalizers()) > 0 { // delete the finalizer here so that the object can be deleted vObj.SetFinalizers([]string{}) - ctx.Log.Infof("remove virtual %s %s/%s finalizers, because object should get deleted", s.config.Kind, vObj.GetNamespace(), vObj.GetName()) + ctx.Log.Infof("remove virtual %s %s/%s finalizers, because object should get deleted", s.gvk.Kind, vObj.GetNamespace(), vObj.GetName()) return ctrl.Result{}, ctx.VirtualClient.Update(ctx.Context, vObj) } @@ -291,16 +289,16 @@ func (s *importer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c } // execute reverse patches - result, err := s.patcher.ApplyReversePatches(ctx.Context, pObj, vObj, s.config.ReversePatches, &virtualToHostNameResolver{namespace: vObj.GetNamespace()}) + result, err := s.patcher.ApplyReversePatches(ctx.Context, pObj, vObj, s) if err != nil { if kerrors.IsInvalid(err) { - ctx.Log.Infof("Warning: this message could indicate a timing issue with no significant impact, or a bug. Please report this if your resource never reaches the expected state. Error message: failed to patch virtual %s %s/%s: %v", s.config.Kind, vObj.GetNamespace(), vObj.GetName(), err) + ctx.Log.Infof("Warning: this message could indicate a timing issue with no significant impact, or a bug. Please report this if your resource never reaches the expected state. Error message: failed to patch virtual %s %s/%s: %v", s.gvk.Kind, vObj.GetNamespace(), vObj.GetName(), err) // this happens when some field is being removed shortly after being added, which suggest it's a timing issue // it doesn't seem to have any negative consequence besides the logged error message return ctrl.Result{Requeue: true}, nil } - return ctrl.Result{}, fmt.Errorf("failed to apply reverse patch on physical %s %s/%s: %w", s.config.Kind, vObj.GetNamespace(), vObj.GetName(), err) + return ctrl.Result{}, fmt.Errorf("failed to apply reverse patch on physical %s %s/%s: %w", s.gvk.Kind, vObj.GetNamespace(), vObj.GetName(), err) } else if result == controllerutil.OperationResultUpdated || result == controllerutil.OperationResultUpdatedStatus || result == controllerutil.OperationResultUpdatedStatusOnly { // a change will trigger reconciliation anyway, and at that point we can make // a more accurate updates(reverse patches) to the virtual resource @@ -308,12 +306,10 @@ func (s *importer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c } // apply patches - _, err = s.patcher.ApplyPatches(ctx.Context, pObj, vObj, s.config.Patches, s.config.ReversePatches, func(vObj client.Object) (client.Object, error) { - return s.TranslateMetadata(ctx.Context, vObj), nil - }, &hostToVirtualImportNameResolver{virtualClient: s.virtualClient, ctx: ctx.Context}) + _, err = s.patcher.ApplyPatches(ctx.Context, pObj, vObj, s) if err != nil { // when invalid, auto delete and recreate to recover - if kerrors.IsInvalid(err) && s.config.ReplaceWhenInvalid { + if kerrors.IsInvalid(err) && s.replaceWhenInvalid { // Replace the object ctx.Log.Infof("Replace virtual object, because of apply failed: %v", err) err = ctx.VirtualClient.Delete(ctx.Context, vObj, &client.DeleteOptions{ @@ -333,45 +329,101 @@ func (s *importer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c } // ensure that annotation on physical resource to mark it as controlled by this syncer is present - return ctrl.Result{}, s.addAnnotationsToPhysicalObject(ctx, pObj) + return ctrl.Result{}, s.addAnnotationsToPhysicalObject(ctx, pObj, vObj) +} + +var _ syncertypes.ObjectExcluder = &importer{} + +func (s *importer) ExcludeVirtual(vObj client.Object) bool { + return s.excludeObject(vObj) +} + +func (s *importer) ExcludePhysical(pObj client.Object) bool { + return s.excludeObject(pObj) +} + +func (s *importer) excludeObject(obj client.Object) bool { + // check if back sync is disabled eg. for service account token secrets + if obj.GetAnnotations() != nil && obj.GetAnnotations()[translate.SkipBackSyncInMultiNamespaceMode] == "true" { + return true + } + if obj.GetLabels() != nil && obj.GetLabels()[translate.ControllerLabel] != "" { + return true + } + if obj.GetAnnotations() != nil && obj.GetAnnotations()[translate.ControllerLabel] != "" && obj.GetAnnotations()[translate.ControllerLabel] != s.Name() { + // make sure kind matches + splitted := strings.Split(obj.GetAnnotations()[translate.ControllerLabel], "/") + if len(splitted) != 3 { + return true + } else if splitted[0] != strings.ToLower(obj.GetObjectKind().GroupVersionKind().Kind) || splitted[1] != strings.ToLower(obj.GetObjectKind().GroupVersionKind().Group) { + return false + } + + return true + } + + return false +} + +func (s *importer) isVirtualManaged(vObj client.Object) bool { + return vObj.GetAnnotations() != nil && vObj.GetAnnotations()[translate.ControllerLabel] != "" && vObj.GetAnnotations()[translate.ControllerLabel] == s.Name() } func (s *importer) IsManaged(_ context.Context, pObj client.Object) (bool, error) { if s.syncerOptions.IsClusterScopedCRD { return true, nil } - if s.excludeObject(pObj) { return false, nil } // check if the pObj belong to a namespace managed by this vcluster - // and that it is not managed by a non-generic syncer - return translate.Default.IsTargetedNamespace(pObj.GetNamespace()) && !translate.Default.IsManaged(pObj), nil -} + if !translate.Default.IsTargetedNamespace(pObj.GetNamespace()) { + return false, nil + } -func (s *importer) IsVirtualManaged(vObj client.Object) bool { - return vObj.GetAnnotations() != nil && vObj.GetAnnotations()[translate.ControllerLabel] != "" && vObj.GetAnnotations()[translate.ControllerLabel] == s.Name() + // check that it is not managed by a non-generic syncer + annotations := pObj.GetAnnotations() + if annotations != nil && annotations[translate.ControllerLabel] == "" && annotations[translate.NameAnnotation] != "" { + return false, nil + } + + return true, nil } -func (s *importer) VirtualToHost(_ context.Context, req types.NamespacedName, _ client.Object) types.NamespacedName { +func (s *importer) VirtualToHost(ctx context.Context, req types.NamespacedName, vObj client.Object) types.NamespacedName { + if s.virtualToHost != nil { + return s.virtualToHost(ctx, req, vObj) + } + return types.NamespacedName{Name: translate.Default.PhysicalName(req.Name, req.Namespace), Namespace: translate.Default.PhysicalNamespace(req.Namespace)} } -func (s *importer) HostToVirtual(ctx context.Context, req types.NamespacedName, _ client.Object) types.NamespacedName { +func (s *importer) HostToVirtual(ctx context.Context, req types.NamespacedName, pObj client.Object) types.NamespacedName { if s.syncerOptions.IsClusterScopedCRD { return types.NamespacedName{ Name: req.Name, } } - vNamespace := (&corev1.Namespace{}).DeepCopyObject().(client.Object) - err := clienthelper.GetByIndex(ctx, s.virtualClient, vNamespace, constants.IndexByPhysicalName, req.Namespace) - if err != nil { - return types.NamespacedName{} + // in multi-namespace mode we just query the target namespace + if !translate.Default.SingleNamespaceTarget() { + vNamespace := &corev1.Namespace{} + err := clienthelper.GetByIndex(ctx, s.virtualClient, vNamespace, constants.IndexByPhysicalName, req.Namespace) + if err != nil { + return types.NamespacedName{} + } + + return types.NamespacedName{Name: req.Name, Namespace: vNamespace.GetName()} + } + + // this is a little bit more tricky + // check if we made annotations already + if pObj != nil && pObj.GetAnnotations() != nil && pObj.GetAnnotations()[translate.NameAnnotation] != "" && pObj.GetAnnotations()[translate.NamespaceAnnotation] != "" { + return types.NamespacedName{Name: pObj.GetAnnotations()[translate.NameAnnotation], Namespace: pObj.GetAnnotations()[translate.NamespaceAnnotation]} } - return types.NamespacedName{Name: req.Name, Namespace: vNamespace.GetName()} + return s.hostToVirtual(ctx, req, pObj) } func (s *importer) TranslateMetadata(ctx context.Context, pObj client.Object) client.Object { @@ -385,13 +437,12 @@ func (s *importer) TranslateMetadata(ctx context.Context, pObj client.Object) cl nn := s.HostToVirtual(ctx, types.NamespacedName{Name: pObj.GetName(), Namespace: pObj.GetNamespace()}, pObj) vObj.SetName(nn.Name) vObj.SetNamespace(nn.Namespace) - return vObj } // TranslateMetadataUpdate translates the object's metadata annotations and labels and determines // if they have changed between the physical and virtual object -func (s *importer) TranslateMetadataUpdate(vObj client.Object, pObj client.Object) (changed bool, annotations map[string]string, labels map[string]string) { +func (s *importer) TranslateMetadataUpdate(_ context.Context, vObj client.Object, pObj client.Object) (changed bool, annotations map[string]string, labels map[string]string) { updatedAnnotations := s.updateVirtualAnnotations(pObj.GetAnnotations()) updatedLabels := pObj.GetLabels() return !equality.Semantic.DeepEqual(updatedAnnotations, vObj.GetAnnotations()) || !equality.Semantic.DeepEqual(updatedLabels, vObj.GetLabels()), updatedAnnotations, updatedLabels @@ -404,12 +455,13 @@ func (s *importer) updateVirtualAnnotations(a map[string]string) map[string]stri a[translate.ControllerLabel] = s.Name() delete(a, translate.NameAnnotation) + delete(a, translate.NamespaceAnnotation) delete(a, translate.UIDAnnotation) delete(a, corev1.LastAppliedConfigAnnotation) return a } -func (s *importer) addAnnotationsToPhysicalObject(ctx *synccontext.SyncContext, pObj client.Object) error { +func (s *importer) addAnnotationsToPhysicalObject(ctx *synccontext.SyncContext, pObj client.Object, vObj client.Object) error { if s.syncerOptions.IsClusterScopedCRD { // do not add annotations to physical object return nil @@ -420,6 +472,9 @@ func (s *importer) addAnnotationsToPhysicalObject(ctx *synccontext.SyncContext, if annotations == nil { annotations = map[string]string{} } + annotations[translate.NameAnnotation] = vObj.GetName() + annotations[translate.NamespaceAnnotation] = vObj.GetNamespace() + annotations[translate.UIDAnnotation] = string(vObj.GetUID()) annotations[translate.ControllerLabel] = s.Name() pObj.SetAnnotations(annotations) @@ -431,40 +486,6 @@ func (s *importer) addAnnotationsToPhysicalObject(ctx *synccontext.SyncContext, return nil } - ctx.Log.Infof("Patch controlled-by annotation on %s %s/%s", s.config.Kind, pObj.GetNamespace(), pObj.GetName()) + ctx.Log.Infof("Patch controlled-by annotation on %s %s/%s", s.gvk.Kind, pObj.GetNamespace(), pObj.GetName()) return ctx.PhysicalClient.Patch(ctx.Context, pObj, patch) } - -type hostToVirtualImportNameResolver struct { - virtualClient client.Client - ctx context.Context -} - -func (r *hostToVirtualImportNameResolver) TranslateName(name string, _ *regexp.Regexp, _ string) (string, error) { - return name, nil -} - -func (r *hostToVirtualImportNameResolver) TranslateNameWithNamespace(name string, _ string, _ *regexp.Regexp, _ string) (string, error) { - return name, nil -} - -func (r *hostToVirtualImportNameResolver) TranslateLabelKey(key string) (string, error) { - return key, nil -} - -func (r *hostToVirtualImportNameResolver) TranslateLabelExpressionsSelector(selector *metav1.LabelSelector) (*metav1.LabelSelector, error) { - return selector, nil -} - -func (r *hostToVirtualImportNameResolver) TranslateLabelSelector(selector map[string]string) (map[string]string, error) { - return selector, nil -} - -func (r *hostToVirtualImportNameResolver) TranslateNamespaceRef(namespace string) (string, error) { - vNamespace := (&corev1.Namespace{}).DeepCopyObject().(client.Object) - err := clienthelper.GetByIndex(r.ctx, r.virtualClient, vNamespace, constants.IndexByPhysicalName, namespace) - if err != nil { - return "", err - } - return vNamespace.GetName(), nil -} diff --git a/pkg/controllers/generic/patcher.go b/pkg/controllers/generic/patcher.go index 07521f910..dc284bbbc 100644 --- a/pkg/controllers/generic/patcher.go +++ b/pkg/controllers/generic/patcher.go @@ -4,9 +4,8 @@ import ( "context" "fmt" - "github.com/loft-sh/vcluster/config" + "github.com/loft-sh/vcluster/pkg/controllers/syncer/translator" "github.com/loft-sh/vcluster/pkg/log" - "github.com/loft-sh/vcluster/pkg/patches" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -18,19 +17,68 @@ import ( var fieldManager = "vcluster-syncer" -type patcher struct { +type ObjectPatcherAndMetadataTranslator interface { + translator.MetadataTranslator + ObjectPatcher +} + +var ErrNoUpdateNeeded = errors.New("no update needed") + +// ObjectPatcher is the heart of the export and import syncers. The following functions are executed based on the lifecycle: +// During Creation: +// * ServerSideApply with nil existingOtherObj +// During Update: +// * ReverseUpdate +// * ServerSideApply +type ObjectPatcher interface { + // ServerSideApply applies the translated object into the target cluster (either host or virtual), which + // was built from originalObj. There might be also an existingOtherObj which was server side applied before, which + // is not guaranteed to exist as this function is called during creation as well. + // + // For export syncers: + // * originalObj is the virtual object + // * translatedObj is the translated virtual object to host (rewritten metadata) + // * existingOtherObj is the existing host object (can be nil if there is none yet) + // + // For import syncers: + // * originalObj is the host object + // * translatedObj is the translated host object to virtual (rewritten metadata) + // * existingOtherObj is the existing virtual object (can be nil if there is none yet) + ServerSideApply(ctx context.Context, originalObj, translatedObj, existingOtherObj client.Object) error + + // ReverseUpdate updates the destObj before running ServerSideApply. This can be useful to sync back + // certain fields. Be careful that everything synced through this function **needs** to be excluded in + // the ServerSideApply function. Both objects are guaranteed to exist for this function. Users can use + // ErrNoUpdateNeeded to skip reverse update. + // + // For export syncers: + // * destObj is the virtual object + // * sourceObj is the host object + // + // For import syncers: + // * destObj is the host object + // * sourceObj is the virtual object + ReverseUpdate(ctx context.Context, destObj, sourceObj client.Object) error +} + +func NewPatcher(fromClient, toClient client.Client, statusIsSubresource bool, log log.Logger) *Patcher { + return &Patcher{ + fromClient: fromClient, + toClient: toClient, + log: log, + statusIsSubresource: statusIsSubresource, + } +} + +type Patcher struct { fromClient client.Client toClient client.Client log log.Logger statusIsSubresource bool } -func (s *patcher) ApplyPatches(ctx context.Context, fromObj, toObj client.Object, patchesConfig, reversePatchesConfig []*config.Patch, translateMetadata func(vObj client.Object) (client.Object, error), nameResolver patches.NameResolver) (client.Object, error) { - translatedObject, err := translateMetadata(fromObj) - if err != nil { - return nil, errors.Wrap(err, "translate object") - } - +func (s *Patcher) ApplyPatches(ctx context.Context, fromObj, toObj client.Object, modifier ObjectPatcherAndMetadataTranslator) (client.Object, error) { + translatedObject := modifier.TranslateMetadata(ctx, fromObj) toObjBase, err := toUnstructured(translatedObject) if err != nil { return nil, err @@ -38,8 +86,12 @@ func (s *patcher) ApplyPatches(ctx context.Context, fromObj, toObj client.Object toObjCopied := toObjBase.DeepCopy() // apply patches on from object - err = patches.ApplyPatches(toObjCopied, toObj, patchesConfig, reversePatchesConfig, nameResolver) + err = modifier.ServerSideApply(ctx, fromObj, toObjCopied, toObj) if err != nil { + if toObj != nil && errors.Is(err, ErrNoUpdateNeeded) { + return nil, nil + } + return nil, fmt.Errorf("error applying patches: %w", err) } @@ -52,7 +104,7 @@ func (s *patcher) ApplyPatches(ctx context.Context, fromObj, toObj client.Object // always apply status if it's there if hasAfterStatus { - s.log.Infof("Apply status of %s during patching", toObjCopied.GetName()) + s.log.Infof("Server side apply status of %s", toObjCopied.GetName()) o := &client.SubResourcePatchOptions{PatchOptions: client.PatchOptions{FieldManager: fieldManager, Force: ptr.To(true)}} err = s.toClient.Status().Patch(ctx, toObjCopied.DeepCopy(), client.Apply, o) if err != nil { @@ -66,7 +118,7 @@ func (s *patcher) ApplyPatches(ctx context.Context, fromObj, toObj client.Object } // always apply object - s.log.Infof("Apply %s during patching", toObjCopied.GetName()) + s.log.Infof("Server side apply %s", toObjCopied.GetName()) outObject := toObjCopied.DeepCopy() err = s.toClient.Patch(ctx, outObject, client.Apply, client.ForceOwnership, client.FieldOwner(fieldManager)) if err != nil { @@ -76,7 +128,7 @@ func (s *patcher) ApplyPatches(ctx context.Context, fromObj, toObj client.Object return outObject, nil } -func (s *patcher) ApplyReversePatches(ctx context.Context, fromObj, otherObj client.Object, reversePatchConfig []*config.Patch, nameResolver patches.NameResolver) (controllerutil.OperationResult, error) { +func (s *Patcher) ApplyReversePatches(ctx context.Context, fromObj, otherObj client.Object, modifier ObjectPatcherAndMetadataTranslator) (controllerutil.OperationResult, error) { originalUnstructured, err := toUnstructured(fromObj) if err != nil { return controllerutil.OperationResultNone, err @@ -84,8 +136,12 @@ func (s *patcher) ApplyReversePatches(ctx context.Context, fromObj, otherObj cli fromCopied := originalUnstructured.DeepCopy() // apply patches on from object - err = patches.ApplyPatches(fromCopied, otherObj, reversePatchConfig, nil, nameResolver) + err = modifier.ReverseUpdate(ctx, fromCopied, otherObj) if err != nil { + if errors.Is(err, ErrNoUpdateNeeded) { + return controllerutil.OperationResultNone, nil + } + return controllerutil.OperationResultNone, fmt.Errorf("error applying reverse patches: %w", err) } @@ -102,7 +158,7 @@ func (s *patcher) ApplyReversePatches(ctx context.Context, fromObj, otherObj cli // update status if (hasBeforeStatus || hasAfterStatus) && !equality.Semantic.DeepEqual(beforeStatus, afterStatus) { - s.log.Infof("Update status of %s during reverse patching", fromCopied.GetName()) + s.log.Infof("Reverse update status of %s", fromCopied.GetName()) err = s.fromClient.Status().Update(ctx, fromCopied) if err != nil { return controllerutil.OperationResultNone, errors.Wrap(err, "update reverse status") @@ -121,7 +177,7 @@ func (s *patcher) ApplyReversePatches(ctx context.Context, fromObj, otherObj cli // compare rest of the object if !equality.Semantic.DeepEqual(originalUnstructured, fromCopied) { - s.log.Infof("Update %s during reverse patching", fromCopied.GetName()) + s.log.Infof("Reverse update %s", fromCopied.GetName()) err = s.fromClient.Update(ctx, fromCopied) if err != nil { return controllerutil.OperationResultNone, errors.Wrap(err, "update reverse") diff --git a/pkg/controllers/generic/types.go b/pkg/controllers/generic/types.go deleted file mode 100644 index 568ceca3c..000000000 --- a/pkg/controllers/generic/types.go +++ /dev/null @@ -1,10 +0,0 @@ -package generic - -import "k8s.io/apimachinery/pkg/runtime/schema" - -type GVKRegister map[schema.GroupVersionKind]*GVKScopeAndSubresource - -type GVKScopeAndSubresource struct { - IsClusterScoped bool - HasStatusSubresource bool -} diff --git a/pkg/controllers/register.go b/pkg/controllers/register.go index 0cd042c4d..b79c2137d 100644 --- a/pkg/controllers/register.go +++ b/pkg/controllers/register.go @@ -33,6 +33,7 @@ import ( "github.com/loft-sh/vcluster/pkg/controllers/resources/volumesnapshots/volumesnapshots" "github.com/loft-sh/vcluster/pkg/controllers/servicesync" "github.com/loft-sh/vcluster/pkg/controllers/syncer" + synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" "github.com/loft-sh/vcluster/pkg/util/blockingcacheclient" util "github.com/loft-sh/vcluster/pkg/util/context" "k8s.io/apimachinery/pkg/api/meta" @@ -45,17 +46,20 @@ import ( "github.com/loft-sh/vcluster/pkg/controllers/k8sdefaultendpoint" "github.com/loft-sh/vcluster/pkg/controllers/podsecurity" "github.com/loft-sh/vcluster/pkg/controllers/resources/services" - synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" syncertypes "github.com/loft-sh/vcluster/pkg/types" "github.com/loft-sh/vcluster/pkg/util/loghelper" "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) -type initFunction func(*synccontext.RegisterContext) (syncertypes.Object, error) +// ExtraControllers that will be started as well +var ExtraControllers []BuildController -func getSyncers(ctx *config.ControllerContext) []initFunction { - return []initFunction{ +// BuildController is a function to build a new syncer +type BuildController func(ctx *synccontext.RegisterContext) (syncertypes.Object, error) + +func getSyncers(ctx *config.ControllerContext) []BuildController { + return append([]BuildController{ isEnabled(ctx.Config.Sync.ToHost.Services.Enabled, services.New), isEnabled(ctx.Config.Sync.ToHost.ConfigMaps.Enabled, configmaps.New), isEnabled(ctx.Config.Sync.ToHost.Secrets.Enabled, secrets.New), @@ -80,10 +84,10 @@ func getSyncers(ctx *config.ControllerContext) []initFunction { isEnabled(ctx.Config.Experimental.MultiNamespaceMode.Enabled, namespaces.New), persistentvolumes.New, nodes.New, - } + }, ExtraControllers...) } -func isEnabled(enabled bool, fn initFunction) initFunction { +func isEnabled(enabled bool, fn BuildController) BuildController { if enabled { return fn } @@ -102,7 +106,7 @@ func Create(ctx *config.ControllerContext) ([]syncertypes.Object, error) { createdController, err := newSyncer(registerContext) if err != nil { - return nil, errors.Wrapf(err, "register %s controller", createdController.Name()) + return nil, fmt.Errorf("register controller: %w", err) } loghelper.Infof("Start %s sync controller", createdController.Name()) diff --git a/pkg/controllers/resources/nodes/syncer.go b/pkg/controllers/resources/nodes/syncer.go index 24fb8774b..1110b3780 100644 --- a/pkg/controllers/resources/nodes/syncer.go +++ b/pkg/controllers/resources/nodes/syncer.go @@ -188,7 +188,7 @@ func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider node }() bld = bld.WatchesRawSource(source.Kind(ctx.PhysicalManager.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestsFromMapFunc(func(_ context.Context, pod *corev1.Pod) []reconcile.Request { - if pod == nil || !translate.Default.IsManaged(pod) || pod.Spec.NodeName == "" { + if pod == nil || !translate.Default.IsManaged(pod, translate.Default.PhysicalName) || pod.Spec.NodeName == "" { return []reconcile.Request{} } @@ -226,7 +226,7 @@ func (s *nodeSyncer) RegisterIndices(ctx *synccontext.RegisterContext) error { func registerIndices(ctx *synccontext.RegisterContext) error { err := ctx.PhysicalManager.GetFieldIndexer().IndexField(ctx.Context, &corev1.Pod{}, constants.IndexByAssigned, func(rawObj client.Object) []string { pod := rawObj.(*corev1.Pod) - if !translate.Default.IsManaged(pod) || pod.Spec.NodeName == "" { + if !translate.Default.IsManaged(pod, translate.Default.PhysicalName) || pod.Spec.NodeName == "" { return nil } return []string{pod.Spec.NodeName} diff --git a/pkg/controllers/resources/nodes/translate.go b/pkg/controllers/resources/nodes/translate.go index 20acfcd55..6debccabf 100644 --- a/pkg/controllers/resources/nodes/translate.go +++ b/pkg/controllers/resources/nodes/translate.go @@ -188,7 +188,7 @@ func (s *nodeSyncer) translateUpdateStatus(ctx *synccontext.SyncContext, pNode * klog.Errorf("Error listing pods: %v", err) } else { for _, pod := range podList.Items { - if !translate.Default.IsManaged(&pod) { + if !translate.Default.IsManaged(&pod, translate.Default.PhysicalName) { // count pods that are not synced by this vcluster nonVClusterPods++ } diff --git a/pkg/controllers/resources/services/syncer.go b/pkg/controllers/resources/services/syncer.go index 0ceefebac..3cc9a4740 100644 --- a/pkg/controllers/resources/services/syncer.go +++ b/pkg/controllers/resources/services/syncer.go @@ -10,7 +10,6 @@ import ( "github.com/loft-sh/vcluster/pkg/specialservices" syncertypes "github.com/loft-sh/vcluster/pkg/types" - "github.com/loft-sh/vcluster/pkg/util/translate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -113,7 +112,10 @@ func isSwitchingFromExternalName(pService *corev1.Service, vService *corev1.Serv var _ syncertypes.ToVirtualSyncer = &serviceSyncer{} func (s *serviceSyncer) SyncToVirtual(ctx *synccontext.SyncContext, pObj client.Object) (ctrl.Result, error) { - if !translate.Default.IsManaged(pObj) { + isManaged, err := s.NamespacedTranslator.IsManaged(ctx.Context, pObj) + if err != nil { + return ctrl.Result{}, err + } else if !isManaged { return ctrl.Result{}, nil } diff --git a/pkg/controllers/syncer/syncer.go b/pkg/controllers/syncer/syncer.go index d10023cfa..68c853333 100644 --- a/pkg/controllers/syncer/syncer.go +++ b/pkg/controllers/syncer/syncer.go @@ -120,7 +120,7 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ // check what function we should call if vObj != nil && pObj == nil { return r.syncer.SyncToHost(syncContext, vObj) - } else if vObj != nil && pObj != nil { + } else if vObj != nil { // make sure the object uid matches pAnnotations := pObj.GetAnnotations() if !r.options.DisableUIDDeletion && pAnnotations != nil && pAnnotations[translate.UIDAnnotation] != "" && pAnnotations[translate.UIDAnnotation] != string(vObj.GetUID()) { @@ -134,7 +134,7 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ } return r.syncer.Sync(syncContext, pObj, vObj) - } else if vObj == nil && pObj != nil { + } else if pObj != nil { if pObj.GetAnnotations() != nil { if shouldSkip, ok := pObj.GetAnnotations()[translate.SkipBackSyncInMultiNamespaceMode]; ok && shouldSkip == "true" { // do not delete @@ -262,11 +262,12 @@ func (r *SyncController) getPhysicalObject(ctx context.Context, req types.Namesp } func (r *SyncController) excludePhysical(ctx context.Context, pObj, vObj client.Object) (bool, error) { + excluder, excluderOk := r.syncer.(syncertypes.ObjectExcluder) isManaged, err := r.syncer.IsManaged(ctx, pObj) if err != nil { return false, fmt.Errorf("failed to check if physical object is managed: %w", err) } else if !isManaged { - if vObj != nil { + if !excluderOk && vObj != nil { msg := fmt.Sprintf("conflict: cannot sync virtual object %s/%s as unmanaged physical object %s/%s exists with desired name", vObj.GetNamespace(), vObj.GetName(), pObj.GetNamespace(), pObj.GetName()) r.vEventRecorder.Eventf(vObj, "Warning", "SyncError", msg) return false, fmt.Errorf(msg) @@ -275,8 +276,7 @@ func (r *SyncController) excludePhysical(ctx context.Context, pObj, vObj client. return true, nil } - excluder, ok := r.syncer.(syncertypes.ObjectExcluder) - if ok { + if excluderOk { return excluder.ExcludePhysical(pObj), nil } diff --git a/pkg/controllers/syncer/translator/namespaced_translator.go b/pkg/controllers/syncer/translator/namespaced_translator.go index 49d9cf356..582dc52a6 100644 --- a/pkg/controllers/syncer/translator/namespaced_translator.go +++ b/pkg/controllers/syncer/translator/namespaced_translator.go @@ -18,8 +18,17 @@ import ( ) func NewNamespacedTranslator(ctx *context.RegisterContext, name string, obj client.Object, excludedAnnotations ...string) NamespacedTranslator { + return newNamespacedTranslator(ctx, name, obj, translate.Default.PhysicalName, excludedAnnotations...) +} + +func NewShortNamespacedTranslator(ctx *context.RegisterContext, name string, obj client.Object, excludedAnnotations ...string) NamespacedTranslator { + return newNamespacedTranslator(ctx, name, obj, translate.Default.PhysicalNameShort, excludedAnnotations...) +} + +func newNamespacedTranslator(ctx *context.RegisterContext, name string, obj client.Object, translateName translate.PhysicalNameFunc, excludedAnnotations ...string) NamespacedTranslator { return &namespacedTranslator{ - name: name, + name: name, + translateName: translateName, syncedLabels: ctx.Config.Experimental.SyncSettings.SyncLabels, excludedAnnotations: excludedAnnotations, @@ -40,6 +49,8 @@ type namespacedTranslator struct { virtualClient client.Client obj client.Object + translateName translate.PhysicalNameFunc + eventRecorder record.EventRecorder } @@ -57,7 +68,7 @@ func (n *namespacedTranslator) Resource() client.Object { func (n *namespacedTranslator) RegisterIndices(ctx *context.RegisterContext) error { return ctx.VirtualManager.GetFieldIndexer().IndexField(ctx.Context, n.obj.DeepCopyObject().(client.Object), constants.IndexByPhysicalName, func(rawObj client.Object) []string { - return []string{translate.Default.PhysicalNamespace(rawObj.GetNamespace()) + "/" + translate.Default.PhysicalName(rawObj.GetName(), rawObj.GetNamespace())} + return []string{translate.Default.PhysicalNamespace(rawObj.GetNamespace()) + "/" + n.translateName(rawObj.GetName(), rawObj.GetNamespace())} }) } @@ -95,15 +106,13 @@ func (n *namespacedTranslator) SyncToHostUpdate(ctx *context.SyncContext, vObj, } func (n *namespacedTranslator) IsManaged(_ context2.Context, pObj client.Object) (bool, error) { - return translate.Default.IsManaged(pObj), nil + return translate.Default.IsManaged(pObj, n.translateName), nil } func (n *namespacedTranslator) VirtualToHost(_ context2.Context, req types.NamespacedName, _ client.Object) types.NamespacedName { - name := translate.Default.PhysicalName(req.Name, req.Namespace) - return types.NamespacedName{ Namespace: translate.Default.PhysicalNamespace(req.Namespace), - Name: name, + Name: n.translateName(req.Name, req.Namespace), } } diff --git a/pkg/integrations/integrations.go b/pkg/integrations/integrations.go new file mode 100644 index 000000000..0a0e15da3 --- /dev/null +++ b/pkg/integrations/integrations.go @@ -0,0 +1,23 @@ +package integrations + +import ( + "github.com/loft-sh/vcluster/pkg/config" + "github.com/loft-sh/vcluster/pkg/integrations/metricsserver" +) + +type Integration func(ctx *config.ControllerContext) error + +var Integrations = []Integration{ + metricsserver.Register, +} + +func StartIntegrations(ctx *config.ControllerContext) error { + for _, integration := range Integrations { + err := integration(ctx) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/server/filters/metrics_server.go b/pkg/integrations/metricsserver/metricsserver.go similarity index 57% rename from pkg/server/filters/metrics_server.go rename to pkg/integrations/metricsserver/metricsserver.go index 0e16d3578..6bd68f50f 100644 --- a/pkg/server/filters/metrics_server.go +++ b/pkg/integrations/metricsserver/metricsserver.go @@ -1,4 +1,4 @@ -package filters +package metricsserver import ( "context" @@ -6,28 +6,24 @@ import ( "net/http" "strings" + "github.com/loft-sh/vcluster/pkg/apiservice" + "github.com/loft-sh/vcluster/pkg/config" + "github.com/loft-sh/vcluster/pkg/server/filters" "github.com/loft-sh/vcluster/pkg/server/handler" requestpkg "github.com/loft-sh/vcluster/pkg/util/request" "github.com/loft-sh/vcluster/pkg/util/translate" - apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/types" - "k8s.io/apiserver/pkg/audit" - "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" - "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/request" - apirest "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/rest" "k8s.io/klog/v2" metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -41,13 +37,44 @@ const ( LabelSelectorQueryParam = "labelSelector" ) +var GroupVersion = schema.GroupVersion{ + Group: "metrics.k8s.io", + Version: "v1beta1", +} + +func Register(ctx *config.ControllerContext) error { + ctx.AcquiredLeaderHooks = append(ctx.AcquiredLeaderHooks, RegisterOrDeregisterAPIService) + if ctx.Config.Integrations.MetricsServer.Enabled { + ctx.StartAPIServiceProxy = true + ctx.PostServerHooks = append(ctx.PostServerHooks, func(h http.Handler, clients config.Clients) http.Handler { + return WithMetricsServerProxy( + h, + ctx.Config.WorkloadTargetNamespace, + clients.CachedHostClient, + clients.CachedVirtualClient, + clients.HostConfig, + ctx.Config.Experimental.MultiNamespaceMode.Enabled, + ) + }) + } + + return nil +} + +func RegisterOrDeregisterAPIService(ctx *config.ControllerContext) error { + if ctx.Config.Integrations.MetricsServer.Enabled { + return apiservice.RegisterAPIService(ctx, "metrics-server", GroupVersion) + } + + return apiservice.DeregisterAPIService(ctx, GroupVersion) +} + func WithMetricsServerProxy( h http.Handler, targetNamespace string, cachedHostClient, cachedVirtualClient client.Client, - hostConfig, - virtualConfig *rest.Config, + hostConfig *rest.Config, multiNamespaceMode bool, ) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -73,60 +100,10 @@ func WithMetricsServerProxy( return } - // is list request? - if isAPIResourceListRequest(info) { - proxyHandler, err := handler.Handler("", hostConfig, nil) - if err != nil { - requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) - return - } - - handleAPIResourceListRequest(w, req, proxyHandler, cachedVirtualClient.Scheme()) - return - } - - // is version request? - if isAPIResourceVersionListRequest(info) { - proxyHandler, err := handler.Handler("", hostConfig, nil) - if err != nil { - requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) - return - } - - handleAPIResourceVersionListRequest(w, req, proxyHandler, cachedVirtualClient.Scheme()) - return - } - - // is new aggregated list request? - if isNewAPIResourceListRequest(info) { - proxyHandler, err := handler.Handler("", virtualConfig, nil) - if err != nil { - requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) - return - } - - // check if we handled the request - if handleNewAPIResourceListRequest(w, req, proxyHandler, cachedVirtualClient.Scheme()) { - return - } - } - h.ServeHTTP(w, req) }) } -func isNewAPIResourceListRequest(r *request.RequestInfo) bool { - return r.Path == "/apis" -} - -func isAPIResourceListRequest(r *request.RequestInfo) bool { - return r.Path == "/apis/metrics.k8s.io/v1beta1" -} - -func isAPIResourceVersionListRequest(r *request.RequestInfo) bool { - return r.Path == "/apis/metrics.k8s.io" -} - func isMetricsServerProxyRequest(r *request.RequestInfo) bool { if !r.IsResourceRequest { return false @@ -137,176 +114,7 @@ func isMetricsServerProxyRequest(r *request.RequestInfo) bool { (r.Resource == NodeResource || r.Resource == PodResource) } -func handleNewAPIResourceListRequest( - responseWriter http.ResponseWriter, - request *http.Request, - handler http.Handler, - scheme *runtime.Scheme, -) bool { - // try parsing data into api group discovery list - if strings.Contains(request.Header.Get("Accept"), "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList") { - // execute the request - code, _, data, err := executeRequest(request, handler) - if err != nil { - klog.Infof("error executing request %v", err) - return false - } else if code != http.StatusOK { - klog.Infof("error status not ok %v", err) - return false - } - - if handleAPIGroupDiscoveryList(responseWriter, request, data, scheme) { - return true - } - } - - return false -} - -func handleAPIGroupDiscoveryList( - responseWriter http.ResponseWriter, - request *http.Request, - data []byte, - scheme *runtime.Scheme, -) bool { - response := &apidiscoveryv2beta1.APIGroupDiscoveryList{} - codecFactory := serializer.NewCodecFactory(scheme) - _, _, err := codecFactory.UniversalDeserializer().Decode(data, nil, response) - if err != nil { - klog.Infof("error unmarshalling discovery list %v", err) - return false - } else if response.Kind != "APIGroupDiscoveryList" || response.APIVersion != apidiscoveryv2beta1.SchemeGroupVersion.String() { - klog.Infof("error retrieving discovery list: unexpected kind or apiversion %s %s %s", response.Kind, response.APIVersion, string(data)) - return false - } - - // inject metrics api - response.Items = append(response.Items, apidiscoveryv2beta1.APIGroupDiscovery{ - ObjectMeta: metav1.ObjectMeta{ - Name: "metrics.k8s.io", - }, - Versions: []apidiscoveryv2beta1.APIVersionDiscovery{ - { - Version: "v1beta1", - Resources: []apidiscoveryv2beta1.APIResourceDiscovery{ - { - Resource: NodeResource, - ResponseKind: &metav1.GroupVersionKind{ - Kind: "NodeMetrics", - }, - Scope: apidiscoveryv2beta1.ScopeCluster, - Verbs: []string{"get", "list"}, - }, - { - Resource: PodResource, - ResponseKind: &metav1.GroupVersionKind{ - Kind: "PodMetrics", - }, - Scope: apidiscoveryv2beta1.ScopeNamespace, - Verbs: []string{"get", "list"}, - }, - }, - Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, - }, - }, - }) - - // return new data - WriteObjectNegotiatedWithMediaType( - responseWriter, - request, - response, - scheme, - "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", - ) - return true -} - -func handleAPIResourceVersionListRequest( - responseWriter http.ResponseWriter, - request *http.Request, - handler http.Handler, - scheme *runtime.Scheme, -) { - codecFactory := serializer.NewCodecFactory(scheme) - code, header, data, err := executeRequest(request, handler) - if err != nil { - klog.Infof("error executing request %v", err) - responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) - return - } else if code != http.StatusOK { - klog.Infof("error status not ok %v", err) - writeWithHeader(responseWriter, code, header, data) - return - } - - response := &metav1.APIGroup{} - _, _, err = codecFactory.UniversalDeserializer().Decode(data, nil, response) - if err != nil { - klog.Infof("error unmarshalling resource list %v", err) - responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) - return - } else if response.Kind != "APIGroup" { - err = fmt.Errorf("error retrieving resource version list: unexpected kind or apiversion %s %s %s", response.Kind, response.APIVersion, string(data)) - klog.Info(err.Error()) - responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) - return - } - - // return new data - WriteObjectNegotiatedWithGVK( - responseWriter, - request, - response, - scheme, - corev1.SchemeGroupVersion, - "", - ) -} - -func handleAPIResourceListRequest( - responseWriter http.ResponseWriter, - request *http.Request, - handler http.Handler, - scheme *runtime.Scheme, -) { - codecFactory := serializer.NewCodecFactory(scheme) - code, header, data, err := executeRequest(request, handler) - if err != nil { - klog.Infof("error executing request %v", err) - responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) - return - } else if code != http.StatusOK { - klog.Infof("error status not ok %v", err) - writeWithHeader(responseWriter, code, header, data) - return - } - - response := &metav1.APIResourceList{} - _, _, err = codecFactory.UniversalDeserializer().Decode(data, nil, response) - if err != nil { - klog.Infof("error unmarshalling resource list %v", err) - responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) - return - } else if response.Kind != "APIResourceList" { - err = fmt.Errorf("error retrieving resource list: unexpected kind or apiversion %s %s %s", response.Kind, response.APIVersion, string(data)) - klog.Info(err.Error()) - responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) - return - } - - // return new data - WriteObjectNegotiatedWithGVK( - responseWriter, - request, - response, - scheme, - corev1.SchemeGroupVersion, - "", - ) -} - -type MetricsServerProxy struct { +type serverProxy struct { handler http.Handler request *http.Request requestInfo *request.RequestInfo @@ -339,7 +147,7 @@ func handleMetricsServerProxyRequest( return } - metricsServerProxy := &MetricsServerProxy{ + metricsServerProxy := &serverProxy{ request: req, requestInfo: info, responseWriter: w, @@ -420,7 +228,7 @@ type RowData struct { Pom metav1.PartialObjectMetadata } -func (p *MetricsServerProxy) HandleRequest() { +func (p *serverProxy) HandleRequest() { if p.resourceType == PodResource && p.verb == RequestVerbList { acceptHeader := p.request.Header.Get("Accept") if strings.Contains(acceptHeader, "as=Table;") { @@ -430,12 +238,12 @@ func (p *MetricsServerProxy) HandleRequest() { } // execute request in host cluster - code, header, data, err := executeRequest(p.request, p.handler) + code, header, data, err := filters.ExecuteRequest(p.request, p.handler) if err != nil { responsewriters.ErrorNegotiated(err, serializer.NewCodecFactory(p.client.Scheme()), corev1.SchemeGroupVersion, p.responseWriter, p.request) return } else if code != http.StatusOK { - writeWithHeader(p.responseWriter, code, header, data) + filters.WriteWithHeader(p.responseWriter, code, header, data) return } @@ -460,7 +268,7 @@ func (p *MetricsServerProxy) HandleRequest() { requestpkg.FailWithStatus(p.responseWriter, p.request, http.StatusInternalServerError, fmt.Errorf("unrecognized resource type: %s", p.resourceType)) } -func (p *MetricsServerProxy) rewriteNodeMetricsList(data []byte) { +func (p *serverProxy) rewriteNodeMetricsList(data []byte) { virtualNodeMap := make(map[string]corev1.Node) for _, node := range p.nodesInVCluster { virtualNodeMap[node.Name] = node @@ -486,7 +294,7 @@ func (p *MetricsServerProxy) rewriteNodeMetricsList(data []byte) { nodeMetricsList.Items = filteredNodeMetricsList // return new data - WriteObjectNegotiated( + filters.WriteObjectNegotiated( p.responseWriter, p.request, nodeMetricsList, @@ -512,7 +320,7 @@ func (p *MetricsServerProxy) rewriteNodeMetricsList(data []byte) { nodeMetric.Labels = vNode.Labels // return new data - WriteObjectNegotiated( + filters.WriteObjectNegotiated( p.responseWriter, p.request, nodeMetric, @@ -521,7 +329,7 @@ func (p *MetricsServerProxy) rewriteNodeMetricsList(data []byte) { } } -func (p *MetricsServerProxy) rewritePodMetricsGetData(data []byte) { +func (p *serverProxy) rewritePodMetricsGetData(data []byte) { codecFactory := serializer.NewCodecFactory(p.client.Scheme()) podMetrics := &metricsv1beta1.PodMetrics{} _, _, err := codecFactory.UniversalDeserializer().Decode(data, nil, podMetrics) @@ -534,7 +342,7 @@ func (p *MetricsServerProxy) rewritePodMetricsGetData(data []byte) { podMetrics.Namespace = p.requestInfo.Namespace // return new data - WriteObjectNegotiated( + filters.WriteObjectNegotiated( p.responseWriter, p.request, podMetrics, @@ -542,7 +350,7 @@ func (p *MetricsServerProxy) rewritePodMetricsGetData(data []byte) { ) } -func (p *MetricsServerProxy) rewritePodMetricsTableData(data []byte) { +func (p *serverProxy) rewritePodMetricsTableData(data []byte) { codecFactory := serializer.NewCodecFactory(p.client.Scheme()) table := &metav1.Table{} _, _, err := codecFactory.UniversalDeserializer().Decode(data, nil, table) @@ -599,7 +407,7 @@ func (p *MetricsServerProxy) rewritePodMetricsTableData(data []byte) { table.Rows = filteredTableRows // return new data - WriteObjectNegotiated( + filters.WriteObjectNegotiated( p.responseWriter, p.request, table, @@ -607,7 +415,7 @@ func (p *MetricsServerProxy) rewritePodMetricsTableData(data []byte) { ) } -func (p *MetricsServerProxy) rewritePodMetricsListData(data []byte) { +func (p *serverProxy) rewritePodMetricsListData(data []byte) { codecFactory := serializer.NewCodecFactory(p.client.Scheme()) podMetricsList := &metricsv1beta1.PodMetricsList{} _, _, err := codecFactory.UniversalDeserializer().Decode(data, nil, podMetricsList) @@ -651,7 +459,7 @@ func (p *MetricsServerProxy) rewritePodMetricsListData(data []byte) { } // write object back - WriteObjectNegotiated( + filters.WriteObjectNegotiated( p.responseWriter, p.request, filteredBackTranslatedList, @@ -704,49 +512,3 @@ func translateLabelSelectors(req *http.Request) error { req.URL.RawQuery = query.Encode() return nil } - -func WriteObjectNegotiated(w http.ResponseWriter, req *http.Request, object runtime.Object, scheme *runtime.Scheme) { - WriteObjectNegotiatedWithMediaType(w, req, object, scheme, "") -} - -func WriteObjectNegotiatedWithGVK(w http.ResponseWriter, req *http.Request, object runtime.Object, scheme *runtime.Scheme, groupVersion schema.GroupVersion, overrideMediaType string) { - s := serializer.NewCodecFactory(scheme) - statusCode := http.StatusOK - stream, ok := object.(apirest.ResourceStreamer) - if ok { - requestInfo, _ := request.RequestInfoFrom(req.Context()) - metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() { - responsewriters.StreamObject(statusCode, groupVersion, s, stream, w, req) - }) - return - } - - _, serializer, err := negotiation.NegotiateOutputMediaType(req, s, negotiation.DefaultEndpointRestrictions) - if err != nil { - status := responsewriters.ErrorToAPIStatus(err) - responsewriters.WriteRawJSON(int(status.Code), status, w) - return - } - - audit.LogResponseObject(req.Context(), object, groupVersion, s) - - encoder := s.EncoderForVersion(serializer.Serializer, groupVersion) - request.TrackSerializeResponseObjectLatency(req.Context(), func() { - if overrideMediaType != "" { - responsewriters.SerializeObject(overrideMediaType, encoder, w, req, statusCode, object) - } else { - responsewriters.SerializeObject(serializer.MediaType, encoder, w, req, statusCode, object) - } - }) -} - -func WriteObjectNegotiatedWithMediaType(w http.ResponseWriter, req *http.Request, object runtime.Object, scheme *runtime.Scheme, overrideMediaType string) { - s := serializer.NewCodecFactory(scheme) - gvk, err := apiutil.GVKForObject(object, scheme) - if err != nil { - responsewriters.ErrorNegotiated(err, s, corev1.SchemeGroupVersion, w, req) - return - } - - WriteObjectNegotiatedWithGVK(w, req, object, scheme, gvk.GroupVersion(), overrideMediaType) -} diff --git a/pkg/metrics/helper.go b/pkg/metrics/helper.go deleted file mode 100644 index bb2593d09..000000000 --- a/pkg/metrics/helper.go +++ /dev/null @@ -1,144 +0,0 @@ -package metrics - -import ( - "bytes" - "context" - "fmt" - "sort" - "strings" - - "github.com/loft-sh/vcluster/pkg/constants" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -func Decode(data []byte) ([]*dto.MetricFamily, error) { - var parser expfmt.TextParser - metricFamilies, err := parser.TextToMetricFamilies(strings.NewReader(string(data))) - if err != nil { - return nil, fmt.Errorf("reading text format failed: %w", err) - } - - // sort metrics alphabetically - metricFamiliesArr := []*dto.MetricFamily{} - for k, fam := range metricFamilies { - name := k - if fam.Name == nil { - fam.Name = &name - } - - metricFamiliesArr = append(metricFamiliesArr, fam) - } - sort.Slice(metricFamiliesArr, func(i int, j int) bool { - return *metricFamiliesArr[i].Name < *metricFamiliesArr[j].Name - }) - - return metricFamiliesArr, nil -} - -func Encode(metricsFamilies []*dto.MetricFamily, format expfmt.Format) ([]byte, error) { - buffer := &bytes.Buffer{} - encoder := expfmt.NewEncoder(buffer, format) - for _, fam := range metricsFamilies { - if len(fam.Metric) > 0 { - err := encoder.Encode(fam) - if err != nil { - return nil, err - } - } - } - - return buffer.Bytes(), nil -} - -func Rewrite(ctx context.Context, metricsFamilies []*dto.MetricFamily, vClient client.Client) ([]*dto.MetricFamily, error) { - resultMetricsFamily := []*dto.MetricFamily{} - - // rewrite metrics - for _, fam := range metricsFamilies { - newMetrics := []*dto.Metric{} - for _, m := range fam.Metric { - var ( - pod string - persistentvolumeclaim string - namespace string - ) - for _, l := range m.Label { - if l.GetName() == "pod" { - pod = l.GetValue() - } else if l.GetName() == "namespace" { - namespace = l.GetValue() - } else if l.GetName() == "persistentvolumeclaim" { - persistentvolumeclaim = l.GetValue() - } - } - - // Add metrics that are pod and namespace independent - if persistentvolumeclaim == "" && pod == "" { - newMetrics = append(newMetrics, m) - continue - } - - // rewrite pod - if pod != "" { - // search if we can find the pod by name in the virtual cluster - podList := &corev1.PodList{} - err := vClient.List(ctx, podList, client.MatchingFields{constants.IndexByPhysicalName: namespace + "/" + pod}) - if err != nil { - return nil, err - } - - // skip the metric if the pod couldn't be found in the virtual cluster - if len(podList.Items) == 0 { - continue - } - - pod = podList.Items[0].Name - namespace = podList.Items[0].Namespace - } - - // rewrite persistentvolumeclaim - if persistentvolumeclaim != "" { - // search if we can find the pvc by name in the virtual cluster - pvcList := &corev1.PersistentVolumeClaimList{} - err := vClient.List(ctx, pvcList, client.MatchingFields{constants.IndexByPhysicalName: namespace + "/" + persistentvolumeclaim}) - if err != nil { - return nil, err - } - - // skip the metric if the pvc couldn't be found in the virtual cluster - if len(pvcList.Items) == 0 { - continue - } - - persistentvolumeclaim = pvcList.Items[0].Name - namespace = pvcList.Items[0].Namespace - } - - // exchange label values - for _, l := range m.Label { - if l.GetName() == "pod" { - l.Value = &pod - } - if l.GetName() == "namespace" { - l.Value = &namespace - } - if l.GetName() == "persistentvolumeclaim" { - l.Value = &persistentvolumeclaim - } - } - - // add the rewritten metric - newMetrics = append(newMetrics, m) - } - - fam.Metric = newMetrics - if len(fam.Metric) > 0 { - resultMetricsFamily = append(resultMetricsFamily, fam) - } - } - - return resultMetricsFamily, nil -} diff --git a/pkg/metricsapiservice/register.go b/pkg/metricsapiservice/register.go deleted file mode 100644 index cc50e1a5f..000000000 --- a/pkg/metricsapiservice/register.go +++ /dev/null @@ -1,147 +0,0 @@ -package metricsapiservice - -import ( - "context" - "math" - "time" - - "github.com/loft-sh/vcluster/pkg/config" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" - apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" - "k8s.io/metrics/pkg/apis/metrics" - "k8s.io/utils/ptr" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - - kerrors "k8s.io/apimachinery/pkg/api/errors" -) - -const ( - MetricsVersion = "v1beta1" - MetricsAPIServiceName = MetricsVersion + "." + metrics.GroupName // "v1beta1.metrics.k8s.io" -) - -func checkExistingAPIService(ctx context.Context, client client.Client) bool { - var exists bool - _ = applyOperation(ctx, func(ctx context.Context) (bool, error) { - err := client.Get(ctx, types.NamespacedName{Name: MetricsAPIServiceName}, &apiregistrationv1.APIService{}) - if err != nil { - if kerrors.IsNotFound(err) { - return true, nil - } - - return false, err - } - - exists = true - return true, nil - }) - - return exists -} - -func applyOperation(ctx context.Context, operationFunc wait.ConditionWithContextFunc) error { - return wait.ExponentialBackoffWithContext(ctx, wait.Backoff{ - Duration: time.Second, - Factor: 1.5, - Cap: time.Minute, - Steps: math.MaxInt32, - }, operationFunc) -} - -func deleteOperation(ctrlCtx *config.ControllerContext) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := ctrlCtx.VirtualManager.GetClient().Delete(ctx, &apiregistrationv1.APIService{ - ObjectMeta: metav1.ObjectMeta{ - Name: MetricsAPIServiceName, - }, - }) - if err != nil { - if kerrors.IsNotFound(err) { - return true, nil - } - - klog.Errorf("error deleting api service %v", err) - return false, nil - } - - return true, nil - } -} - -func createOperation(ctrlCtx *config.ControllerContext) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - service := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "metrics-service", - Namespace: "kube-system", - }, - } - _, err := controllerutil.CreateOrUpdate(ctx, ctrlCtx.VirtualManager.GetClient(), service, func() error { - service.Spec.Type = corev1.ServiceTypeExternalName - service.Spec.ExternalName = "localhost" - service.Spec.Ports = []corev1.ServicePort{ - { - Port: 8443, - }, - } - return nil - }) - if err != nil { - if kerrors.IsAlreadyExists(err) { - return true, nil - } - - klog.Errorf("error creating api service %v", err) - return false, nil - } - - apiServiceSpec := apiregistrationv1.APIServiceSpec{ - Service: &apiregistrationv1.ServiceReference{ - Namespace: "kube-system", - Name: "metrics-service", - Port: ptr.To(int32(8443)), - }, - InsecureSkipTLSVerify: true, - Group: metrics.GroupName, - GroupPriorityMinimum: 100, - Version: MetricsVersion, - VersionPriority: 100, - } - apiService := &apiregistrationv1.APIService{ - ObjectMeta: metav1.ObjectMeta{ - Name: MetricsAPIServiceName, - }, - } - _, err = controllerutil.CreateOrUpdate(ctx, ctrlCtx.VirtualManager.GetClient(), apiService, func() error { - apiService.Spec = apiServiceSpec - return nil - }) - if err != nil { - if kerrors.IsAlreadyExists(err) { - return true, nil - } - - klog.Errorf("error creating api service %v", err) - return false, nil - } - - return true, nil - } -} - -func RegisterOrDeregisterAPIService(ctx *config.ControllerContext) error { - // check if the api service should get created - exists := checkExistingAPIService(ctx.Context, ctx.VirtualManager.GetClient()) - if ctx.Config.Observability.Metrics.Proxy.Nodes || ctx.Config.Observability.Metrics.Proxy.Pods { - return applyOperation(ctx.Context, createOperation(ctx)) - } else if exists { - return applyOperation(ctx.Context, deleteOperation(ctx)) - } - - return nil -} diff --git a/pkg/scheme/scheme.go b/pkg/scheme/scheme.go index 9aa5be139..6781d2b66 100644 --- a/pkg/scheme/scheme.go +++ b/pkg/scheme/scheme.go @@ -6,6 +6,7 @@ import ( agentstoragev1 "github.com/loft-sh/agentapi/v4/pkg/apis/loft/storage/v1" managementv1 "github.com/loft-sh/api/v4/pkg/apis/management/v1" "github.com/loft-sh/vcluster/pkg/apis" + apidiscoveryv2 "k8s.io/api/apidiscovery/v2" apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" @@ -25,6 +26,7 @@ func init() { _ = apiextensionsv1.AddToScheme(Scheme) _ = apiregistrationv1.AddToScheme(Scheme) _ = apidiscoveryv2beta1.AddToScheme(Scheme) + _ = apidiscoveryv2.AddToScheme(Scheme) _ = metricsv1beta1.AddToScheme(Scheme) // Register the fake conversions diff --git a/pkg/server/filters/metrics.go b/pkg/server/filters/metrics.go index dd121f323..6e0ab1af2 100644 --- a/pkg/server/filters/metrics.go +++ b/pkg/server/filters/metrics.go @@ -1,6 +1,7 @@ package filters import ( + "bytes" "compress/gzip" "context" "encoding/json" @@ -8,27 +9,31 @@ import ( "io" "net/http" "net/http/httptest" + "sort" "strconv" "strings" "github.com/loft-sh/vcluster/pkg/constants" - "github.com/loft-sh/vcluster/pkg/metrics" "github.com/loft-sh/vcluster/pkg/server/handler" "github.com/loft-sh/vcluster/pkg/util/clienthelper" requestpkg "github.com/loft-sh/vcluster/pkg/util/request" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/request" + apirest "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/rest" statsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/client" -) - -const ( - KubectlCommandHeader = "Kubectl-Command" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) func WithMetricsProxy(h http.Handler, localConfig *rest.Config, cachedVirtualClient client.Client) http.Handler { @@ -84,33 +89,18 @@ func WithMetricsProxy(h http.Handler, localConfig *rest.Config, cachedVirtualCli }) } -func writeWithHeader(w http.ResponseWriter, code int, header http.Header, body []byte) { - // delete old header - for k := range w.Header() { - w.Header().Del(k) - } - for k, v := range header { - for _, s := range v { - w.Header().Add(k, s) - } - } - - w.WriteHeader(code) - _, _ = w.Write(body) -} - func rewritePrometheusMetrics(req *http.Request, data []byte, vClient client.Client) ([]byte, error) { - metricsFamilies, err := metrics.Decode(data) + metricsFamilies, err := MetricsDecode(data) if err != nil { return nil, err } - metricsFamilies, err = metrics.Rewrite(req.Context(), metricsFamilies, vClient) + metricsFamilies, err = MetricsRewrite(req.Context(), metricsFamilies, vClient) if err != nil { return nil, err } - return metrics.Encode(metricsFamilies, expfmt.Negotiate(req.Header)) + return MetricsEncode(metricsFamilies, expfmt.Negotiate(req.Header)) } func handleNodeRequest(localConfig *rest.Config, vClient client.Client, w http.ResponseWriter, req *http.Request) (bool, error) { @@ -121,11 +111,11 @@ func handleNodeRequest(localConfig *rest.Config, vClient client.Client, w http.R return false, err } - code, header, data, err := executeRequest(req, h) + code, header, data, err := ExecuteRequest(req, h) if err != nil { return false, err } else if code != http.StatusOK { - writeWithHeader(w, code, header, data) + WriteWithHeader(w, code, header, data) return false, nil } @@ -203,8 +193,201 @@ func rewriteStats(ctx context.Context, data []byte, vClient client.Client) ([]by return out, nil } +func isNodesProxy(r *request.RequestInfo) bool { + if !r.IsResourceRequest { + return false + } + + return r.APIGroup == corev1.SchemeGroupVersion.Group && + r.APIVersion == corev1.SchemeGroupVersion.Version && + r.Resource == "nodes" && + r.Subresource == "proxy" +} + +func IsKubeletStats(path string) bool { + return strings.HasSuffix(path, "/stats/summary") +} + +func IsKubeletMetrics(path string) bool { + return strings.HasSuffix(path, "/metrics") || strings.HasSuffix(path, "/metrics/cadvisor") || strings.HasSuffix(path, "/metrics/probes") || strings.HasSuffix(path, "/metrics/resource") || strings.HasSuffix(path, "/metrics/resource/v1alpha1") || strings.HasSuffix(path, "/metrics/resource/v1beta1") +} + +func MetricsDecode(data []byte) ([]*dto.MetricFamily, error) { + var parser expfmt.TextParser + metricFamilies, err := parser.TextToMetricFamilies(strings.NewReader(string(data))) + if err != nil { + return nil, fmt.Errorf("reading text format failed: %w", err) + } + + // sort metrics alphabetically + metricFamiliesArr := []*dto.MetricFamily{} + for k, fam := range metricFamilies { + name := k + if fam.Name == nil { + fam.Name = &name + } + + metricFamiliesArr = append(metricFamiliesArr, fam) + } + sort.Slice(metricFamiliesArr, func(i int, j int) bool { + return *metricFamiliesArr[i].Name < *metricFamiliesArr[j].Name + }) + + return metricFamiliesArr, nil +} + +func MetricsEncode(metricsFamilies []*dto.MetricFamily, format expfmt.Format) ([]byte, error) { + buffer := &bytes.Buffer{} + encoder := expfmt.NewEncoder(buffer, format) + for _, fam := range metricsFamilies { + if len(fam.Metric) > 0 { + err := encoder.Encode(fam) + if err != nil { + return nil, err + } + } + } + + return buffer.Bytes(), nil +} + +func MetricsRewrite(ctx context.Context, metricsFamilies []*dto.MetricFamily, vClient client.Client) ([]*dto.MetricFamily, error) { + resultMetricsFamily := []*dto.MetricFamily{} + + // rewrite metrics + for _, fam := range metricsFamilies { + newMetrics := []*dto.Metric{} + for _, m := range fam.Metric { + var ( + pod string + persistentvolumeclaim string + namespace string + ) + for _, l := range m.Label { + if l.GetName() == "pod" { + pod = l.GetValue() + } else if l.GetName() == "namespace" { + namespace = l.GetValue() + } else if l.GetName() == "persistentvolumeclaim" { + persistentvolumeclaim = l.GetValue() + } + } + + // Add metrics that are pod and namespace independent + if persistentvolumeclaim == "" && pod == "" { + newMetrics = append(newMetrics, m) + continue + } + + // rewrite pod + if pod != "" { + // search if we can find the pod by name in the virtual cluster + podList := &corev1.PodList{} + err := vClient.List(ctx, podList, client.MatchingFields{constants.IndexByPhysicalName: namespace + "/" + pod}) + if err != nil { + return nil, err + } + + // skip the metric if the pod couldn't be found in the virtual cluster + if len(podList.Items) == 0 { + continue + } + + pod = podList.Items[0].Name + namespace = podList.Items[0].Namespace + } + + // rewrite persistentvolumeclaim + if persistentvolumeclaim != "" { + // search if we can find the pvc by name in the virtual cluster + pvcList := &corev1.PersistentVolumeClaimList{} + err := vClient.List(ctx, pvcList, client.MatchingFields{constants.IndexByPhysicalName: namespace + "/" + persistentvolumeclaim}) + if err != nil { + return nil, err + } + + // skip the metric if the pvc couldn't be found in the virtual cluster + if len(pvcList.Items) == 0 { + continue + } + + persistentvolumeclaim = pvcList.Items[0].Name + namespace = pvcList.Items[0].Namespace + } + + // exchange label values + for _, l := range m.Label { + if l.GetName() == "pod" { + l.Value = &pod + } + if l.GetName() == "namespace" { + l.Value = &namespace + } + if l.GetName() == "persistentvolumeclaim" { + l.Value = &persistentvolumeclaim + } + } + + // add the rewritten metric + newMetrics = append(newMetrics, m) + } + + fam.Metric = newMetrics + if len(fam.Metric) > 0 { + resultMetricsFamily = append(resultMetricsFamily, fam) + } + } + + return resultMetricsFamily, nil +} + +func WriteObjectNegotiatedWithMediaType(w http.ResponseWriter, req *http.Request, object runtime.Object, scheme *runtime.Scheme, overrideMediaType string) { + s := serializer.NewCodecFactory(scheme) + gvk, err := apiutil.GVKForObject(object, scheme) + if err != nil { + responsewriters.ErrorNegotiated(err, s, corev1.SchemeGroupVersion, w, req) + return + } + + WriteObjectNegotiatedWithGVK(w, req, object, scheme, gvk.GroupVersion(), overrideMediaType) +} + +func WriteObjectNegotiated(w http.ResponseWriter, req *http.Request, object runtime.Object, scheme *runtime.Scheme) { + WriteObjectNegotiatedWithMediaType(w, req, object, scheme, "") +} -func executeRequest(req *http.Request, h http.Handler) (int, http.Header, []byte, error) { +func WriteObjectNegotiatedWithGVK(w http.ResponseWriter, req *http.Request, object runtime.Object, scheme *runtime.Scheme, groupVersion schema.GroupVersion, overrideMediaType string) { + s := serializer.NewCodecFactory(scheme) + statusCode := http.StatusOK + stream, ok := object.(apirest.ResourceStreamer) + if ok { + requestInfo, _ := request.RequestInfoFrom(req.Context()) + metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() { + responsewriters.StreamObject(statusCode, groupVersion, s, stream, w, req) + }) + return + } + + _, serializer, err := negotiation.NegotiateOutputMediaType(req, s, negotiation.DefaultEndpointRestrictions) + if err != nil { + status := responsewriters.ErrorToAPIStatus(err) + responsewriters.WriteRawJSON(int(status.Code), status, w) + return + } + + audit.LogResponseObject(req.Context(), object, groupVersion, s) + + encoder := s.EncoderForVersion(serializer.Serializer, groupVersion) + request.TrackSerializeResponseObjectLatency(req.Context(), func() { + if overrideMediaType != "" { + responsewriters.SerializeObject(overrideMediaType, encoder, w, req, statusCode, object) + } else { + responsewriters.SerializeObject(serializer.MediaType, encoder, w, req, statusCode, object) + } + }) +} + +func ExecuteRequest(req *http.Request, h http.Handler) (int, http.Header, []byte, error) { clonedRequest := req.Clone(req.Context()) fakeWriter := httptest.NewRecorder() h.ServeHTTP(fakeWriter, clonedRequest) @@ -231,21 +414,17 @@ func executeRequest(req *http.Request, h http.Handler) (int, http.Header, []byte return fakeWriter.Code, fakeWriter.Header(), responseBytes, nil } -func isNodesProxy(r *request.RequestInfo) bool { - if !r.IsResourceRequest { - return false +func WriteWithHeader(w http.ResponseWriter, code int, header http.Header, body []byte) { + // delete old header + for k := range w.Header() { + w.Header().Del(k) + } + for k, v := range header { + for _, s := range v { + w.Header().Add(k, s) + } } - return r.APIGroup == corev1.SchemeGroupVersion.Group && - r.APIVersion == corev1.SchemeGroupVersion.Version && - r.Resource == "nodes" && - r.Subresource == "proxy" -} - -func IsKubeletStats(path string) bool { - return strings.HasSuffix(path, "/stats/summary") -} - -func IsKubeletMetrics(path string) bool { - return strings.HasSuffix(path, "/metrics") || strings.HasSuffix(path, "/metrics/cadvisor") || strings.HasSuffix(path, "/metrics/probes") || strings.HasSuffix(path, "/metrics/resource") || strings.HasSuffix(path, "/metrics/resource/v1alpha1") || strings.HasSuffix(path, "/metrics/resource/v1beta1") + w.WriteHeader(code) + _, _ = w.Write(body) } diff --git a/pkg/server/filters/nodechanges.go b/pkg/server/filters/nodechanges.go index 2b0ac1b71..0ba64d171 100644 --- a/pkg/server/filters/nodechanges.go +++ b/pkg/server/filters/nodechanges.go @@ -88,12 +88,12 @@ func patchNode(ctx context.Context, w http.ResponseWriter, req *http.Request, s q := req.URL.Query() q.Add("dryRun", "All") req.URL.RawQuery = q.Encode() - code, header, data, err := executeRequest(req, h) + code, header, data, err := ExecuteRequest(req, h) if err != nil { responsewriters.ErrorNegotiated(err, s, corev1.SchemeGroupVersion, w, req) return } else if code != http.StatusOK { - writeWithHeader(w, code, header, data) + WriteWithHeader(w, code, header, data) return } diff --git a/pkg/server/server.go b/pkg/server/server.go index 1b124717e..8302c0f32 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -2,6 +2,7 @@ package server import ( "context" + "fmt" "io" "net" "net/http" @@ -9,6 +10,7 @@ import ( "strconv" "time" + "github.com/loft-sh/vcluster/pkg/apiservice" "github.com/loft-sh/vcluster/pkg/authentication/delegatingauthenticator" "github.com/loft-sh/vcluster/pkg/authorization/allowall" "github.com/loft-sh/vcluster/pkg/authorization/delegatingauthorizer" @@ -202,32 +204,29 @@ func NewServer(ctx *config.ControllerContext, requestHeaderCaFile, clientCaFile h := handler.ImpersonatingHandler("", virtualConfig) // pre hooks - for _, f := range ctx.PreHooks { - h = f(h, config.Clients{ - UncachedVirtualClient: uncachedVirtualClient, - CachedVirtualClient: cachedVirtualClient, - UncachedHostClient: uncachedLocalClient, - CachedHostClient: cachedLocalClient, - }) + clients := config.Clients{ + UncachedVirtualClient: uncachedVirtualClient, + CachedVirtualClient: cachedVirtualClient, + UncachedHostClient: uncachedLocalClient, + CachedHostClient: cachedLocalClient, + HostConfig: localConfig, + VirtualConfig: virtualConfig, + } + for _, f := range ctx.PreServerHooks { + h = f(h, clients) } h = filters.WithServiceCreateRedirect(h, uncachedLocalClient, uncachedVirtualClient, virtualConfig, ctx.Config.Experimental.SyncSettings.SyncLabels) h = filters.WithRedirect(h, localConfig, uncachedLocalClient.Scheme(), uncachedVirtualClient, admissionHandler, s.redirectResources) h = filters.WithMetricsProxy(h, localConfig, cachedVirtualClient) - // is metrics proxy enabled? - if ctx.Config.Observability.Metrics.Proxy.Nodes || ctx.Config.Observability.Metrics.Proxy.Pods { - h = filters.WithMetricsServerProxy( - h, - ctx.Config.WorkloadTargetNamespace, - cachedLocalClient, - cachedVirtualClient, - localConfig, - virtualConfig, - ctx.Config.Experimental.MultiNamespaceMode.Enabled, - ) + // inject apis + if ctx.StartAPIServiceProxy { + err = apiservice.StartAPIServiceProxy(ctx.Context, ctx.LocalManager.GetConfig(), ctx.Config.VirtualClusterKubeConfig().ServerCACert, ctx.Config.VirtualClusterKubeConfig().ServerCAKey) + if err != nil { + return nil, fmt.Errorf("start api service proxy: %w", err) + } } - if ctx.Config.Sync.FromHost.Nodes.Enabled && ctx.Config.Sync.FromHost.Nodes.SyncBackChanges { h = filters.WithNodeChanges(ctx.Context, h, uncachedLocalClient, uncachedVirtualClient, virtualConfig) } @@ -239,17 +238,11 @@ func NewServer(ctx *config.ControllerContext, requestHeaderCaFile, clientCaFile } // post hooks - for _, f := range ctx.PostHooks { - h = f(h, config.Clients{ - UncachedVirtualClient: uncachedVirtualClient, - CachedVirtualClient: cachedVirtualClient, - UncachedHostClient: uncachedLocalClient, - CachedHostClient: cachedLocalClient, - }) + for _, f := range ctx.PostServerHooks { + h = f(h, clients) } serverhelper.HandleRoute(s.handler, "/", h) - return s, nil } diff --git a/pkg/setup/config.go b/pkg/setup/config.go index e391efbf0..5c63b6b1c 100644 --- a/pkg/setup/config.go +++ b/pkg/setup/config.go @@ -6,15 +6,11 @@ import ( "os" vclusterconfig "github.com/loft-sh/vcluster/config" - "github.com/loft-sh/vcluster/config/legacyconfig" "github.com/loft-sh/vcluster/pkg/config" - "github.com/loft-sh/vcluster/pkg/helm" "github.com/loft-sh/vcluster/pkg/k3s" "github.com/loft-sh/vcluster/pkg/util/translate" "github.com/pkg/errors" - "gopkg.in/yaml.v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kblabels "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -87,12 +83,6 @@ func InitAndValidateConfig(ctx context.Context, vConfig *config.VirtualClusterCo // EnsureBackingStoreChanges ensures that only a certain set of allowed changes to the backing store and distro occur. func EnsureBackingStoreChanges(ctx context.Context, client kubernetes.Interface, name, namespace, distro string, backingStoreType vclusterconfig.StoreType) error { - if ok, err := CheckUsingHelm(ctx, client, name, namespace, distro, backingStoreType); err != nil { - return err - } else if ok { - return nil - } - if ok, err := CheckUsingSecretAnnotation(ctx, client, name, namespace, distro, backingStoreType); err != nil { return fmt.Errorf("using secret annotations: %w", err) } else if ok { @@ -116,109 +106,6 @@ func EnsureBackingStoreChanges(ctx context.Context, client kubernetes.Interface, return nil } -// CheckUsingHelm fetches the previous release revision and its computed values, and then reconstructs the distro and storage settings. -func CheckUsingHelm(ctx context.Context, client kubernetes.Interface, name, namespace, distro string, backingStoreType vclusterconfig.StoreType) (bool, error) { - ls := kblabels.Set{} - ls["name"] = name - releases, err := helm.NewSecrets(client).ListUnfiltered(ctx, ls.AsSelector(), namespace) - if err != nil || len(releases) == 0 { - return false, nil - } - - // (ThomasK33): if there is only one revision, we're dealing with an initial installation - // at which point we can just exit - if len(releases) == 1 { - return true, nil - } - - // We need to check if we can deserialize the existing values into multiple kind of config structs (legacy and current ones) - previousRelease := releases[len(releases)-2] - if previousRelease.Config == nil { - return false, nil - } - - // marshal previous release config - previousConfigRaw, err := yaml.Marshal(previousRelease.Config) - if err != nil { - return false, nil - } - - // Try parsing as 0.20 values - if success, err := func() (bool, error) { - previousConfig := vclusterconfig.Config{} - if err := previousConfig.UnmarshalYAMLStrict(previousConfigRaw); err != nil { - return false, nil - } - - if err := vclusterconfig.ValidateStoreAndDistroChanges( - backingStoreType, - previousConfig.BackingStoreType(), - distro, - previousConfig.Distro(), - ); err != nil { - return false, err - } - - return true, nil - }(); err != nil { - return false, err - } else if success { - return true, nil - } - - // Try parsing as < 0.20 values - var previousStoreType vclusterconfig.StoreType - previousDistro := "" - - switch previousRelease.Chart.Metadata.Name { - case "vcluster-k8s": - previousDistro = vclusterconfig.K8SDistro - case "vcluster-eks": - previousDistro = vclusterconfig.EKSDistro - case "vcluster-k0s": - previousDistro = vclusterconfig.K0SDistro - case "vcluster": - previousDistro = vclusterconfig.K3SDistro - default: - // unknown chart, we should exit here - return true, nil - } - - switch previousDistro { - // handles k8s and eks values - case vclusterconfig.K8SDistro, vclusterconfig.EKSDistro: - previousConfig := legacyconfig.LegacyK8s{} - if err := yaml.Unmarshal(previousConfigRaw, &previousConfig); err != nil { - return false, err - } - - if previousConfig.EmbeddedEtcd.Enabled { - previousStoreType = vclusterconfig.StoreTypeEmbeddedEtcd - } else { - previousStoreType = vclusterconfig.StoreTypeExternalEtcd - } - - // handles k0s and k3s values - default: - previousConfig := legacyconfig.LegacyK0sAndK3s{} - if err := yaml.Unmarshal(previousConfigRaw, &previousConfig); err != nil { - return false, err - } - - if previousConfig.EmbeddedEtcd.Enabled { - previousStoreType = vclusterconfig.StoreTypeEmbeddedEtcd - } else { - previousStoreType = vclusterconfig.StoreTypeEmbeddedDatabase - } - } - - if err := vclusterconfig.ValidateStoreAndDistroChanges(backingStoreType, previousStoreType, distro, previousDistro); err != nil { - return false, err - } - - return true, nil -} - // CheckUsingHeuristic checks for known file path indicating the existence of a previous distro. // // It checks for the existence of the default K3s token path or the K0s data directory. diff --git a/pkg/setup/controllers.go b/pkg/setup/controllers.go index c55302c75..4e9116288 100644 --- a/pkg/setup/controllers.go +++ b/pkg/setup/controllers.go @@ -11,7 +11,6 @@ import ( "github.com/loft-sh/vcluster/pkg/controllers/resources/services" synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" "github.com/loft-sh/vcluster/pkg/coredns" - "github.com/loft-sh/vcluster/pkg/metricsapiservice" "github.com/loft-sh/vcluster/pkg/plugin" "github.com/loft-sh/vcluster/pkg/pro" "github.com/loft-sh/vcluster/pkg/specialservices" @@ -134,6 +133,14 @@ func StartControllers(controllerContext *config.ControllerContext) error { return fmt.Errorf("register pro controllers: %w", err) } + // run leader hooks + for _, hook := range controllerContext.AcquiredLeaderHooks { + err = hook(controllerContext) + if err != nil { + return fmt.Errorf("execute controller hook: %w", err) + } + } + // write the kube config to secret go func() { wait.Until(func() { @@ -233,19 +240,9 @@ func StartManagers(controllerContext *config.ControllerContext, syncers []syncer controllerContext.VirtualManager.GetCache().WaitForCacheSync(controllerContext.Context) klog.Infof("Successfully started local & virtual manager") - // register APIService - go RegisterOrDeregisterAPIService(controllerContext) - return nil } -func RegisterOrDeregisterAPIService(ctx *config.ControllerContext) { - err := metricsapiservice.RegisterOrDeregisterAPIService(ctx) - if err != nil { - klog.Errorf("Error registering metrics apiservice: %v", err) - } -} - func WriteKubeConfigToSecret(ctx context.Context, currentNamespace string, currentNamespaceClient client.Client, options *config.VirtualClusterConfig, syncerConfig *clientcmdapi.Config) error { syncerConfig, err := CreateVClusterKubeConfig(syncerConfig, options) if err != nil { diff --git a/pkg/specialservices/proxy_service_syncer.go b/pkg/specialservices/proxy_service_syncer.go deleted file mode 100644 index 5e26ccaec..000000000 --- a/pkg/specialservices/proxy_service_syncer.go +++ /dev/null @@ -1,99 +0,0 @@ -package specialservices - -import ( - "slices" - - synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" - "github.com/loft-sh/vcluster/pkg/util/translate" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" - kerrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" -) - -var ( - VclusterProxyMetricsSvcKey = types.NamespacedName{ - Name: "metrics-server", - Namespace: "kube-system", - } -) - -const ( - PhysicalSvcSelectorKeyApp = "app" - PhysicalSvcSelectorKeyRelease = "release" - PhysicalMetricsServerServiceNameSuffix = "-metrics-proxy" -) - -func SyncVclusterProxyService(ctx *synccontext.SyncContext, - _, - svcName string, - vSvcToSync types.NamespacedName, - _ ServicePortTranslator, -) error { - pClient := ctx.PhysicalClient - // get physical service - pObj := &corev1.Service{} - err := pClient.Get(ctx.Context, types.NamespacedName{ - Namespace: translate.Default.PhysicalNamespace(vSvcToSync.Namespace), - Name: svcName + PhysicalMetricsServerServiceNameSuffix, - }, pObj) - if err != nil { - if kerrors.IsNotFound(err) { - return nil - } - - return err - } - - // check if pobject has the expected selectors, if not update - // and make it point to the syncer pod - expectedPhysicalSvcSelectors := map[string]string{ - PhysicalSvcSelectorKeyApp: "vcluster", - PhysicalSvcSelectorKeyRelease: svcName, - } - - if !equality.Semantic.DeepEqual(pObj.Spec.Selector, expectedPhysicalSvcSelectors) { - pObj.Spec.Selector = expectedPhysicalSvcSelectors - err = pClient.Update(ctx.Context, pObj) - if err != nil { - klog.Errorf("error updating physical metrics server service object %v", err) - return err - } - } - - vClient := ctx.VirtualClient - vObj := &corev1.Service{} - err = vClient.Get(ctx.Context, vSvcToSync, vObj) - if err != nil { - if kerrors.IsNotFound(err) { - return nil - } - - return err - } - - if vObj.Spec.ClusterIP != pObj.Spec.ClusterIP || !slices.Equal(vObj.Spec.ClusterIPs, pObj.Spec.ClusterIPs) { - newService := vObj.DeepCopy() - newService.Spec.ClusterIP = pObj.Spec.ClusterIP - newService.Spec.ClusterIPs = pObj.Spec.ClusterIPs - newService.Spec.IPFamilies = pObj.Spec.IPFamilies - newService.Spec.IPFamilyPolicy = pObj.Spec.IPFamilyPolicy - - // delete & create with correct ClusterIP - err = vClient.Delete(ctx.Context, vObj) - if err != nil { - return err - } - - newService.ResourceVersion = "" - - // create the new service with the correct cluster ip - err = vClient.Create(ctx.Context, newService) - if err != nil { - return err - } - } - - return nil -} diff --git a/pkg/util/clienthelper/helper.go b/pkg/util/clienthelper/helper.go index 6344f65f4..b0904cb30 100644 --- a/pkg/util/clienthelper/helper.go +++ b/pkg/util/clienthelper/helper.go @@ -6,6 +6,7 @@ import ( "os" "reflect" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/client-go/rest" @@ -74,8 +75,14 @@ func GetByIndex(ctx context.Context, c client.Client, obj runtime.Object, index, list, err := c.Scheme().New(gvk.GroupVersion().WithKind(gvk.Kind + "List")) if err != nil { - // TODO: handle runtime.IsNotRegisteredError(err) - return err + if !runtime.IsNotRegisteredError(err) { + return err + } + + unstructuredList := &unstructured.UnstructuredList{} + unstructuredList.SetKind(gvk.Kind + "List") + unstructuredList.SetAPIVersion(gvk.GroupVersion().String()) + list = unstructuredList } err = c.List(ctx, list.(client.ObjectList), client.MatchingFields{index: value}) diff --git a/pkg/util/translate/multi_namespace.go b/pkg/util/translate/multi_namespace.go index e36d19596..4501e15ac 100644 --- a/pkg/util/translate/multi_namespace.go +++ b/pkg/util/translate/multi_namespace.go @@ -36,6 +36,11 @@ func (s *multiNamespace) PhysicalName(name, _ string) string { return name } +// PhysicalNameShort returns the short physical name of the name / namespace resource +func (s *multiNamespace) PhysicalNameShort(name, _ string) string { + return name +} + func (s *multiNamespace) objectPhysicalName(obj runtime.Object) string { if obj == nil { return "" @@ -56,7 +61,7 @@ func (s *multiNamespace) PhysicalNameClusterScoped(name string) string { return SafeConcatName("vcluster", name, "x", s.currentNamespace, "x", VClusterName) } -func (s *multiNamespace) IsManaged(obj runtime.Object) bool { +func (s *multiNamespace) IsManaged(obj runtime.Object, _ PhysicalNameFunc) bool { metaAccessor, err := meta.Accessor(obj) if err != nil { return false diff --git a/pkg/util/translate/single_namespace.go b/pkg/util/translate/single_namespace.go index 17a8fc2d0..a99d29413 100644 --- a/pkg/util/translate/single_namespace.go +++ b/pkg/util/translate/single_namespace.go @@ -34,6 +34,16 @@ func (s *singleNamespace) PhysicalName(name, namespace string) string { return SingleNamespacePhysicalName(name, namespace, VClusterName) } +// PhysicalNameShort returns the short physical name of the name / namespace resource +func (s *singleNamespace) PhysicalNameShort(name, namespace string) string { + if name == "" { + return "" + } + + digest := sha256.Sum256([]byte(strings.Join([]string{name, "x", namespace, "x", VClusterName}, "-"))) + return hex.EncodeToString(digest[0:])[0:8] +} + func SingleNamespacePhysicalName(name, namespace, suffix string) string { if name == "" { return "" @@ -61,7 +71,7 @@ func (s *singleNamespace) PhysicalNameClusterScoped(name string) string { return SafeConcatName("vcluster", name, "x", s.targetNamespace, "x", VClusterName) } -func (s *singleNamespace) IsManaged(obj runtime.Object) bool { +func (s *singleNamespace) IsManaged(obj runtime.Object, physicalName PhysicalNameFunc) bool { metaAccessor, err := meta.Accessor(obj) if err != nil { return false @@ -74,7 +84,9 @@ func (s *singleNamespace) IsManaged(obj runtime.Object) bool { // vcluster has not synced the object IF: // If object-name annotation is not set OR // If object-name annotation is different from actual name - if metaAccessor.GetAnnotations() == nil || metaAccessor.GetAnnotations()[NameAnnotation] == "" || metaAccessor.GetName() != s.PhysicalName(metaAccessor.GetAnnotations()[NameAnnotation], metaAccessor.GetAnnotations()[NamespaceAnnotation]) { + if metaAccessor.GetAnnotations() == nil || + metaAccessor.GetAnnotations()[NameAnnotation] == "" || + metaAccessor.GetName() != physicalName(metaAccessor.GetAnnotations()[NameAnnotation], metaAccessor.GetAnnotations()[NamespaceAnnotation]) { return false } diff --git a/pkg/util/translate/translate.go b/pkg/util/translate/translate.go index 76859abb0..122767700 100644 --- a/pkg/util/translate/translate.go +++ b/pkg/util/translate/translate.go @@ -300,7 +300,6 @@ func EnsureCRDFromPhysicalCluster(ctx context.Context, pConfig *rest.Config, vCo crdDefinition.Spec.Versions = newVersions // apply the crd - log.NewWithoutName().Infof("Create crd %s in virtual cluster", groupVersionKind.String()) _, err = vClient.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, crdDefinition, metav1.CreateOptions{}) if err != nil { diff --git a/pkg/util/translate/types.go b/pkg/util/translate/types.go index fd428e814..3cbaa6fc6 100644 --- a/pkg/util/translate/types.go +++ b/pkg/util/translate/types.go @@ -15,12 +15,15 @@ var ( var Default Translator = &singleNamespace{} +// PhysicalNameFunc is a definition to translate a name +type PhysicalNameFunc func(vName, vNamespace string) string + type Translator interface { // SingleNamespaceTarget signals if we sync all objects into a single namespace SingleNamespaceTarget() bool // IsManaged checks if the object is managed by vcluster - IsManaged(obj runtime.Object) bool + IsManaged(obj runtime.Object, physicalName PhysicalNameFunc) bool // IsManagedCluster checks if the cluster scoped object is managed by vcluster IsManagedCluster(obj runtime.Object) bool @@ -35,6 +38,9 @@ type Translator interface { // PhysicalName returns the physical name for a virtual cluster object PhysicalName(vName, vNamespace string) string + // PhysicalNameShort returns the short physical name for a virtual cluster object + PhysicalNameShort(vName, vNamespace string) string + // PhysicalNamespace returns the physical namespace for a virtual cluster object PhysicalNamespace(vNamespace string) string diff --git a/pkg/util/unstructuredhelper/read.go b/pkg/util/unstructuredhelper/read.go new file mode 100644 index 000000000..4981263b4 --- /dev/null +++ b/pkg/util/unstructuredhelper/read.go @@ -0,0 +1,155 @@ +package unstructuredhelper + +import ( + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func ReadFrom(obj client.Object) ReadMap { + return obj.(*unstructured.Unstructured).Object +} + +type ReadArray []interface{} + +func (a ReadArray) Exists() bool { + return a != nil +} + +func (a ReadArray) String(index int) string { + if index >= len(a) { + return "" + } + + retObj, ok := a[index].(string) + if !ok { + return "" + } + + return retObj +} + +func (a ReadArray) Bool(index int) bool { + if index >= len(a) { + return false + } + + retObj, ok := a[index].(bool) + if !ok { + return false + } + + return retObj +} + +func (a ReadArray) Map(index int) ReadMap { + if index >= len(a) { + return nil + } + + retObj, ok := a[index].(map[string]interface{}) + if !ok { + return nil + } + + return retObj +} + +func (a ReadArray) Array(index int) ReadArray { + if index >= len(a) { + return nil + } + + retObj, ok := a[index].([]interface{}) + if !ok { + return nil + } + + return retObj +} + +type ReadMap map[string]interface{} + +func (m ReadMap) Exists() bool { + return m != nil +} + +func (m ReadMap) Has(key string) bool { + if m == nil { + return false + } + + _, ok := m[key] + return ok +} + +func (m ReadMap) Bool(key string) bool { + if m == nil { + return false + } + + obj, ok := m[key] + if !ok { + return false + } + + retObj, ok := obj.(bool) + if !ok { + return false + } + + return retObj +} + +func (m ReadMap) Array(key string) ReadArray { + if m == nil { + return nil + } + + obj, ok := m[key] + if !ok { + return ReadArray{} + } + + retObj, ok := obj.([]interface{}) + if !ok { + return nil + } + + return retObj +} + +func (m ReadMap) Map(key string) ReadMap { + if m == nil { + return nil + } + + obj, ok := m[key] + if !ok { + return ReadMap{} + } + + retObj, ok := obj.(map[string]interface{}) + if !ok { + return nil + } + + return retObj +} + +func (m ReadMap) String(key string) string { + if m == nil { + return "" + } + + str, ok := m[key] + if !ok { + return "" + } + + strVal, ok := str.(string) + if !ok { + return "" + } + + return strVal +} diff --git a/pkg/util/unstructuredhelper/write.go b/pkg/util/unstructuredhelper/write.go new file mode 100644 index 000000000..211130a28 --- /dev/null +++ b/pkg/util/unstructuredhelper/write.go @@ -0,0 +1,314 @@ +package unstructuredhelper + +import ( + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/yaml" +) + +func ValueFrom[T any](path string, from ReadMap) (T, bool) { + pathSplitted := strings.Split(path, ".") + + // traverse to target + targetMap := from + for i := 0; i < len(pathSplitted)-1; i++ { + targetMap = targetMap.Map(pathSplitted[i]) + } + + // check if map has latest path + lastKey := pathSplitted[len(pathSplitted)-1] + if !targetMap.Has(lastKey) { + var ret T + return ret, false + } + + // try to convert target value + targetValue, ok := targetMap[lastKey].(T) + if !ok { + var ret T + return ret, false + } + + return targetValue, true +} + +type TranslateFn[T any] func(in T) T + +func Set[T any](path string, from ReadMap, to WriteMap, translate TranslateFn[T]) { + pathSplitted := strings.Split(path, ".") + + // traverse to target + targetMap := from + toMap := to + for i := 0; i < len(pathSplitted)-1; i++ { + targetMap = targetMap.Map(pathSplitted[i]) + toMap = toMap.Map(pathSplitted[i]) + } + + // check if map has latest path + lastKey := pathSplitted[len(pathSplitted)-1] + + // try to convert target value + var targetValue T + if targetMap.Has(lastKey) { + var ok bool + targetValue, ok = targetMap[lastKey].(T) + if !ok { + return + } + } + + toMap[lastKey] = translate(targetValue) +} + +func TranslateArray[T any](path string, from ReadMap, to WriteMap, translate TranslateFn[T]) { + pathSplitted := strings.Split(path, ".") + + // traverse to target + targetMap := from + for i := 0; i < len(pathSplitted)-1; i++ { + targetMap = targetMap.Map(pathSplitted[i]) + } + + // check if map has latest path + lastKey := pathSplitted[len(pathSplitted)-1] + if !targetMap.Has(lastKey) { + return + } + + // to map + toMap := to + for i := 0; i < len(pathSplitted)-1; i++ { + toMap = toMap.Map(pathSplitted[i]) + } + + // try to get source value + sourceArray, ok := toMap[lastKey].([]T) + if !ok { + return + } + + for i := range sourceArray { + sourceArray[i] = translate(sourceArray[i]) + } +} + +func Translate[T any](path string, from ReadMap, to WriteMap, translate TranslateFn[T]) { + pathSplitted := strings.Split(path, ".") + + // traverse to target + targetMap := from + for i := 0; i < len(pathSplitted)-1; i++ { + targetMap = targetMap.Map(pathSplitted[i]) + } + + // check if map has latest path + lastKey := pathSplitted[len(pathSplitted)-1] + if !targetMap.Has(lastKey) { + return + } + + // to map + toMap := to + for i := 0; i < len(pathSplitted)-1; i++ { + toMap = toMap.Map(pathSplitted[i]) + } + + // try to convert target value + targetValue, ok := targetMap[lastKey].(T) + if !ok { + return + } + + toMap[lastKey] = translate(targetValue) +} + +func WriteInto(obj client.Object) WriteMap { + return obj.(*unstructured.Unstructured).Object +} + +type WriteArray []interface{} + +func (a WriteArray) Exists() bool { + return a != nil +} + +func (a WriteArray) String(index int) string { + if index >= len(a) { + return "" + } + + retObj, ok := a[index].(string) + if !ok { + return "" + } + + return retObj +} + +func (a WriteArray) Bool(index int) bool { + if index >= len(a) { + return false + } + + retObj, ok := a[index].(bool) + if !ok { + return false + } + + return retObj +} + +func (a WriteArray) Map(index int) WriteMap { + if index >= len(a) { + return nil + } + + retObj, ok := a[index].(map[string]interface{}) + if !ok { + return nil + } + + return retObj +} + +func (a WriteArray) Array(index int) WriteArray { + if index >= len(a) { + return nil + } + + retObj, ok := a[index].([]interface{}) + if !ok { + return nil + } + + return retObj +} + +type WriteMap map[string]interface{} + +func (m WriteMap) Exists() bool { + return m != nil +} + +func (m WriteMap) Has(key string) bool { + if m == nil { + return false + } + + _, ok := m[key] + return ok +} + +func (m WriteMap) Bool(key string) bool { + if m == nil { + return false + } + + obj, ok := m[key] + if !ok { + return false + } + + retObj, ok := obj.(bool) + if !ok { + return false + } + + return retObj +} + +func (m WriteMap) Array(key string) WriteArray { + if m == nil { + return nil + } + + obj, ok := m[key] + if !ok { + m[key] = []interface{}{} + return m[key].([]interface{}) + } + + retObj, ok := obj.([]interface{}) + if !ok { + return nil + } + + return retObj +} + +func (m WriteMap) Set(key string, val interface{}) { + readMap, ok := val.(ReadMap) + if ok { + m[key] = map[string]interface{}(readMap) + return + } + + writeMap, ok := val.(WriteMap) + if ok { + m[key] = map[string]interface{}(writeMap) + return + } + + readArray, ok := val.(ReadArray) + if ok { + m[key] = []interface{}(readArray) + return + } + + writeArray, ok := val.(WriteArray) + if ok { + m[key] = []interface{}(writeArray) + return + } + + m[key] = val +} + +func (m WriteMap) M(key string) WriteMap { + return m.Map(key) +} + +func (m WriteMap) Map(key string) WriteMap { + if m == nil { + return nil + } + + obj, ok := m[key] + if !ok { + m[key] = map[string]interface{}{} + return m[key].(map[string]interface{}) + } + + retObj, ok := obj.(map[string]interface{}) + if !ok { + return nil + } + + return retObj +} + +func (m WriteMap) String(key string) string { + if m == nil { + return "" + } + + str, ok := m[key] + if !ok { + return "" + } + + strVal, ok := str.(string) + if !ok { + return "" + } + + return strVal +} + +func (m WriteMap) ToString() string { + out, _ := yaml.Marshal(m) + return string(out) +} diff --git a/test/e2e_metrics_proxy/e2e_metrics_proxy_test.go b/test/e2e_metrics_proxy/e2e_metrics_proxy_test.go index 8011434d7..99cfdcb55 100644 --- a/test/e2e_metrics_proxy/e2e_metrics_proxy_test.go +++ b/test/e2e_metrics_proxy/e2e_metrics_proxy_test.go @@ -54,5 +54,5 @@ func TestRunE2ETargetNamespaceTests(t *testing.T) { } }) - ginkgo.RunSpecs(t, "Vcluster e2eProxyMetricsServer suite") + ginkgo.RunSpecs(t, "vcluster e2eProxyMetricsServer suite") } diff --git a/test/e2e_metrics_proxy/metricsproxy/metrics_proxy.go b/test/e2e_metrics_proxy/metricsproxy/metrics_proxy.go index 0dff7bda4..88478af9d 100644 --- a/test/e2e_metrics_proxy/metricsproxy/metrics_proxy.go +++ b/test/e2e_metrics_proxy/metricsproxy/metrics_proxy.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/loft-sh/vcluster/pkg/metricsapiservice" "github.com/loft-sh/vcluster/test/framework" "github.com/onsi/ginkgo/v2" @@ -21,9 +20,9 @@ var _ = ginkgo.Describe("Target Namespace", func() { f := framework.DefaultFramework ginkgo.It("Make sure the metrics api service is registered and available", func() { - err := wait.PollUntilContextTimeout(f.Context, time.Second, time.Minute*1, false, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(f.Context, time.Second, time.Minute*2, false, func(ctx context.Context) (bool, error) { apiRegistrationClient := apiregistrationv1clientset.NewForConfigOrDie(f.VclusterConfig) - apiService, err := apiRegistrationClient.APIServices().Get(ctx, metricsapiservice.MetricsAPIServiceName, metav1.GetOptions{}) + apiService, err := apiRegistrationClient.APIServices().Get(ctx, "v1beta1.metrics.k8s.io", metav1.GetOptions{}) if err != nil { return false, nil } @@ -38,7 +37,7 @@ var _ = ginkgo.Describe("Target Namespace", func() { }) ginkgo.It("Make sure get nodeMetrics and podMetrics succeed", func() { - err := wait.PollUntilContextTimeout(f.Context, time.Second, time.Minute*1, false, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(f.Context, time.Second, time.Minute*2, false, func(ctx context.Context) (bool, error) { metricsClient := metricsv1beta1client.NewForConfigOrDie(f.VclusterConfig) nodeMetricsList, err := metricsClient.NodeMetricses().List(ctx, metav1.ListOptions{}) diff --git a/test/e2e_metrics_proxy/values.yaml b/test/e2e_metrics_proxy/values.yaml index 811f44fa5..d84bbaae6 100644 --- a/test/e2e_metrics_proxy/values.yaml +++ b/test/e2e_metrics_proxy/values.yaml @@ -1,5 +1,3 @@ -observability: - metrics: - proxy: - nodes: true - pods: true +integrations: + metricsServer: + enabled: true