Skip to content

Commit

Permalink
Add -es-remote-cluster-seed Service automatically in remote cluster c…
Browse files Browse the repository at this point in the history
…ontroller.
  • Loading branch information
nkvoll committed Jun 5, 2019
1 parent 3ca9ede commit a3934d6
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 11 deletions.
78 changes: 73 additions & 5 deletions operators/pkg/controller/remotecluster/driver_incluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,29 @@ import (

commonv1alpha1 "github.com/elastic/cloud-on-k8s/operators/pkg/apis/common/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/finalizer"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/services"
esname "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/name"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
)

const (
LocalTrustRelationshipPrefix = "rc"
RemoteTrustRelationshipPrefix = "rcr"

// remoteClusterSeedServiceSuffix is the suffix used for the remote cluster seed service
remoteClusterSeedServiceSuffix = "remote-cluster-seed"
)

func doReconcile(
r *ReconcileRemoteCluster,
remoteCluster v1alpha1.RemoteCluster,
) (v1alpha1.RemoteClusterStatus, error) {

// Get the previous remote associated cluster, if the remote namespace has been updated by the user we must
// delete the remote relationship from the old namespace and recreate it in the new namespace.
if len(remoteCluster.Status.K8SLocalStatus.RemoteSelector.Namespace) > 0 &&
Expand Down Expand Up @@ -71,8 +78,12 @@ func doReconcile(
remoteCluster,
localClusterSelector,
remoteCluster.Spec.Remote.K8sLocalRef,
r.watches)
err := h.Handle(&remoteCluster, watchFinalizer)
r.watches,
)

seedServiceFinalizer := seedServiceFinalizer(r.Client, remoteCluster)

err := h.Handle(&remoteCluster, watchFinalizer, seedServiceFinalizer)
if err != nil {
return updateStatusWithPhase(&remoteCluster, v1alpha1.RemoteClusterFailed), err
}
Expand Down Expand Up @@ -140,12 +151,18 @@ func doReconcile(
return updateStatusWithPhase(&remoteCluster, v1alpha1.RemoteClusterFailed), err
}

// Create remote service for seeding
svc, err := reconcileRemoteClusterSeedService(r.Client, r.scheme, remoteCluster)
if err != nil {
return updateStatusWithPhase(&remoteCluster, v1alpha1.RemoteClusterFailed), err
}

// Build status
status := v1alpha1.RemoteClusterStatus{
Phase: v1alpha1.RemoteClusterPropagated,
ClusterName: localClusterSelector.Name,
LocalTrustRelationship: localRelationshipName,
SeedHosts: []string{services.ExternalDiscoveryServiceHostname(remote.Selector.NamespacedName())},
SeedHosts: seedHostsFromService(svc),
K8SLocalStatus: v1alpha1.LocalRefStatus{
RemoteSelector: remote.Selector,
RemoteTrustRelationship: remoteRelationshipName,
Expand All @@ -154,6 +171,47 @@ func doReconcile(
return status, nil
}

// reconcileRemoteClusterSeedService reconciles a Service that we can use as the remote cluster seed hosts.
//
// This service is shared between all remote clusters configured this way, and is deleted whenever any of them are
// deleted in a finalizer. There's a watch that re-creates it if it's still in use.
func reconcileRemoteClusterSeedService(
c k8s.Client,
scheme *runtime.Scheme,
remoteCluster v1alpha1.RemoteCluster,
) (*v1.Service, error) {
ns := remoteCluster.Spec.Remote.K8sLocalRef.Namespace
// if the remote has no namespace, assume it's in the same namespace as the RemoteCluster resource
if ns == "" {
ns = remoteCluster.Namespace
}
service := v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: remoteClusterSeedServiceName(remoteCluster.Spec.Remote.K8sLocalRef.Name),
Labels: map[string]string{
label.ClusterNameLabelName: remoteCluster.Spec.Remote.K8sLocalRef.Name,
},
},
Spec: v1.ServiceSpec{
PublishNotReadyAddresses: true,
Ports: []v1.ServicePort{
{Protocol: v1.ProtocolTCP, Port: 9300, TargetPort: intstr.FromInt(9300)},
},
Selector: map[string]string{
common.TypeLabelName: label.Type,
label.ClusterNameLabelName: remoteCluster.Spec.Remote.K8sLocalRef.Name,
},
},
}

if _, err := common.ReconcileService(c, scheme, &service, nil); err != nil {
return nil, err
}

return &service, nil
}

func caCertMissingError(location string, selector commonv1alpha1.ObjectSelector) string {
return fmt.Sprintf(
CaCertMissingError,
Expand All @@ -171,3 +229,13 @@ func updateStatusWithPhase(
status.Phase = phase
return *status
}

// remoteClusterSeedServiceName returns the name of the remote cluster seed service.
func remoteClusterSeedServiceName(esName string) string {
return esname.ESNamer.Suffix(esName, remoteClusterSeedServiceSuffix)
}

// seedHostsFromService returns the seed hosts to use for a given service.
func seedHostsFromService(svc *v1.Service) []string {
return []string{fmt.Sprintf("%s.%s.svc:9300", svc.Name, svc.Namespace)}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,18 @@ func Test_apply(t *testing.T) {
},
RemoteTrustRelationship: "rcr-remotecluster-sample-1-2-default",
},
SeedHosts: []string{"trust-two-es-es-discovery.default.svc.cluster.local:9300"},
SeedHosts: []string{"trust-two-es-es-remote-cluster-seed.default.svc:9300"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.NoError(t, tt.args.rca.watches.InjectScheme(sc))
if tt.args.rca.scheme == nil {
tt.args.rca.scheme = sc
}

assert.NoError(t, tt.args.rca.watches.InjectScheme(tt.args.rca.scheme))

got, err := doReconcile(tt.args.rca, tt.args.remoteCluster)
if (err != nil) != tt.wantErr {
t.Errorf("apply() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
27 changes: 27 additions & 0 deletions operators/pkg/controller/remotecluster/finalizers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/finalizer"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// watchFinalizer ensure that we remove watches for Secrets we are no longer interested in
Expand All @@ -26,3 +30,26 @@ func watchFinalizer(
},
}
}

// seedServiceFinalizer ensures that we remove the seed service if it's no longer required
func seedServiceFinalizer(c k8s.Client, remoteCluster v1alpha1.RemoteCluster) finalizer.Finalizer {
return finalizer.Finalizer{
Name: RemoteClusterSeedServiceFinalizer,
Execute: func() error {
svc := v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: remoteCluster.Spec.Remote.K8sLocalRef.Namespace,
Name: remoteClusterSeedServiceName(remoteCluster.Spec.Remote.K8sLocalRef.Name),
},
}
if svc.Namespace == "" {
svc.Namespace = remoteCluster.Namespace
}

if err := c.Delete(&svc); err != nil && errors.IsNotFound(err) {
return err
}
return nil
},
}
}
114 changes: 114 additions & 0 deletions operators/pkg/controller/remotecluster/finalizers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package remotecluster

import (
"testing"

"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
)

import (
commonv1alpha1 "github.com/elastic/cloud-on-k8s/operators/pkg/apis/common/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/finalizer"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func Test_seedServiceFinalizer(t *testing.T) {
esName := "foo"
esNamespace := "bar"
rc := &v1alpha1.RemoteCluster{
Spec: v1alpha1.RemoteClusterSpec{
Remote: v1alpha1.RemoteClusterRef{
K8sLocalRef: commonv1alpha1.ObjectSelector{
Name: esName,
Namespace: esNamespace,
},
},
},
}

// remote cluster whose remote cluster has no namespace defined
rcNoRemoteClusterNamespace := rc.DeepCopy()
rcNoRemoteClusterNamespace.Namespace = esNamespace
rcNoRemoteClusterNamespace.Spec.Remote.K8sLocalRef.Namespace = ""

rcSvc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: remoteClusterSeedServiceName(esName),
Namespace: esNamespace,
},
}

otherSvc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: remoteClusterSeedServiceName(esName) + "-other",
Namespace: esNamespace,
},
}

type args struct {
c k8s.Client
remoteCluster v1alpha1.RemoteCluster
}
tests := []struct {
name string
args args
want func(t *testing.T, c k8s.Client, f finalizer.Finalizer)
}{
{
name: "should delete remote the related remote cluster seed service",
args: args{
c: k8s.WrapClient(fake.NewFakeClient(rc, rcSvc, otherSvc)),
remoteCluster: *rc,
},
want: func(t *testing.T, c k8s.Client, f finalizer.Finalizer) {
require.NoError(t, f.Execute())

// the service should be deleted
var svc v1.Service
err := c.Get(k8s.ExtractNamespacedName(rcSvc), &svc)
assert.Error(t, err)
assert.True(t, errors.IsNotFound(err))

// the other service should not be deleted
err = c.Get(k8s.ExtractNamespacedName(otherSvc), &svc)
assert.NoError(t, err)
},
},
{
name: "should default the namespace of the remote cluster to the namespace of the remote cluster resource",
args: args{
c: k8s.WrapClient(fake.NewFakeClient(rc, rcSvc, otherSvc)),
remoteCluster: *rc,
},
want: func(t *testing.T, c k8s.Client, f finalizer.Finalizer) {
require.NoError(t, f.Execute())

// the service should be deleted
var svc v1.Service
err := c.Get(k8s.ExtractNamespacedName(rcSvc), &svc)
assert.Error(t, err)
assert.True(t, errors.IsNotFound(err))

// the other service should not be deleted
err = c.Get(k8s.ExtractNamespacedName(otherSvc), &svc)
assert.NoError(t, err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := seedServiceFinalizer(tt.args.c, tt.args.remoteCluster)
tt.want(t, tt.args.c, got)
})
}
}
2 changes: 2 additions & 0 deletions operators/pkg/controller/remotecluster/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
const (
// RemoteClusterDynamicWatchesFinalizer designates a finalizer to clean up unused watches.
RemoteClusterDynamicWatchesFinalizer = "dynamic-watches.remotecluster.k8s.elastic.co"
// RemoteClusterSeedServiceFinalizer designates a finalizer to clean up a seed Service.
RemoteClusterSeedServiceFinalizer = "seed-service.remotecluster.k8s.elastic.co"
// RemoteClusterNamespaceLabelName used to represent the namespace of the RemoteCluster in a TrustRelationship.
RemoteClusterNamespaceLabelName = "remotecluster.k8s.elastic.co/namespace"
// RemoteClusterNameLabelName used to represent the name of the RemoteCluster in a TrustRelationship.
Expand Down
Loading

0 comments on commit a3934d6

Please sign in to comment.