Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Agent and Elastic stack in different namespaces. #7382

Merged
merged 15 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions pkg/apis/common/v1/association.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,13 @@ func FormatNameWithID(template string, id string) string {

// AssociationConf holds the association configuration of a referenced resource in an association.
type AssociationConf struct {
AuthSecretName string `json:"authSecretName"`
AuthSecretKey string `json:"authSecretKey"`
IsServiceAccount bool `json:"isServiceAccount"`
CACertProvided bool `json:"caCertProvided"`
CASecretName string `json:"caSecretName"`
URL string `json:"url"`
AuthSecretName string `json:"authSecretName"`
AuthSecretKey string `json:"authSecretKey"`
IsServiceAccount bool `json:"isServiceAccount"`
CACertProvided bool `json:"caCertProvided"`
CASecretName string `json:"caSecretName"`
AdditionalSecretsHash string `json:"additionalSecretsHash"`
naemono marked this conversation as resolved.
Show resolved Hide resolved
naemono marked this conversation as resolved.
Show resolved Hide resolved
URL string `json:"url"`
// Version of the referenced resource. If a version upgrade is in progress,
// matches the lowest running version. May be empty if unknown.
Version string `json:"version"`
Expand Down
10 changes: 0 additions & 10 deletions pkg/controller/agent/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,16 +333,6 @@ func applyRelatedEsAssoc(agent agentv1alpha1.Agent, esAssociation commonv1.Assoc
return builder, nil
}

esRef := esAssociation.AssociationRef()
if !esRef.IsExternal() && !agent.Spec.FleetServerEnabled && agent.Namespace != esRef.Namespace {
// check agent and ES share the same namespace
return nil, fmt.Errorf(
"agent namespace %s is different than referenced Elasticsearch namespace %s, this is not supported yet",
agent.Namespace,
esAssociation.AssociationRef().Namespace,
)
}

// no ES CA to configure, skip
assocConf, err := esAssociation.AssociationConf()
if err != nil {
Expand Down
62 changes: 45 additions & 17 deletions pkg/controller/agent/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"path"
"testing"

Expand Down Expand Up @@ -845,6 +846,9 @@ func Test_applyRelatedEsAssoc(t *testing.T) {
},
},
}).GetAssociations()[0]
assocToOtherNs.SetAssociationConf(&commonv1.AssociationConf{
CASecretName: "elasticsearch-es-http-certs-public",
})

expectedCAVolume := []corev1.Volume{
{
Expand All @@ -857,26 +861,30 @@ func Test_applyRelatedEsAssoc(t *testing.T) {
},
},
}
expectedCAVolumeMount := []corev1.VolumeMount{
{
Name: "elasticsearch-certs",
ReadOnly: true,
MountPath: "/mnt/elastic-internal/elasticsearch-association/agent-ns/elasticsearch/certs",
},
expectedCAVolumeMountFunc := func(ns string) []corev1.VolumeMount {
return []corev1.VolumeMount{
{
Name: "elasticsearch-certs",
ReadOnly: true,
MountPath: fmt.Sprintf("/mnt/elastic-internal/elasticsearch-association/%s/elasticsearch/certs", ns),
},
}
}
expectedCmd := []string{"/usr/bin/env", "bash", "-c", `#!/usr/bin/env bash
expectedCmdFunc := func(ns string) []string {
return []string{"/usr/bin/env", "bash", "-c", fmt.Sprintf(`#!/usr/bin/env bash
set -e
if [[ -f /mnt/elastic-internal/elasticsearch-association/agent-ns/elasticsearch/certs/ca.crt ]]; then
if [[ -f /mnt/elastic-internal/elasticsearch-association/%[1]s/elasticsearch/certs/ca.crt ]]; then
if [[ -f /usr/bin/update-ca-trust ]]; then
cp /mnt/elastic-internal/elasticsearch-association/agent-ns/elasticsearch/certs/ca.crt /etc/pki/ca-trust/source/anchors/
cp /mnt/elastic-internal/elasticsearch-association/%[1]s/elasticsearch/certs/ca.crt /etc/pki/ca-trust/source/anchors/
/usr/bin/update-ca-trust
elif [[ -f /usr/sbin/update-ca-certificates ]]; then
cp /mnt/elastic-internal/elasticsearch-association/agent-ns/elasticsearch/certs/ca.crt /usr/local/share/ca-certificates/
cp /mnt/elastic-internal/elasticsearch-association/%[1]s/elasticsearch/certs/ca.crt /usr/local/share/ca-certificates/
/usr/sbin/update-ca-certificates
fi
fi
/usr/bin/tini -- /usr/local/bin/docker-entrypoint -e
`}
`, ns)}
}
for _, tt := range []struct {
name string
agent agentv1alpha1.Agent
Expand Down Expand Up @@ -919,8 +927,8 @@ fi
wantErr: false,
wantPodSpec: generatePodSpec(func(ps corev1.PodSpec) corev1.PodSpec {
ps.Volumes = expectedCAVolume
ps.Containers[0].VolumeMounts = expectedCAVolumeMount
ps.Containers[0].Command = expectedCmd
ps.Containers[0].VolumeMounts = expectedCAVolumeMountFunc(agentNs)
ps.Containers[0].Command = expectedCmdFunc(agentNs)
return ps
}),
},
Expand All @@ -940,25 +948,45 @@ fi
wantErr: false,
wantPodSpec: generatePodSpec(func(ps corev1.PodSpec) corev1.PodSpec {
ps.Volumes = expectedCAVolume
ps.Containers[0].VolumeMounts = expectedCAVolumeMount
ps.Containers[0].VolumeMounts = expectedCAVolumeMountFunc(agentNs)
ps.Containers[0].Command = nil
return ps
}),
},
{
name: "fleet server disabled, different namespace",
name: "fleet server disabled, different namespace still has volumes and volumeMount configured",
agent: agentv1alpha1.Agent{
ObjectMeta: metav1.ObjectMeta{
Name: "agent",
Namespace: agentNs,
},
Spec: agentv1alpha1.AgentSpec{
FleetServerEnabled: false,
Version: "7.16.2",
FleetServerEnabled: false,
DaemonSet: &agentv1alpha1.DaemonSetSpec{
PodTemplate: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "agent",
SecurityContext: &corev1.SecurityContext{
RunAsUser: pointer.Int64(0),
},
},
},
},
},
},
},
},
assoc: assocToOtherNs,
wantErr: true,
wantErr: false,
wantPodSpec: generatePodSpec(func(ps corev1.PodSpec) corev1.PodSpec {
ps.Volumes = expectedCAVolume
ps.Containers[0].VolumeMounts = expectedCAVolumeMountFunc("elasticsearch-ns")
ps.Containers[0].Command = expectedCmdFunc("elasticsearch-ns")
return ps
}),
},
} {
t.Run(tt.name, func(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/association/ca.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (r *Reconciler) ReconcileCASecret(ctx context.Context, association commonv1
}

labels := r.AssociationResourceLabels(k8s.ExtractNamespacedName(association), association.AssociationRef().NamespacedName())

// Certificate data should be copied over a secret in the association namespace
expectedSecret := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Expand Down
50 changes: 50 additions & 0 deletions pkg/controller/association/controller/agent_fleetserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/association"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/operator"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/rbac"
)

Expand All @@ -30,6 +31,7 @@ func AddAgentFleetServer(mgr manager.Manager, accessReviewer rbac.AccessReviewer
AssociationName: "agent-fleetserver",
AssociatedShortName: "agent",
AssociationType: commonv1.FleetServerAssociationType,
AdditionalSecrets: additionalSecrets,
Labels: func(associated types.NamespacedName) map[string]string {
return map[string]string{
AgentAssociationLabelName: associated.Name,
Expand All @@ -45,6 +47,54 @@ func AddAgentFleetServer(mgr manager.Manager, accessReviewer rbac.AccessReviewer
})
}

func additionalSecrets(ctx context.Context, c k8s.Client, assoc commonv1.Association) ([]types.NamespacedName, error) {
log := ulog.FromContext(ctx)
associated := assoc.Associated()
var agent agentv1alpha1.Agent
nsn := types.NamespacedName{Namespace: associated.GetNamespace(), Name: associated.GetName()}
if err := c.Get(ctx, nsn, &agent); err != nil {
return nil, err
}
fleetServerRef := assoc.AssociationRef()
if !fleetServerRef.IsDefined() {
return nil, nil
}
fleetServer := agentv1alpha1.Agent{}
if err := c.Get(ctx, fleetServerRef.NamespacedName(), &fleetServer); err != nil {
return nil, err
}

// If the Fleet Server Agent is not associated with an Elasticsearch cluster
// (potentially because of a manual setup) we should do nothing.
if len(fleetServer.Spec.ElasticsearchRefs) == 0 {
return nil, nil
}
esAssociation, err := association.SingleAssociationOfType(fleetServer.GetAssociations(), commonv1.ElasticsearchAssociationType)
if err != nil {
return nil, err
}

// if both agent and ES are same namespace no copying needed
if agent.GetNamespace() == esAssociation.GetNamespace() {
log.V(1).Info("no additional secrets because same namespace")
return nil, nil
}

conf, err := esAssociation.AssociationConf()
if err != nil {
log.V(1).Info("no additional secrets because no assoc conf")
return nil, err
}
if conf == nil || !conf.CACertProvided {
log.V(1).Info("no additional secrets because conf nil or no CA provided")
return nil, nil
}
return []types.NamespacedName{{
Namespace: fleetServer.Namespace,
Name: conf.CASecretName,
}}, nil
}

func getFleetServerExternalURL(c k8s.Client, assoc commonv1.Association) (string, error) {
fleetServerRef := assoc.AssociationRef()
if !fleetServerRef.IsDefined() {
Expand Down
70 changes: 55 additions & 15 deletions pkg/controller/association/dynamic_watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package association

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -35,6 +36,12 @@ func serviceWatchName(associated types.NamespacedName) string {
return fmt.Sprintf("%s-%s-svc-watch", associated.Namespace, associated.Name)
}

// additionalSecretWatchName returns the name of the watch setup on any additional secrets that
// are copied during the association reconciliation.
func additionalSecretWatchName(associated types.NamespacedName) string {
return fmt.Sprintf("%s-%s-secrets-watch", associated.Namespace, associated.Name)
}

// reconcileWatches sets up dynamic watches for:
// * the referenced resource(s) managed or not by ECK (e.g. Elasticsearch for Kibana -> Elasticsearch associations)
// * the CA secret of the referenced resource in the referenced resource namespace
Expand All @@ -43,7 +50,7 @@ func serviceWatchName(associated types.NamespacedName) string {
// * if there's an ES user to create, watch the user Secret in ES namespace
// All watches for all given associations are set under the same watch name and replaced with each reconciliation.
// The given associations are expected to be of the same type (e.g. Kibana -> Elasticsearch, not Kibana -> Enterprise Search).
func (r *Reconciler) reconcileWatches(associated types.NamespacedName, associations []commonv1.Association) error {
func (r *Reconciler) reconcileWatches(ctx context.Context, associated types.NamespacedName, associations []commonv1.Association) error {
managedElasticRef := filterManagedElasticRef(associations)
unmanagedElasticRef := filterUnmanagedElasticRef(associations)

Expand Down Expand Up @@ -93,41 +100,72 @@ func (r *Reconciler) reconcileWatches(associated types.NamespacedName, associati
}
}

if r.AdditionalSecrets != nil {
if err := ReconcileGenericWatch(associated, managedElasticRef, r.watches.Secrets, additionalSecretWatchName(associated), func() ([]types.NamespacedName, error) {
var toWatch []types.NamespacedName
for _, association := range associations {
secs, err := r.AdditionalSecrets(ctx, r.Client, association)
if err != nil {
return nil, err
}
toWatch = append(toWatch, secs...)
}
return toWatch, nil
}); err != nil {
return err
}
}

return nil
}

// ReconcileWatch sets or removes `watchName` watch in `dynamicRequest` based on `associated` and `associations` and
// `watchedFunc`. No watch is added if watchedFunc(association) refers to an empty namespaced name.
func ReconcileWatch(
func ReconcileGenericWatch(
naemono marked this conversation as resolved.
Show resolved Hide resolved
associated types.NamespacedName,
associations []commonv1.Association,
dynamicRequest *watches.DynamicEnqueueRequest,
watchName string,
watchedFunc func(association commonv1.Association) types.NamespacedName,
watchedFunc func() ([]types.NamespacedName, error),
) error {
if len(associations) == 0 {
// clean up if there are none
RemoveWatch(dynamicRequest, watchName)
return nil
}

emptyNamespacedName := types.NamespacedName{}

toWatch := make([]types.NamespacedName, 0, len(associations))
for _, association := range associations {
watchedNamespacedName := watchedFunc(association)
if watchedNamespacedName != emptyNamespacedName {
toWatch = append(toWatch, watchedFunc(association))
}
watched, err := watchedFunc()
if err != nil {
return err
}

return dynamicRequest.AddHandler(watches.NamedWatch{
Name: watchName,
Watched: toWatch,
Watched: watched,
Watcher: associated,
})
}

// ReconcileWatch sets or removes `watchName` watch in `dynamicRequest` based on `associated` and `associations` and
// `watchedFunc`. No watch is added if watchedFunc(association) refers to an empty namespaced name.
func ReconcileWatch(
associated types.NamespacedName,
associations []commonv1.Association,
dynamicRequest *watches.DynamicEnqueueRequest,
watchName string,
watchedFunc func(association commonv1.Association) types.NamespacedName,
) error {
return ReconcileGenericWatch(associated, associations, dynamicRequest, watchName, func() ([]types.NamespacedName, error) {
emptyNamespacedName := types.NamespacedName{}

toWatch := make([]types.NamespacedName, 0, len(associations))
for _, association := range associations {
watchedNamespacedName := watchedFunc(association)
if watchedNamespacedName != emptyNamespacedName {
toWatch = append(toWatch, watchedFunc(association))
}
}
return toWatch, nil
})
}

// RemoveWatch removes `watchName` watch from `dynamicRequest`.
func RemoveWatch(dynamicRequest *watches.DynamicEnqueueRequest, watchName string) {
dynamicRequest.RemoveHandlerForKey(watchName)
Expand All @@ -142,4 +180,6 @@ func (r *Reconciler) removeWatches(associated types.NamespacedName) {
RemoveWatch(r.watches.Services, serviceWatchName(associated))
// - ES user secret
RemoveWatch(r.watches.Secrets, esUserWatchName(associated))
// - Additional secrets (typically in the case of Agent -> Fleet Server -> Elasticsearch)
RemoveWatch(r.watches.Secrets, additionalSecretWatchName(associated))
}
Loading