Skip to content

Commit

Permalink
Support S3 storage
Browse files Browse the repository at this point in the history
Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com>
  • Loading branch information
andreasgerstmayr committed Jan 26, 2024
1 parent 43b5cac commit 96f7538
Show file tree
Hide file tree
Showing 34 changed files with 950 additions and 94 deletions.
41 changes: 21 additions & 20 deletions apis/tempo/v1alpha1/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,31 @@ import (
)

// ValidateStorageSecret validates the object storage secret required for tempo.
func ValidateStorageSecret(tempo TempoStack, storageSecret corev1.Secret) field.ErrorList {
path := field.NewPath("spec").Child("storage").Child("secret")

func ValidateStorageSecret(secretType ObjectStorageSecretType, path *field.Path, storageSecret corev1.Secret) field.ErrorList {
if storageSecret.Data == nil {
return field.ErrorList{field.Invalid(path, tempo.Spec.Storage.Secret, "storage secret is empty")}
return field.ErrorList{field.Invalid(path, storageSecret.Name, "storage secret is empty")}
}

var allErrs field.ErrorList

switch tempo.Spec.Storage.Secret.Type {
switch secretType {
case ObjectStorageSecretAzure:
allErrs = append(allErrs, validateAzureSecret(tempo, path, storageSecret)...)
allErrs = append(allErrs, ValidateAzureSecret(storageSecret.Name, path, storageSecret)...)
case ObjectStorageSecretGCS:
allErrs = append(allErrs, validateGCSSecret(tempo, path, storageSecret)...)
allErrs = append(allErrs, ValidateGCSSecret(storageSecret.Name, path, storageSecret)...)
case ObjectStorageSecretS3:
allErrs = append(allErrs, validateS3Secret(tempo, path, storageSecret)...)
allErrs = append(allErrs, ValidateS3Secret(storageSecret.Name, path, storageSecret)...)
case "":
allErrs = append(allErrs, field.Invalid(
path,
tempo.Spec.Storage.Secret,
secretType,
"storage secret must specify the type",
))
default:
allErrs = append(allErrs, field.Invalid(
path,
tempo.Spec.Storage.Secret,
fmt.Sprintf("%s is not an allowed storage secret type", tempo.Spec.Storage.Secret.Type),
secretType,
fmt.Sprintf("%s is not an allowed storage secret type", secretType),
))
}

Expand All @@ -52,44 +50,47 @@ func ValidateStorageCAConfigMap(caConfigMap corev1.ConfigMap) field.ErrorList {
return nil
}

func ensureNotEmpty(tempo TempoStack, path *field.Path, storageSecret corev1.Secret, fields []string) field.ErrorList {
func ensureNotEmpty(secretName string, path *field.Path, storageSecret corev1.Secret, fields []string) field.ErrorList {
var allErrs field.ErrorList
for _, key := range fields {
if storageSecret.Data[key] == nil || len(storageSecret.Data[key]) == 0 {
allErrs = append(allErrs, field.Invalid(
path,
tempo.Spec.Storage.Secret,
secretName,
fmt.Sprintf("storage secret must contain \"%s\" field", key),
))
}
}
return allErrs
}

func validateAzureSecret(tempo TempoStack, path *field.Path, storageSecret corev1.Secret) field.ErrorList {
// ValidateAzureSecret validates an Azure storage secret.
func ValidateAzureSecret(secretName string, path *field.Path, storageSecret corev1.Secret) field.ErrorList {
var allErrs field.ErrorList
secretFields := []string{
"container",
"account_name",
"account_key",
}

allErrs = append(allErrs, ensureNotEmpty(tempo, path, storageSecret, secretFields)...)
allErrs = append(allErrs, ensureNotEmpty(secretName, path, storageSecret, secretFields)...)
return allErrs
}

func validateGCSSecret(tempo TempoStack, path *field.Path, storageSecret corev1.Secret) field.ErrorList {
// ValidateGCSSecret validates a GCS storage secret.
func ValidateGCSSecret(secretName string, path *field.Path, storageSecret corev1.Secret) field.ErrorList {
var allErrs field.ErrorList
secretFields := []string{
"bucketname",
"key.json",
}

allErrs = append(allErrs, ensureNotEmpty(tempo, path, storageSecret, secretFields)...)
allErrs = append(allErrs, ensureNotEmpty(secretName, path, storageSecret, secretFields)...)
return allErrs
}

func validateS3Secret(tempo TempoStack, path *field.Path, storageSecret corev1.Secret) field.ErrorList {
// ValidateS3Secret validates a S3 storage secret.
func ValidateS3Secret(secretName string, path *field.Path, storageSecret corev1.Secret) field.ErrorList {
var allErrs field.ErrorList
secretFields := []string{
"endpoint",
Expand All @@ -98,7 +99,7 @@ func validateS3Secret(tempo TempoStack, path *field.Path, storageSecret corev1.S
"access_key_secret",
}

allErrs = append(allErrs, ensureNotEmpty(tempo, path, storageSecret, secretFields)...)
allErrs = append(allErrs, ensureNotEmpty(secretName, path, storageSecret, secretFields)...)

if endpoint, ok := storageSecret.Data["endpoint"]; ok {
u, err := url.ParseRequestURI(string(endpoint))
Expand All @@ -107,7 +108,7 @@ func validateS3Secret(tempo TempoStack, path *field.Path, storageSecret corev1.S
if err != nil || u.Scheme == "" {
allErrs = append(allErrs, field.Invalid(
path,
tempo.Spec.Storage.Secret,
secretName,
"\"endpoint\" field of storage secret must be a valid URL",
))
}
Expand Down
27 changes: 26 additions & 1 deletion apis/tempo/v1alpha1/tempomonolithic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,29 @@ type MonolithicTracesStorageSpec struct {
//
// +kubebuilder:validation:Optional
PV *MonolithicTracesStoragePVSpec `json:"pv,omitempty"`

// S3 defines the AWS S3 configuration
//
// +kubebuilder:validation:Optional
S3 *MonolithicTracesStorageS3Spec `json:"s3,omitempty"`
}

// MonolithicTracesStorageBackend defines the backend storage for traces.
//
// +kubebuilder:validation:Enum=memory;pv
// +kubebuilder:validation:Enum=memory;pv;azure;gcs;s3
type MonolithicTracesStorageBackend string

const (
// MonolithicTracesStorageBackendMemory defines storing traces in a tmpfs (in-memory filesystem).
MonolithicTracesStorageBackendMemory MonolithicTracesStorageBackend = "memory"
// MonolithicTracesStorageBackendPV defines storing traces in a Persistent Volume.
MonolithicTracesStorageBackendPV MonolithicTracesStorageBackend = "pv"
// MonolithicTracesStorageBackendAzure defines storing traces in Azure Storage.
MonolithicTracesStorageBackendAzure MonolithicTracesStorageBackend = "azure"
// MonolithicTracesStorageBackendGCS defines storing traces in Google Cloud Storage.
MonolithicTracesStorageBackendGCS MonolithicTracesStorageBackend = "gcs"
// MonolithicTracesStorageBackendS3 defines storing traces in AWS S3.
MonolithicTracesStorageBackendS3 MonolithicTracesStorageBackend = "s3"
)

// MonolithicTracesStorageWALSpec defines the write-ahead logging (WAL) configuration.
Expand All @@ -95,6 +106,20 @@ type MonolithicTracesStoragePVSpec struct {
Size resource.Quantity `json:"size"`
}

// MonolithicTracesStorageS3Spec defines the AWS S3 configuration.
type MonolithicTracesStorageS3Spec struct {
// secret is the name of a Secret containing credentials for accessing AWS S3.
// It needs to be in the same namespace as the Tempo custom resource.
//
// +kubebuilder:validation:Optional
Secret string `json:"secret,omitempty"`

// tls defines the TLS configuration for AWS S3.
//
// +kubebuilder:validation:Optional
TLS *TLSSpec `json:"tls,omitempty"`
}

// MonolithicIngestionSpec defines the ingestion settings.
type MonolithicIngestionSpec struct {
// OTLP defines the ingestion configuration for OTLP
Expand Down
70 changes: 57 additions & 13 deletions apis/tempo/v1alpha1/tempomonolithic_webhook.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package v1alpha1

import (
"context"
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

// SetupWebhookWithManager will setup the manager to manage the webhooks.
func (r *TempoMonolithic) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
WithValidator(&monolithicValidator{client: mgr.GetClient()}).
Complete()
}

Expand Down Expand Up @@ -60,37 +64,77 @@ func (r *TempoMonolithic) Default() {

//+kubebuilder:webhook:path=/validate-tempo-grafana-com-v1alpha1-tempomonolithic,mutating=false,failurePolicy=fail,sideEffects=None,groups=tempo.grafana.com,resources=tempomonolithics,verbs=create;update,versions=v1alpha1,name=vtempomonolithic.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &TempoMonolithic{}
type monolithicValidator struct {
client client.Client
}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type.
func (r *TempoMonolithic) ValidateCreate() (admission.Warnings, error) {
return r.validate()
func (v *monolithicValidator) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
return v.validate(ctx, obj)
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type.
func (r *TempoMonolithic) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
return r.validate()
func (v *monolithicValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) {
return v.validate(ctx, newObj)
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type.
func (r *TempoMonolithic) ValidateDelete() (admission.Warnings, error) {
func (v *monolithicValidator) ValidateDelete(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
return r.validate()
return nil, nil
}

func (tempo *TempoMonolithic) validate() (admission.Warnings, error) {
log := ctrl.Log.WithName("tempomonolithic-webhook")
func (v *monolithicValidator) validate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
tempo, ok := obj.(*TempoMonolithic)
if !ok {
return nil, apierrors.NewBadRequest(fmt.Sprintf("expected a TempoMonolithic object but got %T", obj))
}

log := ctrl.LoggerFrom(ctx).WithName("tempomonolithic-webhook")
log.V(1).Info("running validating webhook", "name", tempo.Name)

// We do not modify the Kubernetes object in the defaulter webhook,
// but still apply some default values in-memory.
tempo.Default()

allWarnings := admission.Warnings{}
allErrors := field.ErrorList{}

if tempo.Spec.ExtraConfig != nil && len(tempo.Spec.ExtraConfig.Tempo.Raw) > 0 {
allWarnings = append(allWarnings, "overriding Tempo configuration could potentially break the deployment, use it carefully")
addValidation := func(warnings admission.Warnings, errors field.ErrorList) {
allWarnings = append(allWarnings, warnings...)
allErrors = append(allErrors, errors...)
}

addValidation(v.validateStorage(ctx, *tempo))
addValidation(v.validateExtraConfig(*tempo))

if len(allErrors) == 0 {
return allWarnings, nil
}
return allWarnings, apierrors.NewInvalid(tempo.GroupVersionKind().GroupKind(), tempo.Name, allErrors)
}

func (v *monolithicValidator) validateStorage(ctx context.Context, tempo TempoMonolithic) (admission.Warnings, field.ErrorList) {
storagePath := field.NewPath("spec").Child("storage")
if tempo.Spec.Storage == nil {
return nil, field.ErrorList{field.Invalid(storagePath, tempo.Spec.Storage, "storage must be configured")}
}

switch tempo.Spec.Storage.Traces.Backend {

Check failure on line 122 in apis/tempo/v1alpha1/tempomonolithic_webhook.go

View workflow job for this annotation

GitHub Actions / Code standards (linting)

missing cases in switch of type v1alpha1.MonolithicTracesStorageBackend: v1alpha1.MonolithicTracesStorageBackendAzure, v1alpha1.MonolithicTracesStorageBackendGCS (exhaustive)
case MonolithicTracesStorageBackendMemory, MonolithicTracesStorageBackendPV:
return nil, nil
case MonolithicTracesStorageBackendS3:
if tempo.Spec.Storage.Traces.S3 == nil {
return nil, field.ErrorList{field.Invalid(storagePath.Child("traces").Child("s3"), tempo.Spec.Storage.Traces.S3, "S3 storage must be configured")}
}
return validateStorageSecret(ctx, v.client, tempo.Namespace, tempo.Spec.Storage.Traces.S3.Secret, ObjectStorageSecretType(tempo.Spec.Storage.Traces.Backend), storagePath.Child("traces").Child("s3").Child("secret"))
}

return nil, nil
}

func (v *monolithicValidator) validateExtraConfig(tempo TempoMonolithic) (admission.Warnings, field.ErrorList) {

Check failure on line 135 in apis/tempo/v1alpha1/tempomonolithic_webhook.go

View workflow job for this annotation

GitHub Actions / Code standards (linting)

(*monolithicValidator).validateExtraConfig - result 1 (k8s.io/apimachinery/pkg/util/validation/field.ErrorList) is always nil (unparam)
if tempo.Spec.ExtraConfig != nil && len(tempo.Spec.ExtraConfig.Tempo.Raw) > 0 {
return admission.Warnings{"overriding Tempo configuration could potentially break the deployment, use it carefully"}, nil
}
return nil, nil
}
22 changes: 15 additions & 7 deletions apis/tempo/v1alpha1/tempostack_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,30 +180,38 @@ func (v *validator) validateServiceAccount(ctx context.Context, tempo TempoStack
return allErrs
}

func (v *validator) validateStorageSecret(ctx context.Context, tempo TempoStack) (admission.Warnings, field.ErrorList) {
func validateStorageSecret(ctx context.Context, client client.Client, namespace string, secretName string, secretType ObjectStorageSecretType, path *field.Path) (admission.Warnings, field.ErrorList) {
storageSecret := &corev1.Secret{}
err := v.client.Get(ctx, types.NamespacedName{Namespace: tempo.Namespace, Name: tempo.Spec.Storage.Secret.Name}, storageSecret)
err := client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: secretName}, storageSecret)
if err != nil {
// Do not fail the validation here, the user can create the storage secret later.
// The operator will remain in a ConfigurationError status condition until the storage secret is set.
return admission.Warnings{fmt.Sprintf("Secret '%s' does not exist", tempo.Spec.Storage.Secret.Name)}, field.ErrorList{}
return admission.Warnings{fmt.Sprintf("Secret '%s' does not exist", secretName)}, field.ErrorList{}
}

return admission.Warnings{}, ValidateStorageSecret(tempo, *storageSecret)
return admission.Warnings{}, ValidateStorageSecret(secretType, path, *storageSecret)
}

func (v *validator) validateStorageCA(ctx context.Context, tempo TempoStack) (admission.Warnings, field.ErrorList) {
func (v *validator) validateStorageSecret(ctx context.Context, tempo TempoStack) (admission.Warnings, field.ErrorList) {
return validateStorageSecret(ctx, v.client, tempo.Namespace, tempo.Spec.Storage.Secret.Name, tempo.Spec.Storage.Secret.Type, field.NewPath("spec").Child("storage").Child("secret"))
}

func validateStorageCA(ctx context.Context, client client.Client, namespace string, configMapName string) (admission.Warnings, field.ErrorList) {
caConfigMap := &corev1.ConfigMap{}
err := v.client.Get(ctx, types.NamespacedName{Namespace: tempo.Namespace, Name: tempo.Spec.Storage.TLS.CA}, caConfigMap)
err := client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: configMapName}, caConfigMap)
if err != nil {
// Do not fail the validation here, the user can create the ConfigMap later.
// The operator will remain in a ConfigurationError status condition until the ConfigMap is created.
return admission.Warnings{fmt.Sprintf("ConfigMap '%s' does not exist", tempo.Spec.Storage.TLS.CA)}, field.ErrorList{}
return admission.Warnings{fmt.Sprintf("ConfigMap '%s' does not exist", configMapName)}, field.ErrorList{}
}

return admission.Warnings{}, ValidateStorageCAConfigMap(*caConfigMap)
}

func (v *validator) validateStorageCA(ctx context.Context, tempo TempoStack) (admission.Warnings, field.ErrorList) {
return validateStorageCA(ctx, v.client, tempo.Namespace, tempo.Spec.Storage.TLS.CA)
}

func (v *validator) validateReplicationFactor(tempo TempoStack) field.ErrorList {
// Validate minimum quorum on ingestors according to replicas and replication factor
replicatonFactor := tempo.Spec.ReplicationFactor
Expand Down
Loading

0 comments on commit 96f7538

Please sign in to comment.