Skip to content

Commit

Permalink
Ignore suspended pods during up/downscale and rolling upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
pebrc committed Oct 12, 2021
1 parent e5e97b4 commit e1ea60c
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 19 deletions.
17 changes: 17 additions & 0 deletions pkg/apis/elasticsearch/v1/elasticsearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package v1

import (
"fmt"
"github.com/elastic/cloud-on-k8s/pkg/utils/set"
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -18,6 +20,7 @@ import (

const (
ElasticsearchContainerName = "elasticsearch"
SuspendAnnotation = "eck.elastic.co/suspend"
// Kind is inferred from the struct name using reflection in SchemeBuilder.Register()
// we duplicate it as a constant here for practical purposes.
Kind = "Elasticsearch"
Expand Down Expand Up @@ -471,6 +474,20 @@ func (es Elasticsearch) SecureSettings() []commonv1.SecretSource {
return es.Spec.SecureSettings
}

func (es Elasticsearch) SuspendedPodNames() set.StringSet {
suspended, exists := es.Annotations[SuspendAnnotation]
if !exists {
return nil
}

podNames := strings.Split(suspended, ",")
suspendedPods := set.Make()
for _, p := range podNames {
suspendedPods.Add(strings.TrimSpace(p))
}
return suspendedPods
}

// -- associations

var _ commonv1.Associated = &Elasticsearch{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/elasticsearch/configmap/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func ReconcileScriptsConfigMap(ctx context.Context, c k8s.Client, es esv1.Elasti
nodespec.PreStopHookScriptConfigKey: nodespec.PreStopHookScript,
initcontainer.PrepareFsScriptConfigKey: fsScript,
initcontainer.SuspendScriptConfigKey: initcontainer.SuspendScript,
initcontainer.SuspendedHostsFile: es.Annotations[initcontainer.SuspendAnnotation],
initcontainer.SuspendedHostsFile: es.Annotations[esv1.SuspendAnnotation],
},
)

Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/elasticsearch/driver/esstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package driver

import (
"context"
"github.com/elastic/cloud-on-k8s/pkg/utils/set"
"sync"

esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
Expand All @@ -32,10 +33,10 @@ type MemoizingESState struct {
}

// NewMemoizingESState returns an initialized MemoizingESState.
func NewMemoizingESState(ctx context.Context, esClient esclient.Client) ESState {
func NewMemoizingESState(ctx context.Context, esClient esclient.Client, ignoredNodes set.StringSet) ESState {
return &MemoizingESState{
esClient: esClient,
memoizingNodes: &memoizingNodes{esClient: esClient, ctx: ctx},
memoizingNodes: &memoizingNodes{esClient: esClient, ctx: ctx, ignored: ignoredNodes},
memoizingShardsAllocationEnabled: &memoizingShardsAllocationEnabled{esClient: esClient, ctx: ctx},
memoizingHealth: &memoizingHealth{esClient: esClient, ctx: ctx},
}
Expand All @@ -57,6 +58,7 @@ type memoizingNodes struct {
once sync.Once
esClient esclient.Client
ctx context.Context
ignored set.StringSet
nodes []string
}

Expand All @@ -75,6 +77,9 @@ func (n *memoizingNodes) NodesInCluster(nodeNames []string) (bool, error) {
if err := initOnce(&n.once, n.initialize); err != nil {
return false, err
}
for _, ignore := range n.ignored.AsSlice() {
nodeNames = stringsutil.RemoveStringInSlice(ignore, nodeNames)
}
return stringsutil.StringsInSlice(nodeNames, n.nodes), nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results.WithError(err)
}

esState := NewMemoizingESState(ctx, esClient)
esState := NewMemoizingESState(ctx, esClient, d.ES.SuspendedPodNames())

// Phase 1: apply expected StatefulSets resources and scale up.
upscaleCtx := upscaleCtx{
Expand Down
14 changes: 2 additions & 12 deletions pkg/controller/elasticsearch/driver/suspend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,24 @@ package driver
import (
"context"
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/initcontainer"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"strings"
)

func reconcileSuspendedPods(c k8s.Client, es esv1.Elasticsearch) error {
suspended, exists := es.Annotations[initcontainer.SuspendAnnotation]
if !exists {
return nil
}
suspendedPodNames := es.SuspendedPodNames()

podNames := strings.Split(suspended, ",")
suspendedPods := make(map[string]struct{})
for _, p := range podNames {
suspendedPods[strings.TrimSpace(p)] = struct{}{}
}
statefulSets, err := sset.RetrieveActualStatefulSets(c, k8s.ExtractNamespacedName(&es))
if err != nil {
return err
}
knownPodNames := statefulSets.PodNames()

for _, podName := range knownPodNames {
if _, suspended := suspendedPods[podName]; suspended {
if suspendedPodNames.Has(podName) {
var pod corev1.Pod
if err := c.Get(context.Background(), types.NamespacedName{Namespace: es.Namespace, Name: podName}, &pod); err != nil {
return err
Expand Down
17 changes: 16 additions & 1 deletion pkg/controller/elasticsearch/driver/upgrade_predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package driver

import (
"context"
"github.com/elastic/cloud-on-k8s/pkg/utils/set"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
Expand Down Expand Up @@ -134,6 +135,17 @@ var predicates = [...]Predicate{
return true, nil
},
},
{
name: "skip_suspended_pods",
fn: func(
context PredicateContext,
candidate corev1.Pod,
_ []corev1.Pod,
_ bool,
) (bool, error) {
return !context.es.SuspendedPodNames().Has(candidate.Name), nil
},
},
{
name: "skip_already_terminating_pods",
fn: func(
Expand Down Expand Up @@ -327,7 +339,10 @@ var predicates = [...]Predicate{
}

// Get the expected masters
expectedMasters := len(context.masterNodesNames)
masterNodeNames := set.Make(context.masterNodesNames...)
// but exclude and suspended Pods
expectedMasters := len(masterNodeNames.Diff(context.es.SuspendedPodNames()))

// Get the healthy masters
healthyMasters := 0
for _, pod := range context.healthyPods {
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/elasticsearch/driver/upscale_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func buildOnce(s *upscaleState) error {
return
}
for _, masterNodePod := range masters {
// ignore suspended Pods here
if s.ctx.es.SuspendedPodNames().Has(masterNodePod.Name) {
continue
}
var isJoining bool
isJoining, err = isMasterNodeJoining(masterNodePod, s.ctx.esState)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/elasticsearch/initcontainer/suspend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package initcontainer

import (
"fmt"
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/defaults"
esvolume "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/volume"
corev1 "k8s.io/api/core/v1"
Expand All @@ -12,7 +13,6 @@ import (
const (
SuspendScriptConfigKey = "suspend.sh"
SuspendedHostsFile = "suspended_hosts.txt"
SuspendAnnotation = "eck.elastic.co/suspend"
)

var SuspendScript = fmt.Sprintf(`#!/usr/bin/env bash
Expand All @@ -22,7 +22,7 @@ while [[ $(grep -Ec $HOSTNAME /mnt/elastic-internal/scripts/%s) -eq 1 ]]; do
echo Pod suspended via %s annotation
sleep 10
done
`, SuspendedHostsFile, SuspendAnnotation)
`, SuspendedHostsFile, esv1.SuspendAnnotation)

var suspendContainerResources = corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
Expand Down
9 changes: 9 additions & 0 deletions pkg/utils/set/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ func (set StringSet) MergeWith(other StringSet) {
}
}

func (set StringSet) Diff( other StringSet) StringSet {
res := Make()
res.MergeWith(set)
for str := range other {
res.Del(str)
}
return res
}

func (set StringSet) Has(s string) (exists bool) {
if set != nil {
_, exists = set[s]
Expand Down

0 comments on commit e1ea60c

Please sign in to comment.