From eefc8afae3cd10cad5cfd49b74a4eed456b00cf6 Mon Sep 17 00:00:00 2001 From: Michael Morello Date: Mon, 2 Mar 2020 14:35:11 +0100 Subject: [PATCH] Add local K8S remote cluster support (#2543) * Add local K8S remote cluster support --- cmd/manager/main.go | 5 + config/crds/all-crds.yaml | 31 ++ ...search.k8s.elastic.co_elasticsearches.yaml | 31 ++ docs/reference/api-docs.asciidoc | 20 + pkg/apis/common/v1/common.go | 21 +- .../elasticsearch/v1/elasticsearch_types.go | 24 + pkg/apis/elasticsearch/v1/name.go | 13 + .../elasticsearch/v1/zz_generated.deepcopy.go | 21 + .../certificates/ca_reconcile.go | 6 + .../certificates/remoteca/reconcile.go | 78 +++ .../certificates/remoteca/reconcile_test.go | 121 +++++ pkg/controller/elasticsearch/client/client.go | 2 + pkg/controller/elasticsearch/client/model.go | 13 +- .../elasticsearch/client/model_test.go | 10 +- pkg/controller/elasticsearch/client/v6.go | 4 + pkg/controller/elasticsearch/driver/driver.go | 29 +- .../elasticsearch/elasticsearch_controller.go | 16 +- .../elasticsearch/nodespec/podspec_test.go | 2 +- .../elasticsearch/nodespec/volumes.go | 7 + .../remotecluster/annotations.go | 62 +++ .../remotecluster/elasticsearch.go | 125 +++++ .../remotecluster/elasticsearch_test.go | 323 ++++++++++++ .../remotecluster/remoteca/controller.go | 289 +++++++++++ .../remotecluster/remoteca/controller_test.go | 473 ++++++++++++++++++ .../remotecluster/remoteca/labels.go | 62 +++ .../remotecluster/remoteca/rbac.go | 58 +++ .../remotecluster/remoteca/secret.go | 185 +++++++ .../remotecluster/remoteca/watches.go | 110 ++++ .../elasticsearch/services/services.go | 45 +- .../elasticsearch/settings/merged_config.go | 1 + pkg/controller/elasticsearch/volume/names.go | 3 + .../remoteca/remoteca_controller.go | 38 ++ 32 files changed, 2196 insertions(+), 32 deletions(-) create mode 100644 pkg/controller/elasticsearch/certificates/remoteca/reconcile.go create mode 100644 pkg/controller/elasticsearch/certificates/remoteca/reconcile_test.go create mode 100644 pkg/controller/elasticsearch/remotecluster/annotations.go create mode 100644 pkg/controller/elasticsearch/remotecluster/elasticsearch.go create mode 100644 pkg/controller/elasticsearch/remotecluster/elasticsearch_test.go create mode 100644 pkg/controller/elasticsearch/remotecluster/remoteca/controller.go create mode 100644 pkg/controller/elasticsearch/remotecluster/remoteca/controller_test.go create mode 100644 pkg/controller/elasticsearch/remotecluster/remoteca/labels.go create mode 100644 pkg/controller/elasticsearch/remotecluster/remoteca/rbac.go create mode 100644 pkg/controller/elasticsearch/remotecluster/remoteca/secret.go create mode 100644 pkg/controller/elasticsearch/remotecluster/remoteca/watches.go create mode 100644 pkg/controller/remoteca/remoteca_controller.go diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 4f05e9b7c0..557a1b915f 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -46,6 +46,7 @@ import ( kbassn "github.com/elastic/cloud-on-k8s/pkg/controller/kibanaassociation" "github.com/elastic/cloud-on-k8s/pkg/controller/license" licensetrial "github.com/elastic/cloud-on-k8s/pkg/controller/license/trial" + "github.com/elastic/cloud-on-k8s/pkg/controller/remoteca" "github.com/elastic/cloud-on-k8s/pkg/controller/webhook" "github.com/elastic/cloud-on-k8s/pkg/dev" "github.com/elastic/cloud-on-k8s/pkg/dev/portforward" @@ -336,6 +337,10 @@ func execute() { log.Error(err, "unable to create controller", "controller", "KibanaAssociation") os.Exit(1) } + if err = remoteca.Add(mgr, accessReviewer, params); err != nil { + log.Error(err, "unable to create controller", "controller", "RemoteClusterCertificateAuthorites") + os.Exit(1) + } if err = license.Add(mgr, params); err != nil { log.Error(err, "unable to create controller", "controller", "License") diff --git a/config/crds/all-crds.yaml b/config/crds/all-crds.yaml index a19325ea73..380195e11f 100644 --- a/config/crds/all-crds.yaml +++ b/config/crds/all-crds.yaml @@ -1043,6 +1043,37 @@ spec: type: object type: object type: object + remoteClusters: + description: RemoteClusters enables you to establish uni-directional + connections to a remote Elasticsearch cluster. + items: + description: RemoteCluster declares a remote Elasticsearch cluster + connection. + properties: + elasticsearchRef: + description: ElasticsearchRef is a reference to an Elasticsearch + cluster running within the same k8s cluster. + properties: + name: + description: Name of the Kubernetes object. + type: string + namespace: + description: Namespace of the Kubernetes object. If empty, + defaults to the current namespace. + type: string + required: + - name + type: object + name: + description: Name is the name of the remote cluster as it is set + in the Elasticsearch settings. The name is expected to be unique + for each remote clusters. + minLength: 1 + type: string + required: + - name + type: object + type: array secureSettings: description: 'SecureSettings is a list of references to Kubernetes secrets containing sensitive configuration options for Elasticsearch. See: diff --git a/config/crds/bases/elasticsearch.k8s.elastic.co_elasticsearches.yaml b/config/crds/bases/elasticsearch.k8s.elastic.co_elasticsearches.yaml index e4a30f0b62..63ce332226 100644 --- a/config/crds/bases/elasticsearch.k8s.elastic.co_elasticsearches.yaml +++ b/config/crds/bases/elasticsearch.k8s.elastic.co_elasticsearches.yaml @@ -6859,6 +6859,37 @@ spec: type: object type: object type: object + remoteClusters: + description: RemoteClusters enables you to establish uni-directional + connections to a remote Elasticsearch cluster. + items: + description: RemoteCluster declares a remote Elasticsearch cluster + connection. + properties: + elasticsearchRef: + description: ElasticsearchRef is a reference to an Elasticsearch + cluster running within the same k8s cluster. + properties: + name: + description: Name of the Kubernetes object. + type: string + namespace: + description: Namespace of the Kubernetes object. If empty, + defaults to the current namespace. + type: string + required: + - name + type: object + name: + description: Name is the name of the remote cluster as it is + set in the Elasticsearch settings. The name is expected to + be unique for each remote clusters. + minLength: 1 + type: string + required: + - name + type: object + type: array secureSettings: description: 'SecureSettings is a list of references to Kubernetes secrets containing sensitive configuration options for Elasticsearch. diff --git a/docs/reference/api-docs.asciidoc b/docs/reference/api-docs.asciidoc index 86c99a8686..3586600171 100644 --- a/docs/reference/api-docs.asciidoc +++ b/docs/reference/api-docs.asciidoc @@ -197,6 +197,7 @@ ObjectSelector defines a reference to a Kubernetes object. **** - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-pkg-apis-apm-v1-apmserverspec[$$ApmServerSpec$$] - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-pkg-apis-kibana-v1-kibanaspec[$$KibanaSpec$$] +- xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-pkg-apis-elasticsearch-v1-remotecluster[$$RemoteCluster$$] **** [cols="25a,75a", options="header"] @@ -616,6 +617,7 @@ ElasticsearchSpec holds the specification of an Elasticsearch cluster. | *`podDisruptionBudget`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-pkg-apis-common-v1-poddisruptionbudgettemplate[$$PodDisruptionBudgetTemplate$$]__ | PodDisruptionBudget provides access to the default pod disruption budget for the Elasticsearch cluster. The default budget selects all cluster pods and sets `maxUnavailable` to 1. To disable, set `PodDisruptionBudget` to the empty value (`{}` in YAML). | *`secureSettings`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-pkg-apis-common-v1-secretsource[$$SecretSource$$]__ | SecureSettings is a list of references to Kubernetes secrets containing sensitive configuration options for Elasticsearch. See: https://www.elastic.co/guide/en/cloud-on-k8s/current/k8s-es-secure-settings.html | *`serviceAccountName`* __string__ | ServiceAccountName is used to check access from the current resource to a resource (eg. a remote Elasticsearch cluster) in a different namespace. Can only be used if ECK is enforcing RBAC on references. +| *`remoteClusters`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-pkg-apis-elasticsearch-v1-remotecluster[$$RemoteCluster$$] array__ | RemoteClusters enables you to establish uni-directional connections to a remote Elasticsearch cluster. |=== @@ -642,6 +644,24 @@ ElasticsearchSpec holds the specification of an Elasticsearch cluster. |=== +[id="{anchor_prefix}-github-com-elastic-cloud-on-k8s-pkg-apis-elasticsearch-v1-remotecluster"] +=== RemoteCluster + + + +.Appears In: +**** +- xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-pkg-apis-elasticsearch-v1-elasticsearchspec[$$ElasticsearchSpec$$] +**** + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`name`* __string__ | Name is the name of the remote cluster as it is set in the Elasticsearch settings. The name is expected to be unique for each remote clusters. +| *`elasticsearchRef`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-pkg-apis-common-v1-objectselector[$$ObjectSelector$$]__ | ElasticsearchRef is a reference to an Elasticsearch cluster running within the same k8s cluster. +|=== + + [id="{anchor_prefix}-github-com-elastic-cloud-on-k8s-pkg-apis-elasticsearch-v1-updatestrategy"] === UpdateStrategy diff --git a/pkg/apis/common/v1/common.go b/pkg/apis/common/v1/common.go index 0b4412e61c..25ed6c36be 100644 --- a/pkg/apis/common/v1/common.go +++ b/pkg/apis/common/v1/common.go @@ -33,18 +33,29 @@ type ObjectSelector struct { Namespace string `json:"namespace,omitempty"` } +// WithDefaultNamespace adds a default namespace to a given ObjectSelector if none is set. +func (o ObjectSelector) WithDefaultNamespace(defaultNamespace string) ObjectSelector { + if len(o.Namespace) > 0 { + return o + } + return ObjectSelector{ + Namespace: defaultNamespace, + Name: o.Name, + } +} + // NamespacedName is a convenience method to turn an ObjectSelector into a NamespacedName. -func (s ObjectSelector) NamespacedName() types.NamespacedName { +func (o ObjectSelector) NamespacedName() types.NamespacedName { return types.NamespacedName{ - Name: s.Name, - Namespace: s.Namespace, + Name: o.Name, + Namespace: o.Namespace, } } // IsDefined checks if the object selector is not nil and has a name. // Namespace is not mandatory as it may be inherited by the parent object. -func (s *ObjectSelector) IsDefined() bool { - return s != nil && s.Name != "" +func (o *ObjectSelector) IsDefined() bool { + return o != nil && o.Name != "" } // HTTPConfig holds the HTTP layer configuration for resources. diff --git a/pkg/apis/elasticsearch/v1/elasticsearch_types.go b/pkg/apis/elasticsearch/v1/elasticsearch_types.go index 5cb8462e45..5f29573085 100644 --- a/pkg/apis/elasticsearch/v1/elasticsearch_types.go +++ b/pkg/apis/elasticsearch/v1/elasticsearch_types.go @@ -5,6 +5,7 @@ package v1 import ( + "github.com/elastic/cloud-on-k8s/pkg/controller/common/hash" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -50,6 +51,29 @@ type ElasticsearchSpec struct { // Can only be used if ECK is enforcing RBAC on references. // +optional ServiceAccountName string `json:"serviceAccountName,omitempty"` + + // RemoteClusters enables you to establish uni-directional connections to a remote Elasticsearch cluster. + // +optional + RemoteClusters []RemoteCluster `json:"remoteClusters,omitempty"` +} + +// RemoteCluster declares a remote Elasticsearch cluster connection. +type RemoteCluster struct { + // Name is the name of the remote cluster as it is set in the Elasticsearch settings. + // The name is expected to be unique for each remote clusters. + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + Name string `json:"name"` + + // ElasticsearchRef is a reference to an Elasticsearch cluster running within the same k8s cluster. + ElasticsearchRef commonv1.ObjectSelector `json:"elasticsearchRef,omitempty"` + + // TODO: Allow the user to specify some options (transport.compress, transport.ping_schedule) + +} + +func (r RemoteCluster) ConfigHash() string { + return hash.HashObject(r) } // NodeCount returns the total number of nodes of the Elasticsearch cluster diff --git a/pkg/apis/elasticsearch/v1/name.go b/pkg/apis/elasticsearch/v1/name.go index 4d2dd1c036..a57d197c50 100644 --- a/pkg/apis/elasticsearch/v1/name.go +++ b/pkg/apis/elasticsearch/v1/name.go @@ -18,6 +18,7 @@ const ( configSecretSuffix = "config" secureSettingsSecretSuffix = "secure-settings" httpServiceSuffix = "http" + transportServiceSuffix = "transport" elasticUserSecretSuffix = "elastic-user" xpackFileRealmSecretSuffix = "xpack-file-realm" internalUsersSecretSuffix = "internal-users" @@ -27,6 +28,9 @@ const ( scriptsConfigMapSuffix = "scripts" transportCertificatesSecretSuffix = "transport-certificates" + // remoteCaNameSuffix is a suffix for the secret that contains the concatenation of all the remote CAs + remoteCaNameSuffix = "remote-ca" + controllerRevisionHashLen = 10 ) @@ -46,6 +50,7 @@ var ( defaultPodDisruptionBudget, scriptsConfigMapSuffix, transportCertificatesSecretSuffix, + remoteCaNameSuffix, } ) @@ -108,6 +113,10 @@ func TransportCertificatesSecret(esName string) string { return ESNamer.Suffix(esName, transportCertificatesSecretSuffix) } +func TransportService(esName string) string { + return ESNamer.Suffix(esName, transportServiceSuffix) +} + func HTTPService(esName string) string { return ESNamer.Suffix(esName, httpServiceSuffix) } @@ -140,3 +149,7 @@ func LicenseSecretName(esName string) string { func DefaultPodDisruptionBudget(esName string) string { return ESNamer.Suffix(esName, defaultPodDisruptionBudget) } + +func RemoteCaSecretName(esName string) string { + return ESNamer.Suffix(esName, remoteCaNameSuffix) +} diff --git a/pkg/apis/elasticsearch/v1/zz_generated.deepcopy.go b/pkg/apis/elasticsearch/v1/zz_generated.deepcopy.go index c157ac82d2..07cf00c910 100644 --- a/pkg/apis/elasticsearch/v1/zz_generated.deepcopy.go +++ b/pkg/apis/elasticsearch/v1/zz_generated.deepcopy.go @@ -159,6 +159,11 @@ func (in *ElasticsearchSpec) DeepCopyInto(out *ElasticsearchSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.RemoteClusters != nil { + in, out := &in.RemoteClusters, &out.RemoteClusters + *out = make([]RemoteCluster, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ElasticsearchSpec. @@ -229,6 +234,22 @@ func (in *NodeSet) DeepCopy() *NodeSet { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RemoteCluster) DeepCopyInto(out *RemoteCluster) { + *out = *in + out.ElasticsearchRef = in.ElasticsearchRef +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteCluster. +func (in *RemoteCluster) DeepCopy() *RemoteCluster { + if in == nil { + return nil + } + out := new(RemoteCluster) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *UpdateStrategy) DeepCopyInto(out *UpdateStrategy) { *out = *in diff --git a/pkg/controller/elasticsearch/certificates/ca_reconcile.go b/pkg/controller/elasticsearch/certificates/ca_reconcile.go index 9d92d2f516..1b0a9042a0 100644 --- a/pkg/controller/elasticsearch/certificates/ca_reconcile.go +++ b/pkg/controller/elasticsearch/certificates/ca_reconcile.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/controller/common/driver" "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/pkg/controller/common/tracing" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/certificates/remoteca" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/certificates/transport" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" @@ -48,6 +49,11 @@ func Reconcile( results := &reconciler.Results{} + // reconcile remote clusters certificate authorities + if err := remoteca.Reconcile(driver.K8sClient(), es); err != nil { + results.WithError(err) + } + labels := label.NewLabels(k8s.ExtractNamespacedName(&es)) httpCA, err := certificates.ReconcileCAForOwner( diff --git a/pkg/controller/elasticsearch/certificates/remoteca/reconcile.go b/pkg/controller/elasticsearch/certificates/remoteca/reconcile.go new file mode 100644 index 0000000000..63686c4bff --- /dev/null +++ b/pkg/controller/elasticsearch/certificates/remoteca/reconcile.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remoteca + +import ( + "bytes" + "reflect" + "sort" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/certificates" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/remotecluster/remoteca" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/pkg/utils/maps" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// Reconcile fetches the list of remote certificate authorities and concatenates them into a single Secret +func Reconcile( + c k8s.Client, + es esv1.Elasticsearch, +) error { + // Get all the remote certificate authorities + var remoteCAList v1.SecretList + if err := c.List( + &remoteCAList, + client.InNamespace(es.Namespace), + remoteca.LabelSelector(es.Name), + ); err != nil { + return err + } + // We sort the remote certificate authorities to have a stable comparison with the reconciled data + sort.SliceStable(remoteCAList.Items, func(i, j int) bool { + // We don't need to compare the namespace because they are all in the same one + return remoteCAList.Items[i].Name < remoteCAList.Items[j].Name + }) + + remoteCertificateAuthorities := make([][]byte, len(remoteCAList.Items)) + for i, remoteCA := range remoteCAList.Items { + remoteCertificateAuthorities[i] = remoteCA.Data[certificates.CAFileName] + } + + expected := v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: esv1.RemoteCaSecretName(es.Name), + Namespace: es.Namespace, + Labels: map[string]string{ + label.ClusterNameLabelName: es.Name, + }, + }, + Data: map[string][]byte{ + certificates.CAFileName: bytes.Join(remoteCertificateAuthorities, nil), + }, + } + + var reconciled v1.Secret + return reconciler.ReconcileResource(reconciler.Params{ + Client: c, + Scheme: scheme.Scheme, + Owner: &es, + Expected: &expected, + Reconciled: &reconciled, + NeedsUpdate: func() bool { + return !maps.IsSubset(expected.Labels, reconciled.Labels) || !reflect.DeepEqual(expected.Data, reconciled.Data) + }, + UpdateReconciled: func() { + reconciled.Labels = maps.Merge(reconciled.Labels, expected.Labels) + reconciled.Data = expected.Data + }, + }) +} diff --git a/pkg/controller/elasticsearch/certificates/remoteca/reconcile_test.go b/pkg/controller/elasticsearch/certificates/remoteca/reconcile_test.go new file mode 100644 index 0000000000..cb77a6b3e3 --- /dev/null +++ b/pkg/controller/elasticsearch/certificates/remoteca/reconcile_test.go @@ -0,0 +1,121 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remoteca + +import ( + "testing" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/certificates" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/remotecluster/remoteca" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" +) + +func TestReconcile(t *testing.T) { + type args struct { + es esv1.Elasticsearch + secrets []runtime.Object + } + tests := []struct { + name string + args args + want []byte + wantErr bool + }{ + { + name: "Certificates should be sorted", + args: args{ + es: esv1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "es1", Namespace: "ns1"}}, + secrets: []runtime.Object{ + &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "b", + Namespace: "ns1", + Labels: map[string]string{ + label.ClusterNameLabelName: "es1", + common.TypeLabelName: remoteca.TypeLabelValue, + }, + }, + Data: map[string][]byte{certificates.CAFileName: []byte("cert1\n")}, + }, + &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "a", + Namespace: "ns1", + Labels: map[string]string{ + label.ClusterNameLabelName: "es1", + common.TypeLabelName: remoteca.TypeLabelValue, + }, + }, + Data: map[string][]byte{certificates.CAFileName: []byte("cert2\n")}, + }, + }, + }, + want: []byte("cert2\ncert1\n"), + }, + { + name: "Only include Secrets with the right label", + args: args{ + es: esv1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "es1", Namespace: "ns1"}}, + secrets: []runtime.Object{ + &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "b", + Namespace: "ns1", + Labels: map[string]string{ + label.ClusterNameLabelName: "es1", + common.TypeLabelName: remoteca.TypeLabelValue, + }, + }, + Data: map[string][]byte{certificates.CAFileName: []byte("cert1\n")}, + }, + &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "c", + Namespace: "ns1", + Labels: map[string]string{ + label.ClusterNameLabelName: "es1", + common.TypeLabelName: "foo", + }, + }, + Data: map[string][]byte{certificates.CAFileName: []byte("cert3\n")}, + }, + &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "a", + Namespace: "ns1", + Labels: map[string]string{ + label.ClusterNameLabelName: "es1", + common.TypeLabelName: remoteca.TypeLabelValue, + }, + }, + Data: map[string][]byte{certificates.CAFileName: []byte("cert2\n")}, + }, + }, + }, + want: []byte("cert2\ncert1\n"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + k8sClient := k8s.WrappedFakeClient(tt.args.secrets...) + if err := Reconcile(k8sClient, tt.args.es); (err != nil) != tt.wantErr { + t.Errorf("Reconcile() error = %v, wantErr %v", err, tt.wantErr) + } + remoteCaList := v1.Secret{} + assert.NoError(t, k8sClient.Get(types.NamespacedName{Namespace: "ns1", Name: "es1-es-remote-ca"}, &remoteCaList)) + content, ok := remoteCaList.Data[certificates.CAFileName] + assert.True(t, ok) + assert.Equal(t, tt.want, content) + }) + } +} diff --git a/pkg/controller/elasticsearch/client/client.go b/pkg/controller/elasticsearch/client/client.go index 5bed857f70..6ed21c9e01 100644 --- a/pkg/controller/elasticsearch/client/client.go +++ b/pkg/controller/elasticsearch/client/client.go @@ -87,6 +87,8 @@ type Client interface { GetNodesStats(ctx context.Context) (NodesStats, error) // ClusterBootstrappedForZen2 returns true if the cluster is relying on zen2 orchestration. ClusterBootstrappedForZen2(ctx context.Context) (bool, error) + // UpdateRemoteClusterSettings updates the remote clusters of a cluster. + UpdateRemoteClusterSettings(ctx context.Context, settings RemoteClustersSettings) error // AddVotingConfigExclusions sets the transient and persistent setting of the same name in cluster settings. // // If timeout is the empty string, the default is used. diff --git a/pkg/controller/elasticsearch/client/model.go b/pkg/controller/elasticsearch/client/model.go index 3c835121fa..561aa99b28 100644 --- a/pkg/controller/elasticsearch/client/model.go +++ b/pkg/controller/elasticsearch/client/model.go @@ -364,19 +364,18 @@ type StartBasicResponse struct { ErrorMessage string `json:"error_message"` } -// Settings is the root element of settings. -type Settings struct { +// RemoteClustersSettings is used to build a request to update remote clusters. +type RemoteClustersSettings struct { PersistentSettings *SettingsGroup `json:"persistent,omitempty"` - TransientSettings *SettingsGroup `json:"transient,omitempty"` } -// SettingsGroup is a group of settings, either to transient or persistent. +// SettingsGroup is a group of persistent settings. type SettingsGroup struct { - Cluster Cluster `json:"cluster,omitempty"` + Cluster RemoteClusters `json:"cluster,omitempty"` } -// Cluster models the configuration of the cluster. -type Cluster struct { +// RemoteClusters models the configuration of the remote clusters. +type RemoteClusters struct { RemoteClusters map[string]RemoteCluster `json:"remote,omitempty"` } diff --git a/pkg/controller/elasticsearch/client/model_test.go b/pkg/controller/elasticsearch/client/model_test.go index 305f8171f2..f19860d723 100644 --- a/pkg/controller/elasticsearch/client/model_test.go +++ b/pkg/controller/elasticsearch/client/model_test.go @@ -15,14 +15,14 @@ import ( func TestModel_RemoteCluster(t *testing.T) { tests := []struct { name string - arg Settings + arg RemoteClustersSettings want string }{ { name: "Simple remote cluster", - arg: Settings{ + arg: RemoteClustersSettings{ PersistentSettings: &SettingsGroup{ - Cluster: Cluster{ + Cluster: RemoteClusters{ RemoteClusters: map[string]RemoteCluster{ "leader": { Seeds: []string{"127.0.0.1:9300"}, @@ -35,9 +35,9 @@ func TestModel_RemoteCluster(t *testing.T) { }, { name: "Deleted remote cluster", - arg: Settings{ + arg: RemoteClustersSettings{ PersistentSettings: &SettingsGroup{ - Cluster: Cluster{ + Cluster: RemoteClusters{ RemoteClusters: map[string]RemoteCluster{ "leader": { Seeds: nil, diff --git a/pkg/controller/elasticsearch/client/v6.go b/pkg/controller/elasticsearch/client/v6.go index 54397ccc74..cec67acabd 100644 --- a/pkg/controller/elasticsearch/client/v6.go +++ b/pkg/controller/elasticsearch/client/v6.go @@ -84,6 +84,10 @@ func (c *clientV6) GetNodesStats(ctx context.Context) (NodesStats, error) { return nodesStats, c.get(ctx, "/_nodes/_all/stats/os", &nodesStats) } +func (c *clientV6) UpdateRemoteClusterSettings(ctx context.Context, settings RemoteClustersSettings) error { + return c.put(ctx, "/_cluster/settings", &settings, nil) +} + func (c *clientV6) GetLicense(ctx context.Context) (License, error) { var license LicenseResponse return license.License, c.get(ctx, "/_xpack/license", &license) diff --git a/pkg/controller/elasticsearch/driver/driver.go b/pkg/controller/elasticsearch/driver/driver.go index 64c5d87c1d..5bbc9afdae 100644 --- a/pkg/controller/elasticsearch/driver/driver.go +++ b/pkg/controller/elasticsearch/driver/driver.go @@ -10,17 +10,13 @@ import ( "fmt" "time" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" - controller "sigs.k8s.io/controller-runtime/pkg/reconcile" - esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/pkg/controller/common" commondriver "github.com/elastic/cloud-on-k8s/pkg/controller/common/driver" "github.com/elastic/cloud-on-k8s/pkg/controller/common/events" "github.com/elastic/cloud-on-k8s/pkg/controller/common/expectations" "github.com/elastic/cloud-on-k8s/pkg/controller/common/keystore" + commonlicense "github.com/elastic/cloud-on-k8s/pkg/controller/common/license" "github.com/elastic/cloud-on-k8s/pkg/controller/common/operator" "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/pkg/controller/common/version" @@ -35,11 +31,16 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/license" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/reconcile" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/remotecluster" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/services" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/settings" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/user" esversion "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + controller "sigs.k8s.io/controller-runtime/pkg/reconcile" ) var ( @@ -75,6 +76,9 @@ type DefaultDriverParameters struct { Scheme *runtime.Scheme Recorder record.EventRecorder + // LicenseChecker is used for some features to check if an appropriate license is setup + LicenseChecker commonlicense.Checker + // State holds the accumulated state during the reconcile loop ReconcileState *reconcile.State // Observers that observe es clusters state. @@ -122,6 +126,11 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results { return results.WithError(err) } + _, err := common.ReconcileService(ctx, d.Client, d.Scheme(), services.NewTransportService(d.ES), &d.ES) + if err != nil { + return results.WithError(err) + } + externalService, err := common.ReconcileService(ctx, d.Client, d.Scheme(), services.NewExternalService(d.ES), &d.ES) if err != nil { return results.WithError(err) @@ -216,6 +225,16 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results { }, ) + if esReachable { + err = remotecluster.UpdateSettings(ctx, d.Client, esClient, d.Recorder(), d.LicenseChecker, d.ES) + if err != nil { + msg := "Could not update remote clusters in Elasticsearch settings" + d.ReconcileState.AddEvent(corev1.EventTypeWarning, events.EventReasonUnexpected, msg) + log.Error(err, msg, "namespace", d.ES.Namespace, "es_name", d.ES.Name) + results.WithResult(defaultRequeue) + } + } + // Compute seed hosts based on current masters with a podIP if err := settings.UpdateSeedHostsConfigMap(ctx, d.Client, d.Scheme(), d.ES, resourcesState.AllPods); err != nil { return results.WithError(err) diff --git a/pkg/controller/elasticsearch/elasticsearch_controller.go b/pkg/controller/elasticsearch/elasticsearch_controller.go index c4c5afb156..a6da672425 100644 --- a/pkg/controller/elasticsearch/elasticsearch_controller.go +++ b/pkg/controller/elasticsearch/elasticsearch_controller.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/controller/common/expectations" "github.com/elastic/cloud-on-k8s/pkg/controller/common/finalizer" "github.com/elastic/cloud-on-k8s/pkg/controller/common/keystore" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/license" "github.com/elastic/cloud-on-k8s/pkg/controller/common/operator" "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/pkg/controller/common/tracing" @@ -65,10 +66,11 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileEl observerSettings := observer.DefaultSettings observerSettings.Tracer = params.Tracer return &ReconcileElasticsearch{ - Client: client, - scheme: mgr.GetScheme(), - recorder: mgr.GetEventRecorderFor(name), - esObservers: observer.NewManager(observerSettings), + Client: client, + scheme: mgr.GetScheme(), + recorder: mgr.GetEventRecorderFor(name), + licenseChecker: license.NewLicenseChecker(client, params.OperatorNamespace), + esObservers: observer.NewManager(observerSettings), dynamicWatches: watches.NewDynamicWatches(), expectations: expectations.NewClustersExpectations(client), @@ -159,8 +161,9 @@ var _ reconcile.Reconciler = &ReconcileElasticsearch{} type ReconcileElasticsearch struct { k8s.Client operator.Parameters - scheme *runtime.Scheme - recorder record.EventRecorder + scheme *runtime.Scheme + recorder record.EventRecorder + licenseChecker license.Checker esObservers *observer.Manager @@ -309,6 +312,7 @@ func (r *ReconcileElasticsearch) internalReconcile( Observers: r.esObservers, DynamicWatches: r.dynamicWatches, SupportedVersions: *supported, + LicenseChecker: r.licenseChecker, }).Reconcile(ctx) } diff --git a/pkg/controller/elasticsearch/nodespec/podspec_test.go b/pkg/controller/elasticsearch/nodespec/podspec_test.go index 7137321329..8fe3ef6f11 100644 --- a/pkg/controller/elasticsearch/nodespec/podspec_test.go +++ b/pkg/controller/elasticsearch/nodespec/podspec_test.go @@ -140,7 +140,7 @@ func TestBuildPodTemplateSpec(t *testing.T) { Labels: map[string]string{ "common.k8s.elastic.co/type": "elasticsearch", "elasticsearch.k8s.elastic.co/cluster-name": "name", - "elasticsearch.k8s.elastic.co/config-hash": "2019720671", + "elasticsearch.k8s.elastic.co/config-hash": "4065866170", "elasticsearch.k8s.elastic.co/http-scheme": "https", "elasticsearch.k8s.elastic.co/node-data": "false", "elasticsearch.k8s.elastic.co/node-ingest": "true", diff --git a/pkg/controller/elasticsearch/nodespec/volumes.go b/pkg/controller/elasticsearch/nodespec/volumes.go index 535344269b..a843e685a1 100644 --- a/pkg/controller/elasticsearch/nodespec/volumes.go +++ b/pkg/controller/elasticsearch/nodespec/volumes.go @@ -31,6 +31,11 @@ func buildVolumes(esName string, nodeSpec esv1.NodeSet, keystoreResources *keyst esvolume.HTTPCertificatesSecretVolumeMountPath, ) transportCertificatesVolume := transportCertificatesVolume(esName) + remoteCertificateAuthoritiesVolume := volume.NewSecretVolumeWithMountPath( + esv1.RemoteCaSecretName(esName), + esvolume.RemoteCertificateAuthoritiesSecretVolumeName, + esvolume.RemoteCertificateAuthoritiesSecretVolumeMountPath, + ) unicastHostsVolume := volume.NewConfigMapVolume( esv1.UnicastHostsConfigMap(esName), esvolume.UnicastHostsVolumeName, esvolume.UnicastHostsVolumeMountPath, ) @@ -68,6 +73,7 @@ func buildVolumes(esName string, nodeSpec esv1.NodeSet, keystoreResources *keyst unicastHostsVolume.Volume(), probeSecret.Volume(), transportCertificatesVolume.Volume(), + remoteCertificateAuthoritiesVolume.Volume(), httpCertificatesVolume.Volume(), scriptsVolume.Volume(), configVolume.Volume(), @@ -85,6 +91,7 @@ func buildVolumes(esName string, nodeSpec esv1.NodeSet, keystoreResources *keyst unicastHostsVolume.VolumeMount(), probeSecret.VolumeMount(), transportCertificatesVolume.VolumeMount(), + remoteCertificateAuthoritiesVolume.VolumeMount(), httpCertificatesVolume.VolumeMount(), scriptsVolume.VolumeMount(), configVolume.VolumeMount(), diff --git a/pkg/controller/elasticsearch/remotecluster/annotations.go b/pkg/controller/elasticsearch/remotecluster/annotations.go new file mode 100644 index 0000000000..6b427e77ac --- /dev/null +++ b/pkg/controller/elasticsearch/remotecluster/annotations.go @@ -0,0 +1,62 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remotecluster + +import ( + "encoding/json" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/pkg/errors" +) + +const ( + RemoteClustersAnnotationName = "elasticsearch.k8s.elastic.co/remote-clusters" +) + +type expectedRemoteClusterConfiguration struct { + esv1.RemoteCluster + + // ConfigHash is the hash of the remote cluster configuration. It is used to detect when the settings must be updated. + ConfigHash string `json:"configHash"` +} + +// getCurrentRemoteClusters returns a map with the current configuration hash of the remote clusters declared in Elasticsearch. +// A map is returned here to quickly compare with the ones that are new or missing. +// If there's no remote clusters the map is empty but not nil. +func getCurrentRemoteClusters(es esv1.Elasticsearch) (map[string]string, error) { + serializedRemoteClusters, ok := es.Annotations[RemoteClustersAnnotationName] + remoteClusters := make(map[string]string) + if !ok { + return remoteClusters, nil + } + if err := json.Unmarshal([]byte(serializedRemoteClusters), &remoteClusters); err != nil { + return nil, err + } + + return remoteClusters, nil +} + +func annotateWithRemoteClusters(c k8s.Client, es esv1.Elasticsearch, remoteClusters map[string]expectedRemoteClusterConfiguration) error { + // Store a map with the configuration hash for every remote cluster + remoteClusterConfigurations := make(map[string]string, len(remoteClusters)) + for _, remoteCluster := range remoteClusters { + // remoteCluster.Name is set by the user, it is supposed to be unique + remoteClusterConfigurations[remoteCluster.Name] = remoteCluster.RemoteCluster.ConfigHash() + } + + // serialize the remote clusters list and update the object + serializedRemoteClusters, err := json.Marshal(remoteClusterConfigurations) + if err != nil { + return errors.Wrapf(err, "failed to serialize configuration") + } + + if es.Annotations == nil { + es.Annotations = make(map[string]string) + } + + es.Annotations[RemoteClustersAnnotationName] = string(serializedRemoteClusters) + return c.Update(&es) +} diff --git a/pkg/controller/elasticsearch/remotecluster/elasticsearch.go b/pkg/controller/elasticsearch/remotecluster/elasticsearch.go new file mode 100644 index 0000000000..92f72539f6 --- /dev/null +++ b/pkg/controller/elasticsearch/remotecluster/elasticsearch.go @@ -0,0 +1,125 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remotecluster + +import ( + "context" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/events" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/license" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/tracing" + esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/services" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "go.elastic.co/apm" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +var log = logf.Log.WithName("remotecluster") + +const enterpriseFeaturesDisabledMsg = "Remote cluster is an enterprise feature. Enterprise features are disabled" + +// UpdateSettings updates the remote clusters in the persistent settings by calling the Elasticsearch API. +func UpdateSettings( + ctx context.Context, + c k8s.Client, + esClient esclient.Client, + eventRecorder record.EventRecorder, + licenseChecker license.Checker, + es esv1.Elasticsearch, +) error { + span, _ := apm.StartSpan(ctx, "update_remote_clusters", tracing.SpanTypeApp) + defer span.End() + + enabled, err := licenseChecker.EnterpriseFeaturesEnabled() + if err != nil { + return err + } + if !enabled { + log.Info( + enterpriseFeaturesDisabledMsg, + "namespace", es.Namespace, "es_name", es.Name, + ) + eventRecorder.Eventf(&es, corev1.EventTypeWarning, events.EventAssociationError, enterpriseFeaturesDisabledMsg) + return nil + } + + currentRemoteClusters, err := getCurrentRemoteClusters(es) + if err != nil { + return err + } + expectedRemoteClusters := getExpectedRemoteClusters(es) + + remoteClusters := make(map[string]esclient.RemoteCluster) + // RemoteClusters to add or update + for name, remoteCluster := range expectedRemoteClusters { + if currentConfigHash, ok := currentRemoteClusters[name]; !ok || currentConfigHash != remoteCluster.ConfigHash { + // Declare remote cluster in ES + seedHosts := []string{services.ExternalTransportServiceHost(remoteCluster.ElasticsearchRef.NamespacedName())} + log.Info("Adding or updating remote cluster", + "namespace", es.Namespace, + "es_name", es.Name, + "remote_cluster", remoteCluster.Name, + "seeds", seedHosts, + ) + remoteClusters[name] = esclient.RemoteCluster{Seeds: seedHosts} + } + } + + // RemoteClusters to remove + for name := range currentRemoteClusters { + if _, ok := expectedRemoteClusters[name]; !ok { + log.Info("Removing remote cluster", + "namespace", es.Namespace, + "es_name", es.Name, + "remote_cluster", name, + ) + remoteClusters[name] = esclient.RemoteCluster{Seeds: nil} + } + } + + if len(remoteClusters) > 0 { + // Apply the settings + if err := updateSettings(esClient, remoteClusters); err != nil { + return err + } + // Update the annotation + return annotateWithRemoteClusters(c, es, expectedRemoteClusters) + } + return nil +} + +// getExpectedRemoteClusters returns a map with the expected remote clusters +// A map is returned here because it will be used to quickly compare with the ones that are new or missing. +func getExpectedRemoteClusters(es esv1.Elasticsearch) map[string]expectedRemoteClusterConfiguration { + remoteClusters := make(map[string]expectedRemoteClusterConfiguration) + for _, remoteCluster := range es.Spec.RemoteClusters { + if !remoteCluster.ElasticsearchRef.IsDefined() { + continue + } + remoteCluster.ElasticsearchRef = remoteCluster.ElasticsearchRef.WithDefaultNamespace(es.Namespace) + remoteClusters[remoteCluster.Name] = expectedRemoteClusterConfiguration{ + RemoteCluster: remoteCluster, + ConfigHash: remoteCluster.ConfigHash(), + } + } + return remoteClusters +} + +// updateSettings makes a call to an Elasticsearch cluster to apply a persistent setting. +func updateSettings(esClient esclient.Client, remoteClusters map[string]esclient.RemoteCluster) error { + ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout) + defer cancel() + return esClient.UpdateRemoteClusterSettings(ctx, esclient.RemoteClustersSettings{ + PersistentSettings: &esclient.SettingsGroup{ + Cluster: esclient.RemoteClusters{ + RemoteClusters: remoteClusters, + }, + }, + }) +} diff --git a/pkg/controller/elasticsearch/remotecluster/elasticsearch_test.go b/pkg/controller/elasticsearch/remotecluster/elasticsearch_test.go new file mode 100644 index 0000000000..67b4d0e5cb --- /dev/null +++ b/pkg/controller/elasticsearch/remotecluster/elasticsearch_test.go @@ -0,0 +1,323 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. +package remotecluster + +import ( + "context" + "reflect" + "testing" + + "k8s.io/client-go/tools/record" + + commonv1 "github.com/elastic/cloud-on-k8s/pkg/apis/common/v1" + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/license" + esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func Test_getCurrentRemoteClusters(t *testing.T) { + type args struct { + es esv1.Elasticsearch + } + tests := []struct { + name string + args args + want map[string]string + wantErr bool + }{ + { + name: "Read from a nil annotation should be ok", + args: args{es: esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ns1", + Namespace: "es1", + Annotations: map[string]string{}, + }, + }}, + want: map[string]string{}, + }, + { + name: "Decode annotation into a list of remote cluster", + args: args{es: esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ns1", + Namespace: "es1", + Annotations: map[string]string{"elasticsearch.k8s.elastic.co/remote-clusters": `{"ns2-cluster-2":"3795207740","ns5-cluster-8":"XXXXXXXXXX"}`}, + }, + }}, + want: map[string]string{ + "ns2-cluster-2": "3795207740", + "ns5-cluster-8": "XXXXXXXXXX", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getCurrentRemoteClusters(tt.args.es) + if (err != nil) != tt.wantErr { + t.Errorf("getCurrentRemoteClusters() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getCurrentRemoteClusters() = %v, want %v", got, tt.want) + } + }) + } +} + +type fakeESClient struct { + esclient.Client + settings esclient.RemoteClustersSettings + called bool +} + +func (f *fakeESClient) UpdateRemoteClusterSettings(_ context.Context, settings esclient.RemoteClustersSettings) error { + f.settings = settings + f.called = true + return nil +} +func newEsWithRemoteClusters( + esNamespace, esName string, + annotations map[string]string, + remoteClusters ...esv1.RemoteCluster, +) *esv1.Elasticsearch { + return &esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{ + Name: esName, + Namespace: esNamespace, + Annotations: annotations, + }, + Spec: esv1.ElasticsearchSpec{ + RemoteClusters: remoteClusters, + }, + } +} + +type fakeLicenseChecker struct { + enterpriseFeaturesEnabled bool +} + +func (fakeLicenseChecker) CurrentEnterpriseLicense() (*license.EnterpriseLicense, error) { + return nil, nil +} + +func (f *fakeLicenseChecker) EnterpriseFeaturesEnabled() (bool, error) { + return f.enterpriseFeaturesEnabled, nil +} + +func (fakeLicenseChecker) Valid(_ license.EnterpriseLicense) (bool, error) { + return true, nil +} + +func TestUpdateSettings(t *testing.T) { + type args struct { + esClient *fakeESClient + es *esv1.Elasticsearch + licenseChecker license.Checker + } + tests := []struct { + name string + args args + wantErr bool + wantEsCalled bool + wantSettings esclient.RemoteClustersSettings + }{ + { + name: "Create a new remote cluster", + args: args{ + esClient: &fakeESClient{}, + licenseChecker: &fakeLicenseChecker{true}, + es: newEsWithRemoteClusters( + "ns1", + "es1", + nil, + esv1.RemoteCluster{ + Name: "ns2-es2", + ElasticsearchRef: commonv1.ObjectSelector{Name: "es2", Namespace: "ns2"}, + }, + ), + }, + wantEsCalled: true, + wantSettings: esclient.RemoteClustersSettings{ + PersistentSettings: &esclient.SettingsGroup{ + Cluster: esclient.RemoteClusters{ + RemoteClusters: map[string]esclient.RemoteCluster{ + "ns2-es2": {Seeds: []string{"es2-es-transport.ns2.svc:9300"}}, + }, + }, + }, + }, + }, + { + name: "Create a new remote cluster with no namespace", + args: args{ + esClient: &fakeESClient{}, + licenseChecker: &fakeLicenseChecker{true}, + es: newEsWithRemoteClusters( + "ns1", + "es1", + nil, + esv1.RemoteCluster{ + Name: "ns1-es2", + ElasticsearchRef: commonv1.ObjectSelector{Name: "es2"}, + }), + }, + wantEsCalled: true, + wantSettings: esclient.RemoteClustersSettings{ + PersistentSettings: &esclient.SettingsGroup{ + Cluster: esclient.RemoteClusters{ + RemoteClusters: map[string]esclient.RemoteCluster{ + "ns1-es2": {Seeds: []string{"es2-es-transport.ns1.svc:9300"}}, + }, + }, + }, + }, + }, + { + name: "Remote cluster already exists, do not make an API call", + args: args{ + esClient: &fakeESClient{}, + licenseChecker: &fakeLicenseChecker{true}, + es: newEsWithRemoteClusters( + "ns1", + "es1", + map[string]string{ + "elasticsearch.k8s.elastic.co/remote-clusters": `{"ns1-es2":"2221154215"}`, + }, + esv1.RemoteCluster{ + Name: "ns1-es2", + ElasticsearchRef: commonv1.ObjectSelector{Name: "es2"}, + }), + }, + wantEsCalled: false, + }, + { + name: "Remote cluster already exists but has been updated, we should make an API call", + args: args{ + esClient: &fakeESClient{}, + licenseChecker: &fakeLicenseChecker{true}, + es: newEsWithRemoteClusters( + "ns1", + "es1", + map[string]string{ + "elasticsearch.k8s.elastic.co/remote-clusters": `{"ns1-es2":"8851644973"}`, + }, + esv1.RemoteCluster{ + Name: "ns1-es2", + ElasticsearchRef: commonv1.ObjectSelector{Name: "es2"}, + }), + }, + wantEsCalled: true, + wantSettings: esclient.RemoteClustersSettings{ + PersistentSettings: &esclient.SettingsGroup{ + Cluster: esclient.RemoteClusters{ + RemoteClusters: map[string]esclient.RemoteCluster{ + "ns1-es2": {Seeds: []string{"es2-es-transport.ns1.svc:9300"}}, + }, + }, + }, + }, + }, + { + name: "Remove existing cluster", + args: args{ + esClient: &fakeESClient{}, + licenseChecker: &fakeLicenseChecker{true}, + es: newEsWithRemoteClusters( + "ns1", + "es1", + map[string]string{ + "elasticsearch.k8s.elastic.co/remote-clusters": `{"to-be-deleted":"8538658922","ns1-es2":"2221154215"}`, + }, + esv1.RemoteCluster{ + Name: "ns1-es2", + ElasticsearchRef: commonv1.ObjectSelector{Name: "es2"}, + }), + }, + wantEsCalled: true, + wantSettings: esclient.RemoteClustersSettings{ + PersistentSettings: &esclient.SettingsGroup{ + Cluster: esclient.RemoteClusters{ + RemoteClusters: map[string]esclient.RemoteCluster{ + "to-be-deleted": {Seeds: nil}, + }, + }, + }, + }, + }, + { + name: "No valid license to create a new remote cluster", + args: args{ + esClient: &fakeESClient{}, + licenseChecker: &fakeLicenseChecker{false}, + es: newEsWithRemoteClusters( + "ns1", + "es1", + nil, + esv1.RemoteCluster{ + Name: "es2-ns2", + ElasticsearchRef: commonv1.ObjectSelector{Namespace: "ns2", Name: "es2"}, + }), + }, + wantEsCalled: false, + }, + { + name: "Multiple changes in one call: remote cluster already exists but has been updated, one is added and a last one is removed.", + args: args{ + esClient: &fakeESClient{}, + licenseChecker: &fakeLicenseChecker{true}, + es: newEsWithRemoteClusters( + "ns1", + "es1", + map[string]string{ + "elasticsearch.k8s.elastic.co/remote-clusters": `{"ns1-es2":"8851644973","ns1-es5":"8851644973"}`, + }, + esv1.RemoteCluster{ + Name: "ns1-es2", + ElasticsearchRef: commonv1.ObjectSelector{Name: "es2"}, + }, + esv1.RemoteCluster{ + Name: "ns1-es4", + ElasticsearchRef: commonv1.ObjectSelector{Name: "es4"}, + }, + ), + }, + wantEsCalled: true, + wantSettings: esclient.RemoteClustersSettings{ + PersistentSettings: &esclient.SettingsGroup{ + Cluster: esclient.RemoteClusters{ + RemoteClusters: map[string]esclient.RemoteCluster{ + "ns1-es2": {Seeds: []string{"es2-es-transport.ns1.svc:9300"}}, + "ns1-es5": {Seeds: nil}, + "ns1-es4": {Seeds: []string{"es4-es-transport.ns1.svc:9300"}}, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client := k8s.WrappedFakeClient(tt.args.es) + if err := UpdateSettings( + context.Background(), + client, + tt.args.esClient, + record.NewFakeRecorder(100), + tt.args.licenseChecker, + *tt.args.es, + ); (err != nil) != tt.wantErr { + t.Errorf("UpdateRemoteClusterSettings() error = %v, wantErr %v", err, tt.wantErr) + } + // Check the settings + assert.Equal(t, tt.wantEsCalled, tt.args.esClient.called) + if tt.wantEsCalled { + assert.Equal(t, tt.wantSettings, tt.args.esClient.settings) + } + }) + } +} diff --git a/pkg/controller/elasticsearch/remotecluster/remoteca/controller.go b/pkg/controller/elasticsearch/remotecluster/remoteca/controller.go new file mode 100644 index 0000000000..7e7e45b8ec --- /dev/null +++ b/pkg/controller/elasticsearch/remotecluster/remoteca/controller.go @@ -0,0 +1,289 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remoteca + +import ( + "context" + "fmt" + "time" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/association" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/license" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/operator" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/tracing" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/pkg/utils/rbac" + "go.elastic.co/apm" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + name = "remoteca-controller" + + EventReasonClusterCaCertNotFound = "ClusterCaCertNotFound" +) + +var ( + defaultRequeue = reconcile.Result{Requeue: true, RequeueAfter: 20 * time.Second} +) + +// NewReconciler returns a new reconcile.Reconciler +func NewReconciler(mgr manager.Manager, accessReviewer rbac.AccessReviewer, params operator.Parameters) *ReconcileRemoteCa { + c := k8s.WrapClient(mgr.GetClient()) + return &ReconcileRemoteCa{ + Client: c, + accessReviewer: accessReviewer, + scheme: mgr.GetScheme(), + watches: watches.NewDynamicWatches(), + recorder: mgr.GetEventRecorderFor(name), + licenseChecker: license.NewLicenseChecker(c, params.OperatorNamespace), + Parameters: params, + } +} + +var _ reconcile.Reconciler = &ReconcileRemoteCa{} + +// ReconcileRemoteCa reconciles remote CA Secrets. +type ReconcileRemoteCa struct { + k8s.Client + operator.Parameters + accessReviewer rbac.AccessReviewer + scheme *runtime.Scheme + recorder record.EventRecorder + watches watches.DynamicWatches + licenseChecker license.Checker + + // iteration is the number of times this controller has run its Reconcile method + iteration uint64 +} + +// Reconcile reads that state of the cluster for the expected remote clusters in this Kubernetes cluster. +// It copies the remote CA Secrets so they can be trusted by every peer Elasticsearch clusters. +func (r *ReconcileRemoteCa) Reconcile(request reconcile.Request) (reconcile.Result, error) { + defer common.LogReconciliationRun(log, request, "es_name", &r.iteration)() + tx, ctx := tracing.NewTransaction(r.Tracer, request.NamespacedName, "remoteca") + defer tracing.EndTransaction(tx) + + // Fetch the local Elasticsearch spec + es := esv1.Elasticsearch{} + err := r.Get(request.NamespacedName, &es) + if err != nil { + if errors.IsNotFound(err) { + return deleteAllRemoteCa(ctx, r, request.NamespacedName) + } + return reconcile.Result{}, err + } + + if common.IsPaused(es.ObjectMeta) { + log.Info("Object is paused. Skipping reconciliation", "namespace", es.Namespace, "es_name", es.Name) + return common.PauseRequeue, nil + } + + enabled, err := r.licenseChecker.EnterpriseFeaturesEnabled() + if err != nil { + return defaultRequeue, err + } + if !enabled { + log.Info( + "Remote cluster controller is an enterprise feature. Enterprise features are disabled", + "namespace", es.Namespace, "es_name", es.Name, + ) + return reconcile.Result{}, nil + } + + return doReconcile(ctx, r, &es) +} + +// deleteAllRemoteCa deletes all associated remote certificate authorities +func deleteAllRemoteCa(ctx context.Context, r *ReconcileRemoteCa, es types.NamespacedName) (reconcile.Result, error) { + span, _ := apm.StartSpan(ctx, "delete_all_remote_ca", tracing.SpanTypeApp) + defer span.End() + + remoteClusters, err := remoteClustersInvolvedWith(ctx, r.Client, es) + if err != nil { + return reconcile.Result{}, err + } + results := &reconciler.Results{} + for remoteCluster := range remoteClusters { + if err := deleteCertificateAuthorities(ctx, r, es, remoteCluster); err != nil { + results.WithError(err) + } + } + return results.Aggregate() +} + +func doReconcile( + ctx context.Context, + r *ReconcileRemoteCa, + localEs *esv1.Elasticsearch, +) (reconcile.Result, error) { + localClusterKey := k8s.ExtractNamespacedName(localEs) + + expectedRemoteClusters, err := getExpectedRemoteClusters(ctx, r.Client, localEs) + if err != nil { + return reconcile.Result{}, err + } + + // Get all the clusters to which this reconciled cluster is connected to according to the existing remote CAs. + // remoteClustersInvolved is used to delete the CA certificates and cancel any trust relationships + // that may have existed in the past but should not exist anymore. + remoteClustersInvolved, err := remoteClustersInvolvedWith(ctx, r.Client, localClusterKey) + if err != nil { + return reconcile.Result{}, err + } + + results := &reconciler.Results{} + // Create or update expected remote CA + for remoteEsKey := range expectedRemoteClusters { + // Get the remote Elasticsearch cluster associated with this remote CA + remoteEs := &esv1.Elasticsearch{} + if err := r.Client.Get(remoteEsKey, remoteEs); err != nil { + if errors.IsNotFound(err) { + // Remote cluster does not exist, skip it + continue + } + return reconcile.Result{}, err + } + accessAllowed, err := isRemoteClusterAssociationAllowed(r.accessReviewer, localEs, remoteEs, r.recorder) + if err != nil { + return reconcile.Result{}, err + } + // if the remote CA exists but isn't allowed anymore, it will be deleted next + if !accessAllowed { + continue + } + delete(remoteClustersInvolved, remoteEsKey) + results.WithResults(createOrUpdateCertificateAuthorities(ctx, r, localEs, remoteEs)) + if results.HasError() { + return results.Aggregate() + } + } + + // Delete existing but not expected remote CA + for toDelete := range remoteClustersInvolved { + log.V(1).Info("Deleting remote CA", + "local_namespace", localEs.Namespace, + "local_name", localEs.Name, + "remote_namespace", toDelete.Namespace, + "remote_name", toDelete.Name, + ) + results.WithError(deleteCertificateAuthorities(ctx, r, localClusterKey, toDelete)) + } + return results.WithResult(association.RequeueRbacCheck(r.accessReviewer)).Aggregate() +} + +func caCertMissingError(cluster types.NamespacedName) string { + return fmt.Sprintf("Cannot find CA certificate cluster %s/%s", cluster.Namespace, cluster.Name) +} + +// getExpectedRemoteClusters returns all the remote cluster keys for which a remote ca should created +// The CA certificates must be copied from the remote cluster to the local one and vice versa +func getExpectedRemoteClusters( + ctx context.Context, + c k8s.Client, + associatedEs *esv1.Elasticsearch, +) (map[types.NamespacedName]struct{}, error) { + span, _ := apm.StartSpan(ctx, "get_expected_remote_ca", tracing.SpanTypeApp) + defer span.End() + expectedRemoteClusters := make(map[types.NamespacedName]struct{}) + + // Add remote clusters declared in the Spec + for _, remoteCluster := range associatedEs.Spec.RemoteClusters { + if !remoteCluster.ElasticsearchRef.IsDefined() { + continue + } + esRef := remoteCluster.ElasticsearchRef.WithDefaultNamespace(associatedEs.Namespace) + expectedRemoteClusters[esRef.NamespacedName()] = struct{}{} + } + + var list esv1.ElasticsearchList + if err := c.List(&list, &client.ListOptions{}); err != nil { + return nil, err + } + + // Seek for Elasticsearch resources where this cluster is declared as a remote cluster + for _, es := range list.Items { + for _, remoteCluster := range es.Spec.RemoteClusters { + if !remoteCluster.ElasticsearchRef.IsDefined() { + continue + } + esRef := remoteCluster.ElasticsearchRef.WithDefaultNamespace(es.Namespace) + if esRef.Namespace == associatedEs.Namespace && + esRef.Name == associatedEs.Name { + expectedRemoteClusters[k8s.ExtractNamespacedName(&es)] = struct{}{} + } + } + } + + return expectedRemoteClusters, nil +} + +// remoteClustersInvolvedWith returns for a given Elasticsearch cluster all the Elasticsearch keys for which +// the remote certificate authorities have been copied, i.e. all the other Elasticsearch clusters for which this cluster +// has been involved in a remote cluster association. +// In order to get all of them we: +// 1. List all the remote CA copied locally. +// 2. List all the other Elasticsearch clusters for which the CA of the given cluster has been copied. +func remoteClustersInvolvedWith( + ctx context.Context, + c k8s.Client, + es types.NamespacedName, +) (map[types.NamespacedName]struct{}, error) { + span, _ := apm.StartSpan(ctx, "get_current_remote_ca", tracing.SpanTypeApp) + defer span.End() + + currentRemoteClusters := make(map[types.NamespacedName]struct{}) + + // 1. Get clusters whose CA has been copied into the local namespace. + var remoteCAList corev1.SecretList + if err := c.List( + &remoteCAList, + client.InNamespace(es.Namespace), + LabelSelector(es.Name), + ); err != nil { + return nil, err + } + for _, remoteCA := range remoteCAList.Items { + remoteNs := remoteCA.Labels[RemoteClusterNamespaceLabelName] + remoteEs := remoteCA.Labels[RemoteClusterNameLabelName] + currentRemoteClusters[types.NamespacedName{ + Namespace: remoteNs, + Name: remoteEs, + }] = struct{}{} + } + + // 2. Get clusters for which the CA of the local cluster has been copied. + if err := c.List( + &remoteCAList, + client.MatchingLabels(map[string]string{ + common.TypeLabelName: TypeLabelValue, + RemoteClusterNamespaceLabelName: es.Namespace, + RemoteClusterNameLabelName: es.Name, + }), + ); err != nil { + return nil, err + } + for _, remoteCA := range remoteCAList.Items { + remoteEs := remoteCA.Labels[label.ClusterNameLabelName] + currentRemoteClusters[types.NamespacedName{ + Namespace: remoteCA.Namespace, + Name: remoteEs, + }] = struct{}{} + } + + return currentRemoteClusters, nil +} diff --git a/pkg/controller/elasticsearch/remotecluster/remoteca/controller_test.go b/pkg/controller/elasticsearch/remotecluster/remoteca/controller_test.go new file mode 100644 index 0000000000..1f9f1f1750 --- /dev/null +++ b/pkg/controller/elasticsearch/remotecluster/remoteca/controller_test.go @@ -0,0 +1,473 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remoteca + +import ( + "reflect" + "testing" + + commonv1 "github.com/elastic/cloud-on-k8s/pkg/apis/common/v1" + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/certificates" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/license" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/certificates/transport" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/pkg/utils/rbac" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type clusterBuilder struct { + name, namespace string + remoteClusters []commonv1.ObjectSelector +} + +func newClusteBuilder(namespace, name string) *clusterBuilder { + return &clusterBuilder{ + name: name, + namespace: namespace, + } +} + +func (cb *clusterBuilder) withRemoteCluster(namespace, name string) *clusterBuilder { + cb.remoteClusters = append(cb.remoteClusters, commonv1.ObjectSelector{ + Name: name, + Namespace: namespace, + }) + return cb +} + +func (cb *clusterBuilder) build() *esv1.Elasticsearch { + remoteClusters := make([]esv1.RemoteCluster, len(cb.remoteClusters)) + i := 0 + for _, remoteCluster := range cb.remoteClusters { + remoteClusters[i] = esv1.RemoteCluster{ + ElasticsearchRef: commonv1.ObjectSelector{ + Name: remoteCluster.Name, + Namespace: remoteCluster.Namespace, + }} + i++ + } + + return &esv1.Elasticsearch{ + ObjectMeta: v1.ObjectMeta{ + Namespace: cb.namespace, + Name: cb.name, + }, + Spec: esv1.ElasticsearchSpec{ + RemoteClusters: remoteClusters, + }, + } +} + +type fakeAccessReviewer struct { + allowed bool + err error +} + +func (f *fakeAccessReviewer) AccessAllowed(_ string, _ string, _ runtime.Object) (bool, error) { + return f.allowed, f.err +} + +type fakeLicenseChecker struct { + enterpriseFeaturesEnabled bool +} + +func (f fakeLicenseChecker) CurrentEnterpriseLicense() (*license.EnterpriseLicense, error) { + return nil, nil +} +func (f fakeLicenseChecker) EnterpriseFeaturesEnabled() (bool, error) { + return f.enterpriseFeaturesEnabled, nil +} +func (f fakeLicenseChecker) Valid(l license.EnterpriseLicense) (bool, error) { + return f.enterpriseFeaturesEnabled, nil +} + +func fakePublicCa(namespace, name string) *corev1.Secret { + namespacedName := types.NamespacedName{ + Name: name, + Namespace: namespace, + } + transportPublicCertKey := transport.PublicCertsSecretRef(namespacedName) + return &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Namespace: transportPublicCertKey.Namespace, + Name: transportPublicCertKey.Name, + }, + Data: map[string][]byte{ + certificates.CAFileName: []byte(namespacedName.String()), + }, + } +} + +// remoteCa builds an expected remote Ca +func remoteCa(localNamespace, localName, remoteNamespace, remoteName string) *corev1.Secret { + remoteNamespacedName := types.NamespacedName{ + Name: remoteName, + Namespace: remoteNamespace, + } + return &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Namespace: localNamespace, + Name: remoteCASecretName(localName, remoteNamespacedName), + Labels: map[string]string{ + "common.k8s.elastic.co/type": "remote-ca", + "elasticsearch.k8s.elastic.co/cluster-name": localName, + "elasticsearch.k8s.elastic.co/remote-cluster-name": remoteName, + "elasticsearch.k8s.elastic.co/remote-cluster-namespace": remoteNamespace, + }, + }, + Data: map[string][]byte{ + certificates.CAFileName: []byte(remoteNamespacedName.String()), + }, + } +} + +func withDataCert(caSecret *corev1.Secret, newCa []byte) *corev1.Secret { + caSecret.Data[certificates.CAFileName] = newCa + return caSecret +} + +func TestReconcileRemoteCa_Reconcile(t *testing.T) { + type fields struct { + clusters []runtime.Object + accessReviewer rbac.AccessReviewer + licenseChecker license.Checker + } + type args struct { + request reconcile.Request + } + tests := []struct { + name string + fields fields + args args + + expectedSecrets []*corev1.Secret + unexpectedSecrets []types.NamespacedName + want reconcile.Result + wantErr bool + }{ + { + name: "Simple remote cluster ns1/es1 -> ns2/es2", + fields: fields{ + clusters: []runtime.Object{ + newClusteBuilder("ns1", "es1").withRemoteCluster("ns2", "es2").build(), + fakePublicCa("ns1", "es1"), + newClusteBuilder("ns2", "es2").build(), + fakePublicCa("ns2", "es2"), + }, + accessReviewer: &fakeAccessReviewer{allowed: true}, + licenseChecker: &fakeLicenseChecker{enterpriseFeaturesEnabled: true}, + }, + args: args{ + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "es1", + Namespace: "ns1", + }, + }, + }, + expectedSecrets: []*corev1.Secret{ + remoteCa("ns1", "es1", "ns2", "es2"), + remoteCa("ns2", "es2", "ns1", "es1"), + }, + want: reconcile.Result{}, + wantErr: false, + }, + { + name: "Bi-directional remote cluster ns1/es1 <-> ns2/es2", + fields: fields{ + clusters: []runtime.Object{ + newClusteBuilder("ns1", "es1").withRemoteCluster("ns2", "es2").build(), + fakePublicCa("ns1", "es1"), + newClusteBuilder("ns2", "es2").withRemoteCluster("ns1", "es1").build(), + fakePublicCa("ns2", "es2"), + }, + accessReviewer: &fakeAccessReviewer{allowed: true}, + licenseChecker: &fakeLicenseChecker{enterpriseFeaturesEnabled: true}, + }, + args: args{ + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "es1", + Namespace: "ns1", + }, + }, + }, + expectedSecrets: []*corev1.Secret{ + remoteCa("ns1", "es1", "ns2", "es2"), + remoteCa("ns2", "es2", "ns1", "es1"), + }, + want: reconcile.Result{}, + wantErr: false, + }, + { + name: "Deleted remote cluster", + fields: fields{ + clusters: []runtime.Object{ + newClusteBuilder("ns1", "es1").build(), + fakePublicCa("ns1", "es1"), + newClusteBuilder("ns2", "es2").build(), + fakePublicCa("ns2", "es2"), + remoteCa("ns1", "es1", "ns2", "es2"), + remoteCa("ns2", "es2", "ns1", "es1"), + }, + accessReviewer: &fakeAccessReviewer{allowed: true}, + licenseChecker: &fakeLicenseChecker{enterpriseFeaturesEnabled: true}, + }, + args: args{ + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "es1", + Namespace: "ns1", + }, + }, + }, + unexpectedSecrets: []types.NamespacedName{ + { + Namespace: "ns1", + Name: remoteCASecretName("es1", types.NamespacedName{ + Namespace: "ns2", + Name: "es2", + }), + }, + { + Namespace: "ns2", + Name: remoteCASecretName("es2", types.NamespacedName{ + Namespace: "ns1", + Name: "es1", + }), + }, + }, + want: reconcile.Result{}, + wantErr: false, + }, + { + name: "CA content has been updated, remote ca must be reconciled", + fields: fields{ + clusters: []runtime.Object{ + newClusteBuilder("ns1", "es1").withRemoteCluster("ns2", "es2").build(), + fakePublicCa("ns1", "es1"), + newClusteBuilder("ns2", "es2").build(), + fakePublicCa("ns2", "es2"), + withDataCert(remoteCa("ns1", "es1", "ns2", "es2"), []byte("foo")), + withDataCert(remoteCa("ns2", "es2", "ns1", "es1"), []byte("bar")), + }, + accessReviewer: &fakeAccessReviewer{allowed: true}, + licenseChecker: &fakeLicenseChecker{enterpriseFeaturesEnabled: true}, + }, + args: args{ + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "es1", + Namespace: "ns1", + }, + }, + }, + expectedSecrets: []*corev1.Secret{ + remoteCa("ns1", "es1", "ns2", "es2"), + remoteCa("ns2", "es2", "ns1", "es1"), + }, + want: reconcile.Result{}, + wantErr: false, + }, + { + // ns1/es1 has been deleted - all related secrets in other namespaces must be deleted + name: "Deleted cluster", + fields: fields{ + clusters: []runtime.Object{ + // ns2/es2 + newClusteBuilder("ns2", "es2").withRemoteCluster("ns1", "es1").build(), + fakePublicCa("ns2", "es2"), + remoteCa("ns2", "es2", "ns1", "es1"), + // ns3/es3 + newClusteBuilder("ns3", "es3").withRemoteCluster("ns1", "es1").build(), + fakePublicCa("ns3", "es3"), + remoteCa("ns3", "es3", "ns1", "es1"), + }, + accessReviewer: &fakeAccessReviewer{allowed: true}, + licenseChecker: &fakeLicenseChecker{enterpriseFeaturesEnabled: true}, + }, + args: args{ + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "es1", + Namespace: "ns1", + }, + }, + }, + unexpectedSecrets: []types.NamespacedName{ + { + Namespace: "ns3", + Name: remoteCASecretName("es3", types.NamespacedName{ + Namespace: "ns1", + Name: "es1", + }), + }, + { + Namespace: "ns2", + Name: remoteCASecretName("es2", types.NamespacedName{ + Namespace: "ns1", + Name: "es1", + }), + }, + }, + want: reconcile.Result{}, + wantErr: false, + }, + { + name: "No enterprise license, remote ca are not created", + fields: fields{ + clusters: []runtime.Object{ + newClusteBuilder("ns1", "es1").withRemoteCluster("ns2", "es2").build(), + fakePublicCa("ns1", "es1"), + newClusteBuilder("ns2", "es2").build(), + fakePublicCa("ns2", "es2"), + }, + accessReviewer: &fakeAccessReviewer{allowed: true}, + licenseChecker: &fakeLicenseChecker{enterpriseFeaturesEnabled: false}, + }, + args: args{ + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "es1", + Namespace: "ns1", + }, + }, + }, + unexpectedSecrets: []types.NamespacedName{ + { + Namespace: "ns1", + Name: remoteCASecretName("es1", types.NamespacedName{ + Namespace: "ns2", + Name: "es2", + }), + }, + { + Namespace: "ns2", + Name: remoteCASecretName("es2", types.NamespacedName{ + Namespace: "ns1", + Name: "es1", + }), + }, + }, + want: reconcile.Result{}, + wantErr: false, + }, + { + name: "No enterprise license, existing remote ca are left untouched", + fields: fields{ + clusters: []runtime.Object{ + newClusteBuilder("ns1", "es1").withRemoteCluster("ns2", "es2").build(), + fakePublicCa("ns1", "es1"), + newClusteBuilder("ns2", "es2").build(), + fakePublicCa("ns2", "es2"), + remoteCa("ns1", "es1", "ns2", "es2"), + remoteCa("ns2", "es2", "ns1", "es1"), + }, + accessReviewer: &fakeAccessReviewer{allowed: true}, + licenseChecker: &fakeLicenseChecker{enterpriseFeaturesEnabled: false}, + }, + args: args{ + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "es1", + Namespace: "ns1", + }, + }, + }, + expectedSecrets: []*corev1.Secret{ + remoteCa("ns1", "es1", "ns2", "es2"), + remoteCa("ns2", "es2", "ns1", "es1"), + }, + want: reconcile.Result{}, + wantErr: false, + }, + { + name: "Association is not allowed, existing remote ca are removed", + fields: fields{ + clusters: []runtime.Object{ + newClusteBuilder("ns1", "es1").withRemoteCluster("ns2", "es2").build(), + fakePublicCa("ns1", "es1"), + newClusteBuilder("ns2", "es2").build(), + fakePublicCa("ns2", "es2"), + remoteCa("ns1", "es1", "ns2", "es2"), + remoteCa("ns2", "es2", "ns1", "es1"), + }, + accessReviewer: &fakeAccessReviewer{allowed: false}, + licenseChecker: &fakeLicenseChecker{enterpriseFeaturesEnabled: true}, + }, + args: args{ + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "es1", + Namespace: "ns1", + }, + }, + }, + unexpectedSecrets: []types.NamespacedName{ + { + Namespace: "ns1", + Name: remoteCASecretName("es1", types.NamespacedName{Namespace: "ns2", Name: "es2"}), + }, + { + Namespace: "ns2", + Name: remoteCASecretName("es2", types.NamespacedName{Namespace: "ns1", Name: "es1"}), + }, + }, + want: reconcile.Result{}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := watches.NewDynamicWatches() + require.NoError(t, w.Secrets.InjectScheme(scheme.Scheme)) + r := &ReconcileRemoteCa{ + Client: k8s.WrappedFakeClient(tt.fields.clusters...), + accessReviewer: tt.fields.accessReviewer, + scheme: k8s.Scheme(), + watches: w, + licenseChecker: tt.fields.licenseChecker, + recorder: record.NewFakeRecorder(10), + } + got, err := r.Reconcile(tt.args.request) + if (err != nil) != tt.wantErr { + t.Errorf("ReconcileRemoteCa.Reconcile() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ReconcileRemoteCa.Reconcile() = %v, want %v", got, tt.want) + } + // Check that expected secrets are here + for _, expectedSecret := range tt.expectedSecrets { + var actualSecret corev1.Secret + assert.NoError(t, r.Client.Get(types.NamespacedName{Namespace: expectedSecret.Namespace, Name: expectedSecret.Name}, &actualSecret)) + // Compare content + actualCa, ok := actualSecret.Data[certificates.CAFileName] + assert.True(t, ok) + assert.Equal(t, expectedSecret.Data[certificates.CAFileName], actualCa) + // Compare labels + assert.NotNil(t, actualSecret.Labels) + assert.Equal(t, expectedSecret.Labels, actualSecret.Labels) + } + // Check that unexpected secrets does not exist + for _, unexpectedSecret := range tt.unexpectedSecrets { + var actualSecret corev1.Secret + err := r.Client.Get(types.NamespacedName{Namespace: unexpectedSecret.Namespace, Name: unexpectedSecret.Name}, &actualSecret) + assert.True(t, apierrors.IsNotFound(err)) + } + }) + } +} diff --git a/pkg/controller/elasticsearch/remotecluster/remoteca/labels.go b/pkg/controller/elasticsearch/remotecluster/remoteca/labels.go new file mode 100644 index 0000000000..818aef6020 --- /dev/null +++ b/pkg/controller/elasticsearch/remotecluster/remoteca/labels.go @@ -0,0 +1,62 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remoteca + +import ( + "fmt" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + // RemoteClusterNamespaceLabelName used to represent the namespace of the RemoteCluster in a TrustRelationship. + RemoteClusterNamespaceLabelName = "elasticsearch.k8s.elastic.co/remote-cluster-namespace" + // RemoteClusterNameLabelName used to represent the name of the RemoteCluster in a TrustRelationship. + RemoteClusterNameLabelName = "elasticsearch.k8s.elastic.co/remote-cluster-name" + // TypeLabelValue is a type used to identify a Secret which contains the CA of a remote cluster. + TypeLabelValue = "remote-ca" + // remoteCASecretSuffix is the suffix added to the aforementioned Secret. + remoteCASecretSuffix = "remote-ca" +) + +func remoteCAObjectMeta( + name string, + owner *esv1.Elasticsearch, + remote types.NamespacedName, +) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Name: name, + Namespace: owner.Namespace, + Labels: map[string]string{ + RemoteClusterNamespaceLabelName: remote.Namespace, + RemoteClusterNameLabelName: remote.Name, + label.ClusterNameLabelName: owner.Name, + common.TypeLabelName: TypeLabelValue, + }, + } +} + +// RemoteCASecretName returns the name of the Secret that contains the transport CA of a remote cluster +func remoteCASecretName( + localClusterName string, + remoteCluster types.NamespacedName, +) string { + return esv1.ESNamer.Suffix( + fmt.Sprintf("%s-%s-%s", localClusterName, remoteCluster.Namespace, remoteCluster.Name), + remoteCASecretSuffix, + ) +} + +func LabelSelector(esName string) client.MatchingLabels { + return map[string]string{ + label.ClusterNameLabelName: esName, + common.TypeLabelName: TypeLabelValue, + } +} diff --git a/pkg/controller/elasticsearch/remotecluster/remoteca/rbac.go b/pkg/controller/elasticsearch/remotecluster/remoteca/rbac.go new file mode 100644 index 0000000000..5a6675ac1c --- /dev/null +++ b/pkg/controller/elasticsearch/remotecluster/remoteca/rbac.go @@ -0,0 +1,58 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remoteca + +import ( + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/events" + "github.com/elastic/cloud-on-k8s/pkg/utils/rbac" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +var log = logf.Log.WithName("remotecluster-remoteca") + +// isRemoteClusterAssociationAllowed checks if a bi-directional association is allowed between 2 clusters. +func isRemoteClusterAssociationAllowed( + accessReviewer rbac.AccessReviewer, + localEs, remoteEs *esv1.Elasticsearch, + eventRecorder record.EventRecorder, +) (bool, error) { + accessAllowed, err := accessReviewer.AccessAllowed(localEs.Spec.ServiceAccountName, localEs.Namespace, remoteEs) + if err != nil { + return false, err + } + if !accessAllowed { + logNotAllowedAssociation(localEs, remoteEs, eventRecorder) + return false, nil + } + accessAllowed, err = accessReviewer.AccessAllowed(remoteEs.Spec.ServiceAccountName, remoteEs.Namespace, localEs) + if err != nil { + return false, err + } + if !accessAllowed { + logNotAllowedAssociation(remoteEs, localEs, eventRecorder) + return false, nil + } + return true, nil +} + +func logNotAllowedAssociation(localEs, remoteEs *esv1.Elasticsearch, eventRecorder record.EventRecorder) { + log.Info("Remote cluster association not allowed", + "local_name", localEs.Name, + "local_namespace", localEs.GetNamespace(), + "service_account", localEs.Spec.ServiceAccountName, + "remote_namespace", remoteEs.GetNamespace(), + "remote_name", remoteEs.GetName(), + ) + eventRecorder.Eventf( + localEs, + corev1.EventTypeWarning, + events.EventAssociationError, + "Remote cluster association not allowed: %s/%s to %s/%s", + localEs.Namespace, localEs.Name, remoteEs.Namespace, remoteEs.Name, + ) +} diff --git a/pkg/controller/elasticsearch/remotecluster/remoteca/secret.go b/pkg/controller/elasticsearch/remotecluster/remoteca/secret.go new file mode 100644 index 0000000000..0a3a9137a4 --- /dev/null +++ b/pkg/controller/elasticsearch/remotecluster/remoteca/secret.go @@ -0,0 +1,185 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remoteca + +import ( + "context" + "reflect" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/certificates" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/tracing" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/certificates/transport" + "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/pkg/utils/maps" + "go.elastic.co/apm" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" +) + +// createOrUpdateCertificateAuthorities creates the two Secrets that are needed to establish a trust relationship between +// two clusters. This is a bidirectional, symmetrical, action. In order to establish the trust relationship between +// a local and a remote cluster we must: +// * Copy the CA of the local cluster to the remote one. +// * Copy the CA of the remote cluster to the local one. +func createOrUpdateCertificateAuthorities( + ctx context.Context, + r *ReconcileRemoteCa, + local, remote *esv1.Elasticsearch, +) *reconciler.Results { + span, _ := apm.StartSpan(ctx, "create_or_update_remote_ca", tracing.SpanTypeApp) + defer span.End() + results := &reconciler.Results{} + + localClusterKey := k8s.ExtractNamespacedName(local) + remoteClusterKey := k8s.ExtractNamespacedName(remote) + + // Add watches on the CA secret of the local cluster. + if err := addCertificatesAuthorityWatches(r, localClusterKey, remoteClusterKey); err != nil { + return results.WithError(err) + } + + // Add watches on the CA secret of the remote cluster. + if err := addCertificatesAuthorityWatches(r, remoteClusterKey, localClusterKey); err != nil { + return results.WithError(err) + } + + log.V(1).Info( + "Setting up remote CA", + "local_namespace", localClusterKey.Namespace, + "local_name", localClusterKey.Namespace, + "remote_namespace", remote.Namespace, + "remote_name", remote.Name, + ) + + // Copy CA from remote (source) to local (target) cluster + if err := copyCertificateAuthority(ctx, r, remote, local); err != nil { + if !errors.IsNotFound(err) { + return results.WithError(err) + } + results.WithResult(defaultRequeue) + } + + // Reciprocally, copy CA from local (source) to remote (target) cluster + if err := copyCertificateAuthority(ctx, r, local, remote); err != nil { + if !errors.IsNotFound(err) { + return results.WithError(err) + } + results.WithResult(defaultRequeue) + } + + return nil +} + +// copyCertificateAuthority creates a copy of the CA from a source cluster to a target cluster +func copyCertificateAuthority( + ctx context.Context, + r *ReconcileRemoteCa, + source, target *esv1.Elasticsearch, +) error { + sourceKey := k8s.ExtractNamespacedName(source) + // Check if CA of the source cluster exists + sourceCA := &corev1.Secret{} + if err := r.Client.Get(transport.PublicCertsSecretRef(sourceKey), sourceCA); err != nil { + return err + } + + if len(sourceCA.Data[certificates.CAFileName]) == 0 { + log.Info( + "Cannot find CA cert", + "local_namespace", source.Namespace, + "local_name", source.Namespace, + ) + r.recorder.Event(source, v1.EventTypeWarning, EventReasonClusterCaCertNotFound, caCertMissingError(sourceKey)) + // CA secrets are watched, we don't need to requeue. + // If CA is created later it will trigger a new reconciliation. + return nil + } + + // Reconcile the copy to the target cluster + if err := reconcileRemoteCA(ctx, r.Client, target, sourceKey, sourceCA.Data[certificates.CAFileName]); err != nil { + return err + } + + return nil +} + +// deleteCertificateAuthorities deletes all the Secrets needed to establish a trust relationship between two clusters. +// This means that the CA of the local cluster is deleted from the remote one and reciprocally the CA from the +// remote cluster must be deleted from the local one. +func deleteCertificateAuthorities( + ctx context.Context, + r *ReconcileRemoteCa, + local, remote types.NamespacedName, +) error { + span, _ := apm.StartSpan(ctx, "delete_certificate_authorities", tracing.SpanTypeApp) + defer span.End() + + // Delete local secret + if err := r.Client.Delete(&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: local.Namespace, + Name: remoteCASecretName(local.Name, remote), + }, + }); err != nil && !errors.IsNotFound(err) { + return err + } + // Delete remote secret + if err := r.Client.Delete(&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: remote.Namespace, + Name: remoteCASecretName(remote.Name, local), + }, + }); err != nil && !errors.IsNotFound(err) { + return err + } + + // Remove watches + r.watches.Secrets.RemoveHandlerForKey(watchName(local, remote)) + r.watches.Secrets.RemoveHandlerForKey(watchName(remote, local)) + + return nil +} + +// reconcileRemoteCA does the reconciliation of the Secret that contains certificate authority from a source cluster. +func reconcileRemoteCA( + ctx context.Context, + c k8s.Client, + target *esv1.Elasticsearch, + source types.NamespacedName, + sourceCA []byte, +) error { + span, _ := apm.StartSpan(ctx, "reconcile_remote_ca", tracing.SpanTypeApp) + defer span.End() + + // Define the expected source CA object, it lives in the target namespace with the content of the source cluster CA + expected := corev1.Secret{ + ObjectMeta: remoteCAObjectMeta(remoteCASecretName(target.Name, source), target, source), + Data: map[string][]byte{ + certificates.CAFileName: sourceCA, + }, + } + + var reconciled corev1.Secret + return reconciler.ReconcileResource(reconciler.Params{ + Client: c, + Scheme: scheme.Scheme, + Owner: target, + Expected: &expected, + Reconciled: &reconciled, + NeedsUpdate: func() bool { + return !maps.IsSubset(expected.Labels, reconciled.Labels) || !reflect.DeepEqual(expected.Data, reconciled.Data) + }, + UpdateReconciled: func() { + reconciled.Labels = maps.Merge(reconciled.Labels, expected.Labels) + reconciled.Data = expected.Data + }, + }) +} diff --git a/pkg/controller/elasticsearch/remotecluster/remoteca/watches.go b/pkg/controller/elasticsearch/remotecluster/remoteca/watches.go new file mode 100644 index 0000000000..08503c46cb --- /dev/null +++ b/pkg/controller/elasticsearch/remotecluster/remoteca/watches.go @@ -0,0 +1,110 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remoteca + +import ( + "fmt" + + "github.com/elastic/cloud-on-k8s/pkg/controller/common" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/certificates/transport" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// AddWatches set watches on objects needed to manage the association between a local and a remote cluster. +func AddWatches(c controller.Controller, r *ReconcileRemoteCa) error { + // Watch for changes to RemoteCluster + if err := c.Watch(&source.Kind{Type: &esv1.Elasticsearch{}}, &handler.EnqueueRequestForObject{}); err != nil { + return err + } + + // Watch Secrets that contain remote certificate authorities managed by this controller + if err := c.Watch(&source.Kind{Type: &v1.Secret{}}, &handler.EnqueueRequestsFromMapFunc{ + ToRequests: newToRequestsFuncFromSecret(), + }); err != nil { + return err + } + + // Dynamically watches the certificate authorities involved in a cluster relationship + if err := c.Watch(&source.Kind{Type: &v1.Secret{}}, r.watches.Secrets); err != nil { + return err + } + if err := r.watches.Secrets.AddHandlers( + &watches.OwnerWatch{ + EnqueueRequestForOwner: handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &esv1.Elasticsearch{}, + }, + }, + ); err != nil { + return err + } + + return nil +} + +// newToRequestsFuncFromSecret creates a watch handler function that creates reconcile requests based on the +// labels set on a Secret which contains the remote CA. +func newToRequestsFuncFromSecret() handler.ToRequestsFunc { + return func(obj handler.MapObject) []reconcile.Request { + labels := obj.Meta.GetLabels() + if secretType, ok := labels[common.TypeLabelName]; !ok || secretType != TypeLabelValue { + return nil + } + clusterAssociationName, ok := labels[RemoteClusterNameLabelName] + if !ok { + return nil + } + clusterAssociationNamespace, ok := labels[RemoteClusterNamespaceLabelName] + if !ok { + return nil + } + return []reconcile.Request{ + {NamespacedName: types.NamespacedName{ + Namespace: clusterAssociationNamespace, + Name: clusterAssociationName}, + }, + } + } +} + +func watchName(local types.NamespacedName, remote types.NamespacedName) string { + return fmt.Sprintf( + "%s-%s-%s-%s", + local.Namespace, + local.Name, + remote.Namespace, + remote.Name, + ) +} + +// addCertificatesAuthorityWatches sets some watches on all secrets containing the certificate of a CA involved in a association. +// The local CA is watched to update the trusted certificates in the remote clusters. +// The remote CAs are watched to update the trusted certificates of the local cluster. +func addCertificatesAuthorityWatches( + reconcileClusterAssociation *ReconcileRemoteCa, + local, remote types.NamespacedName) error { + // Watch the CA secret of Elasticsearch clusters which are involved in a association. + err := reconcileClusterAssociation.watches.Secrets.AddHandler(watches.NamedWatch{ + Name: watchName(local, remote), + Watched: []types.NamespacedName{transport.PublicCertsSecretRef(remote)}, + Watcher: types.NamespacedName{ + Namespace: local.Namespace, + Name: local.Name, + }, + }) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/controller/elasticsearch/services/services.go b/pkg/controller/elasticsearch/services/services.go index 7051a77f99..63970501d4 100644 --- a/pkg/controller/elasticsearch/services/services.go +++ b/pkg/controller/elasticsearch/services/services.go @@ -9,27 +9,66 @@ import ( "math/rand" "strconv" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/pkg/controller/common/defaults" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/network" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" "github.com/elastic/cloud-on-k8s/pkg/utils/stringsutil" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) const ( globalServiceSuffix = ".svc" ) +// TransportServiceName returns the name for the transport service associated to this cluster +func TransportServiceName(esName string) string { + return esv1.TransportService(esName) +} + +// NewTransportService returns the transport service associated with the given cluster. +// It is used by Elasticsearch nodes to talk to remote cluster nodes. +func NewTransportService(es esv1.Elasticsearch) *corev1.Service { + nsn := k8s.ExtractNamespacedName(&es) + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: es.Namespace, + Name: TransportServiceName(es.Name), + Labels: label.NewLabels(nsn), + }, + Spec: corev1.ServiceSpec{ + Selector: label.NewLabels(nsn), + Ports: []corev1.ServicePort{ + { + Protocol: corev1.ProtocolTCP, + Port: network.TransportPort, + }, + }, + // We set ClusterIP to None in order to let the ES nodes discover all other node IPs at once. + ClusterIP: "None", + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, + // Nodes need to discover themselves before the pod is considered ready, + // otherwise minimum master nodes would never be reached + PublishNotReadyAddresses: true, + }, + } +} + // ExternalServiceName returns the name for the external service // associated to this cluster func ExternalServiceName(esName string) string { return esv1.HTTPService(esName) } +// ExternalTransportServiceHost returns the hostname and the port used to reach Elasticsearch's transport endpoint. +func ExternalTransportServiceHost(es types.NamespacedName) string { + return stringsutil.Concat(TransportServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.TransportPort)) +} + // ExternalServiceURL returns the URL used to reach Elasticsearch's external endpoint func ExternalServiceURL(es esv1.Elasticsearch) string { return stringsutil.Concat(es.Spec.HTTP.Protocol(), "://", ExternalServiceName(es.Name), ".", es.Namespace, globalServiceSuffix, ":", strconv.Itoa(network.HTTPPort)) diff --git a/pkg/controller/elasticsearch/settings/merged_config.go b/pkg/controller/elasticsearch/settings/merged_config.go index 520006dc3e..bc1b26204f 100644 --- a/pkg/controller/elasticsearch/settings/merged_config.go +++ b/pkg/controller/elasticsearch/settings/merged_config.go @@ -94,6 +94,7 @@ func xpackConfig(ver version.Version, httpCfg commonv1.HTTPConfig, certResources ), esv1.XPackSecurityTransportSslCertificateAuthorities: []string{ path.Join(volume.TransportCertificatesSecretVolumeMountPath, certificates.CAFileName), + path.Join(volume.RemoteCertificateAuthoritiesSecretVolumeMountPath, certificates.CAFileName), }, } diff --git a/pkg/controller/elasticsearch/volume/names.go b/pkg/controller/elasticsearch/volume/names.go index 1e193f41f7..f36981045f 100644 --- a/pkg/controller/elasticsearch/volume/names.go +++ b/pkg/controller/elasticsearch/volume/names.go @@ -17,6 +17,9 @@ const ( TransportCertificatesSecretVolumeName = "elastic-internal-transport-certificates" TransportCertificatesSecretVolumeMountPath = "/usr/share/elasticsearch/config/transport-certs" + RemoteCertificateAuthoritiesSecretVolumeName = "elastic-internal-remote-certificate-authorities" + RemoteCertificateAuthoritiesSecretVolumeMountPath = "/usr/share/elasticsearch/config/transport-remote-certs/" + HTTPCertificatesSecretVolumeName = "elastic-internal-http-certificates" HTTPCertificatesSecretVolumeMountPath = "/usr/share/elasticsearch/config/http-certs" diff --git a/pkg/controller/remoteca/remoteca_controller.go b/pkg/controller/remoteca/remoteca_controller.go new file mode 100644 index 0000000000..26038c0f3e --- /dev/null +++ b/pkg/controller/remoteca/remoteca_controller.go @@ -0,0 +1,38 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package remoteca + +import ( + "github.com/elastic/cloud-on-k8s/pkg/controller/common/operator" + "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/remotecluster/remoteca" + "github.com/elastic/cloud-on-k8s/pkg/utils/rbac" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +var ( + name = "remoteca-controller" +) + +// Add creates a new RemoteCa Controller and adds it to the manager with default RBAC. +func Add(mgr manager.Manager, accessReviewer rbac.AccessReviewer, params operator.Parameters) error { + r := remoteca.NewReconciler(mgr, accessReviewer, params) + c, err := add(mgr, r) + if err != nil { + return err + } + return remoteca.AddWatches(c, r) +} + +// add adds a new Controller to mgr with r as the reconcile.Reconciler +func add(mgr manager.Manager, r reconcile.Reconciler) (controller.Controller, error) { + // Create a new controller + c, err := controller.New(name, mgr, controller.Options{Reconciler: r}) + if err != nil { + return c, err + } + return c, nil +}