Skip to content

Commit

Permalink
test: adding and cleaning of disk for CI
Browse files Browse the repository at this point in the history
adds aws disk in the test namespace for resources to get installed.

Signed-off-by: riya-singhal31 <riyasinghalji@gmail.com>
  • Loading branch information
riya-singhal31 committed May 19, 2022
1 parent e97bdeb commit d55c34a
Show file tree
Hide file tree
Showing 5 changed files with 1,290 additions and 86 deletions.
228 changes: 228 additions & 0 deletions e2e/aws_disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package e2e

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)

type nodeDisks struct {
disks []disk
node corev1.Node
}

type disk struct {
size int
}

const (
awsPurposeTag = "lvm-e2etest"
labelNodeRoleWorker = "node-role.kubernetes.io/worker"
)

// getAWSNodeInfo returns instanceID, region, zone, error
func getAWSNodeInfo(node corev1.Node) (string, string, string, error) {
var instanceID, region, zone string
// providerID looks like: aws:///us-east-2a/i-02d314dea14ed4efb
if !strings.HasPrefix(node.Spec.ProviderID, "aws://") {
return "", "", "", fmt.Errorf("not an aws based node")
}
split := strings.Split(node.Spec.ProviderID, "/")
instanceID = split[len(split)-1]
zone = split[len(split)-2]
region = zone[:len(zone)-1]
return instanceID, region, zone, nil
}

// this assumes that the device spaces /dev/sd[h-z] are available on the node
// do not provide more than 20 disksizes
// do not use more than once per node
// this function is async
func createAndAttachAWSVolumes(t *testing.T, ec2Client *ec2.EC2, namespace string, nodeEnv []nodeDisks) {
for _, nodeEntry := range nodeEnv {
go createAndAttachAWSVolumesForNode(t, nodeEntry, ec2Client)
}
}

func createAndAttachAWSVolumesForNode(t *testing.T, nodeEntry nodeDisks, ec2Client *ec2.EC2) {
node := nodeEntry.node
volumes := make([]*ec2.Volume, len(nodeEntry.disks))
volumeLetters := []string{"g", "h"}
volumeIDs := make([]*string, 0)
instanceID, _, zone, err := getAWSNodeInfo(node)
if err != nil {
t.Errorf("failed to create and attach aws disks for node %q: %+v", nodeEntry.node.Name, err)
}

// create ec2 volumes
for i, disk := range nodeEntry.disks {
diskSize := disk.size
diskName := fmt.Sprintf("sd%s", volumeLetters[i])
createInput := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(zone),
Size: aws.Int64(int64(diskSize)),
VolumeType: aws.String("gp2"),
TagSpecifications: []*ec2.TagSpecification{
{
ResourceType: aws.String("volume"),
Tags: []*ec2.Tag{
{
Key: aws.String("Name"),
Value: aws.String(diskName),
},
{
Key: aws.String("purpose"),
Value: aws.String(awsPurposeTag),
},
{
Key: aws.String("chosen-instanceID"),
Value: aws.String(instanceID),
},
},
},
},
}
volume, err := ec2Client.CreateVolume(createInput)
if err != nil {
err := fmt.Errorf("expect to create AWS volume with input %v: %w", createInput, err)
t.Errorf("failed to create and attach aws disks for node %q: %+v", nodeEntry.node.Name, err)
}
t.Logf("creating volume: %q (%dGi)", *volume.VolumeId, *volume.Size)
volumes[i] = volume
volumeIDs = append(volumeIDs, volume.VolumeId)
}
// attach and poll for attachment to complete
err = wait.Poll(time.Second*5, time.Minute*4, func() (bool, error) {
describeVolumeInput := &ec2.DescribeVolumesInput{
VolumeIds: volumeIDs,
}
describedVolumes, err := ec2Client.DescribeVolumes(describeVolumeInput)
if err != nil {
return false, err
}
allAttached := true
for i, volume := range describedVolumes.Volumes {
if *volume.State == ec2.VolumeStateInUse {
t.Logf("volume attachment complete: %q (%dGi)", *volume.VolumeId, *volume.Size)
continue
}
allAttached = false
if *volume.State == ec2.VolumeStateAvailable {

t.Logf("volume attachment starting: %q (%dGi)", *volume.VolumeId, *volume.Size)
attachInput := &ec2.AttachVolumeInput{
VolumeId: volume.VolumeId,
InstanceId: aws.String(instanceID),
Device: aws.String(fmt.Sprintf("/dev/sd%s", volumeLetters[i])),
}
_, err = ec2Client.AttachVolume(attachInput)
if err != nil {
return false, err
}
}
}
return allAttached, nil

})
if err != nil {
t.Errorf("failed to create and attach aws disks for node %q: %+v", nodeEntry.node.Name, err)
}
}

func getEC2Client(region string) (*ec2.EC2, error) {
// get AWS credentials
awsCreds := &corev1.Secret{}
secretName := types.NamespacedName{Name: "aws-creds", Namespace: "kube-system"}
err := DeployManagerObj.GetCrClient().Get(context.TODO(), secretName, awsCreds)
if err != nil {
return nil, err
}
// detect region
// base64 decode
id, found := awsCreds.Data["aws_access_key_id"]
if !found {
return nil, fmt.Errorf("cloud credential id not found")
}
key, found := awsCreds.Data["aws_secret_access_key"]
if !found {
return nil, fmt.Errorf("cloud credential key not found")
}

sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Credentials: credentials.NewStaticCredentials(string(id), string(key), ""),
})
if err != nil {
return nil, err
}

// initialize client
return ec2.New(sess), nil
}

func cleanupAWSDisks(t *testing.T, ec2Client *ec2.EC2) error {
volumes, err := getAWSTestVolumes(ec2Client)
if err != nil {
return fmt.Errorf("failed to list AWS volumes: %+v", err)
}
t.Log("using described volumes")
for _, volume := range volumes {
t.Logf("detaching AWS disks with volumeId: %q (%dGi)", *volume.VolumeId, *volume.Size)
input := &ec2.DetachVolumeInput{VolumeId: volume.VolumeId}
_, err := ec2Client.DetachVolume(input)
if err != nil {
t.Logf("detaching disk failed: %+v", err)
}
}
err = wait.Poll(time.Second*2, time.Minute*5, func() (bool, error) {
volumes, err := getAWSTestVolumes(ec2Client)
if err != nil {
return false, fmt.Errorf("failed to list AWS volumes: %+v", err)
}
allDeleted := true
for _, volume := range volumes {
if *volume.State != ec2.VolumeStateAvailable {
t.Logf("volume %q is in state %q, waiting for state %q", *volume.VolumeId, *volume.State, ec2.VolumeStateAvailable)
allDeleted = false
continue
}
t.Logf("deleting AWS disks with volumeId: %q (%dGi)", *volume.VolumeId, *volume.Size)
input := &ec2.DeleteVolumeInput{VolumeId: volume.VolumeId}
_, err := ec2Client.DeleteVolume(input)
if err != nil {
t.Logf("deleting disk failed: %+v", err)
allDeleted = false
}
}
return allDeleted, nil
})
if err != nil {

return fmt.Errorf("AWS cleanup of disks: %+v", err)
}
return nil
}

func getAWSTestVolumes(ec2Client *ec2.EC2) ([]*ec2.Volume, error) {
output, err := ec2Client.DescribeVolumes(&ec2.DescribeVolumesInput{
Filters: []*ec2.Filter{
{
Name: aws.String("tag:purpose"),
Values: []*string{aws.String(awsPurposeTag)},
},
},
})

return output.Volumes, err

}
80 changes: 80 additions & 0 deletions e2e/disk_setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package e2e

import (
"context"
"testing"
"time"

"github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func diskSetup(t *testing.T) {
namespace := TestNamespace

// get nodes
nodeList := &corev1.NodeList{}
err := DeployManagerObj.GetCrClient().List(context.TODO(), nodeList, client.HasLabels{labelNodeRoleWorker})
if err != nil {
t.Fatalf("failed to list nodes: %+v", err)
}

// represents the disk layout to setup on the nodes.
var nodeEnv []nodeDisks
for _, disks := range nodeList.Items {
nodeEnv = []nodeDisks{
{
disks: []disk{
{size: 10},
{size: 20},
},
node: disks,
},
}
}
matcher := gomega.NewGomegaWithT(t)
gomega.SetDefaultEventuallyTimeout(time.Minute * 10)
gomega.SetDefaultEventuallyPollingInterval(time.Second * 2)

t.Log("getting AWS region info from node spec")
_, region, _, err := getAWSNodeInfo(nodeList.Items[0])
matcher.Expect(err).NotTo(gomega.HaveOccurred(), "getAWSNodeInfo")

// initialize client
t.Log("initialize ec2 creds")
ec2Client, err := getEC2Client(region)
matcher.Expect(err).NotTo(gomega.HaveOccurred(), "getEC2Client")

// create and attach volumes
t.Log("creating and attaching disks")
createAndAttachAWSVolumes(t, ec2Client, namespace, nodeEnv)
}

func diskRemoval(t *testing.T) {
// get nodes
nodeList := &corev1.NodeList{}
err := DeployManagerObj.GetCrClient().List(context.TODO(), nodeList, client.HasLabels{labelNodeRoleWorker})
if err != nil {
t.Fatalf("failed to list nodes: %+v", err)
}

matcher := gomega.NewGomegaWithT(t)
gomega.SetDefaultEventuallyTimeout(time.Minute * 10)
gomega.SetDefaultEventuallyPollingInterval(time.Second * 2)

t.Log("getting AWS region info from node spec")
_, region, _, err := getAWSNodeInfo(nodeList.Items[0])
matcher.Expect(err).NotTo(gomega.HaveOccurred(), "getAWSNodeInfo")

// initialize client
t.Log("initialize ec2 creds")
ec2Client, err := getEC2Client(region)
matcher.Expect(err).NotTo(gomega.HaveOccurred(), "getEC2Client")

// cleaning disk
t.Log("cleaning up disks")
err = cleanupAWSDisks(t, ec2Client)
gomega.Expect(err).To(gomega.BeNil())
}
12 changes: 11 additions & 1 deletion e2e/setup_teardown.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package e2e

import "github.com/onsi/gomega"
import (
"testing"

"github.com/onsi/gomega"
)

// BeforeTestSuiteSetup is the function called to initialize the test environment.
func BeforeTestSuiteSetup() {
Expand All @@ -20,6 +24,9 @@ func BeforeTestSuiteSetup() {
err = DeployManagerObj.CreateNamespace(TestNamespace)
gomega.Expect(err).To(gomega.BeNil())

debug("Creating disk for CI\n")
diskSetup(&testing.T{})

SuiteFailed = false

debug("------------------------------\n")
Expand All @@ -44,4 +51,7 @@ func AfterTestSuiteCleanup() {
err = DeployManagerObj.UninstallLVM(LvmCatalogSourceImage, LvmSubscriptionChannel)
gomega.Expect(err).To(gomega.BeNil(), "error uninstalling LVM: %v", err)
}

debug("Cleaning up disk\n")
diskRemoval(&testing.T{})
}
Loading

0 comments on commit d55c34a

Please sign in to comment.