From 3ca9ede9a0e9671a8f0048944dfd34fca21546f5 Mon Sep 17 00:00:00 2001 From: Njal Karevoll Date: Tue, 4 Jun 2019 19:45:34 +0200 Subject: [PATCH 1/2] Remove -es-discovery service. It is no longer required for discovery, which is now file based. --- .../elasticsearch/driver/default.go | 2 +- .../elasticsearch/driver/generic_resources.go | 13 +----- .../pkg/controller/elasticsearch/name/name.go | 5 --- .../pkg/controller/elasticsearch/pod/pod.go | 2 - .../elasticsearch/services/services.go | 40 ------------------- .../elasticsearch/version/common.go | 10 ++--- operators/test/e2e/failure_test.go | 9 ----- operators/test/e2e/stack/checks_k8s.go | 2 - 8 files changed, 7 insertions(+), 76 deletions(-) diff --git a/operators/pkg/controller/elasticsearch/driver/default.go b/operators/pkg/controller/elasticsearch/driver/default.go index 7063bef5bc..fa8a0c3e6b 100644 --- a/operators/pkg/controller/elasticsearch/driver/default.go +++ b/operators/pkg/controller/elasticsearch/driver/default.go @@ -142,7 +142,7 @@ func (d *defaultDriver) Reconcile( d.Scheme, d.CSRClient, es, - []corev1.Service{genericResources.DiscoveryService, genericResources.ExternalService}, + []corev1.Service{genericResources.ExternalService}, d.Parameters.CACertValidity, d.Parameters.CACertRotateBefore, d.Parameters.CertValidity, diff --git a/operators/pkg/controller/elasticsearch/driver/generic_resources.go b/operators/pkg/controller/elasticsearch/driver/generic_resources.go index 12346bcbb4..4959ab8f9a 100644 --- a/operators/pkg/controller/elasticsearch/driver/generic_resources.go +++ b/operators/pkg/controller/elasticsearch/driver/generic_resources.go @@ -18,8 +18,6 @@ import ( type GenericResources struct { // ExternalService is the user-facing service ExternalService corev1.Service - // DiscoveryService is the service used by ES for discovery purposes - DiscoveryService corev1.Service } // reconcileGenericResources reconciles the expected generic resources of a cluster. @@ -32,20 +30,13 @@ func reconcileGenericResources( // TODO: consider removing the "res" bits of the ReconcileService signature? results := &reconciler.Results{} - discoveryService := services.NewDiscoveryService(es) - _, err := common.ReconcileService(c, scheme, discoveryService, &es) - if err != nil { - return nil, results.WithError(err) - } - externalService := services.NewExternalService(es) - _, err = common.ReconcileService(c, scheme, externalService, &es) + _, err := common.ReconcileService(c, scheme, externalService, &es) if err != nil { return nil, results.WithError(err) } return &GenericResources{ - DiscoveryService: *discoveryService, - ExternalService: *externalService, + ExternalService: *externalService, }, results } diff --git a/operators/pkg/controller/elasticsearch/name/name.go b/operators/pkg/controller/elasticsearch/name/name.go index 081e07855c..bf23fd68a7 100644 --- a/operators/pkg/controller/elasticsearch/name/name.go +++ b/operators/pkg/controller/elasticsearch/name/name.go @@ -28,7 +28,6 @@ const ( secureSettingsSecretSuffix = "secure-settings" certsSecretSuffix = "certs" httpServiceSuffix = "http" - discoveryServiceSuffix = "discovery" elasticUserSecretSuffix = "elastic-user" esRolesUsersSecretSuffix = "roles-users" clusterSecretsSecretSuffix = "secrets" @@ -102,10 +101,6 @@ func HTTPService(esName string) string { return ESNamer.Suffix(esName, httpServiceSuffix) } -func DiscoveryService(esName string) string { - return ESNamer.Suffix(esName, discoveryServiceSuffix) -} - func ElasticUserSecret(esName string) string { return ESNamer.Suffix(esName, elasticUserSecretSuffix) } diff --git a/operators/pkg/controller/elasticsearch/pod/pod.go b/operators/pkg/controller/elasticsearch/pod/pod.go index 57b8258673..7b7b2fb475 100644 --- a/operators/pkg/controller/elasticsearch/pod/pod.go +++ b/operators/pkg/controller/elasticsearch/pod/pod.go @@ -58,8 +58,6 @@ type NewPodSpecParams struct { CustomImageName string // ClusterName is the name of the Elasticsearch cluster ClusterName string - // DiscoveryServiceName is the name of the Service that should be used for discovery. - DiscoveryServiceName string // DiscoveryZenMinimumMasterNodes is the setting for minimum master node in Zen Discovery DiscoveryZenMinimumMasterNodes int diff --git a/operators/pkg/controller/elasticsearch/services/services.go b/operators/pkg/controller/elasticsearch/services/services.go index 5df2082e5a..16ec7c930e 100644 --- a/operators/pkg/controller/elasticsearch/services/services.go +++ b/operators/pkg/controller/elasticsearch/services/services.go @@ -23,41 +23,6 @@ const ( globalServiceSuffix = ".svc.cluster.local" ) -// DiscoveryServiceName returns the name for the discovery service -// associated to this cluster -func DiscoveryServiceName(esName string) string { - return name.DiscoveryService(esName) -} - -// NewDiscoveryService returns the discovery service associated to the given cluster -// It is used by nodes to talk to each other. -func NewDiscoveryService(es v1alpha1.Elasticsearch) *corev1.Service { - nsn := k8s.ExtractNamespacedName(&es) - return &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: es.Namespace, - Name: DiscoveryServiceName(es.Name), - Labels: label.NewLabels(nsn), - }, - Spec: corev1.ServiceSpec{ - Selector: label.NewLabels(nsn), - Ports: []corev1.ServicePort{ - corev1.ServicePort{ - Protocol: corev1.ProtocolTCP, - Port: network.TransportPort, - }, - }, - // We set ClusterIP to None in order to let the ES nodes discover all other node IPs at once. - ClusterIP: "None", - SessionAffinity: corev1.ServiceAffinityNone, - Type: corev1.ServiceTypeClusterIP, - // Nodes need to discover themselves before the pod is considered ready, - // otherwise minimum master nodes would never be reached - PublishNotReadyAddresses: true, - }, - } -} - // ExternalServiceName returns the name for the external service // associated to this cluster func ExternalServiceName(esName string) string { @@ -69,11 +34,6 @@ func ExternalServiceURL(es v1alpha1.Elasticsearch) string { return stringsutil.Concat("https://", ExternalServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.HTTPPort)) } -// ExternalDiscoveryServiceHostname returns the hostname used to reach Elasticsearch's discovery endpoint. -func ExternalDiscoveryServiceHostname(es types.NamespacedName) string { - return stringsutil.Concat(DiscoveryServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.TransportPort)) -} - // NewExternalService returns the external service associated to the given cluster // It is used by users to perform requests against one of the cluster nodes. func NewExternalService(es v1alpha1.Elasticsearch) *corev1.Service { diff --git a/operators/pkg/controller/elasticsearch/version/common.go b/operators/pkg/controller/elasticsearch/version/common.go index a38c420081..8b988f4026 100644 --- a/operators/pkg/controller/elasticsearch/version/common.go +++ b/operators/pkg/controller/elasticsearch/version/common.go @@ -15,7 +15,6 @@ import ( "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/name" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/pod" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/processmanager" - "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/services" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/settings" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/user" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/volume" @@ -45,11 +44,10 @@ func NewExpectedPodSpecs( for i := int32(0); i < node.NodeCount; i++ { params := pod.NewPodSpecParams{ // cluster-wide params - Version: es.Spec.Version, - CustomImageName: es.Spec.Image, - ClusterName: es.Name, - DiscoveryServiceName: services.DiscoveryServiceName(es.Name), - SetVMMaxMapCount: es.Spec.SetVMMaxMapCount, + Version: es.Spec.Version, + CustomImageName: es.Spec.Image, + ClusterName: es.Name, + SetVMMaxMapCount: es.Spec.SetVMMaxMapCount, // volumes UsersSecretVolume: paramsTmpl.UsersSecretVolume, ConfigMapVolume: paramsTmpl.ConfigMapVolume, diff --git a/operators/test/e2e/failure_test.go b/operators/test/e2e/failure_test.go index 8c56a94af6..ae141d3e93 100644 --- a/operators/test/e2e/failure_test.go +++ b/operators/test/e2e/failure_test.go @@ -151,15 +151,6 @@ func TestDeleteServices(t *testing.T) { WithESMasterDataNodes(1, stack.DefaultResources) RunFailureTest(t, s, func(k *helpers.K8sHelper) helpers.TestStepList { return helpers.TestStepList{ - { - Name: "Delete discovery service", - Test: func(t *testing.T) { - s, err := k.GetService(s.Elasticsearch.Name + "-es-discovery") - require.NoError(t, err) - err = k.Client.Delete(s) - require.NoError(t, err) - }, - }, { Name: "Delete external service", Test: func(t *testing.T) { diff --git a/operators/test/e2e/stack/checks_k8s.go b/operators/test/e2e/stack/checks_k8s.go index 4024b2a8d7..54d61e155d 100644 --- a/operators/test/e2e/stack/checks_k8s.go +++ b/operators/test/e2e/stack/checks_k8s.go @@ -282,7 +282,6 @@ func CheckServices(stack Builder, k *helpers.K8sHelper) helpers.TestStep { Name: "Services should be created", Test: helpers.Eventually(func() error { for _, s := range []string{ - stack.Elasticsearch.Name + "-es-discovery", esname.HTTPService(stack.Elasticsearch.Name), kbname.HTTPService(stack.Kibana.Name), } { @@ -301,7 +300,6 @@ func CheckServicesEndpoints(stack Builder, k *helpers.K8sHelper) helpers.TestSte Name: "Services should have endpoints", Test: helpers.Eventually(func() error { for endpointName, addrCount := range map[string]int{ - stack.Elasticsearch.Name + "-es-discovery": int(stack.Elasticsearch.Spec.NodeCount()), kbname.HTTPService(stack.Kibana.Name): int(stack.Kibana.Spec.NodeCount), esname.HTTPService(stack.Elasticsearch.Name): int(stack.Elasticsearch.Spec.NodeCount()), } { From 33123a9d03a6c5e33036cfd8fb5e4c1d88767c1c Mon Sep 17 00:00:00 2001 From: Njal Karevoll Date: Tue, 4 Jun 2019 19:54:13 +0200 Subject: [PATCH 2/2] Add -es-remote-cluster-seed Service automatically in remote cluster controller. --- .../remotecluster/driver_incluster.go | 78 +++++++++++- .../remotecluster/driver_incluster_test.go | 9 +- .../controller/remotecluster/finalizers.go | 27 +++++ .../remotecluster/finalizers_test.go | 114 ++++++++++++++++++ .../pkg/controller/remotecluster/labels.go | 4 + .../pkg/controller/remotecluster/watches.go | 73 ++++++++++- .../controller/remotecluster/watches_test.go | 100 +++++++++++++++ 7 files changed, 394 insertions(+), 11 deletions(-) create mode 100644 operators/pkg/controller/remotecluster/finalizers_test.go create mode 100644 operators/pkg/controller/remotecluster/watches_test.go diff --git a/operators/pkg/controller/remotecluster/driver_incluster.go b/operators/pkg/controller/remotecluster/driver_incluster.go index bf0d200f60..6f2b19625f 100644 --- a/operators/pkg/controller/remotecluster/driver_incluster.go +++ b/operators/pkg/controller/remotecluster/driver_incluster.go @@ -9,22 +9,29 @@ import ( commonv1alpha1 "github.com/elastic/cloud-on-k8s/operators/pkg/apis/common/v1alpha1" "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" + "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/finalizer" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/label" - "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/services" + esname "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/name" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" ) const ( LocalTrustRelationshipPrefix = "rc" RemoteTrustRelationshipPrefix = "rcr" + + // remoteClusterSeedServiceSuffix is the suffix used for the remote cluster seed service + remoteClusterSeedServiceSuffix = "remote-cluster-seed" ) func doReconcile( r *ReconcileRemoteCluster, remoteCluster v1alpha1.RemoteCluster, ) (v1alpha1.RemoteClusterStatus, error) { - // Get the previous remote associated cluster, if the remote namespace has been updated by the user we must // delete the remote relationship from the old namespace and recreate it in the new namespace. if len(remoteCluster.Status.K8SLocalStatus.RemoteSelector.Namespace) > 0 && @@ -71,8 +78,12 @@ func doReconcile( remoteCluster, localClusterSelector, remoteCluster.Spec.Remote.K8sLocalRef, - r.watches) - err := h.Handle(&remoteCluster, watchFinalizer) + r.watches, + ) + + seedServiceFinalizer := seedServiceFinalizer(r.Client, remoteCluster) + + err := h.Handle(&remoteCluster, watchFinalizer, seedServiceFinalizer) if err != nil { return updateStatusWithPhase(&remoteCluster, v1alpha1.RemoteClusterFailed), err } @@ -140,12 +151,18 @@ func doReconcile( return updateStatusWithPhase(&remoteCluster, v1alpha1.RemoteClusterFailed), err } + // Create remote service for seeding + svc, err := reconcileRemoteClusterSeedService(r.Client, r.scheme, remoteCluster) + if err != nil { + return updateStatusWithPhase(&remoteCluster, v1alpha1.RemoteClusterFailed), err + } + // Build status status := v1alpha1.RemoteClusterStatus{ Phase: v1alpha1.RemoteClusterPropagated, ClusterName: localClusterSelector.Name, LocalTrustRelationship: localRelationshipName, - SeedHosts: []string{services.ExternalDiscoveryServiceHostname(remote.Selector.NamespacedName())}, + SeedHosts: seedHostsFromService(svc), K8SLocalStatus: v1alpha1.LocalRefStatus{ RemoteSelector: remote.Selector, RemoteTrustRelationship: remoteRelationshipName, @@ -154,6 +171,47 @@ func doReconcile( return status, nil } +// reconcileRemoteClusterSeedService reconciles a Service that we can use as the remote cluster seed hosts. +// +// This service is shared between all remote clusters configured this way, and is deleted whenever any of them are +// deleted in a finalizer. There's a watch that re-creates it if it's still in use. +func reconcileRemoteClusterSeedService( + c k8s.Client, + scheme *runtime.Scheme, + remoteCluster v1alpha1.RemoteCluster, +) (*v1.Service, error) { + ns := remoteCluster.Spec.Remote.K8sLocalRef.Namespace + // if the remote has no namespace, assume it's in the same namespace as the RemoteCluster resource + if ns == "" { + ns = remoteCluster.Namespace + } + service := v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: remoteClusterSeedServiceName(remoteCluster.Spec.Remote.K8sLocalRef.Name), + Labels: map[string]string{ + RemoteClusterSeedServiceForLabelName: remoteCluster.Spec.Remote.K8sLocalRef.Name, + }, + }, + Spec: v1.ServiceSpec{ + PublishNotReadyAddresses: true, + Ports: []v1.ServicePort{ + {Protocol: v1.ProtocolTCP, Port: 9300, TargetPort: intstr.FromInt(9300)}, + }, + Selector: map[string]string{ + common.TypeLabelName: label.Type, + label.ClusterNameLabelName: remoteCluster.Spec.Remote.K8sLocalRef.Name, + }, + }, + } + + if _, err := common.ReconcileService(c, scheme, &service, nil); err != nil { + return nil, err + } + + return &service, nil +} + func caCertMissingError(location string, selector commonv1alpha1.ObjectSelector) string { return fmt.Sprintf( CaCertMissingError, @@ -171,3 +229,13 @@ func updateStatusWithPhase( status.Phase = phase return *status } + +// remoteClusterSeedServiceName returns the name of the remote cluster seed service. +func remoteClusterSeedServiceName(esName string) string { + return esname.ESNamer.Suffix(esName, remoteClusterSeedServiceSuffix) +} + +// seedHostsFromService returns the seed hosts to use for a given service. +func seedHostsFromService(svc *v1.Service) []string { + return []string{fmt.Sprintf("%s.%s.svc:9300", svc.Name, svc.Namespace)} +} diff --git a/operators/pkg/controller/remotecluster/driver_incluster_test.go b/operators/pkg/controller/remotecluster/driver_incluster_test.go index 404e51f4ca..9db6c5489b 100644 --- a/operators/pkg/controller/remotecluster/driver_incluster_test.go +++ b/operators/pkg/controller/remotecluster/driver_incluster_test.go @@ -153,13 +153,18 @@ func Test_apply(t *testing.T) { }, RemoteTrustRelationship: "rcr-remotecluster-sample-1-2-default", }, - SeedHosts: []string{"trust-two-es-es-discovery.default.svc.cluster.local:9300"}, + SeedHosts: []string{"trust-two-es-es-remote-cluster-seed.default.svc:9300"}, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.NoError(t, tt.args.rca.watches.InjectScheme(sc)) + if tt.args.rca.scheme == nil { + tt.args.rca.scheme = sc + } + + assert.NoError(t, tt.args.rca.watches.InjectScheme(tt.args.rca.scheme)) + got, err := doReconcile(tt.args.rca, tt.args.remoteCluster) if (err != nil) != tt.wantErr { t.Errorf("apply() error = %v, wantErr %v", err, tt.wantErr) diff --git a/operators/pkg/controller/remotecluster/finalizers.go b/operators/pkg/controller/remotecluster/finalizers.go index 7ceebc00a3..cb716b4d12 100644 --- a/operators/pkg/controller/remotecluster/finalizers.go +++ b/operators/pkg/controller/remotecluster/finalizers.go @@ -9,6 +9,10 @@ import ( "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/finalizer" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // watchFinalizer ensure that we remove watches for Secrets we are no longer interested in @@ -26,3 +30,26 @@ func watchFinalizer( }, } } + +// seedServiceFinalizer ensures that we remove the seed service if it's no longer required +func seedServiceFinalizer(c k8s.Client, remoteCluster v1alpha1.RemoteCluster) finalizer.Finalizer { + return finalizer.Finalizer{ + Name: RemoteClusterSeedServiceFinalizer, + Execute: func() error { + svc := v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: remoteCluster.Spec.Remote.K8sLocalRef.Namespace, + Name: remoteClusterSeedServiceName(remoteCluster.Spec.Remote.K8sLocalRef.Name), + }, + } + if svc.Namespace == "" { + svc.Namespace = remoteCluster.Namespace + } + + if err := c.Delete(&svc); err != nil && errors.IsNotFound(err) { + return err + } + return nil + }, + } +} diff --git a/operators/pkg/controller/remotecluster/finalizers_test.go b/operators/pkg/controller/remotecluster/finalizers_test.go new file mode 100644 index 0000000000..8037555fbf --- /dev/null +++ b/operators/pkg/controller/remotecluster/finalizers_test.go @@ -0,0 +1,114 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remotecluster + +import ( + "testing" + + "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" +) + +import ( + commonv1alpha1 "github.com/elastic/cloud-on-k8s/operators/pkg/apis/common/v1alpha1" + "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/finalizer" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func Test_seedServiceFinalizer(t *testing.T) { + esName := "foo" + esNamespace := "bar" + rc := &v1alpha1.RemoteCluster{ + Spec: v1alpha1.RemoteClusterSpec{ + Remote: v1alpha1.RemoteClusterRef{ + K8sLocalRef: commonv1alpha1.ObjectSelector{ + Name: esName, + Namespace: esNamespace, + }, + }, + }, + } + + // remote cluster whose remote cluster has no namespace defined + rcNoRemoteClusterNamespace := rc.DeepCopy() + rcNoRemoteClusterNamespace.Namespace = esNamespace + rcNoRemoteClusterNamespace.Spec.Remote.K8sLocalRef.Namespace = "" + + rcSvc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: remoteClusterSeedServiceName(esName), + Namespace: esNamespace, + }, + } + + otherSvc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: remoteClusterSeedServiceName(esName) + "-other", + Namespace: esNamespace, + }, + } + + type args struct { + c k8s.Client + remoteCluster v1alpha1.RemoteCluster + } + tests := []struct { + name string + args args + want func(t *testing.T, c k8s.Client, f finalizer.Finalizer) + }{ + { + name: "should delete the related remote cluster seed service", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient(rc, rcSvc, otherSvc)), + remoteCluster: *rc, + }, + want: func(t *testing.T, c k8s.Client, f finalizer.Finalizer) { + require.NoError(t, f.Execute()) + + // the service should be deleted + var svc v1.Service + err := c.Get(k8s.ExtractNamespacedName(rcSvc), &svc) + assert.Error(t, err) + assert.True(t, errors.IsNotFound(err)) + + // the other service should not be deleted + err = c.Get(k8s.ExtractNamespacedName(otherSvc), &svc) + assert.NoError(t, err) + }, + }, + { + name: "should default the namespace of the remote cluster to the namespace of the remote cluster resource", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient(rc, rcSvc, otherSvc)), + remoteCluster: *rc, + }, + want: func(t *testing.T, c k8s.Client, f finalizer.Finalizer) { + require.NoError(t, f.Execute()) + + // the service should be deleted + var svc v1.Service + err := c.Get(k8s.ExtractNamespacedName(rcSvc), &svc) + assert.Error(t, err) + assert.True(t, errors.IsNotFound(err)) + + // the other service should not be deleted + err = c.Get(k8s.ExtractNamespacedName(otherSvc), &svc) + assert.NoError(t, err) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := seedServiceFinalizer(tt.args.c, tt.args.remoteCluster) + tt.want(t, tt.args.c, got) + }) + } +} diff --git a/operators/pkg/controller/remotecluster/labels.go b/operators/pkg/controller/remotecluster/labels.go index cbcf3e06de..7b31bee474 100644 --- a/operators/pkg/controller/remotecluster/labels.go +++ b/operators/pkg/controller/remotecluster/labels.go @@ -14,10 +14,14 @@ import ( const ( // RemoteClusterDynamicWatchesFinalizer designates a finalizer to clean up unused watches. RemoteClusterDynamicWatchesFinalizer = "dynamic-watches.remotecluster.k8s.elastic.co" + // RemoteClusterSeedServiceFinalizer designates a finalizer to clean up a seed Service. + RemoteClusterSeedServiceFinalizer = "seed-service.remotecluster.k8s.elastic.co" // RemoteClusterNamespaceLabelName used to represent the namespace of the RemoteCluster in a TrustRelationship. RemoteClusterNamespaceLabelName = "remotecluster.k8s.elastic.co/namespace" // RemoteClusterNameLabelName used to represent the name of the RemoteCluster in a TrustRelationship. RemoteClusterNameLabelName = "remotecluster.k8s.elastic.co/name" + // RemoteClusterSeedServiceForLabelName is used to mark a service as used as a seed service for remote clusters. + RemoteClusterSeedServiceForLabelName = "remotecluster.k8s.elastic.co/seed-service-for" ) func trustRelationshipObjectMeta( diff --git a/operators/pkg/controller/remotecluster/watches.go b/operators/pkg/controller/remotecluster/watches.go index b20119648c..272f6be763 100644 --- a/operators/pkg/controller/remotecluster/watches.go +++ b/operators/pkg/controller/remotecluster/watches.go @@ -42,7 +42,21 @@ func addWatches(c controller.Controller, r *ReconcileRemoteCluster) error { // Watch licenses in order to enable functionality if license status changes if err := c.Watch(&source.Kind{Type: &v1alpha1.EnterpriseLicense{}}, &handler.EnqueueRequestsFromMapFunc{ - ToRequests: reconcileAllRemoteClusters(r.Client), + ToRequests: allRemoteClustersMapper(r.Client), + }); err != nil { + return err + } + + // Watch licenses in order to enable functionality if license status changes + if err := c.Watch(&source.Kind{Type: &v1alpha1.EnterpriseLicense{}}, &handler.EnqueueRequestsFromMapFunc{ + ToRequests: allRemoteClustersMapper(r.Client), + }); err != nil { + return err + } + + // Watch Service resources in order to reconcile related remote clusters if it changes. + if err := c.Watch(&source.Kind{Type: &v1.Service{}}, &handler.EnqueueRequestsFromMapFunc{ + ToRequests: allRemoteClustersWithMatchingSeedServiceMapper(r.Client), }); err != nil { return err } @@ -50,12 +64,63 @@ func addWatches(c controller.Controller, r *ReconcileRemoteCluster) error { return nil } -// reconcileAllRemoteClusters creates a reconcile request for each currently existing remote cluster resource. -func reconcileAllRemoteClusters(c k8s.Client) handler.ToRequestsFunc { +func allRemoteClustersWithMatchingSeedServiceMapper(c k8s.Client) handler.Mapper { return handler.ToRequestsFunc(func(object handler.MapObject) []reconcile.Request { + // if it's a remote cluster seed service, we need to enqueue requests for any relevant remote clusters + + labels := object.Meta.GetLabels() + if labels == nil { + // not a remote cluster seed service, safe to ignore + return nil + } + + if _, ok := labels[RemoteClusterSeedServiceForLabelName]; !ok { + // not a remote cluster seed service, safe to ignore + return nil + } + + svcNamespace := object.Meta.GetNamespace() + svcName := object.Meta.GetName() + + var list v1alpha1.RemoteClusterList + if err := c.List(&client.ListOptions{}, &list); err != nil { + log.Error(err, "failed to list remote clusters in watch handler") + // dropping any errors on the floor here + return nil + } + + var reqs []reconcile.Request + for _, rc := range list.Items { + // compare service name + if remoteClusterSeedServiceName(rc.Spec.Remote.K8sLocalRef.Name) != svcName { + continue + } + + // compare service namespace, defaulting the remote service namespace to the remote cluster resource ns + ns := rc.Spec.Remote.K8sLocalRef.Namespace + if ns == "" { + ns = rc.Namespace + } + if ns != svcNamespace { + // service and remote namespace in different namespaces, so not relevant + continue + } + + log.Info("Synthesizing reconcile for ", "resource", k8s.ExtractNamespacedName(&rc)) + reqs = append(reqs, reconcile.Request{ + NamespacedName: k8s.ExtractNamespacedName(&rc), + }) + } + return reqs + }) +} + +// allRemoteClustersMapper creates a reconcile request for each currently existing remote cluster resource. +func allRemoteClustersMapper(c k8s.Client) handler.Mapper { + return handler.ToRequestsFunc(func(_ handler.MapObject) []reconcile.Request { var list v1alpha1.RemoteClusterList if err := c.List(&client.ListOptions{}, &list); err != nil { - log.Error(err, "failed to list remote clusters in watch handler for enterprise licenses") + log.Error(err, "failed to list remote clusters in watch handler") // dropping any errors on the floor here return nil } diff --git a/operators/pkg/controller/remotecluster/watches_test.go b/operators/pkg/controller/remotecluster/watches_test.go new file mode 100644 index 0000000000..a02b8074ad --- /dev/null +++ b/operators/pkg/controller/remotecluster/watches_test.go @@ -0,0 +1,100 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remotecluster + +import ( + "testing" + + v1alpha12 "github.com/elastic/cloud-on-k8s/operators/pkg/apis/common/v1alpha1" + "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +func Test_allRemoteClustersWithMatchingSeedServiceMapper(t *testing.T) { + esName := "foo" + esNamespace := "bar" + + rc := &v1alpha1.RemoteCluster{ + ObjectMeta: v12.ObjectMeta{ + Namespace: esNamespace, + Name: "my-rc", + }, + Spec: v1alpha1.RemoteClusterSpec{ + Remote: v1alpha1.RemoteClusterRef{ + K8sLocalRef: v1alpha12.ObjectSelector{ + Name: esName, + Namespace: esNamespace, + }, + }, + }, + } + + // another RC that uses the same remote as rc + rc2 := rc.DeepCopy() + rc2.ObjectMeta.Name = "my-rc-2" + + otherRc := &v1alpha1.RemoteCluster{ + ObjectMeta: v12.ObjectMeta{ + Namespace: esNamespace, + Name: "my-rc-3", + }, + Spec: v1alpha1.RemoteClusterSpec{ + Remote: v1alpha1.RemoteClusterRef{ + K8sLocalRef: v1alpha12.ObjectSelector{ + Name: "not-" + esName, + Namespace: esNamespace, + }, + }, + }, + } + + svc := v1.Service{ + ObjectMeta: v12.ObjectMeta{ + Namespace: esNamespace, + Name: remoteClusterSeedServiceName(esName), + Labels: map[string]string{ + RemoteClusterSeedServiceForLabelName: esName, + }, + }, + } + + type args struct { + c k8s.Client + } + tests := []struct { + name string + args args + want func(t *testing.T, c k8s.Client, mapper handler.Mapper) + }{ + { + name: "should return requests for only relevant remote clusters", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient(rc, rc2, otherRc)), + }, + want: func(t *testing.T, c k8s.Client, mapper handler.Mapper) { + requests := mapper.Map(handler.MapObject{Meta: &svc, Object: &svc}) + + assert.Len(t, requests, 2) + assert.Contains(t, requests, reconcile.Request{NamespacedName: k8s.ExtractNamespacedName(rc)}) + assert.Contains(t, requests, reconcile.Request{NamespacedName: k8s.ExtractNamespacedName(rc2)}) + + // otherRc should not show up in the reconciled requests. + assert.NotContains(t, requests, reconcile.Request{NamespacedName: k8s.ExtractNamespacedName(otherRc)}) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := allRemoteClustersWithMatchingSeedServiceMapper(tt.args.c) + tt.want(t, tt.args.c, got) + }) + } +}