Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2etest: adding and cleaning aws disk #188

Merged
merged 1 commit into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,11 @@ LVM_OPERATOR_INSTALL ?= true
LVM_OPERATOR_UNINSTALL ?= true
SUBSCRIPTION_CHANNEL ?= alpha

# Handles only AWS as of now.
DISK_INSTALL ?= false

# Build and run lvm tests.
e2e-test: ginkgo
@echo "build and run e2e tests"
cd e2e/lvm && $(GINKGO) build
cd e2e/lvm && ./lvm.test --lvm-catalog-image=$(CATALOG_IMG) --lvm-subscription-channel=$(SUBSCRIPTION_CHANNEL) --lvm-operator-install=$(LVM_OPERATOR_INSTALL) --lvm-operator-uninstall=$(LVM_OPERATOR_UNINSTALL) -ginkgo.v
cd e2e/lvm && ./lvm.test --lvm-catalog-image=$(CATALOG_IMG) --lvm-subscription-channel=$(SUBSCRIPTION_CHANNEL) --lvm-operator-install=$(LVM_OPERATOR_INSTALL) --lvm-operator-uninstall=$(LVM_OPERATOR_UNINSTALL) --disk-install=$(DISK_INSTALL) -ginkgo.v
236 changes: 236 additions & 0 deletions e2e/aws_disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package e2e

import (
"context"
"fmt"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)

type nodeDisks struct {
disks []disk
node corev1.Node
}

type disk struct {
size int
}

const (
awsPurposeTag = "odf-lvmo"
labelNodeRoleWorker = "node-role.kubernetes.io/worker"
)

// getAWSNodeInfo returns instanceID, region, zone, error
func getAWSNodeInfo(node corev1.Node) (string, string, string, error) {
var instanceID, region, zone string
// providerID looks like: aws:///us-east-2a/i-02d314dea14ed4efb
if !strings.HasPrefix(node.Spec.ProviderID, "aws://") {
return "", "", "", fmt.Errorf("not an aws based node")
}
split := strings.Split(node.Spec.ProviderID, "/")
instanceID = split[len(split)-1]
zone = split[len(split)-2]
region = zone[:len(zone)-1]
return instanceID, region, zone, nil
}

// this assumes that the device spaces /dev/sd[h-z] are available on the node
// do not provide more than 20 disksize
// do not use more than once per node
// this function is async
func createAndAttachAWSVolumes(ec2Client *ec2.EC2, nodeEnv []nodeDisks) error {
for _, nodeEntry := range nodeEnv {
err := createAndAttachAWSVolumesForNode(nodeEntry, ec2Client)
if err != nil {
return err
}
}
return nil
}

func createAndAttachAWSVolumesForNode(nodeEntry nodeDisks, ec2Client *ec2.EC2) error {
node := nodeEntry.node
volumes := make([]*ec2.Volume, len(nodeEntry.disks))
volumeLetters := []string{"g", "h"}
volumeIDs := make([]*string, 0)
instanceID, _, zone, err := getAWSNodeInfo(node)
if err != nil {
fmt.Printf("failed to create and attach aws disks for node %q\n", nodeEntry.node.Name)
return err
}

// create ec2 volumes
for i, disk := range nodeEntry.disks {
diskSize := disk.size
diskName := fmt.Sprintf("sd%s", volumeLetters[i])
createInput := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(zone),
Size: aws.Int64(int64(diskSize)),
VolumeType: aws.String("gp2"),
TagSpecifications: []*ec2.TagSpecification{
{
ResourceType: aws.String("volume"),
Tags: []*ec2.Tag{
{
Key: aws.String("Name"),
Value: aws.String(diskName),
},
{
Key: aws.String("purpose"),
Value: aws.String(awsPurposeTag),
},
{
Key: aws.String("chosen-instanceID"),
Value: aws.String(instanceID),
},
},
},
},
}
volume, err := ec2Client.CreateVolume(createInput)
if err != nil {
err := fmt.Errorf("expect to create AWS volume with input %v: %w", createInput, err)
fmt.Printf("failed to create and attach aws disks for node %q\n", nodeEntry.node.Name)
return err
}
fmt.Printf("creating volume: %q (%dGi)\n", *volume.VolumeId, *volume.Size)
volumes[i] = volume
volumeIDs = append(volumeIDs, volume.VolumeId)
}
// attach and poll for attachment to complete
err = wait.Poll(time.Second*5, time.Minute*4, func() (bool, error) {
describeVolumeInput := &ec2.DescribeVolumesInput{
VolumeIds: volumeIDs,
}
describedVolumes, err := ec2Client.DescribeVolumes(describeVolumeInput)
if err != nil {
return false, err
}
allAttached := true
for i, volume := range describedVolumes.Volumes {
if *volume.State == ec2.VolumeStateInUse {
fmt.Printf("volume attachment complete: %q (%dGi)\n", *volume.VolumeId, *volume.Size)
continue
}
allAttached = false
if *volume.State == ec2.VolumeStateAvailable {

fmt.Printf("volume attachment starting: %q (%dGi)\n", *volume.VolumeId, *volume.Size)
attachInput := &ec2.AttachVolumeInput{
VolumeId: volume.VolumeId,
InstanceId: aws.String(instanceID),
Device: aws.String(fmt.Sprintf("/dev/sd%s", volumeLetters[i])),
}
_, err = ec2Client.AttachVolume(attachInput)
if err != nil {
return false, err
}
}
}
return allAttached, nil

})
if err != nil {
fmt.Printf("failed to create and attach aws disks for node %q\n", nodeEntry.node.Name)
return err
}
return nil
}

func getEC2Client(region string) (*ec2.EC2, error) {
// get AWS credentials
awsCreds := &corev1.Secret{}
secretName := types.NamespacedName{Name: "aws-creds", Namespace: "kube-system"}
err := DeployManagerObj.GetCrClient().Get(context.TODO(), secretName, awsCreds)
if err != nil {
return nil, err
}
// detect region
// base64 decode
id, found := awsCreds.Data["aws_access_key_id"]
if !found {
return nil, fmt.Errorf("cloud credential id not found")
}
key, found := awsCreds.Data["aws_secret_access_key"]
if !found {
return nil, fmt.Errorf("cloud credential key not found")
}

sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Credentials: credentials.NewStaticCredentials(string(id), string(key), ""),
})
if err != nil {
return nil, err
}

// initialize client
return ec2.New(sess), nil
}

func cleanupAWSDisks(ec2Client *ec2.EC2) error {
volumes, err := getAWSTestVolumes(ec2Client)
if err != nil {
fmt.Printf("failed to list AWS volumes")
return err
}
for _, volume := range volumes {
fmt.Printf("detaching AWS disks with volumeId: %q (%dGi)\n", *volume.VolumeId, *volume.Size)
input := &ec2.DetachVolumeInput{VolumeId: volume.VolumeId}
_, err := ec2Client.DetachVolume(input)
if err != nil {
fmt.Printf("detaching disk failed")
return err
}
}
err = wait.Poll(time.Second*2, time.Minute*5, func() (bool, error) {
volumes, err := getAWSTestVolumes(ec2Client)
if err != nil {
return false, fmt.Errorf("failed to list AWS volumes: %+v", err)
}
allDeleted := true
for _, volume := range volumes {
if *volume.State != ec2.VolumeStateAvailable {
fmt.Printf("volume %q is in state %q, waiting for state %q\n", *volume.VolumeId, *volume.State, ec2.VolumeStateAvailable)
allDeleted = false
continue
}
fmt.Printf("deleting AWS disks with volumeId: %q (%dGi)\n", *volume.VolumeId, *volume.Size)
input := &ec2.DeleteVolumeInput{VolumeId: volume.VolumeId}
_, err := ec2Client.DeleteVolume(input)
if err != nil {
fmt.Printf("deleting disk failed: %+v\n", err)
allDeleted = false
}
}
return allDeleted, nil
})
if err != nil {
fmt.Printf("Failed AWS cleanup of disks")
return err
}
return nil
}

func getAWSTestVolumes(ec2Client *ec2.EC2) ([]*ec2.Volume, error) {
output, err := ec2Client.DescribeVolumes(&ec2.DescribeVolumesInput{
Filters: []*ec2.Filter{
{
Name: aws.String("tag:purpose"),
Values: []*string{aws.String(awsPurposeTag)},
},
},
})

return output.Volumes, err

}
4 changes: 4 additions & 0 deletions e2e/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ var LvmCatalogSourceImage string
// LvmSubscriptionChannel is the name of the lvm subscription channel
var LvmSubscriptionChannel string

// DiskInstall indicates whether disks are needed to be installed.
var DiskInstall bool

// DeployManager is the suite global DeployManager
var DeployManagerObj *deploymanager.DeployManager

Expand All @@ -33,6 +36,7 @@ func init() {
flag.StringVar(&LvmSubscriptionChannel, "lvm-subscription-channel", "", "The subscription channel to revise updates from")
flag.BoolVar(&lvmOperatorInstall, "lvm-operator-install", true, "Install the LVM operator before starting tests")
flag.BoolVar(&lvmOperatorUninstall, "lvm-operator-uninstall", true, "Uninstall the LVM cluster and operator after test completion")
flag.BoolVar(&DiskInstall, "disk-install", false, "Create and attach disks to the nodes. This currently only works with AWS")

d, err := deploymanager.NewDeployManager()
if err != nil {
Expand Down
76 changes: 76 additions & 0 deletions e2e/disk_setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package e2e

import (
"context"
"fmt"

"github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func diskSetup() error {

// get nodes
nodeList := &corev1.NodeList{}
err := DeployManagerObj.GetCrClient().List(context.TODO(), nodeList, client.HasLabels{labelNodeRoleWorker})
if err != nil {
fmt.Printf("failed to list nodes\n")
return err
}

fmt.Printf("getting AWS region info from node spec\n")
_, region, _, err := getAWSNodeInfo(nodeList.Items[0])
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "getAWSNodeInfo")

// initialize client
fmt.Printf("initialize ec2 creds\n")
ec2Client, err := getEC2Client(region)
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "getEC2Client")

// represents the disk layout to setup on the nodes.
var nodeEnv []nodeDisks
nbalacha marked this conversation as resolved.
Show resolved Hide resolved
nbalacha marked this conversation as resolved.
Show resolved Hide resolved
for _, node := range nodeList.Items {
nodeEnv = append(nodeEnv, nodeDisks{

disks: []disk{
{size: 10},
{size: 20},
},
node: node,
})
}

// create and attach volumes
fmt.Printf("creating and attaching disks\n")
err = createAndAttachAWSVolumes(ec2Client, nodeEnv)
gomega.Expect(err).To(gomega.BeNil())

return nil
}

func diskRemoval() error {
// get nodes
nodeList := &corev1.NodeList{}
err := DeployManagerObj.GetCrClient().List(context.TODO(), nodeList, client.HasLabels{labelNodeRoleWorker})
if err != nil {
fmt.Printf("failed to list nodes\n")
return err
}
fmt.Printf("getting AWS region info from node spec\n")
_, region, _, err := getAWSNodeInfo(nodeList.Items[0])
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "getAWSNodeInfo")

// initialize client
fmt.Printf("initialize ec2 creds\n")
ec2Client, err := getEC2Client(region)
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "getEC2Client")

// cleaning disk
fmt.Printf("cleaning up disks\n")
err = cleanupAWSDisks(ec2Client)
gomega.Expect(err).To(gomega.BeNil())

return err
}
18 changes: 16 additions & 2 deletions e2e/setup_teardown.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package e2e

import "github.com/onsi/gomega"
import (
"github.com/onsi/gomega"
)

// BeforeTestSuiteSetup is the function called to initialize the test environment.
func BeforeTestSuiteSetup() {

if DiskInstall {
debug("Creating disk for e2e tests\n")
err := diskSetup()
gomega.Expect(err).To(gomega.BeNil())
}

SuiteFailed = true
if lvmOperatorInstall {
debug("BeforeTestSuite: deploying LVM Operator\n")
Expand Down Expand Up @@ -42,6 +50,12 @@ func AfterTestSuiteCleanup() {

debug("AfterTestSuite: uninstalling LVM Operator\n")
err = DeployManagerObj.UninstallLVM(LvmCatalogSourceImage, LvmSubscriptionChannel)
gomega.Expect(err).To(gomega.BeNil(), "error uninstalling LVM: %v", err)
gomega.Expect(err).To(gomega.BeNil(), "error uninstalling the LVM Operator: %v", err)
}

if DiskInstall {
debug("Cleaning up disk\n")
err = diskRemoval()
gomega.Expect(err).To(gomega.BeNil())
}
}
Loading