Skip to content

Commit

Permalink
Allow users to suspend Elasticsearch Pods for debugging purposes (ela…
Browse files Browse the repository at this point in the history
…stic#4946)

Adds support for an new annotation, eck.k8s.elastic.co/suspend, that allows users to suspend the Pods listed in the annotation.

Implementation does not treat suspended Pods in any special way. That means most cluster operations like upscale or downscale of master nodes and rolling upgrades of all nodes are not able to make progress while a node in the cluster is suspended.
  • Loading branch information
pebrc authored and fantapsody committed Jan 3, 2023
1 parent ef7109d commit b524a61
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 28 deletions.
26 changes: 22 additions & 4 deletions pkg/apis/elasticsearch/v1/elasticsearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,22 @@ package v1

import (
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"strings"

commonv1 "github.com/elastic/cloud-on-k8s/pkg/apis/common/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/hash"
"github.com/elastic/cloud-on-k8s/pkg/utils/pointer"
"github.com/elastic/cloud-on-k8s/pkg/utils/set"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

const (
ElasticsearchContainerName = "elasticsearch"
// SuspendAnnotation allows users to annotate the Elasticsearch resource with the names of Pods they want to suspend
// for debugging purposes.
SuspendAnnotation = "eck.k8s.elastic.co/suspend"
// Kind is inferred from the struct name using reflection in SchemeBuilder.Register()
// we duplicate it as a constant here for practical purposes.
Kind = "Elasticsearch"
Expand Down Expand Up @@ -471,6 +475,20 @@ func (es Elasticsearch) SecureSettings() []commonv1.SecretSource {
return es.Spec.SecureSettings
}

func (es Elasticsearch) SuspendedPodNames() set.StringSet {
suspended, exists := es.Annotations[SuspendAnnotation]
if !exists {
return nil
}

podNames := strings.Split(suspended, ",")
suspendedPods := set.Make()
for _, p := range podNames {
suspendedPods.Add(strings.TrimSpace(p))
}
return suspendedPods
}

// -- associations

var _ commonv1.Associated = &Elasticsearch{}
Expand Down
47 changes: 46 additions & 1 deletion pkg/apis/elasticsearch/v1/elasticsearch_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"time"

"github.com/elastic/cloud-on-k8s/pkg/utils/pointer"

"github.com/elastic/cloud-on-k8s/pkg/utils/set"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -211,3 +211,48 @@ func Test_GetMaxUnavailableOrDefault(t *testing.T) {
})
}
}

func TestElasticsearch_SuspendedPodNames(t *testing.T) {
tests := []struct {
name string
ObjectMeta metav1.ObjectMeta
want set.StringSet
}{
{
name: "no annotation",
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}},
want: nil,
},
{
name: "single value",
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
SuspendAnnotation: "a",
}},
want: set.Make("a"),
},
{
name: "multi value",
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
SuspendAnnotation: "a,b,c",
}},
want: set.Make("a", "b", "c"),
},
{
name: "multi value with whitespace",
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
SuspendAnnotation: "a , b , c",
}},
want: set.Make("a", "b", "c"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
es := Elasticsearch{
ObjectMeta: tt.ObjectMeta,
}
if got := es.SuspendedPodNames(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("SuspendedPodNames() = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/controller/apmserver/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (tp testParams) withInitContainer() testParams {
Name: "",
Image: "docker.elastic.co/apm/apm-server:1.0",
Env: defaults.PodDownwardEnvVars(),
Resources: DefaultResources,
Resources: DefaultResources, // inherited from main container
},
}
return tp
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/elasticsearch/configmap/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewConfigMapWithData(es types.NamespacedName, data map[string]string) corev
}
}

// ReconcileScriptsConfigMap reconciles a configmap containing scripts used by
// ReconcileScriptsConfigMap reconciles a configmap containing scripts and related configuration used by
// init containers and readiness probe.
func ReconcileScriptsConfigMap(ctx context.Context, c k8s.Client, es esv1.Elasticsearch) error {
span, _ := apm.StartSpan(ctx, "reconcile_scripts", tracing.SpanTypeApp)
Expand All @@ -49,6 +49,8 @@ func ReconcileScriptsConfigMap(ctx context.Context, c k8s.Client, es esv1.Elasti
nodespec.ReadinessProbeScriptConfigKey: nodespec.ReadinessProbeScript,
nodespec.PreStopHookScriptConfigKey: nodespec.PreStopHookScript,
initcontainer.PrepareFsScriptConfigKey: fsScript,
initcontainer.SuspendScriptConfigKey: initcontainer.SuspendScript,
initcontainer.SuspendedHostsFile: initcontainer.RenderSuspendConfiguration(es),
},
)

Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/elasticsearch/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
results.WithResult(defaultRequeue)
}

// we want to reconcile suspended Pods before we start reconciling node specs as this is considered a debugging and
// troubleshooting tool that does not follow the change budget restrictions
if err := reconcileSuspendedPods(d.Client, d.ES, d.Expectations); err != nil {
return results.WithError(err)
}

// reconcile StatefulSets and nodes configuration
res = d.reconcileNodeSpecs(ctx, esReachable, esClient, d.ReconcileState, observedState, *resourcesState, keystoreResources)
results = results.WithResults(res)
Expand Down
91 changes: 91 additions & 0 deletions pkg/controller/elasticsearch/driver/suspend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package driver

import (
"context"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/annotation"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/expectations"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/initcontainer"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// reconcileSuspendedPods implements the operator side of activating the Pod suspension mechanism:
// - Users annotate the Elasticsearch resource with names of Pods they want to suspend for debugging purposes.
// - Each Pod has an initContainer that runs a shell script to check a file backed by a configMap for its own Pod name.
// - If the name of the Pod is found in the file the initContainer enters a loop preventing termination until the name
// of the Pod is removed from the file again. The Pod is now "suspended".
// - This function handles the case where the Pod is either already running the main container or it is currently suspended.
// - If the Pod is already running but should be suspended we want to delete the Pod so that the recreated Pod can run
// the initContainer again.
// - If the Pod is suspended in the initContainer but should be running we update the Pods metadata to accelerate the
// propagation of the configMap values. This is just an optimisation and not essential for the correct operation of
// the feature.
func reconcileSuspendedPods(c k8s.Client, es esv1.Elasticsearch, e *expectations.Expectations) error {
// let's make sure we observe any deletions in the cache to avoid redundant deletion
deletionsSatisfied, err := e.DeletionsSatisfied()
if err != nil {
return err
}

// suspendedPodNames as indicated by the user on the Elasticsearch resource via an annotation
// the corresponding configMap has already been reconciled prior to that function
suspendedPodNames := es.SuspendedPodNames()

// all known Pods, this is mostly to fine tune the reconciliation to the current state of the Pods, see below
statefulSets, err := sset.RetrieveActualStatefulSets(c, k8s.ExtractNamespacedName(&es))
if err != nil {
return err
}
knownPods, err := statefulSets.GetActualPods(c)
if err != nil {
return err
}

for i, pod := range knownPods {
// Pod should be suspended
if suspendedPodNames.Has(pod.Name) {
for _, s := range pod.Status.ContainerStatuses {
// delete the Pod without grace period if the main Elasticsearch container is running
// and we have seen all expected deletions in the cache
if deletionsSatisfied && s.Name == esv1.ElasticsearchContainerName && s.State.Running != nil {
log.Info("Deleting suspended pod", "pod_name", pod.Name, "pod_uid", pod.UID,
"namespace", es.Namespace, "es_name", es.Name)
// the precondition serves as an additional barrier in addition to the expectation mechanism to
// not accidentally deleting Pods we do not intent to delete (because our view of the world is out of sync)
preconditions := client.Preconditions{
UID: &pod.UID,
ResourceVersion: &pod.ResourceVersion,
}
if err := c.Delete(context.Background(), &knownPods[i], preconditions, client.GracePeriodSeconds(0)); err != nil {
return err
}
// record the expected deletion
e.ExpectDeletion(pod)
}
}
} else if isSuspended(pod) {
// Pod is suspended. But it should not be. Try to speed up propagation of config map entries so that it can
// start up again. Without this it can take minutes until the config map file in the Pod's filesystem is
// updated with the current state.
annotation.MarkPodAsUpdated(c, pod)
}
}
return nil
}

func isSuspended(pod corev1.Pod) bool {
for _, s := range pod.Status.InitContainerStatuses {
if s.Name == initcontainer.SuspendContainerName && s.State.Running != nil {
return true
}
}
return false
}
10 changes: 8 additions & 2 deletions pkg/controller/elasticsearch/initcontainer/initcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import (
corev1 "k8s.io/api/core/v1"
)

// PrepareFilesystemContainerName is the name of the container that prepares the filesystem
const PrepareFilesystemContainerName = "elastic-internal-init-filesystem"
const (
// PrepareFilesystemContainerName is the name of the container that prepares the filesystem
PrepareFilesystemContainerName = "elastic-internal-init-filesystem"
// SuspendContainerName is the name of the container that is used to suspend Elasticsearch if requested by the user.
SuspendContainerName = "elastic-internal-suspend"
)

// NewInitContainers creates init containers according to the given parameters
func NewInitContainers(
Expand All @@ -29,5 +33,7 @@ func NewInitContainers(
containers = append(containers, keystoreResources.InitContainer)
}

containers = append(containers, NewSuspendInitContainer())

return containers, nil
}
20 changes: 10 additions & 10 deletions pkg/controller/elasticsearch/initcontainer/initcontainer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (

func TestNewInitContainers(t *testing.T) {
type args struct {
elasticsearchImage string
operatorImage string
keystoreResources *keystore.Resources
keystoreResources *keystore.Resources
}
tests := []struct {
name string
Expand All @@ -26,19 +24,21 @@ func TestNewInitContainers(t *testing.T) {
{
name: "with keystore resources",
args: args{
elasticsearchImage: "es-image",
operatorImage: "op-image",
keystoreResources: &keystore.Resources{},
keystoreResources: &keystore.Resources{},
},
expectedNumberOfContainers: 3,
},
{
name: "without keystore resources",
args: args{
keystoreResources: nil,
},
expectedNumberOfContainers: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
containers, err := NewInitContainers(
volume.SecretVolume{},
tt.args.keystoreResources,
)
containers, err := NewInitContainers(volume.SecretVolume{}, tt.args.keystoreResources)
assert.NoError(t, err)
assert.Equal(t, tt.expectedNumberOfContainers, len(containers))
})
Expand Down
47 changes: 47 additions & 0 deletions pkg/controller/elasticsearch/initcontainer/suspend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package initcontainer

import (
"fmt"
"path"
"strings"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/defaults"
esvolume "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/volume"
corev1 "k8s.io/api/core/v1"
)

const (
SuspendScriptConfigKey = "suspend.sh"
SuspendedHostsFile = "suspended_pods.txt"
)

var SuspendScript = fmt.Sprintf(`#!/usr/bin/env bash
set -eu
while [[ $(grep -Exc $HOSTNAME /mnt/elastic-internal/scripts/%s) -eq 1 ]]; do
echo Pod suspended via %s annotation
sleep 10
done
`, SuspendedHostsFile, esv1.SuspendAnnotation)

// RenderSuspendConfiguration renders the configuration used by the SuspendScript.
func RenderSuspendConfiguration(es esv1.Elasticsearch) string {
names := es.SuspendedPodNames().AsSlice()
names.Sort()
return strings.Join(names, "\n")
}

// NewSuspendInitContainer creates an init container to run the script to check for suspended Pods.
func NewSuspendInitContainer() corev1.Container {
return corev1.Container{
ImagePullPolicy: corev1.PullIfNotPresent,
Name: SuspendContainerName,
Env: defaults.PodDownwardEnvVars(),
Command: []string{"bash", "-c", path.Join(esvolume.ScriptsVolumeMountPath, SuspendScriptConfigKey)},
}
}
Loading

0 comments on commit b524a61

Please sign in to comment.