Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to suspend Elasticsearch Pods for debugging purposes #4946

Merged
merged 20 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/apis/elasticsearch/v1/elasticsearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package v1

import (
"fmt"
"github.com/elastic/cloud-on-k8s/pkg/utils/set"
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -18,6 +20,7 @@ import (

const (
ElasticsearchContainerName = "elasticsearch"
SuspendAnnotation = "eck.k8s.elastic.co/suspend"
pebrc marked this conversation as resolved.
Show resolved Hide resolved
// Kind is inferred from the struct name using reflection in SchemeBuilder.Register()
// we duplicate it as a constant here for practical purposes.
Kind = "Elasticsearch"
Expand Down Expand Up @@ -471,6 +474,20 @@ func (es Elasticsearch) SecureSettings() []commonv1.SecretSource {
return es.Spec.SecureSettings
}

func (es Elasticsearch) SuspendedPodNames() set.StringSet {
suspended, exists := es.Annotations[SuspendAnnotation]
if !exists {
return nil
}

podNames := strings.Split(suspended, ",")
suspendedPods := set.Make()
for _, p := range podNames {
suspendedPods.Add(strings.TrimSpace(p))
}
return suspendedPods
}

// -- associations

var _ commonv1.Associated = &Elasticsearch{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/elasticsearch/configmap/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func ReconcileScriptsConfigMap(ctx context.Context, c k8s.Client, es esv1.Elasti
nodespec.ReadinessProbeScriptConfigKey: nodespec.ReadinessProbeScript,
nodespec.PreStopHookScriptConfigKey: nodespec.PreStopHookScript,
initcontainer.PrepareFsScriptConfigKey: fsScript,
initcontainer.SuspendScriptConfigKey: initcontainer.SuspendScript,
initcontainer.SuspendedHostsFile: es.Annotations[esv1.SuspendAnnotation],
},
)

Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/elasticsearch/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
results.WithResult(defaultRequeue)
}

// we want to reconcile suspended Pods before we start reconciling node specs as this is considered a debugging and
// troubleshooting tool that does not follow the change budget restrictions
// TODO: does this need to happen after all expectations are met?
if err := reconcileSuspendedPods(d.Client, d.ES); err != nil {
return results.WithError(err)
}

// reconcile StatefulSets and nodes configuration
res = d.reconcileNodeSpecs(ctx, esReachable, esClient, d.ReconcileState, observedState, *resourcesState, keystoreResources)
results = results.WithResults(res)
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/elasticsearch/driver/esstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package driver

import (
"context"
"github.com/elastic/cloud-on-k8s/pkg/utils/set"
"sync"

esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
Expand All @@ -32,10 +33,10 @@ type MemoizingESState struct {
}

// NewMemoizingESState returns an initialized MemoizingESState.
func NewMemoizingESState(ctx context.Context, esClient esclient.Client) ESState {
func NewMemoizingESState(ctx context.Context, esClient esclient.Client, ignoredNodes set.StringSet) ESState {
return &MemoizingESState{
esClient: esClient,
memoizingNodes: &memoizingNodes{esClient: esClient, ctx: ctx},
memoizingNodes: &memoizingNodes{esClient: esClient, ctx: ctx, ignored: ignoredNodes},
memoizingShardsAllocationEnabled: &memoizingShardsAllocationEnabled{esClient: esClient, ctx: ctx},
memoizingHealth: &memoizingHealth{esClient: esClient, ctx: ctx},
}
Expand All @@ -57,6 +58,7 @@ type memoizingNodes struct {
once sync.Once
esClient esclient.Client
ctx context.Context
ignored set.StringSet
nodes []string
}

Expand All @@ -75,6 +77,9 @@ func (n *memoizingNodes) NodesInCluster(nodeNames []string) (bool, error) {
if err := initOnce(&n.once, n.initialize); err != nil {
return false, err
}
for _, ignore := range n.ignored.AsSlice() {
nodeNames = stringsutil.RemoveStringInSlice(ignore, nodeNames)
}
return stringsutil.StringsInSlice(nodeNames, n.nodes), nil
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/elasticsearch/driver/esstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package driver

import (
"context"
"github.com/elastic/cloud-on-k8s/pkg/utils/set"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -134,6 +135,13 @@ func Test_memoizingNodes_NodesInCluster(t *testing.T) {
inCluster, err = memoizingNodes.NodesInCluster([]string{"a", "b", "c", "e"})
require.NoError(t, err)
require.False(t, inCluster)

// unless we explicitly chose to ignore a node (because it has been suspended on the infrastructure level for example)
memoizingNodes.ignored = set.Make("e")
inCluster, err = memoizingNodes.NodesInCluster([]string{"a", "b", "c", "e"})
require.NoError(t, err)
require.True(t, inCluster)

}

func Test_memoizingShardsAllocationEnabled_ShardAllocationsEnabled(t *testing.T) {
Expand Down Expand Up @@ -190,7 +198,7 @@ func Test_memoizingGreenHealth_GreenHealth(t *testing.T) {
func TestNewMemoizingESState(t *testing.T) {
esClient := &fakeESClient{}
// just make sure everything is initialized correctly (no panic for nil pointers)
s := NewMemoizingESState(context.Background(), esClient)
s := NewMemoizingESState(context.Background(), esClient, set.Make())
_, err := s.Health()
require.NoError(t, err)
_, err = s.ShardAllocationsEnabled()
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results.WithError(err)
}

esState := NewMemoizingESState(ctx, esClient)
esState := NewMemoizingESState(ctx, esClient, d.ES.SuspendedPodNames())

// Phase 1: apply expected StatefulSets resources and scale up.
upscaleCtx := upscaleCtx{
Expand Down
41 changes: 41 additions & 0 deletions pkg/controller/elasticsearch/driver/suspend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package driver

import (
"context"
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

pebrc marked this conversation as resolved.
Show resolved Hide resolved
func reconcileSuspendedPods(c k8s.Client, es esv1.Elasticsearch) error {
suspendedPodNames := es.SuspendedPodNames()
pebrc marked this conversation as resolved.
Show resolved Hide resolved

statefulSets, err := sset.RetrieveActualStatefulSets(c, k8s.ExtractNamespacedName(&es))
if err != nil {
return err
}
knownPodNames := statefulSets.PodNames()

for _, podName := range knownPodNames {
sebgl marked this conversation as resolved.
Show resolved Hide resolved
if suspendedPodNames.Has(podName) {
var pod corev1.Pod
if err := c.Get(context.Background(), types.NamespacedName{Namespace: es.Namespace, Name: podName}, &pod); err != nil {
return err
}
for _, s := range pod.Status.ContainerStatuses {
// delete the Pod without grace period if the main container is running
if s.Name == esv1.ElasticsearchContainerName && s.State.Running != nil {
log.Info("Deleting suspended pod", "pod_name", pod.Name, "pod_uid", pod.UID,
"namespace", es.Namespace, "es_name", es.Name)
if err := c.Delete(context.Background(), &pod, client.GracePeriodSeconds(0)); err != nil {
return err
}
}
}
}
}
return nil
}
17 changes: 16 additions & 1 deletion pkg/controller/elasticsearch/driver/upgrade_predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package driver

import (
"context"
"github.com/elastic/cloud-on-k8s/pkg/utils/set"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
Expand Down Expand Up @@ -134,6 +135,17 @@ var predicates = [...]Predicate{
return true, nil
},
},
{
name: "skip_suspended_pods",
fn: func(
context PredicateContext,
candidate corev1.Pod,
_ []corev1.Pod,
_ bool,
) (bool, error) {
return !context.es.SuspendedPodNames().Has(candidate.Name), nil
},
},
{
name: "skip_already_terminating_pods",
fn: func(
Expand Down Expand Up @@ -327,7 +339,10 @@ var predicates = [...]Predicate{
}

// Get the expected masters
expectedMasters := len(context.masterNodesNames)
masterNodeNames := set.Make(context.masterNodesNames...)
// but exclude and suspended Pods
expectedMasters := len(masterNodeNames.Diff(context.es.SuspendedPodNames()))
pebrc marked this conversation as resolved.
Show resolved Hide resolved

// Get the healthy masters
healthyMasters := 0
for _, pod := range context.healthyPods {
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/elasticsearch/driver/upscale_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func buildOnce(s *upscaleState) error {
return
}
for _, masterNodePod := range masters {
// ignore suspended Pods here
if s.ctx.es.SuspendedPodNames().Has(masterNodePod.Name) {
continue
}
var isJoining bool
isJoining, err = isMasterNodeJoining(masterNodePod, s.ctx.esState)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/elasticsearch/initcontainer/initcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
corev1 "k8s.io/api/core/v1"
)

// PrepareFilesystemContainerName is the name of the container that prepares the filesystem
const PrepareFilesystemContainerName = "elastic-internal-init-filesystem"
const (
// PrepareFilesystemContainerName is the name of the container that prepares the filesystem
PrepareFilesystemContainerName = "elastic-internal-init-filesystem"
sebgl marked this conversation as resolved.
Show resolved Hide resolved
SuspendContainerName = "elastic-internal-suspend"
)

// NewInitContainers creates init containers according to the given parameters
func NewInitContainers(
Expand All @@ -29,5 +32,7 @@ func NewInitContainers(
containers = append(containers, keystoreResources.InitContainer)
}

containers = append(containers, NewSuspendInitContainer())

return containers, nil
}
11 changes: 7 additions & 4 deletions pkg/controller/elasticsearch/initcontainer/initcontainer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (

func TestNewInitContainers(t *testing.T) {
type args struct {
elasticsearchImage string
operatorImage string
keystoreResources *keystore.Resources
}
tests := []struct {
Expand All @@ -26,10 +24,15 @@ func TestNewInitContainers(t *testing.T) {
{
name: "with keystore resources",
args: args{
elasticsearchImage: "es-image",
operatorImage: "op-image",
keystoreResources: &keystore.Resources{},
},
expectedNumberOfContainers: 3,
},
{
name: "without keystore resources",
args: args{
keystoreResources: nil,
},
expectedNumberOfContainers: 2,
},
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/controller/elasticsearch/initcontainer/suspend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package initcontainer

import (
"fmt"
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/defaults"
esvolume "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/volume"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"path"
)

const (
SuspendScriptConfigKey = "suspend.sh"
SuspendedHostsFile = "suspended_hosts.txt"
pebrc marked this conversation as resolved.
Show resolved Hide resolved
)

var SuspendScript = fmt.Sprintf(`#!/usr/bin/env bash
set -eu

while [[ $(grep -Ec $HOSTNAME /mnt/elastic-internal/scripts/%s) -eq 1 ]]; do
pebrc marked this conversation as resolved.
Show resolved Hide resolved
echo Pod suspended via %s annotation
sleep 10
done
`, SuspendedHostsFile, esv1.SuspendAnnotation)

var suspendContainerResources = corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: resource.MustParse("100Mi"),
},
Limits: map[corev1.ResourceName]resource.Quantity{
// Memory limit should be at least 12582912 when running with CRI-O
// Less than 100Mi and Elasticsearch tools like elasticsearch-node run into OOM
corev1.ResourceMemory: resource.MustParse("100Mi"),
pebrc marked this conversation as resolved.
Show resolved Hide resolved
},
}

func NewSuspendInitContainer() corev1.Container {
return corev1.Container{
ImagePullPolicy: corev1.PullIfNotPresent,
Name: SuspendContainerName,
Env: defaults.PodDownwardEnvVars(),
Command: []string{"bash", "-c", path.Join(esvolume.ScriptsVolumeMountPath, SuspendScriptConfigKey)},
Resources: suspendContainerResources,
}
}
13 changes: 8 additions & 5 deletions pkg/controller/elasticsearch/nodespec/podspec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,12 @@ func TestBuildPodTemplateSpec(t *testing.T) {
nil,
)
require.NoError(t, err)
// should be patched with volume and env
// init containers should be patched with volume and inherited env vars and image
headlessSvcEnvVar := corev1.EnvVar{Name: "HEADLESS_SERVICE_NAME", Value: "name-es-nodeset-1"}
esDockerImage := "docker.elastic.co/elasticsearch/elasticsearch:7.2.0"
for i := range initContainers {
initContainers[i].Env = append(initContainers[i].Env, defaults.PodDownwardEnvVars()...)
initContainers[i].Image = esDockerImage
initContainers[i].Env = append(initContainers[i].Env, headlessSvcEnvVar)
initContainers[i].VolumeMounts = append(initContainers[i].VolumeMounts, volumeMounts...)
}

Expand Down Expand Up @@ -243,8 +246,8 @@ func TestBuildPodTemplateSpec(t *testing.T) {
Volumes: volumes,
InitContainers: append(initContainers, corev1.Container{
Name: "additional-init-container",
Image: "docker.elastic.co/elasticsearch/elasticsearch:7.2.0",
Env: defaults.ExtendPodDownwardEnvVars(corev1.EnvVar{Name: "HEADLESS_SERVICE_NAME", Value: "name-es-nodeset-1"}),
Image: esDockerImage,
Env: defaults.ExtendPodDownwardEnvVars(headlessSvcEnvVar),
VolumeMounts: volumeMounts,
}),
Containers: []corev1.Container{
Expand All @@ -253,7 +256,7 @@ func TestBuildPodTemplateSpec(t *testing.T) {
},
{
Name: "elasticsearch",
Image: "docker.elastic.co/elasticsearch/elasticsearch:7.2.0",
Image: esDockerImage,
Ports: []corev1.ContainerPort{
{Name: "https", HostPort: 0, ContainerPort: 9200, Protocol: "TCP", HostIP: ""},
{Name: "transport", HostPort: 0, ContainerPort: 9300, Protocol: "TCP", HostIP: ""},
Expand Down
9 changes: 9 additions & 0 deletions pkg/utils/set/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ func (set StringSet) MergeWith(other StringSet) {
}
}

func (set StringSet) Diff( other StringSet) StringSet {
res := Make()
res.MergeWith(set)
for str := range other {
res.Del(str)
}
return res
}

func (set StringSet) Has(s string) (exists bool) {
if set != nil {
_, exists = set[s]
Expand Down
Loading