Skip to content

Commit

Permalink
Fix how cluster.initial_master_nodes is set (elastic#2315)
Browse files Browse the repository at this point in the history
* Fix how cluster.initial_master_nodes is set

We had a bug where the value of `cluster.initial_master_nodes` would not
be set while upgrading a single v6 cluster to v7. This commit fixes it
by handling `cluster.initial_master_nodes` differently.

:: Annotation logic

Instead of relyin on the ClusterUUID annotation to know whether a
cluster is bootstrapped, and manually removing on the special case of a
single v6 -> v7 annotation, we now rely on a dedicated annotation:
`elasticsearch.k8s.elastic.co/initial-master-nodes: node-0,node-1,node-2`.

When that annotation is set at the cluster level, it means the cluster
is currently bootstrapping for zen2. Any v7+ master node must have
`cluster.initial_master_nodes` in its configuration set to the value in
the annotation. This value is not supposed to vary over time, we make
sure that is the case here.

Once we detect the cluster has finished its bootstrap (the current
master of the cluster is a v7+ master node), we remove the annotation,
and remove the configuration setting from master nodes.

:: When should cluster.initial_master_nodes be set?

There are 2 cases where this setting must be set:
- when we create a v7+ cluster for the first time
- when we upgrade/restart a single v6 master to v7: that new master must
identify itself as a legit master

We don't want to set this setting when:
- we do a regular rolling-upgrade of multiple master nodes from v6 to v7
- Elasticsearch is not running in v7+

:: Edge cases

The following cases are quite tricky and covered by unit & E2E tests:

* upgrade from a single v6 master to a single v7 master **in a different
NodeSet**. The new master will be created before the old one gets
removed, hence the setting should not be set.
* upgrade from a single v6 master to more v7 masters. The new v7 masters
will be created before the v6 master is upgraded, hence the setting
should not be set.
* upgrade from two v6 masters to two v7 masters. This is a regular
rolling upgrade, hence the setting should not be set. However when the
first master goes down for upgrade, the cluster becomes unavailable
since minimum_master_nodes=2.

* Fix V7 comparison in client

* Remove new lines in imports

* Improve readability by extracting checks

* Don't error out if no master yet

Co-authored-by: Michael Morello <michael.morello@gmail.com>
  • Loading branch information
sebgl and barkbay committed Jan 3, 2020
1 parent 0245bc5 commit 4c46f28
Show file tree
Hide file tree
Showing 22 changed files with 764 additions and 336 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver
package bootstrap

import (
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version/zen2"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

var log = logf.Log.WithName("elasticsearch-uuid")

const (
// ClusterUUIDAnnotationName used to store the cluster UUID as an annotation when cluster has been bootstrapped.
ClusterUUIDAnnotationName = "elasticsearch.k8s.elastic.co/cluster-uuid"
Expand All @@ -25,55 +26,18 @@ func AnnotatedForBootstrap(cluster esv1.Elasticsearch) bool {
}

func ReconcileClusterUUID(c k8s.Client, cluster *esv1.Elasticsearch, observedState observer.State) error {
reBootstrap, err := clusterNeedsReBootstrap(c, cluster)
if err != nil {
return err
}

if AnnotatedForBootstrap(*cluster) {
if reBootstrap {
log.Info("cluster re-bootstrap necessary",
"version", cluster.Spec.Version,
"namespace", cluster.Namespace,
"name", cluster.Name,
)
return removeUUIDAnnotation(c, cluster)
}
// already annotated, nothing to do.
return nil
}
if clusterIsBootstrapped(observedState) && !reBootstrap {
if clusterIsBootstrapped(observedState) {
// cluster bootstrapped but not annotated yet
return annotateWithUUID(cluster, observedState, c)
}
// cluster not bootstrapped yet
return nil
}

func removeUUIDAnnotation(client k8s.Client, es *esv1.Elasticsearch) error {
annotations := es.Annotations
if annotations == nil {
return nil
}
delete(es.Annotations, ClusterUUIDAnnotationName)
return client.Update(es)
}

// clusterNeedsReBootstrap is true if we are updating a single master cluster from 6.x to 7.x
// because we lose the 'cluster' when rolling the single master node.
// Invariant: no grow and shrink
func clusterNeedsReBootstrap(client k8s.Client, es *esv1.Elasticsearch) (bool, error) {
initialZen2Upgrade, err := zen2.IsInitialZen2Upgrade(client, *es)
if err != nil {
return false, err
}
currentMasters, err := sset.GetActualMastersForCluster(client, *es)
if err != nil {
return false, err
}
return len(currentMasters) == 1 && initialZen2Upgrade, nil
}

// clusterIsBootstrapped returns true if the cluster has formed and has a UUID.
func clusterIsBootstrapped(observedState observer.State) bool {
return observedState.ClusterInfo != nil &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver
package bootstrap

import (
"testing"
Expand All @@ -12,7 +12,6 @@ import (
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -28,35 +27,13 @@ func bootstrappedES() *esv1.Elasticsearch {
}
}

func bootstrappedESWithChangeBudget(maxSurge, maxUnavailable *int32) *esv1.Elasticsearch {
es := bootstrappedES()
es.Spec.UpdateStrategy = esv1.UpdateStrategy{
ChangeBudget: esv1.ChangeBudget{
MaxSurge: maxSurge,
MaxUnavailable: maxUnavailable,
},
}

return es
}

func notBootstrappedES() *esv1.Elasticsearch {
return &esv1.Elasticsearch{
ObjectMeta: metav1.ObjectMeta{Name: "cluster"},
Spec: esv1.ElasticsearchSpec{Version: "7.3.0"},
}
}

func reBootstrappingES() *esv1.Elasticsearch {
return &esv1.Elasticsearch{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster",
Annotations: map[string]string{},
},
Spec: esv1.ElasticsearchSpec{Version: "7.3.0"},
}
}

func TestAnnotatedForBootstrap(t *testing.T) {
require.True(t, AnnotatedForBootstrap(*bootstrappedES()))
require.False(t, AnnotatedForBootstrap(*notBootstrappedES()))
Expand Down Expand Up @@ -147,33 +124,6 @@ func TestReconcileClusterUUID(t *testing.T) {
observedState: observer.State{ClusterInfo: &client.Info{ClusterUUID: "uuid"}},
wantCluster: bootstrappedES(),
},
{
name: "annotated, bootstrapped, but needs re-bootstrapping due to single node upgrade ",
c: k8s.WrappedFakeClient(
bootstrappedES(),
sset.TestPod{
ClusterName: "cluster",
Version: "6.8.0",
Master: true,
}.BuildPtr()),
cluster: bootstrappedES(),
observedState: observer.State{ClusterInfo: &client.Info{ClusterUUID: "uuid"}},
wantCluster: reBootstrappingES(),
},
{
name: "not annotated, bootstrapped, but still on pre-upgrade version",
c: k8s.WrappedFakeClient(
reBootstrappingES(),
sset.TestPod{
ClusterName: "cluster",
Version: "6.8.0",
Master: true,
}.BuildPtr(),
),
cluster: reBootstrappingES(),
observedState: observer.State{ClusterInfo: &client.Info{ClusterUUID: "uuid"}},
wantCluster: reBootstrappingES(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/elasticsearch/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type Client interface {
GetNodes(ctx context.Context) (Nodes, error)
// GetNodesStats calls the _nodes/stats api to return a map(nodeName -> NodeStats)
GetNodesStats(ctx context.Context) (NodesStats, error)
// ClusterBootstrappedForZen2 returns true if the cluster is relying on zen2 orchestration.
ClusterBootstrappedForZen2(ctx context.Context) (bool, error)
// AddVotingConfigExclusions sets the transient and persistent setting of the same name in cluster settings.
//
// If timeout is the empty string, the default is used.
Expand Down
50 changes: 50 additions & 0 deletions pkg/controller/elasticsearch/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,3 +602,53 @@ func TestAPIError_Types(t *testing.T) {
})
}
}

func TestClient_ClusterBootstrappedForZen2(t *testing.T) {
tests := []struct {
name string
expectedPath, version, apiResponse string
bootstrappedForZen2, wantErr bool
}{
{
name: "6.x master node",
expectedPath: "/_nodes/_master",
version: "6.8.0",
apiResponse: fixtures.MasterNodeForVersion("6.8.0"),
bootstrappedForZen2: false,
wantErr: false,
},
{
name: "7.x master node",
expectedPath: "/_nodes/_master",
version: "7.5.0",
apiResponse: fixtures.MasterNodeForVersion("7.5.0"),
bootstrappedForZen2: true,
wantErr: false,
},
{
name: "no master node",
expectedPath: "/_nodes/_master",
version: "7.5.0",
apiResponse: `{"cluster_name": "elasticsearch-sample", "nodes": {}}`,
bootstrappedForZen2: false,
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := NewMockClient(version.MustParse(tt.version), func(req *http.Request) *http.Response {
require.Equal(t, tt.expectedPath, req.URL.Path)
return &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(strings.NewReader(tt.apiResponse)),
}
})
bootstrappedForZen2, err := client.ClusterBootstrappedForZen2(context.Background())
if (err != nil) != tt.wantErr {
t.Errorf("Client.ClusterBootstrappedForZen2() error = %v, wantErr %v", err, tt.wantErr)
}
require.Equal(t, tt.bootstrappedForZen2, bootstrappedForZen2)
})
}
}
17 changes: 14 additions & 3 deletions pkg/controller/elasticsearch/client/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package client

import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/elastic/cloud-on-k8s/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/pkg/utils/stringsutil"
"github.com/pkg/errors"
)

// Info represents the response from /
Expand Down Expand Up @@ -66,16 +68,25 @@ func (n Nodes) Names() []string {

// Node partially models an Elasticsearch node retrieved from /_nodes
type Node struct {
Name string `json:"name"`
Roles []string `json:"roles"`
JVM struct {
Name string `json:"name"`
Version string `json:"version"`
Roles []string `json:"roles"`
JVM struct {
StartTimeInMillis int64 `json:"start_time_in_millis"`
Mem struct {
HeapMaxInBytes int `json:"heap_max_in_bytes"`
} `json:"mem"`
} `json:"jvm"`
}

func (n Node) isV7OrAbove() (bool, error) {
v, err := version.Parse(n.Version)
if err != nil {
return false, errors.Wrap(err, fmt.Sprintf("unable to parse node version %s", n.Version))
}
return v.Major >= 7, nil
}

// NodesStats partially models the response from a request to /_nodes/stats
type NodesStats struct {
Nodes map[string]NodeStats `json:"nodes"`
Expand Down
15 changes: 15 additions & 0 deletions pkg/controller/elasticsearch/client/test_fixtures/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,3 +725,18 @@ const (
}
`
)

func MasterNodeForVersion(version string) string {
return `
{
"cluster_name": "elasticsearch-sample",
"nodes": {
"ICZ6uWfXQ3iQMzz-oLMlNw": {
"name": "elasticsearch-sample-es-default-0",
"version": "` + version + `",
"roles": ["ingest", "master", "data", "ml"]
}
}
}
`
}
19 changes: 19 additions & 0 deletions pkg/controller/elasticsearch/client/v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,25 @@ func (c *clientV6) DeleteVotingConfigExclusions(ctx context.Context, waitForRemo
return errors.New("Not supported in Elasticsearch 6.x")
}

func (c *clientV6) ClusterBootstrappedForZen2(ctx context.Context) (bool, error) {
// Look at the current master node of the cluster: if it's running version 7.x.x or above,
// the cluster has been bootstrapped.
// Even though c is a clientV6, it may be targeting a mixed v6/v7 having a v7 master.
var response Nodes
if err := c.get(ctx, "/_nodes/_master", &response); err != nil {
return false, err
}
if len(response.Nodes) == 0 {
// no known master node (yet), consider the cluster is not bootstrapped
return false, nil
}
for _, master := range response.Nodes {
return master.isV7OrAbove()
}
// should never happen since we ensured a single entry in the above map
return false, errors.New("no master found in ClusterBootstrappedForZen2")
}

func (c *clientV6) Request(ctx context.Context, r *http.Request) (*http.Response, error) {
newURL, err := url.Parse(stringsutil.Concat(c.Endpoint, r.URL.String()))
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions pkg/controller/elasticsearch/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
controller "sigs.k8s.io/controller-runtime/pkg/reconcile"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common"
commondriver "github.com/elastic/cloud-on-k8s/pkg/controller/common/driver"
Expand All @@ -19,6 +24,7 @@ import (
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/bootstrap"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/certificates"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/cleanup"
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
Expand All @@ -33,10 +39,6 @@ import (
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/user"
esversion "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
controller "sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var (
Expand Down Expand Up @@ -229,7 +231,7 @@ func (d *defaultDriver) Reconcile() *reconciler.Results {
}

// set an annotation with the ClusterUUID, if bootstrapped
if err := ReconcileClusterUUID(d.Client, &d.ES, observedState); err != nil {
if err := bootstrap.ReconcileClusterUUID(d.Client, &d.ES, observedState); err != nil {
return results.WithError(err)
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,22 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results.WithResult(defaultRequeue)
}

// Update Zen1 minimum master nodes through the API, corresponding to the current nodes we have.
// Maybe update Zen1 minimum master nodes through the API, corresponding to the current nodes we have.
requeue, err := zen1.UpdateMinimumMasterNodes(d.Client, d.ES, esClient, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if requeue {
results.WithResult(defaultRequeue)
}
// Remove the zen2 bootstrap annotation if bootstrap is over.
requeue, err = zen2.RemoveZen2BootstrapAnnotation(d.Client, d.ES, esClient)
if err != nil {
return results.WithError(err)
}
if requeue {
results.WithResult(defaultRequeue)
}
// Maybe clear zen2 voting config exclusions.
requeue, err = zen2.ClearVotingConfigExclusions(d.ES, d.Client, esClient, actualStatefulSets)
if err != nil {
Expand Down
Loading

0 comments on commit 4c46f28

Please sign in to comment.