Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

Discrete etcd cluster #544

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions multi-node/aws/cmd/kube-aws/command_render.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func runCmdRender(cmd *cobra.Command, args []string) error {
{"credentials/.gitignore", []byte("*"), 0644},
{"userdata/cloud-config-controller", config.CloudConfigController, 0644},
{"userdata/cloud-config-worker", config.CloudConfigWorker, 0644},
{"userdata/cloud-config-etcd", config.CloudConfigEtcd, 0644},
{"stack-template.json", config.StackTemplateTemplate, 0644},
{"kubeconfig", kubeconfig.Bytes(), 0600},
}
Expand Down
1 change: 1 addition & 0 deletions multi-node/aws/cmd/kube-aws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var stackTemplateOptions = config.StackTemplateOptions{
TLSAssetsDir: "credentials",
ControllerTmplFile: "userdata/cloud-config-controller",
WorkerTmplFile: "userdata/cloud-config-worker",
EtcdTmplFile: "userdata/cloud-config-etcd",
StackTemplateTmplFile: "stack-template.json",
}

Expand Down
76 changes: 75 additions & 1 deletion multi-node/aws/pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package cluster

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"
"text/tabwriter"
"time"
Expand Down Expand Up @@ -232,12 +234,84 @@ func (c *Cluster) createStack(cfSvc cloudformationService, stackBody string) (*c
return cfSvc.CreateStack(creq)
}

/*
Makes sure that etcd resource definitions are not upgrades by cloudformation stack update.
Fetches resource defintions from existing stack and splices them into the updated resource defintions.

TODO(chom): etcd controller + dynamic cluster management will obviate need for this function
*/
type cfStackResources struct {
Resources map[string]map[string]interface{} `json:"Resources"`
Mappings map[string]map[string]interface{} `json:"Mappings"`
}

func (c *Cluster) lockEtcdResources(cfSvc *cloudformation.CloudFormation, stackBody string) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a bit more documentation explaining why this is necessary / how the functionality will be replaced (for those of us unfamiliar with cloudformation).

Copy link
Contributor Author

@colhom colhom Sep 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem: We must ensure that a cloudformation update never affects the etcd cluster instances.

Reason: When updating ec2 instances, cloudformation may choose a destroy and replace strategy, which has the potential to corrupt the etcd cluster.

Solution: lockEtcdResources is a 50-line function which only runs on cluster update- it queries the current state of the etcd cluster and ensures the definition does not change across CloudFormation updates.

Concerns over fragility: This is a simple, mechanical operation- but even those can go wrong.

Assuming:

  • lockEtcdResources() errors out: the update will fail
  • lockEtcdResources() munges the etcd cluster definition: In that case, Cloudformation will attempt to update the etcd cluster instances and fail because they are marked as "non-updatable" when first created. The CloudFormation update will then roll-back, and the user will be left with a non-updatable cluster

In either case, the failure mode is "cluster non-updatable"- which is exactly where every user is today anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ Something to that effect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm


//Unmarshal incoming stack resource defintions
var newStack cfStackResources
if err := json.Unmarshal([]byte(stackBody), &newStack); err != nil {
return "", fmt.Errorf("error unmarshaling new stack json: %v", err)
}

instanceEtcdExpr := regexp.MustCompile("^InstanceEtcd[0-9]+$")
//Remove all etcdInstance resource defintions from incoming stack
for name, _ := range newStack.Resources {
if instanceEtcdExpr.Match([]byte(name)) {
fmt.Printf("[lockEtcdResources: REMOVE %s\n", name)
delete(newStack.Resources, name)
}
}

//Fetch and unmarshal existing stack resource defintions
res, err := cfSvc.GetTemplate(&cloudformation.GetTemplateInput{
StackName: aws.String(c.ClusterName),
})
if err != nil {
return "", fmt.Errorf("error getting stack template: %v", err)
}
var existingStack cfStackResources
if err := json.Unmarshal([]byte(*res.TemplateBody), &existingStack); err != nil {
return "", fmt.Errorf("error unmarshaling existing stack json: %v", err)
}

//splice in existing resource defintions for etcd into new stack
for name, definition := range existingStack.Resources {
if instanceEtcdExpr.Match([]byte(name)) {
fmt.Printf("[lockEtcdResources: ADD %s\n", name)
newStack.Resources[name] = definition
}
}
newStack.Mappings["EtcdInstanceParams"] = existingStack.Mappings["EtcdInstanceParams"]

//create generic json document of outgoing stack
var outgoingStack map[string]interface{}
if err := json.Unmarshal([]byte(stackBody), &outgoingStack); err != nil {
return "", fmt.Errorf("error unmarshaling outgoing stack json: %v", err)
}

//splice in special copies of resources,mappings with etcd-related fields masked
outgoingStack["Resources"] = newStack.Resources
outgoingStack["Mappings"] = newStack.Mappings
// ship off new stack to cloudformation api for an update
out, err := json.MarshalIndent(&outgoingStack, "", "")
if err != nil {
return "", fmt.Errorf("error marshaling stack json: %v", err)
}

return string(out), nil
}

func (c *Cluster) Update(stackBody string) (string, error) {

cfSvc := cloudformation.New(c.session)
var err error
if stackBody, err = c.lockEtcdResources(cfSvc, stackBody); err != nil {
return "", err
}
input := &cloudformation.UpdateStackInput{
Capabilities: []*string{aws.String(cloudformation.CapabilityCapabilityIam)},
StackName: aws.String(c.ClusterName),
TemplateBody: &stackBody,
TemplateBody: aws.String(stackBody),
}

updateOutput, err := cfSvc.UpdateStack(input)
Expand Down
113 changes: 106 additions & 7 deletions multi-node/aws/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ func newDefaultCluster() *Cluster {
WorkerRootVolumeType: "gp2",
WorkerRootVolumeIOPS: 0,
WorkerRootVolumeSize: 30,
EtcdCount: 1,
EtcdInstanceType: "m3.medium",
EtcdRootVolumeSize: 30,
EtcdDataVolumeSize: 30,
CreateRecordSet: false,
RecordSetTTL: 300,
Subnets: []Subnet{},
Subnets: []*Subnet{},
}
}

Expand Down Expand Up @@ -94,7 +98,7 @@ func ClusterFromBytes(data []byte) (*Cluster, error) {

// For backward-compatibility
if len(c.Subnets) == 0 {
c.Subnets = []Subnet{
c.Subnets = []*Subnet{
{
AvailabilityZone: c.AvailabilityZone,
InstanceCIDR: c.InstanceCIDR,
Expand Down Expand Up @@ -122,6 +126,11 @@ type Cluster struct {
WorkerRootVolumeIOPS int `yaml:"workerRootVolumeIOPS,omitempty"`
WorkerRootVolumeSize int `yaml:"workerRootVolumeSize,omitempty"`
WorkerSpotPrice string `yaml:"workerSpotPrice,omitempty"`
EtcdCount int `yaml:"etcdCount"`
EtcdInstanceType string `yaml:"etcdInstanceType,omitempty"`
EtcdRootVolumeSize int `yaml:"etcdRootVolumeSize,omitempty"`
EtcdDataVolumeSize int `yaml:"etcdDataVolumeSize,omitempty"`
EtcdDataVolumeEphemeral bool `yaml:"etcdDataVolumEphemeral,omitempty"`
VPCID string `yaml:"vpcId,omitempty"`
RouteTableID string `yaml:"routeTableId,omitempty"`
VPCCIDR string `yaml:"vpcCIDR,omitempty"`
Expand All @@ -140,12 +149,13 @@ type Cluster struct {
HostedZoneID string `yaml:"hostedZoneId,omitempty"`
StackTags map[string]string `yaml:"stackTags,omitempty"`
UseCalico bool `yaml:"useCalico,omitempty"`
Subnets []Subnet `yaml:"subnets,omitempty"`
Subnets []*Subnet `yaml:"subnets,omitempty"`
}

type Subnet struct {
AvailabilityZone string `yaml:"availabilityZone,omitempty"`
InstanceCIDR string `yaml:"instanceCIDR,omitempty"`
AvailabilityZone string `yaml:"availabilityZone,omitempty"`
InstanceCIDR string `yaml:"instanceCIDR,omitempty"`
lastAllocatedAddr *net.IP
}

const (
Expand All @@ -160,7 +170,7 @@ var supportedReleaseChannels = map[string]bool{

func (c Cluster) Config() (*Config, error) {
config := Config{Cluster: c}
config.ETCDEndpoints = fmt.Sprintf("http://%s:2379", c.ControllerIP)

config.APIServers = fmt.Sprintf("http://%s:8080", c.ControllerIP)
config.SecureAPIServers = fmt.Sprintf("https://%s:443", c.ControllerIP)
config.APIServerEndpoint = fmt.Sprintf("https://%s", c.ExternalDNSName)
Expand Down Expand Up @@ -199,6 +209,77 @@ func (c Cluster) Config() (*Config, error) {
config.VPCRef = fmt.Sprintf("%q", config.VPCID)
}

config.EtcdInstances = make([]etcdInstance, config.EtcdCount)
var etcdEndpoints, etcdInitialCluster bytes.Buffer
for etcdIndex := 0; etcdIndex < config.EtcdCount; etcdIndex++ {

//Round-robbin etcd instances across all available subnets
subnetIndex := etcdIndex % len(config.Subnets)
subnet := config.Subnets[subnetIndex]

_, subnetCIDR, err := net.ParseCIDR(subnet.InstanceCIDR)
if err != nil {
return nil, fmt.Errorf("error parsing subnet instance cidr %s: %v", subnet.InstanceCIDR, err)
}

if subnet.lastAllocatedAddr == nil {
ip := subnetCIDR.IP
//TODO:(chom) this is sloppy, but "soon-ish" etcd with be self-hosted so we'll leave this be
for i := 0; i < 3; i++ {
ip = incrementIP(ip)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming a 10.0.0.0/24, we put the first etcd instance at 10.0.0.3. That's saying start 3 addresses from the lowest address in the CIDR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really- needed to pick a small offset, so 3 seemed like a good one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for the small offset? What are you trying to accomplish here and why? I'm trying to understand why we need this code - right now it seems completely arbitrary.

}
subnet.lastAllocatedAddr = &ip
}

nextAddr := incrementIP(*subnet.lastAllocatedAddr)
subnet.lastAllocatedAddr = &nextAddr
instance := etcdInstance{
IPAddress: *subnet.lastAllocatedAddr,
SubnetIndex: subnetIndex,
}

//TODO(chom): validate we're not overflowing the address space
//This is complicated, must also factor in DHCP addresses
//for ASG components

//Punt on this- we're going to have an answer for dynamic etcd clusters at some point. Then we can either throw
//the instances in an ASG and use DHCP like all other instances, or simply self-host on cluster

config.EtcdInstances[etcdIndex] = instance

//TODO: ipv6 support
if len(instance.IPAddress) != 4 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if instance.IPAddress.To4() == nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

return nil, fmt.Errorf("Non ipv4 address for etcd node: %v", instance.IPAddress)
}

//http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-instance-addressing.html#concepts-private-addresses

var dnsSuffix string
if config.Region == "us-east-1" {
// a special DNS suffix for the original AWS region!
dnsSuffix = "ec2.internal"
} else {
dnsSuffix = fmt.Sprintf("%s.compute.internal", config.Region)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encountered this yesterday and have patched exactly the same as you did 😄 Great.


hostname := fmt.Sprintf("ip-%d-%d-%d-%d.%s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the hostname? If we know the IP can't that just be used directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If given the option, it's generally considered preferable to list a DNS name (either CN or SUBJECT_ALT) rather than using IP SANS.

Copy link
Contributor

@aaronlevy aaronlevy Sep 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to use IP sans all over the place -- why is this particular use-case a concern? Seems to add more complexity - want to make sure it's useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we move towards using cross-zone load balancers for the various interfaces (etcd, k8s api mainly), DNS names will need to supplant IP addresses. The only places where I see IP SANS sticking around is maybe the kubelet's certificate (validated by api server when exec'ing), but won't even that go away with bootkube-flavor self-hosted?

instance.IPAddress[0],
instance.IPAddress[1],
instance.IPAddress[2],
instance.IPAddress[3],
dnsSuffix,
)

fmt.Fprintf(&etcdEndpoints, "https://%s:2379", hostname)
fmt.Fprintf(&etcdInitialCluster, "%s=https://%s:2380", hostname, hostname)
if etcdIndex < config.EtcdCount-1 {
fmt.Fprintf(&etcdEndpoints, ",")
fmt.Fprintf(&etcdInitialCluster, ",")
}
}
config.EtcdEndpoints = etcdEndpoints.String()
config.EtcdInitialCluster = etcdInitialCluster.String()

return &config, nil
}

Expand Down Expand Up @@ -232,13 +313,15 @@ type StackTemplateOptions struct {
TLSAssetsDir string
ControllerTmplFile string
WorkerTmplFile string
EtcdTmplFile string
StackTemplateTmplFile string
}

type stackConfig struct {
*Config
UserDataWorker string
UserDataController string
UserDataEtcd string
ControllerSubnetIndex int
}

Expand Down Expand Up @@ -289,6 +372,7 @@ func (c Cluster) stackConfig(opts StackTemplateOptions, compressUserData bool) (
if controllerIPAddr == nil {
return nil, fmt.Errorf("invalid controllerIP: %s", stackConfig.ControllerIP)
}

controllerSubnetFound := false
for i, subnet := range stackConfig.Subnets {
_, instanceCIDR, err := net.ParseCIDR(subnet.InstanceCIDR)
Expand All @@ -310,6 +394,9 @@ func (c Cluster) stackConfig(opts StackTemplateOptions, compressUserData bool) (
if stackConfig.UserDataController, err = execute(opts.ControllerTmplFile, stackConfig.Config, compressUserData); err != nil {
return nil, fmt.Errorf("failed to render controller cloud config: %v", err)
}
if stackConfig.UserDataEtcd, err = execute(opts.EtcdTmplFile, stackConfig.Config, compressUserData); err != nil {
return nil, fmt.Errorf("failed to render etcd cloud config: %v", err)
}

return &stackConfig, nil
}
Expand All @@ -334,6 +421,10 @@ func (c Cluster) ValidateUserData(opts StackTemplateOptions) error {
Content: stackConfig.UserDataController,
Name: "UserDataController",
},
{
Content: stackConfig.UserDataEtcd,
Name: "UserDataEtcd",
},
} {
report, err := validate.Validate([]byte(userData.Content))

Expand Down Expand Up @@ -413,10 +504,18 @@ func getContextString(buf []byte, offset, lineCount int) string {
return string(buf[leftLimit:rightLimit])
}

type etcdInstance struct {
IPAddress net.IP
SubnetIndex int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for the subnets to change externally to the etcd configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cluster upgrades can never support updating the Subnet or VPC parameters, unfortunately. I've had users ask about it- there are complex manual migration steps which cannot currently be automated :(

Now that you've asked, I'll make sure to make that explicit rather than just letting cloudformation roll back the update. #608 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this indexed? The config file? There needs to be an explicit "order cannot ever change in this config file" if that's the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use explicit IDs instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that would be a more robust way of going about it- I'll change that. The re-ordering of subnets thing would be kind of a stupid constraint.

}

type Config struct {
Cluster

ETCDEndpoints string
EtcdEndpoints string
EtcdInitialCluster string
EtcdInstances []etcdInstance

APIServers string
SecureAPIServers string
APIServerEndpoint string
Expand Down
12 changes: 6 additions & 6 deletions multi-node/aws/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestMultipleSubnets(t *testing.T) {

validConfigs := []struct {
conf string
subnets []Subnet
subnets []*Subnet
}{
{
conf: `
Expand All @@ -320,7 +320,7 @@ subnets:
- availabilityZone: ap-northeast-1c
instanceCIDR: 10.4.4.0/24
`,
subnets: []Subnet{
subnets: []*Subnet{
{
InstanceCIDR: "10.4.3.0/24",
AvailabilityZone: "ap-northeast-1a",
Expand All @@ -339,7 +339,7 @@ controllerIP: 10.4.3.50
availabilityZone: ap-northeast-1a
instanceCIDR: 10.4.3.0/24
`,
subnets: []Subnet{
subnets: []*Subnet{
{
AvailabilityZone: "ap-northeast-1a",
InstanceCIDR: "10.4.3.0/24",
Expand All @@ -355,7 +355,7 @@ availabilityZone: ap-northeast-1a
instanceCIDR: 10.4.3.0/24
subnets: []
`,
subnets: []Subnet{
subnets: []*Subnet{
{
AvailabilityZone: "ap-northeast-1a",
InstanceCIDR: "10.4.3.0/24",
Expand All @@ -368,7 +368,7 @@ subnets: []
availabilityZone: "ap-northeast-1a"
subnets: []
`,
subnets: []Subnet{
subnets: []*Subnet{
{
AvailabilityZone: "ap-northeast-1a",
InstanceCIDR: "10.0.0.0/24",
Expand All @@ -380,7 +380,7 @@ subnets: []
# Missing subnets field fall-backs to the single subnet with the default az/cidr.
availabilityZone: "ap-northeast-1a"
`,
subnets: []Subnet{
subnets: []*Subnet{
{
AvailabilityZone: "ap-northeast-1a",
InstanceCIDR: "10.0.0.0/24",
Expand Down
Loading