diff --git a/operators/pkg/controller/remotecluster/driver_incluster.go b/operators/pkg/controller/remotecluster/driver_incluster.go index bf0d200f606..878e6b1a9ff 100644 --- a/operators/pkg/controller/remotecluster/driver_incluster.go +++ b/operators/pkg/controller/remotecluster/driver_incluster.go @@ -9,22 +9,29 @@ import ( commonv1alpha1 "github.com/elastic/cloud-on-k8s/operators/pkg/apis/common/v1alpha1" "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" + "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/finalizer" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/label" - "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/services" + esname "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/name" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" ) const ( LocalTrustRelationshipPrefix = "rc" RemoteTrustRelationshipPrefix = "rcr" + + // remoteClusterSeedServiceSuffix is the suffix used for the remote cluster seed service + remoteClusterSeedServiceSuffix = "remote-cluster-seed" ) func doReconcile( r *ReconcileRemoteCluster, remoteCluster v1alpha1.RemoteCluster, ) (v1alpha1.RemoteClusterStatus, error) { - // Get the previous remote associated cluster, if the remote namespace has been updated by the user we must // delete the remote relationship from the old namespace and recreate it in the new namespace. if len(remoteCluster.Status.K8SLocalStatus.RemoteSelector.Namespace) > 0 && @@ -71,8 +78,12 @@ func doReconcile( remoteCluster, localClusterSelector, remoteCluster.Spec.Remote.K8sLocalRef, - r.watches) - err := h.Handle(&remoteCluster, watchFinalizer) + r.watches, + ) + + seedServiceFinalizer := seedServiceFinalizer(r.Client, remoteCluster) + + err := h.Handle(&remoteCluster, watchFinalizer, seedServiceFinalizer) if err != nil { return updateStatusWithPhase(&remoteCluster, v1alpha1.RemoteClusterFailed), err } @@ -140,12 +151,18 @@ func doReconcile( return updateStatusWithPhase(&remoteCluster, v1alpha1.RemoteClusterFailed), err } + // Create remote service for seeding + svc, err := reconcileRemoteClusterSeedService(r.Client, r.scheme, remoteCluster) + if err != nil { + return updateStatusWithPhase(&remoteCluster, v1alpha1.RemoteClusterFailed), err + } + // Build status status := v1alpha1.RemoteClusterStatus{ Phase: v1alpha1.RemoteClusterPropagated, ClusterName: localClusterSelector.Name, LocalTrustRelationship: localRelationshipName, - SeedHosts: []string{services.ExternalDiscoveryServiceHostname(remote.Selector.NamespacedName())}, + SeedHosts: seedHostsFromService(svc), K8SLocalStatus: v1alpha1.LocalRefStatus{ RemoteSelector: remote.Selector, RemoteTrustRelationship: remoteRelationshipName, @@ -154,6 +171,47 @@ func doReconcile( return status, nil } +// reconcileRemoteClusterSeedService reconciles a Service that we can use as the remote cluster seed hosts. +// +// This service is shared between all remote clusters configured this way, and is deleted whenever any of them are +// deleted in a finalizer. There's a watch that re-creates it if it's still in use. +func reconcileRemoteClusterSeedService( + c k8s.Client, + scheme *runtime.Scheme, + remoteCluster v1alpha1.RemoteCluster, +) (*v1.Service, error) { + ns := remoteCluster.Spec.Remote.K8sLocalRef.Namespace + // if the remote has no namespace, assume it's in the same namespace as the RemoteCluster resource + if ns == "" { + ns = remoteCluster.Namespace + } + service := v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: remoteClusterSeedServiceName(remoteCluster.Spec.Remote.K8sLocalRef.Name), + Labels: map[string]string{ + label.ClusterNameLabelName: remoteCluster.Spec.Remote.K8sLocalRef.Name, + }, + }, + Spec: v1.ServiceSpec{ + PublishNotReadyAddresses: true, + Ports: []v1.ServicePort{ + {Protocol: v1.ProtocolTCP, Port: 9300, TargetPort: intstr.FromInt(9300)}, + }, + Selector: map[string]string{ + common.TypeLabelName: label.Type, + label.ClusterNameLabelName: remoteCluster.Spec.Remote.K8sLocalRef.Name, + }, + }, + } + + if _, err := common.ReconcileService(c, scheme, &service, nil); err != nil { + return nil, err + } + + return &service, nil +} + func caCertMissingError(location string, selector commonv1alpha1.ObjectSelector) string { return fmt.Sprintf( CaCertMissingError, @@ -171,3 +229,13 @@ func updateStatusWithPhase( status.Phase = phase return *status } + +// remoteClusterSeedServiceName returns the name of the remote cluster seed service. +func remoteClusterSeedServiceName(esName string) string { + return esname.ESNamer.Suffix(esName, remoteClusterSeedServiceSuffix) +} + +// seedHostsFromService returns the seed hosts to use for a given service. +func seedHostsFromService(svc *v1.Service) []string { + return []string{fmt.Sprintf("%s.%s.svc:9300", svc.Name, svc.Namespace)} +} diff --git a/operators/pkg/controller/remotecluster/driver_incluster_test.go b/operators/pkg/controller/remotecluster/driver_incluster_test.go index 404e51f4ca0..9db6c5489b5 100644 --- a/operators/pkg/controller/remotecluster/driver_incluster_test.go +++ b/operators/pkg/controller/remotecluster/driver_incluster_test.go @@ -153,13 +153,18 @@ func Test_apply(t *testing.T) { }, RemoteTrustRelationship: "rcr-remotecluster-sample-1-2-default", }, - SeedHosts: []string{"trust-two-es-es-discovery.default.svc.cluster.local:9300"}, + SeedHosts: []string{"trust-two-es-es-remote-cluster-seed.default.svc:9300"}, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.NoError(t, tt.args.rca.watches.InjectScheme(sc)) + if tt.args.rca.scheme == nil { + tt.args.rca.scheme = sc + } + + assert.NoError(t, tt.args.rca.watches.InjectScheme(tt.args.rca.scheme)) + got, err := doReconcile(tt.args.rca, tt.args.remoteCluster) if (err != nil) != tt.wantErr { t.Errorf("apply() error = %v, wantErr %v", err, tt.wantErr) diff --git a/operators/pkg/controller/remotecluster/finalizers.go b/operators/pkg/controller/remotecluster/finalizers.go index 7ceebc00a37..cb716b4d12b 100644 --- a/operators/pkg/controller/remotecluster/finalizers.go +++ b/operators/pkg/controller/remotecluster/finalizers.go @@ -9,6 +9,10 @@ import ( "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/finalizer" "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // watchFinalizer ensure that we remove watches for Secrets we are no longer interested in @@ -26,3 +30,26 @@ func watchFinalizer( }, } } + +// seedServiceFinalizer ensures that we remove the seed service if it's no longer required +func seedServiceFinalizer(c k8s.Client, remoteCluster v1alpha1.RemoteCluster) finalizer.Finalizer { + return finalizer.Finalizer{ + Name: RemoteClusterSeedServiceFinalizer, + Execute: func() error { + svc := v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: remoteCluster.Spec.Remote.K8sLocalRef.Namespace, + Name: remoteClusterSeedServiceName(remoteCluster.Spec.Remote.K8sLocalRef.Name), + }, + } + if svc.Namespace == "" { + svc.Namespace = remoteCluster.Namespace + } + + if err := c.Delete(&svc); err != nil && errors.IsNotFound(err) { + return err + } + return nil + }, + } +} diff --git a/operators/pkg/controller/remotecluster/finalizers_test.go b/operators/pkg/controller/remotecluster/finalizers_test.go new file mode 100644 index 00000000000..12df57b28ce --- /dev/null +++ b/operators/pkg/controller/remotecluster/finalizers_test.go @@ -0,0 +1,114 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remotecluster + +import ( + "testing" + + "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" +) + +import ( + commonv1alpha1 "github.com/elastic/cloud-on-k8s/operators/pkg/apis/common/v1alpha1" + "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/finalizer" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func Test_seedServiceFinalizer(t *testing.T) { + esName := "foo" + esNamespace := "bar" + rc := &v1alpha1.RemoteCluster{ + Spec: v1alpha1.RemoteClusterSpec{ + Remote: v1alpha1.RemoteClusterRef{ + K8sLocalRef: commonv1alpha1.ObjectSelector{ + Name: esName, + Namespace: esNamespace, + }, + }, + }, + } + + // remote cluster whose remote cluster has no namespace defined + rcNoRemoteClusterNamespace := rc.DeepCopy() + rcNoRemoteClusterNamespace.Namespace = esNamespace + rcNoRemoteClusterNamespace.Spec.Remote.K8sLocalRef.Namespace = "" + + rcSvc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: remoteClusterSeedServiceName(esName), + Namespace: esNamespace, + }, + } + + otherSvc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: remoteClusterSeedServiceName(esName) + "-other", + Namespace: esNamespace, + }, + } + + type args struct { + c k8s.Client + remoteCluster v1alpha1.RemoteCluster + } + tests := []struct { + name string + args args + want func(t *testing.T, c k8s.Client, f finalizer.Finalizer) + }{ + { + name: "should delete remote the related remote cluster seed service", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient(rc, rcSvc, otherSvc)), + remoteCluster: *rc, + }, + want: func(t *testing.T, c k8s.Client, f finalizer.Finalizer) { + require.NoError(t, f.Execute()) + + // the service should be deleted + var svc v1.Service + err := c.Get(k8s.ExtractNamespacedName(rcSvc), &svc) + assert.Error(t, err) + assert.True(t, errors.IsNotFound(err)) + + // the other service should not be deleted + err = c.Get(k8s.ExtractNamespacedName(otherSvc), &svc) + assert.NoError(t, err) + }, + }, + { + name: "should default the namespace of the remote cluster to the namespace of the remote cluster resource", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient(rc, rcSvc, otherSvc)), + remoteCluster: *rc, + }, + want: func(t *testing.T, c k8s.Client, f finalizer.Finalizer) { + require.NoError(t, f.Execute()) + + // the service should be deleted + var svc v1.Service + err := c.Get(k8s.ExtractNamespacedName(rcSvc), &svc) + assert.Error(t, err) + assert.True(t, errors.IsNotFound(err)) + + // the other service should not be deleted + err = c.Get(k8s.ExtractNamespacedName(otherSvc), &svc) + assert.NoError(t, err) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := seedServiceFinalizer(tt.args.c, tt.args.remoteCluster) + tt.want(t, tt.args.c, got) + }) + } +} diff --git a/operators/pkg/controller/remotecluster/labels.go b/operators/pkg/controller/remotecluster/labels.go index cbcf3e06de5..36ca71d90c0 100644 --- a/operators/pkg/controller/remotecluster/labels.go +++ b/operators/pkg/controller/remotecluster/labels.go @@ -14,6 +14,8 @@ import ( const ( // RemoteClusterDynamicWatchesFinalizer designates a finalizer to clean up unused watches. RemoteClusterDynamicWatchesFinalizer = "dynamic-watches.remotecluster.k8s.elastic.co" + // RemoteClusterSeedServiceFinalizer designates a finalizer to clean up a seed Service. + RemoteClusterSeedServiceFinalizer = "seed-service.remotecluster.k8s.elastic.co" // RemoteClusterNamespaceLabelName used to represent the namespace of the RemoteCluster in a TrustRelationship. RemoteClusterNamespaceLabelName = "remotecluster.k8s.elastic.co/namespace" // RemoteClusterNameLabelName used to represent the name of the RemoteCluster in a TrustRelationship. diff --git a/operators/pkg/controller/remotecluster/watches.go b/operators/pkg/controller/remotecluster/watches.go index b20119648c2..f7eee04ccd2 100644 --- a/operators/pkg/controller/remotecluster/watches.go +++ b/operators/pkg/controller/remotecluster/watches.go @@ -6,6 +6,7 @@ package remotecluster import ( "fmt" + "strings" commonv1alpha1 "github.com/elastic/cloud-on-k8s/operators/pkg/apis/common/v1alpha1" "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" @@ -42,7 +43,21 @@ func addWatches(c controller.Controller, r *ReconcileRemoteCluster) error { // Watch licenses in order to enable functionality if license status changes if err := c.Watch(&source.Kind{Type: &v1alpha1.EnterpriseLicense{}}, &handler.EnqueueRequestsFromMapFunc{ - ToRequests: reconcileAllRemoteClusters(r.Client), + ToRequests: allRemoteClustersMapper(r.Client), + }); err != nil { + return err + } + + // Watch licenses in order to enable functionality if license status changes + if err := c.Watch(&source.Kind{Type: &v1alpha1.EnterpriseLicense{}}, &handler.EnqueueRequestsFromMapFunc{ + ToRequests: allRemoteClustersMapper(r.Client), + }); err != nil { + return err + } + + // Watch Service resources in order to reconcile related remote clusters if it changes. + if err := c.Watch(&source.Kind{Type: &v1.Service{}}, &handler.EnqueueRequestsFromMapFunc{ + ToRequests: allRemoteClustersWithMatchingSeedServiceMapper(r.Client), }); err != nil { return err } @@ -50,12 +65,57 @@ func addWatches(c controller.Controller, r *ReconcileRemoteCluster) error { return nil } -// reconcileAllRemoteClusters creates a reconcile request for each currently existing remote cluster resource. -func reconcileAllRemoteClusters(c k8s.Client) handler.ToRequestsFunc { +func allRemoteClustersWithMatchingSeedServiceMapper(c k8s.Client) handler.Mapper { return handler.ToRequestsFunc(func(object handler.MapObject) []reconcile.Request { + // if it's a remote cluster seed service, we need to enqueue requests for any relevant remote clusters + + if !strings.HasSuffix(object.Meta.GetName(), remoteClusterSeedServiceSuffix) { + // not a remote cluster seed service, safe to ignore + return nil + } + + svcNamespace := object.Meta.GetNamespace() + svcName := object.Meta.GetName() + + var list v1alpha1.RemoteClusterList + if err := c.List(&client.ListOptions{}, &list); err != nil { + log.Error(err, "failed to list remote clusters in watch handler") + // dropping any errors on the floor here + return nil + } + + var reqs []reconcile.Request + for _, rc := range list.Items { + // compare service name + if remoteClusterSeedServiceName(rc.Spec.Remote.K8sLocalRef.Name) != svcName { + continue + } + + // compare service namespace, defaulting the remote service namespace to the remote cluster resource ns + ns := rc.Spec.Remote.K8sLocalRef.Namespace + if ns == "" { + ns = rc.Namespace + } + if ns != svcNamespace { + // service and remote namespace in different namespaces, so not relevant + continue + } + + log.Info("Synthesizing reconcile for ", "resource", k8s.ExtractNamespacedName(&rc)) + reqs = append(reqs, reconcile.Request{ + NamespacedName: k8s.ExtractNamespacedName(&rc), + }) + } + return reqs + }) +} + +// allRemoteClustersMapper creates a reconcile request for each currently existing remote cluster resource. +func allRemoteClustersMapper(c k8s.Client) handler.Mapper { + return handler.ToRequestsFunc(func(_ handler.MapObject) []reconcile.Request { var list v1alpha1.RemoteClusterList if err := c.List(&client.ListOptions{}, &list); err != nil { - log.Error(err, "failed to list remote clusters in watch handler for enterprise licenses") + log.Error(err, "failed to list remote clusters in watch handler") // dropping any errors on the floor here return nil } diff --git a/operators/pkg/controller/remotecluster/watches_test.go b/operators/pkg/controller/remotecluster/watches_test.go new file mode 100644 index 00000000000..66066f3a4ec --- /dev/null +++ b/operators/pkg/controller/remotecluster/watches_test.go @@ -0,0 +1,97 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remotecluster + +import ( + "testing" + + v1alpha12 "github.com/elastic/cloud-on-k8s/operators/pkg/apis/common/v1alpha1" + "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +func Test_allRemoteClustersWithMatchingSeedServiceMapper(t *testing.T) { + esName := "foo" + esNamespace := "bar" + + rc := &v1alpha1.RemoteCluster{ + ObjectMeta: v12.ObjectMeta{ + Namespace: esNamespace, + Name: "my-rc", + }, + Spec: v1alpha1.RemoteClusterSpec{ + Remote: v1alpha1.RemoteClusterRef{ + K8sLocalRef: v1alpha12.ObjectSelector{ + Name: esName, + Namespace: esNamespace, + }, + }, + }, + } + + // another RC that uses the same remote as rc + rc2 := rc.DeepCopy() + rc2.ObjectMeta.Name = "my-rc-2" + + otherRc := &v1alpha1.RemoteCluster{ + ObjectMeta: v12.ObjectMeta{ + Namespace: esNamespace, + Name: "my-rc-3", + }, + Spec: v1alpha1.RemoteClusterSpec{ + Remote: v1alpha1.RemoteClusterRef{ + K8sLocalRef: v1alpha12.ObjectSelector{ + Name: "not-" + esName, + Namespace: esNamespace, + }, + }, + }, + } + + svc := v1.Service{ + ObjectMeta: v12.ObjectMeta{ + Namespace: esNamespace, + Name: remoteClusterSeedServiceName(esName), + }, + } + + type args struct { + c k8s.Client + } + tests := []struct { + name string + args args + want func(t *testing.T, c k8s.Client, mapper handler.Mapper) + }{ + { + name: "should return requests for only relevant remote clusters", + args: args{ + c: k8s.WrapClient(fake.NewFakeClient(rc, rc2, otherRc)), + }, + want: func(t *testing.T, c k8s.Client, mapper handler.Mapper) { + requests := mapper.Map(handler.MapObject{Meta: &svc, Object: &svc}) + + assert.Len(t, requests, 2) + assert.Contains(t, requests, reconcile.Request{NamespacedName: k8s.ExtractNamespacedName(rc)}) + assert.Contains(t, requests, reconcile.Request{NamespacedName: k8s.ExtractNamespacedName(rc2)}) + + // otherRc should not show up in the reconciled requests. + assert.NotContains(t, requests, reconcile.Request{NamespacedName: k8s.ExtractNamespacedName(otherRc)}) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := allRemoteClustersWithMatchingSeedServiceMapper(tt.args.c) + tt.want(t, tt.args.c, got) + }) + } +}