Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash - add ability to reload pipeline(s) without triggering full pod restart #6674

Merged
merged 13 commits into from
Apr 21, 2023
12 changes: 8 additions & 4 deletions config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1944,7 +1944,8 @@ spec:
defined in spec.resourceClaims, that are used
by this container. \n This is an alpha field and
requires enabling the DynamicResourceAllocation
feature gate. \n This field is immutable."
feature gate. \n This field is immutable. It can
only be set for containers."
items:
description: ResourceClaim references one entry
in PodSpec.ResourceClaims.
Expand Down Expand Up @@ -3314,7 +3315,8 @@ spec:
defined in spec.resourceClaims, that are used
by this container. \n This is an alpha field and
requires enabling the DynamicResourceAllocation
feature gate. \n This field is immutable."
feature gate. \n This field is immutable. It can
only be set for containers."
items:
description: ResourceClaim references one entry
in PodSpec.ResourceClaims.
Expand Down Expand Up @@ -4717,7 +4719,8 @@ spec:
defined in spec.resourceClaims, that are used
by this container. \n This is an alpha field and
requires enabling the DynamicResourceAllocation
feature gate. \n This field is immutable."
feature gate. \n This field is immutable. It can
only be set for containers."
items:
description: ResourceClaim references one entry
in PodSpec.ResourceClaims.
Expand Down Expand Up @@ -6523,7 +6526,8 @@ spec:
that are used by this container. \n
This is an alpha field and requires
enabling the DynamicResourceAllocation
feature gate. \n This field is immutable."
feature gate. \n This field is immutable.
It can only be set for containers."
items:
description: ResourceClaim references
one entry in PodSpec.ResourceClaims.
Expand Down
4 changes: 4 additions & 0 deletions config/samples/logstash/logstash_svc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ spec:
api.http.host: "0.0.0.0"
api.http.port: 9601
queue.type: memory
pipelines:
- pipeline.id: one
pipeline.workers: 2
config.string: "input { beats { port => 5044 }} output { stdout {}}"
services:
- name: api
service:
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/logstash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func defaultConfig() *settings.CanonicalConfig {
settingsMap := map[string]interface{}{
// Set 'api.http.host' by default to `0.0.0.0` for readiness probe to work.
"api.http.host": "0.0.0.0",
// Set `config.reload.automatic` to `true` to enable pipeline reloads by default
"config.reload.automatic": true,
}

return settings.MustCanonicalConfig(settingsMap)
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/logstash/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func Test_newConfig(t *testing.T) {
want: `api:
http:
host: 0.0.0.0
config:
reload:
automatic: true
`,
wantErr: false,
},
Expand All @@ -56,6 +59,9 @@ func Test_newConfig(t *testing.T) {
want: `api:
http:
host: 0.0.0.0
config:
reload:
automatic: true
log:
level: debug
`,
Expand All @@ -70,6 +76,9 @@ log:
want: `api:
http:
host: 0.0.0.0
config:
reload:
automatic: true
log:
level: debug
`,
Expand All @@ -86,6 +95,9 @@ log:
want: `api:
http:
host: 0.0.0.0
config:
reload:
automatic: true
log:
level: warn
`,
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/logstash/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log
return results.WithError(err), params.Status
}

if err := reconcilePipeline(params, configHash); err != nil {
// We intentionally DO NOT pass the configHash here. We don't want to consider the pipeline definitions in the
// hash of the config to ensure that a pipeline change does not automatically trigger a restart
// of the pod, but allows Logstash's automatic reload of pipelines to take place
if err := reconcilePipeline(params); err != nil {
Comment on lines +91 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass the configHash when config.reload.automaticequals false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question. I'm erring on the side of 'no' at the moment, but I think this is something that could change after the technical preview depending on feedback.

My reasoning on this is that the false (default) value of non-k8s logstash doesn't react to pipeline changes at all, and to change this semantic to restart logstash completely on pipeline changes feels like very different behaviour.

Thinking about how we could add flexibility, I wonder if we might want to introduce something for ECK here, along the lines of:

config.reload.restart_policy: detected_only|all|none, which would either set config.reload.automatic: true for detected_only, and false for all or none, passing the configHash if the value is all, and not if it is none.

cc @flexitrev, @roaksoax, @jsvd

return results.WithError(err), params.Status
}

Expand Down
80 changes: 80 additions & 0 deletions pkg/controller/logstash/initcontainer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package logstash

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
)

const (
InitConfigContainerName = "logstash-internal-init-config"

// InitConfigScript is a small bash script to prepare the logstash configuration directory
InitConfigScript = `#!/usr/bin/env bash
set -eu

init_config_initialized_flag=` + InitContainerConfigVolumeMountPath + `/elastic-internal-init-config.ok

if [[ -f "${init_config_initialized_flag}" ]]; then
echo "Logstash configuration already initialized."
exit 0
robbavey marked this conversation as resolved.
Show resolved Hide resolved
fi

echo "Setup Logstash configuration"

mount_path=` + InitContainerConfigVolumeMountPath + `

for f in /usr/share/logstash/config/*.*; do
filename=$(basename $f)
if [[ ! -f "$mount_path/$filename" ]]; then
cp $f $mount_path
fi
done
robbavey marked this conversation as resolved.
Show resolved Hide resolved

ln -sf ` + InternalConfigVolumeMountPath + `/logstash.yml $mount_path
ln -sf ` + InternalPipelineVolumeMountPath + `/pipelines.yml $mount_path

touch "${init_config_initialized_flag}"
echo "Logstash configuration successfully prepared."
`
)

// initConfigContainer returns an init container that executes a bash script to prepare the logstash config directory.
// This copies files from the `config` folder of the docker image, and creates symlinks for the operator created
// `logstash.yml` and `pipelines.yml` file into a shared config folder to use by the main logstash container. This
// enables dynamic reloads for `pipelines.yml`
robbavey marked this conversation as resolved.
Show resolved Hide resolved
func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container {
privileged := false

return corev1.Container{
// Image will be inherited from pod template defaults
ImagePullPolicy: corev1.PullIfNotPresent,
Name: InitConfigContainerName,
SecurityContext: &corev1.SecurityContext{
Privileged: &privileged,
},
Command: []string{"/usr/bin/env", "bash", "-c", InitConfigScript},
VolumeMounts: []corev1.VolumeMount{
ConfigSharedVolume.InitContainerVolumeMount(),
ConfigVolume(ls).VolumeMount(),
PipelineVolume(ls).VolumeMount(),
},

Resources: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourceCPU: resource.MustParse("0.1"),
},
Limits: map[corev1.ResourceName]resource.Quantity{
// Memory limit should be at least 12582912 when running with CRI-O
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourceCPU: resource.MustParse("0.1"),
},
},
}
}
7 changes: 7 additions & 0 deletions pkg/controller/logstash/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package logstash

import (
"sigs.k8s.io/controller-runtime/pkg/client"

commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1"
logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
)
Expand All @@ -27,3 +29,8 @@ func NewLabels(logstash logstashv1alpha1.Logstash) map[string]string {
NameLabelName: logstash.Name,
}
}

// NewLabelSelectorForLogstash returns a labels.Selector that matches the labels as constructed by NewLabels
func NewLabelSelectorForLogstash(ls logstashv1alpha1.Logstash) client.MatchingLabels {
return client.MatchingLabels(map[string]string{commonv1.TypeLabelName: TypeLabelValue, NameLabelName: ls.Name})
}
48 changes: 42 additions & 6 deletions pkg/controller/logstash/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@
package logstash

import (
"hash"
"reflect"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/controller-runtime/pkg/client"

logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/annotation"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/labels"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines"

"github.com/elastic/cloud-on-k8s/v2/pkg/utils/maps"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: group imports (max 3 groups: stdlib / external deps / internal deps).

)

func reconcilePipeline(params Params, configHash hash.Hash) error {
func reconcilePipeline(params Params) error {
defer tracing.Span(&params.Context)()

cfgBytes, err := buildPipeline(params)
Expand All @@ -36,15 +41,46 @@ func reconcilePipeline(params Params, configHash hash.Hash) error {
},
}

if _, err = reconciler.ReconcileSecret(params.Context, params.Client, expected, &params.Logstash); err != nil {
if err := reconcileSecretWithFastUpdate(params, expected); err != nil {
return err
}

_, _ = configHash.Write(cfgBytes)

return nil
}

// This function reconciles the secret, but then adds a postUpdate step to mark the pods as updated
// to trigger a quicker reload of the updated secret than waiting for the kubelet sync interval to kick in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This function reconciles the secret, but then adds a postUpdate step to mark the pods as updated
// to trigger a quicker reload of the updated secret than waiting for the kubelet sync interval to kick in
// This function reconciles the secret, but then adds a postUpdate step to mark the pods as updated
// to trigger a quicker reload of the updated secret rather than waiting for the kubelet sync to kick in.

func reconcileSecretWithFastUpdate(params Params, expected corev1.Secret) error {
var reconciled corev1.Secret

return reconciler.ReconcileResource(reconciler.Params{
Context: params.Context,
Client: params.Client,
Owner: &params.Logstash,
Expected: &expected,
Reconciled: &reconciled,
NeedsUpdate: func() bool {
// update if expected labels and annotations are not there
return !maps.IsSubset(expected.Labels, reconciled.Labels) ||
!maps.IsSubset(expected.Annotations, reconciled.Annotations) ||
// or if secret data is not strictly equal
!reflect.DeepEqual(expected.Data, reconciled.Data)
},
UpdateReconciled: func() {
// set expected annotations and labels, but don't remove existing ones
// that may have been defaulted or set by the user on the existing resource
reconciled.Labels = maps.Merge(reconciled.Labels, expected.Labels)
reconciled.Annotations = maps.Merge(reconciled.Annotations, expected.Annotations)
reconciled.Data = expected.Data
},
PostUpdate: func() {
annotation.MarkPodsAsUpdated(params.Context, params.Client,
client.InNamespace(params.Logstash.Namespace),
NewLabelSelectorForLogstash(params.Logstash),
)
},
})
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff --git a/pkg/controller/common/reconciler/secret.go b/pkg/controller/common/reconciler/secret.go
index 0b6026f87..50004fd80 100644
--- a/pkg/controller/common/reconciler/secret.go
+++ b/pkg/controller/common/reconciler/secret.go
@@ -30,11 +30,17 @@ const (
 	SoftOwnerKindLabel      = "eck.k8s.elastic.co/owner-kind"
 )
 
+func WithPostUpdate(f func()) func(p *Params) {
+	return func(p *Params) {
+		p.PostUpdate = f
+	}
+}
+
 // ReconcileSecret creates or updates the actual secret to match the expected one.
 // Existing annotations or labels that are not expected are preserved.
-func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, owner client.Object) (corev1.Secret, error) {
+func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, owner client.Object, opts ...func(*Params)) (corev1.Secret, error) {
 	var reconciled corev1.Secret
-	if err := ReconcileResource(Params{
+	params := Params{
 		Context:    ctx,
 		Client:     c,
 		Owner:      owner,
@@ -54,7 +60,11 @@ func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret,
 			reconciled.Annotations = maps.Merge(reconciled.Annotations, expected.Annotations)
 			reconciled.Data = expected.Data
 		},
-	}); err != nil {
+	}
+	for _, opt := range opts {
+		opt(&params)
+	}
+	if err := ReconcileResource(params); err != nil {
 		return corev1.Secret{}, err
 	}
 	return reconciled, nil
diff --git a/pkg/controller/logstash/pipeline.go b/pkg/controller/logstash/pipeline.go
index 6cbfee388..447ed7b8b 100644
--- a/pkg/controller/logstash/pipeline.go
+++ b/pkg/controller/logstash/pipeline.go
@@ -41,7 +41,13 @@ func reconcilePipeline(params Params) error {
 		},
 	}
 
-	if err := reconcileSecretWithFastUpdate(params, expected); err != nil {
+	if _, err := reconciler.ReconcileSecret(params.Context, params.Client, expected, &params.Logstash,
+		reconciler.WithPostUpdate(func() {
+			annotation.MarkPodsAsUpdated(params.Context, params.Client,
+				client.InNamespace(params.Logstash.Namespace),
+				NewLabelSelectorForLogstash(params.Logstash),
+			)
+		})); err != nil {
 		return err
 	}
 	return nil

If we want to reuse the existing secret reconciliation we could add a slice of option functions at the end


func buildPipeline(params Params) ([]byte, error) {
userProvidedCfg, err := getUserPipeline(params)
if err != nil {
Expand Down
54 changes: 37 additions & 17 deletions pkg/controller/logstash/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package logstash
import (
"fmt"
"hash"
"path"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -39,6 +38,14 @@ const (

// VersionLabelName is a label used to track the version of a Logstash Pod.
VersionLabelName = "logstash.k8s.elastic.co/version"

InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local"

// InternalConfigVolumeName is a volume which contains the generated configuration.
InternalConfigVolumeName = "elastic-internal-logstash-config"
InternalConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config"
InternalPipelineVolumeName = "elastic-internal-logstash-pipeline"
InternalPipelineVolumeMountPath = "/mnt/elastic-internal/logstash-pipeline"
)

var (
Expand All @@ -54,26 +61,38 @@ var (
}
)

var (
// ConfigSharedVolume contains the Logstash config/ directory, it contains the contents of config from the docker container
ConfigSharedVolume = volume.SharedVolume{
VolumeName: ConfigVolumeName,
InitContainerMountPath: InitContainerConfigVolumeMountPath,
ContainerMountPath: ConfigMountPath,
}
)

// ConfigVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource.
func ConfigVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume {
return volume.NewSecretVolumeWithMountPath(
logstashv1alpha1.ConfigSecretName(ls.Name),
InternalConfigVolumeName,
InternalConfigVolumeMountPath,
)
}

// PipelineVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource.
func PipelineVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume {
return volume.NewSecretVolumeWithMountPath(
logstashv1alpha1.PipelineSecretName(ls.Name),
InternalPipelineVolumeName,
InternalPipelineVolumeMountPath,
)
}

func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateSpec {
defer tracing.Span(&params.Context)()
spec := &params.Logstash.Spec
builder := defaults.NewPodTemplateBuilder(params.GetPodTemplate(), logstashv1alpha1.LogstashContainerName)
vols := []volume.VolumeLike{
// volume with logstash configuration file
volume.NewSecretVolume(
logstashv1alpha1.ConfigSecretName(params.Logstash.Name),
LogstashConfigVolumeName,
path.Join(ConfigMountPath, LogstashConfigFileName),
LogstashConfigFileName,
0644),
// volume with logstash pipeline file
volume.NewSecretVolume(
logstashv1alpha1.PipelineSecretName(params.Logstash.Name),
PipelineVolumeName,
path.Join(ConfigMountPath, PipelineFileName),
PipelineFileName,
0644),
}
vols := []volume.VolumeLike{ConfigSharedVolume, ConfigVolume(params.Logstash), PipelineVolume(params.Logstash)}

labels := maps.Merge(params.Logstash.GetIdentityLabels(), map[string]string{
VersionLabelName: spec.Version})
Expand All @@ -93,6 +112,7 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS
WithPorts(ports).
WithReadinessProbe(readinessProbe(params.Logstash)).
WithVolumeLikes(vols...).
WithInitContainers(initConfigContainer(params.Logstash)).
WithInitContainerDefaults()

builder, err := stackmon.WithMonitoring(params.Context, params.Client, builder, params.Logstash)
Expand Down
Loading