diff --git a/pkg/apis/elasticsearch/v1/elasticsearch_types.go b/pkg/apis/elasticsearch/v1/elasticsearch_types.go index 8acf4e552b..ef4aacf62c 100644 --- a/pkg/apis/elasticsearch/v1/elasticsearch_types.go +++ b/pkg/apis/elasticsearch/v1/elasticsearch_types.go @@ -6,6 +6,8 @@ package v1 import ( "fmt" + "github.com/elastic/cloud-on-k8s/pkg/utils/set" + "strings" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,6 +20,7 @@ import ( const ( ElasticsearchContainerName = "elasticsearch" + SuspendAnnotation = "eck.elastic.co/suspend" // Kind is inferred from the struct name using reflection in SchemeBuilder.Register() // we duplicate it as a constant here for practical purposes. Kind = "Elasticsearch" @@ -471,6 +474,20 @@ func (es Elasticsearch) SecureSettings() []commonv1.SecretSource { return es.Spec.SecureSettings } +func (es Elasticsearch) SuspendedPodNames() set.StringSet { + suspended, exists := es.Annotations[SuspendAnnotation] + if !exists { + return nil + } + + podNames := strings.Split(suspended, ",") + suspendedPods := set.Make() + for _, p := range podNames { + suspendedPods.Add(strings.TrimSpace(p)) + } + return suspendedPods +} + // -- associations var _ commonv1.Associated = &Elasticsearch{} diff --git a/pkg/controller/elasticsearch/configmap/configmap.go b/pkg/controller/elasticsearch/configmap/configmap.go index 32ce94bfec..dc4a2f0e0a 100644 --- a/pkg/controller/elasticsearch/configmap/configmap.go +++ b/pkg/controller/elasticsearch/configmap/configmap.go @@ -50,7 +50,7 @@ func ReconcileScriptsConfigMap(ctx context.Context, c k8s.Client, es esv1.Elasti nodespec.PreStopHookScriptConfigKey: nodespec.PreStopHookScript, initcontainer.PrepareFsScriptConfigKey: fsScript, initcontainer.SuspendScriptConfigKey: initcontainer.SuspendScript, - initcontainer.SuspendedHostsFile: es.Annotations[initcontainer.SuspendAnnotation], + initcontainer.SuspendedHostsFile: es.Annotations[esv1.SuspendAnnotation], }, ) diff --git a/pkg/controller/elasticsearch/driver/esstate.go b/pkg/controller/elasticsearch/driver/esstate.go index dd86f3f241..f28ff1ff8b 100644 --- a/pkg/controller/elasticsearch/driver/esstate.go +++ b/pkg/controller/elasticsearch/driver/esstate.go @@ -6,6 +6,7 @@ package driver import ( "context" + "github.com/elastic/cloud-on-k8s/pkg/utils/set" "sync" esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" @@ -32,10 +33,10 @@ type MemoizingESState struct { } // NewMemoizingESState returns an initialized MemoizingESState. -func NewMemoizingESState(ctx context.Context, esClient esclient.Client) ESState { +func NewMemoizingESState(ctx context.Context, esClient esclient.Client, ignoredNodes set.StringSet) ESState { return &MemoizingESState{ esClient: esClient, - memoizingNodes: &memoizingNodes{esClient: esClient, ctx: ctx}, + memoizingNodes: &memoizingNodes{esClient: esClient, ctx: ctx, ignored: ignoredNodes}, memoizingShardsAllocationEnabled: &memoizingShardsAllocationEnabled{esClient: esClient, ctx: ctx}, memoizingHealth: &memoizingHealth{esClient: esClient, ctx: ctx}, } @@ -57,6 +58,7 @@ type memoizingNodes struct { once sync.Once esClient esclient.Client ctx context.Context + ignored set.StringSet nodes []string } @@ -75,6 +77,9 @@ func (n *memoizingNodes) NodesInCluster(nodeNames []string) (bool, error) { if err := initOnce(&n.once, n.initialize); err != nil { return false, err } + for _, ignore := range n.ignored.AsSlice() { + nodeNames = stringsutil.RemoveStringInSlice(ignore, nodeNames) + } return stringsutil.StringsInSlice(nodeNames, n.nodes), nil } diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index cdacfa7517..4f24769adc 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -83,7 +83,7 @@ func (d *defaultDriver) reconcileNodeSpecs( return results.WithError(err) } - esState := NewMemoizingESState(ctx, esClient) + esState := NewMemoizingESState(ctx, esClient, d.ES.SuspendedPodNames()) // Phase 1: apply expected StatefulSets resources and scale up. upscaleCtx := upscaleCtx{ diff --git a/pkg/controller/elasticsearch/driver/suspend.go b/pkg/controller/elasticsearch/driver/suspend.go index e778ccbdc2..30ff9f0ff6 100644 --- a/pkg/controller/elasticsearch/driver/suspend.go +++ b/pkg/controller/elasticsearch/driver/suspend.go @@ -3,26 +3,16 @@ package driver import ( "context" esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" - "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/initcontainer" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "strings" ) func reconcileSuspendedPods(c k8s.Client, es esv1.Elasticsearch) error { - suspended, exists := es.Annotations[initcontainer.SuspendAnnotation] - if !exists { - return nil - } + suspendedPodNames := es.SuspendedPodNames() - podNames := strings.Split(suspended, ",") - suspendedPods := make(map[string]struct{}) - for _, p := range podNames { - suspendedPods[strings.TrimSpace(p)] = struct{}{} - } statefulSets, err := sset.RetrieveActualStatefulSets(c, k8s.ExtractNamespacedName(&es)) if err != nil { return err @@ -30,7 +20,7 @@ func reconcileSuspendedPods(c k8s.Client, es esv1.Elasticsearch) error { knownPodNames := statefulSets.PodNames() for _, podName := range knownPodNames { - if _, suspended := suspendedPods[podName]; suspended { + if suspendedPodNames.Has(podName) { var pod corev1.Pod if err := c.Get(context.Background(), types.NamespacedName{Namespace: es.Namespace, Name: podName}, &pod); err != nil { return err diff --git a/pkg/controller/elasticsearch/driver/upgrade_predicates.go b/pkg/controller/elasticsearch/driver/upgrade_predicates.go index d6983cbfb5..93ad694572 100644 --- a/pkg/controller/elasticsearch/driver/upgrade_predicates.go +++ b/pkg/controller/elasticsearch/driver/upgrade_predicates.go @@ -6,6 +6,7 @@ package driver import ( "context" + "github.com/elastic/cloud-on-k8s/pkg/utils/set" esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" @@ -134,6 +135,17 @@ var predicates = [...]Predicate{ return true, nil }, }, + { + name: "skip_suspended_pods", + fn: func( + context PredicateContext, + candidate corev1.Pod, + _ []corev1.Pod, + _ bool, + ) (bool, error) { + return !context.es.SuspendedPodNames().Has(candidate.Name), nil + }, + }, { name: "skip_already_terminating_pods", fn: func( @@ -327,7 +339,10 @@ var predicates = [...]Predicate{ } // Get the expected masters - expectedMasters := len(context.masterNodesNames) + masterNodeNames := set.Make(context.masterNodesNames...) + // but exclude and suspended Pods + expectedMasters := len(masterNodeNames.Diff(context.es.SuspendedPodNames())) + // Get the healthy masters healthyMasters := 0 for _, pod := range context.healthyPods { diff --git a/pkg/controller/elasticsearch/driver/upscale_state.go b/pkg/controller/elasticsearch/driver/upscale_state.go index f2fdde7136..e88e592e52 100644 --- a/pkg/controller/elasticsearch/driver/upscale_state.go +++ b/pkg/controller/elasticsearch/driver/upscale_state.go @@ -62,6 +62,10 @@ func buildOnce(s *upscaleState) error { return } for _, masterNodePod := range masters { + // ignore suspended Pods here + if s.ctx.es.SuspendedPodNames().Has(masterNodePod.Name) { + continue + } var isJoining bool isJoining, err = isMasterNodeJoining(masterNodePod, s.ctx.esState) if err != nil { diff --git a/pkg/controller/elasticsearch/initcontainer/suspend.go b/pkg/controller/elasticsearch/initcontainer/suspend.go index 9d567d7885..9de65cd65b 100644 --- a/pkg/controller/elasticsearch/initcontainer/suspend.go +++ b/pkg/controller/elasticsearch/initcontainer/suspend.go @@ -2,6 +2,7 @@ package initcontainer import ( "fmt" + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/pkg/controller/common/defaults" esvolume "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/volume" corev1 "k8s.io/api/core/v1" @@ -12,7 +13,6 @@ import ( const ( SuspendScriptConfigKey = "suspend.sh" SuspendedHostsFile = "suspended_hosts.txt" - SuspendAnnotation = "eck.elastic.co/suspend" ) var SuspendScript = fmt.Sprintf(`#!/usr/bin/env bash @@ -22,7 +22,7 @@ while [[ $(grep -Ec $HOSTNAME /mnt/elastic-internal/scripts/%s) -eq 1 ]]; do echo Pod suspended via %s annotation sleep 10 done -`, SuspendedHostsFile, SuspendAnnotation) +`, SuspendedHostsFile, esv1.SuspendAnnotation) var suspendContainerResources = corev1.ResourceRequirements{ Requests: map[corev1.ResourceName]resource.Quantity{ diff --git a/pkg/utils/set/set.go b/pkg/utils/set/set.go index fcc12beae4..79444e6301 100644 --- a/pkg/utils/set/set.go +++ b/pkg/utils/set/set.go @@ -34,6 +34,15 @@ func (set StringSet) MergeWith(other StringSet) { } } +func (set StringSet) Diff( other StringSet) StringSet { + res := Make() + res.MergeWith(set) + for str := range other { + res.Del(str) + } + return res +} + func (set StringSet) Has(s string) (exists bool) { if set != nil { _, exists = set[s]