Skip to content

Commit

Permalink
Merge pull request #188 from riya-singhal31/aws-disk
Browse files Browse the repository at this point in the history
e2etest: adding and cleaning aws disk
  • Loading branch information
openshift-merge-robot authored May 27, 2022
2 parents fe6e7bc + 1352c85 commit 192622d
Show file tree
Hide file tree
Showing 8 changed files with 1,316 additions and 98 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,11 @@ LVM_OPERATOR_INSTALL ?= true
LVM_OPERATOR_UNINSTALL ?= true
SUBSCRIPTION_CHANNEL ?= alpha

# Handles only AWS as of now.
DISK_INSTALL ?= false

# Build and run lvm tests.
e2e-test: ginkgo
@echo "build and run e2e tests"
cd e2e/lvm && $(GINKGO) build
cd e2e/lvm && ./lvm.test --lvm-catalog-image=$(CATALOG_IMG) --lvm-subscription-channel=$(SUBSCRIPTION_CHANNEL) --lvm-operator-install=$(LVM_OPERATOR_INSTALL) --lvm-operator-uninstall=$(LVM_OPERATOR_UNINSTALL) -ginkgo.v
cd e2e/lvm && ./lvm.test --lvm-catalog-image=$(CATALOG_IMG) --lvm-subscription-channel=$(SUBSCRIPTION_CHANNEL) --lvm-operator-install=$(LVM_OPERATOR_INSTALL) --lvm-operator-uninstall=$(LVM_OPERATOR_UNINSTALL) --disk-install=$(DISK_INSTALL) -ginkgo.v
236 changes: 236 additions & 0 deletions e2e/aws_disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package e2e

import (
"context"
"fmt"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)

type nodeDisks struct {
disks []disk
node corev1.Node
}

type disk struct {
size int
}

const (
awsPurposeTag = "odf-lvmo"
labelNodeRoleWorker = "node-role.kubernetes.io/worker"
)

// getAWSNodeInfo returns instanceID, region, zone, error
func getAWSNodeInfo(node corev1.Node) (string, string, string, error) {
var instanceID, region, zone string
// providerID looks like: aws:///us-east-2a/i-02d314dea14ed4efb
if !strings.HasPrefix(node.Spec.ProviderID, "aws://") {
return "", "", "", fmt.Errorf("not an aws based node")
}
split := strings.Split(node.Spec.ProviderID, "/")
instanceID = split[len(split)-1]
zone = split[len(split)-2]
region = zone[:len(zone)-1]
return instanceID, region, zone, nil
}

// this assumes that the device spaces /dev/sd[h-z] are available on the node
// do not provide more than 20 disksize
// do not use more than once per node
// this function is async
func createAndAttachAWSVolumes(ec2Client *ec2.EC2, nodeEnv []nodeDisks) error {
for _, nodeEntry := range nodeEnv {
err := createAndAttachAWSVolumesForNode(nodeEntry, ec2Client)
if err != nil {
return err
}
}
return nil
}

func createAndAttachAWSVolumesForNode(nodeEntry nodeDisks, ec2Client *ec2.EC2) error {
node := nodeEntry.node
volumes := make([]*ec2.Volume, len(nodeEntry.disks))
volumeLetters := []string{"g", "h"}
volumeIDs := make([]*string, 0)
instanceID, _, zone, err := getAWSNodeInfo(node)
if err != nil {
fmt.Printf("failed to create and attach aws disks for node %q\n", nodeEntry.node.Name)
return err
}

// create ec2 volumes
for i, disk := range nodeEntry.disks {
diskSize := disk.size
diskName := fmt.Sprintf("sd%s", volumeLetters[i])
createInput := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(zone),
Size: aws.Int64(int64(diskSize)),
VolumeType: aws.String("gp2"),
TagSpecifications: []*ec2.TagSpecification{
{
ResourceType: aws.String("volume"),
Tags: []*ec2.Tag{
{
Key: aws.String("Name"),
Value: aws.String(diskName),
},
{
Key: aws.String("purpose"),
Value: aws.String(awsPurposeTag),
},
{
Key: aws.String("chosen-instanceID"),
Value: aws.String(instanceID),
},
},
},
},
}
volume, err := ec2Client.CreateVolume(createInput)
if err != nil {
err := fmt.Errorf("expect to create AWS volume with input %v: %w", createInput, err)
fmt.Printf("failed to create and attach aws disks for node %q\n", nodeEntry.node.Name)
return err
}
fmt.Printf("creating volume: %q (%dGi)\n", *volume.VolumeId, *volume.Size)
volumes[i] = volume
volumeIDs = append(volumeIDs, volume.VolumeId)
}
// attach and poll for attachment to complete
err = wait.Poll(time.Second*5, time.Minute*4, func() (bool, error) {
describeVolumeInput := &ec2.DescribeVolumesInput{
VolumeIds: volumeIDs,
}
describedVolumes, err := ec2Client.DescribeVolumes(describeVolumeInput)
if err != nil {
return false, err
}
allAttached := true
for i, volume := range describedVolumes.Volumes {
if *volume.State == ec2.VolumeStateInUse {
fmt.Printf("volume attachment complete: %q (%dGi)\n", *volume.VolumeId, *volume.Size)
continue
}
allAttached = false
if *volume.State == ec2.VolumeStateAvailable {

fmt.Printf("volume attachment starting: %q (%dGi)\n", *volume.VolumeId, *volume.Size)
attachInput := &ec2.AttachVolumeInput{
VolumeId: volume.VolumeId,
InstanceId: aws.String(instanceID),
Device: aws.String(fmt.Sprintf("/dev/sd%s", volumeLetters[i])),
}
_, err = ec2Client.AttachVolume(attachInput)
if err != nil {
return false, err
}
}
}
return allAttached, nil

})
if err != nil {
fmt.Printf("failed to create and attach aws disks for node %q\n", nodeEntry.node.Name)
return err
}
return nil
}

func getEC2Client(region string) (*ec2.EC2, error) {
// get AWS credentials
awsCreds := &corev1.Secret{}
secretName := types.NamespacedName{Name: "aws-creds", Namespace: "kube-system"}
err := DeployManagerObj.GetCrClient().Get(context.TODO(), secretName, awsCreds)
if err != nil {
return nil, err
}
// detect region
// base64 decode
id, found := awsCreds.Data["aws_access_key_id"]
if !found {
return nil, fmt.Errorf("cloud credential id not found")
}
key, found := awsCreds.Data["aws_secret_access_key"]
if !found {
return nil, fmt.Errorf("cloud credential key not found")
}

sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Credentials: credentials.NewStaticCredentials(string(id), string(key), ""),
})
if err != nil {
return nil, err
}

// initialize client
return ec2.New(sess), nil
}

func cleanupAWSDisks(ec2Client *ec2.EC2) error {
volumes, err := getAWSTestVolumes(ec2Client)
if err != nil {
fmt.Printf("failed to list AWS volumes")
return err
}
for _, volume := range volumes {
fmt.Printf("detaching AWS disks with volumeId: %q (%dGi)\n", *volume.VolumeId, *volume.Size)
input := &ec2.DetachVolumeInput{VolumeId: volume.VolumeId}
_, err := ec2Client.DetachVolume(input)
if err != nil {
fmt.Printf("detaching disk failed")
return err
}
}
err = wait.Poll(time.Second*2, time.Minute*5, func() (bool, error) {
volumes, err := getAWSTestVolumes(ec2Client)
if err != nil {
return false, fmt.Errorf("failed to list AWS volumes: %+v", err)
}
allDeleted := true
for _, volume := range volumes {
if *volume.State != ec2.VolumeStateAvailable {
fmt.Printf("volume %q is in state %q, waiting for state %q\n", *volume.VolumeId, *volume.State, ec2.VolumeStateAvailable)
allDeleted = false
continue
}
fmt.Printf("deleting AWS disks with volumeId: %q (%dGi)\n", *volume.VolumeId, *volume.Size)
input := &ec2.DeleteVolumeInput{VolumeId: volume.VolumeId}
_, err := ec2Client.DeleteVolume(input)
if err != nil {
fmt.Printf("deleting disk failed: %+v\n", err)
allDeleted = false
}
}
return allDeleted, nil
})
if err != nil {
fmt.Printf("Failed AWS cleanup of disks")
return err
}
return nil
}

func getAWSTestVolumes(ec2Client *ec2.EC2) ([]*ec2.Volume, error) {
output, err := ec2Client.DescribeVolumes(&ec2.DescribeVolumesInput{
Filters: []*ec2.Filter{
{
Name: aws.String("tag:purpose"),
Values: []*string{aws.String(awsPurposeTag)},
},
},
})

return output.Volumes, err

}
4 changes: 4 additions & 0 deletions e2e/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ var LvmCatalogSourceImage string
// LvmSubscriptionChannel is the name of the lvm subscription channel
var LvmSubscriptionChannel string

// DiskInstall indicates whether disks are needed to be installed.
var DiskInstall bool

// DeployManager is the suite global DeployManager
var DeployManagerObj *deploymanager.DeployManager

Expand All @@ -33,6 +36,7 @@ func init() {
flag.StringVar(&LvmSubscriptionChannel, "lvm-subscription-channel", "", "The subscription channel to revise updates from")
flag.BoolVar(&lvmOperatorInstall, "lvm-operator-install", true, "Install the LVM operator before starting tests")
flag.BoolVar(&lvmOperatorUninstall, "lvm-operator-uninstall", true, "Uninstall the LVM cluster and operator after test completion")
flag.BoolVar(&DiskInstall, "disk-install", false, "Create and attach disks to the nodes. This currently only works with AWS")

d, err := deploymanager.NewDeployManager()
if err != nil {
Expand Down
76 changes: 76 additions & 0 deletions e2e/disk_setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package e2e

import (
"context"
"fmt"

"github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func diskSetup() error {

// get nodes
nodeList := &corev1.NodeList{}
err := DeployManagerObj.GetCrClient().List(context.TODO(), nodeList, client.HasLabels{labelNodeRoleWorker})
if err != nil {
fmt.Printf("failed to list nodes\n")
return err
}

fmt.Printf("getting AWS region info from node spec\n")
_, region, _, err := getAWSNodeInfo(nodeList.Items[0])
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "getAWSNodeInfo")

// initialize client
fmt.Printf("initialize ec2 creds\n")
ec2Client, err := getEC2Client(region)
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "getEC2Client")

// represents the disk layout to setup on the nodes.
var nodeEnv []nodeDisks
for _, node := range nodeList.Items {
nodeEnv = append(nodeEnv, nodeDisks{

disks: []disk{
{size: 10},
{size: 20},
},
node: node,
})
}

// create and attach volumes
fmt.Printf("creating and attaching disks\n")
err = createAndAttachAWSVolumes(ec2Client, nodeEnv)
gomega.Expect(err).To(gomega.BeNil())

return nil
}

func diskRemoval() error {
// get nodes
nodeList := &corev1.NodeList{}
err := DeployManagerObj.GetCrClient().List(context.TODO(), nodeList, client.HasLabels{labelNodeRoleWorker})
if err != nil {
fmt.Printf("failed to list nodes\n")
return err
}
fmt.Printf("getting AWS region info from node spec\n")
_, region, _, err := getAWSNodeInfo(nodeList.Items[0])
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "getAWSNodeInfo")

// initialize client
fmt.Printf("initialize ec2 creds\n")
ec2Client, err := getEC2Client(region)
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "getEC2Client")

// cleaning disk
fmt.Printf("cleaning up disks\n")
err = cleanupAWSDisks(ec2Client)
gomega.Expect(err).To(gomega.BeNil())

return err
}
18 changes: 16 additions & 2 deletions e2e/setup_teardown.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package e2e

import "github.com/onsi/gomega"
import (
"github.com/onsi/gomega"
)

// BeforeTestSuiteSetup is the function called to initialize the test environment.
func BeforeTestSuiteSetup() {

if DiskInstall {
debug("Creating disk for e2e tests\n")
err := diskSetup()
gomega.Expect(err).To(gomega.BeNil())
}

SuiteFailed = true
if lvmOperatorInstall {
debug("BeforeTestSuite: deploying LVM Operator\n")
Expand Down Expand Up @@ -42,6 +50,12 @@ func AfterTestSuiteCleanup() {

debug("AfterTestSuite: uninstalling LVM Operator\n")
err = DeployManagerObj.UninstallLVM(LvmCatalogSourceImage, LvmSubscriptionChannel)
gomega.Expect(err).To(gomega.BeNil(), "error uninstalling LVM: %v", err)
gomega.Expect(err).To(gomega.BeNil(), "error uninstalling the LVM Operator: %v", err)
}

if DiskInstall {
debug("Cleaning up disk\n")
err = diskRemoval()
gomega.Expect(err).To(gomega.BeNil())
}
}
Loading

0 comments on commit 192622d

Please sign in to comment.