Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clone from k8s services instead of individual hosts #422

Merged
merged 13 commits into from
Jan 9, 2020
2 changes: 1 addition & 1 deletion cmd/mysql-operator-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
Run: func(cmd *cobra.Command, args []string) {
if err := sidecar.RunCloneCommand(cfg); err != nil {
log.Error(err, "clone command failed")
os.Exit(1)
os.Exit(8)
}
if err := sidecar.RunConfigCommand(cfg); err != nil {
log.Error(err, "init command failed")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2018 Pressinfra SRL

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mysqlcluster

import (
"github.com/presslabs/controller-util/syncer"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster"
)

// NewHealthyReplicasSVCSyncer returns a service syncer for healthy replicas service
func NewHealthyReplicasSVCSyncer(c client.Client, scheme *runtime.Scheme, cluster *mysqlcluster.MysqlCluster) syncer.Interface {
obj := &core.Service{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.GetNameForResource(mysqlcluster.HealthyReplicasService),
Namespace: cluster.Namespace,
},
}

return syncer.NewObjectSyncer("HealthyReplicasSVC", cluster.Unwrap(), obj, c, scheme, func(in runtime.Object) error {
out := in.(*core.Service)

// set service labels
out.Labels = cluster.GetLabels()
out.Labels["mysql.presslabs.org/service-type"] = "ready-replicas"

// set selectors for healthy replica (non-master) mysql pods only
out.Spec.Selector = cluster.GetSelectorLabels()
out.Spec.Selector["role"] = "replica"
out.Spec.Selector["healthy"] = "yes"

if len(out.Spec.Ports) != 2 {
out.Spec.Ports = make([]core.ServicePort, 2)
}

out.Spec.Ports[0].Name = MysqlPortName
out.Spec.Ports[0].Port = MysqlPort
out.Spec.Ports[0].TargetPort = TargetPort
out.Spec.Ports[0].Protocol = core.ProtocolTCP

out.Spec.Ports[1].Name = SidecarServerPortName
out.Spec.Ports[1].Port = SidecarServerPort
out.Spec.Ports[1].Protocol = core.ProtocolTCP

return nil
})

}
8 changes: 6 additions & 2 deletions pkg/controller/mysqlcluster/internal/syncer/master_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,18 @@ func NewMasterSVCSyncer(c client.Client, scheme *runtime.Scheme, cluster *mysqlc
out.Spec.Selector = cluster.GetSelectorLabels()
out.Spec.Selector["role"] = "master"

if len(out.Spec.Ports) != 1 {
out.Spec.Ports = make([]core.ServicePort, 1)
if len(out.Spec.Ports) != 2 {
out.Spec.Ports = make([]core.ServicePort, 2)
}
out.Spec.Ports[0].Name = MysqlPortName
out.Spec.Ports[0].Port = MysqlPort
out.Spec.Ports[0].TargetPort = TargetPort
out.Spec.Ports[0].Protocol = core.ProtocolTCP

out.Spec.Ports[1].Name = SidecarServerPortName
out.Spec.Ports[1].Port = SidecarServerPort
out.Spec.Ports[1].Protocol = core.ProtocolTCP

return nil
})

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/mysqlcluster/mysqlcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (r *ReconcileMysqlCluster) Reconcile(request reconcile.Request) (reconcile.
clustersyncer.NewHeadlessSVCSyncer(r.Client, r.scheme, cluster),
clustersyncer.NewMasterSVCSyncer(r.Client, r.scheme, cluster),
clustersyncer.NewHealthySVCSyncer(r.Client, r.scheme, cluster),
clustersyncer.NewHealthyReplicasSVCSyncer(r.Client, r.scheme, cluster),

clustersyncer.NewStatefulSetSyncer(r.Client, r.scheme, cluster, cmRev, sctRev, r.opt),
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/mysqlcluster/mysqlcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ const (
ConfigMap ResourceName = "config-files"
// MasterService is the name of the service that points to master node
MasterService ResourceName = "master-service"
// HealthyNodesService is the name of a service that continas all healthy nodes
// HealthyReplicasService is the name of a service that points healthy replicas (excludes master)
HealthyReplicasService ResourceName = "healthy-replicas-service"
// HealthyNodesService is the name of a service that contains all healthy nodes
HealthyNodesService ResourceName = "healthy-nodes-service"
// PodDisruptionBudget is the name of pod disruption budget for the stateful set
PodDisruptionBudget ResourceName = "pdb"
Expand All @@ -131,6 +133,8 @@ func GetNameForResource(name ResourceName, clusterName string) string {
return fmt.Sprintf("%s-mysql", clusterName)
case MasterService:
return fmt.Sprintf("%s-mysql-master", clusterName)
case HealthyReplicasService:
return fmt.Sprintf("%s-mysql-replicas", clusterName)
case HeadlessSVC:
return HeadlessSVCName
case OldHeadlessSVC:
Expand Down
124 changes: 105 additions & 19 deletions pkg/sidecar/appclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,44 @@ package sidecar

import (
"fmt"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path"
"strings"
)

// RunCloneCommand clone the data from source.
// RunCloneCommand clones the data from several potential sources.
//
// There are a few possible scenarios that this function tries to handle:
//
// Scenario | Action Taken
// ------------------------------------------------------------------------------------
// Data already exists | Log an informational message and return without error.
// | This permits the pod to continue initializing and mysql
// | will use the data already on the PVC.
// ------------------------------------------------------------------------------------
// Healthy replicas exist | We will attempt to clone from the healthy replicas.
// | If the cloning starts but is interrupted, we will return
// | with an error, not trying to clone from the master. The
// | assumption is that some intermittent error caused the
// | failure and we should let K8S restart the init container
// | to try to clone from the replicas again.
// ------------------------------------------------------------------------------------
// No healthy replicas; only | We attempt to clone from the master, assuming that this
// master exists | is the initialization of the second pod in a multi-pod
// | cluster. If cloning starts and is interrupted, we will
// | return with an error, letting K8S try again.
// ------------------------------------------------------------------------------------
// No healthy replicas; no | If there is a bucket URL to clone from, we will try that.
// master; bucket URL exists | The assumption is that this is the bootstrap case: the
// | very first mysql pod is being initialized.
// ------------------------------------------------------------------------------------
// No healthy replicas; no | If this is the first pod in the cluster, then allow it
// master; no bucket URL | to initialize as an empty instance, otherwise, return an
// | error to allow k8s to kill and restart the pod.
// ------------------------------------------------------------------------------------
func RunCloneCommand(cfg *Config) error {
log.Info("cloning command", "host", cfg.Hostname)

Expand All @@ -36,29 +68,60 @@ func RunCloneCommand(cfg *Config) error {
return fmt.Errorf("removing lost+found: %s", err)
}

if cfg.ServerID() > cfg.MyServerIDOffset {
// cloning from prior node
sourceHost := cfg.FQDNForServer(cfg.ServerID() - 1)
err := cloneFromSource(cfg, sourceHost)
if err != nil {
return fmt.Errorf("failed to clone from %s, err: %s", sourceHost, err)
if isServiceAvailable(cfg.ReplicasFQDN()) {
if err := attemptClone(cfg, cfg.ReplicasFQDN()); err != nil {
return fmt.Errorf("cloning from healthy replicas failed due to unexpected error: %s", err)
}
} else if isServiceAvailable(cfg.MasterFQDN()) {
log.Info("healthy replica service was unavailable for cloning, will attempt to clone from the master")
if err := attemptClone(cfg, cfg.MasterFQDN()); err != nil {
return fmt.Errorf("cloning from master service failed due to unexpected error: %s", err)
}
} else if cfg.ShouldCloneFromBucket() {
// cloning from provided initBucketURL
err := cloneFromBucket(cfg.InitBucketURL)
if err != nil {
log.Info("cloning from bucket")
if err := cloneFromBucket(cfg.InitBucketURL); err != nil {
return fmt.Errorf("failed to clone from bucket, err: %s", err)
}
} else {
log.Info("nothing to clone or init from")
} else if cfg.IsFirstPodInSet() {
log.Info("nothing to clone from: empty cluster initializing")
return nil
} else {
return fmt.Errorf("nothing to clone from: no existing data found, no replicas and no master available, and no clone bucket url found")
}

// prepare backup
if err := xtrabackupPreperData(); err != nil {
return err
return xtrabackupPrepareData()
}

func isServiceAvailable(svc string) bool {
dougfales marked this conversation as resolved.
Show resolved Hide resolved
req, err := http.NewRequest("GET", prepareURL(svc, serverProbeEndpoint), nil)
if err != nil {
log.Info("failed to check available service", "service", svc, "error", err)
return false
}

client := &http.Client{}
client.Transport = transportWithTimeout(serverConnectTimeout)
resp, err := client.Do(req)
if err != nil {
log.Info("service was not available", "service", svc, "error", err)
return false
}

if resp.StatusCode != 200 {
log.Info("service not available", "service", svc, "HTTP status code", resp.StatusCode)
return false
}

return true
}

func attemptClone(cfg *Config, sourceService string) error {
err := cloneFromSource(cfg, sourceService)
if err != nil {
return fmt.Errorf("failed to clone from %s, err: %s", sourceService, err)
}
return nil
}

Expand All @@ -84,7 +147,7 @@ func cloneFromBucket(initBucket string) error {
// extracts files from stdin (-x) and writes them to mysql
// data target dir
// nolint: gosec
xbstream := exec.Command("xbstream", "-x", "-C", dataDir)
xbstream := exec.Command(xbStreamCommand, "-x", "-C", dataDir)

var err error
// rclone | gzip | xbstream
Expand Down Expand Up @@ -140,11 +203,19 @@ func cloneFromSource(cfg *Config, host string) error {
// extracts files from stdin (-x) and writes them to mysql
// data target dir
// nolint: gosec
xbstream := exec.Command("xbstream", "-x", "-C", dataDir)
xbstream := exec.Command(xbStreamCommand, "-x", "-C", dataDir)

xbstream.Stdin = response.Body
xbstream.Stderr = os.Stderr

cloneSucceeded := false
defer func() {
if !cloneSucceeded {
log.Info("clone operation failed, cleaning up dataDir so retries may proceed")
cleanDataDir()
}
}()

if err := xbstream.Start(); err != nil {
return fmt.Errorf("xbstream start error: %s", err)
}
Expand All @@ -157,12 +228,13 @@ func cloneFromSource(cfg *Config, host string) error {
return err
}

cloneSucceeded = true
return nil
}

func xtrabackupPreperData() error {
func xtrabackupPrepareData() error {
// nolint: gosec
xtbkCmd := exec.Command("xtrabackup", "--prepare",
xtbkCmd := exec.Command(xtrabackupCommand, "--prepare",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By replacing these with a variable, I can disable them as needed in unit tests. For example, when we pass a valid xbstream which is not necessarily also a valid xtrabackup directory.

fmt.Sprintf("--target-dir=%s", dataDir))

xtbkCmd.Stderr = os.Stderr
Expand All @@ -171,6 +243,20 @@ func xtrabackupPreperData() error {
}

func deleteLostFound() error {
path := fmt.Sprintf("%s/lost+found", dataDir)
return os.RemoveAll(path)
lfPath := fmt.Sprintf("%s/lost+found", dataDir)
return os.RemoveAll(lfPath)
}

func cleanDataDir() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done to avoid leaving the dataDir in an unknown or partially restored state. The case we ran into was a large backup (hundreds of GBs) being interrupted midstream due to a network blip. On subsequent pod re-starts, mysql would continually fail to start as the PVC had only a partial backup.

files, err := ioutil.ReadDir(dataDir)
if err != nil {
log.Error(err, "failed to clean dataDir")
}

for _, f := range files {
toRemove := path.Join(dataDir, f.Name())
if err := os.RemoveAll(toRemove); err != nil {
log.Error(err, "failed to remove file in dataDir")
}
}
}
Loading