diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 54a3e68ce5..a970e46b5b 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -20,6 +20,8 @@ import ( "context" "errors" "fmt" + "math/rand" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -80,19 +82,30 @@ var ( ErrAlreadyExists = errors.New("Resource already exists") ) +func init() { + rand.Seed(time.Now().UnixNano()) +} + +// Disk represents a EBS volume type Disk struct { - VolumeID string - CapacityGiB int64 + VolumeID string + CapacityGiB int64 + AvailabilityZone string } +// DiskOptions represents parameters to create an EBS volume type DiskOptions struct { CapacityBytes int64 Tags map[string]string VolumeType string IOPSPerGB int64 + // the availability zone to create volume in + // if empty a random zone will be used + AvailabilityZone string } // EC2 abstracts aws.EC2 to facilitate its mocking. +// See https://docs.aws.amazon.com/sdk-for-go/api/service/ec2/ for details type EC2 interface { DescribeVolumesWithContext(ctx aws.Context, input *ec2.DescribeVolumesInput, opts ...request.Option) (*ec2.DescribeVolumesOutput, error) CreateVolumeWithContext(ctx aws.Context, input *ec2.CreateVolumeInput, opts ...request.Option) (*ec2.Volume, error) @@ -100,6 +113,7 @@ type EC2 interface { DetachVolumeWithContext(ctx aws.Context, input *ec2.DetachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error) AttachVolumeWithContext(ctx aws.Context, input *ec2.AttachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error) DescribeInstancesWithContext(ctx aws.Context, input *ec2.DescribeInstancesInput, opts ...request.Option) (*ec2.DescribeInstancesOutput, error) + DescribeAvailabilityZonesWithContext(ctx aws.Context, input *ec2.DescribeAvailabilityZonesInput, opts ...request.Option) (*ec2.DescribeAvailabilityZonesOutput, error) } type Cloud interface { @@ -158,8 +172,10 @@ func (c *cloud) GetMetadata() MetadataService { } func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (*Disk, error) { - var createType string - var iops int64 + var ( + createType string + iops int64 + ) capacityGiB := util.BytesToGiB(diskOptions.CapacityBytes) switch diskOptions.VolumeType { @@ -189,9 +205,22 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * Tags: tags, } - m := c.GetMetadata() + var ( + zone string + err error + ) + if diskOptions.AvailabilityZone == "" { + zone, err = c.pickRandomAvailabilityZone(ctx) + if err != nil { + return nil, err + } + glog.V(5).Infof("AZ is not provided. Choose random AZ [%s]", zone) + } else { + zone = diskOptions.AvailabilityZone + } + request := &ec2.CreateVolumeInput{ - AvailabilityZone: aws.String(m.GetAvailabilityZone()), + AvailabilityZone: aws.String(zone), Size: aws.Int64(capacityGiB), VolumeType: aws.String(createType), TagSpecifications: []*ec2.TagSpecification{&tagSpec}, @@ -215,7 +244,7 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * return nil, fmt.Errorf("disk size was not returned by CreateVolume") } - return &Disk{CapacityGiB: size, VolumeID: volumeID}, nil + return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone}, nil } func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) { @@ -435,3 +464,17 @@ func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance, return instances[0], nil } + +func (c *cloud) pickRandomAvailabilityZone(ctx context.Context) (string, error) { + output, err := c.ec2.DescribeAvailabilityZonesWithContext(ctx, &ec2.DescribeAvailabilityZonesInput{}) + if err != nil { + return "", err + } + + var zones []string + for _, zone := range output.AvailabilityZones { + zones = append(zones, *zone.ZoneName) + } + + return zones[rand.Int()%len(zones)], nil +} diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 346fcc9db5..0eefe4851c 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -43,8 +43,23 @@ func TestCreateDisk(t *testing.T) { name: "success: normal", volumeName: "vol-test-name", diskOptions: &DiskOptions{ - CapacityBytes: util.GiBToBytes(1), - Tags: map[string]string{VolumeNameTagKey: "vol-test"}, + CapacityBytes: util.GiBToBytes(1), + Tags: map[string]string{VolumeNameTagKey: "vol-test"}, + AvailabilityZone: "", + }, + expDisk: &Disk{ + VolumeID: "vol-test", + CapacityGiB: 1, + }, + expErr: nil, + }, + { + name: "success: normal with provided zone", + volumeName: "vol-test-name", + diskOptions: &DiskOptions{ + CapacityBytes: util.GiBToBytes(1), + Tags: map[string]string{VolumeNameTagKey: "vol-test"}, + AvailabilityZone: "us-west-2", }, expDisk: &Disk{ VolumeID: "vol-test", @@ -56,8 +71,9 @@ func TestCreateDisk(t *testing.T) { name: "fail: CreateVolume returned an error", volumeName: "vol-test-name-error", diskOptions: &DiskOptions{ - CapacityBytes: util.GiBToBytes(1), - Tags: map[string]string{VolumeNameTagKey: "vol-test"}, + CapacityBytes: util.GiBToBytes(1), + Tags: map[string]string{VolumeNameTagKey: "vol-test"}, + AvailabilityZone: "", }, expErr: fmt.Errorf("CreateVolume generic error"), }, @@ -80,6 +96,24 @@ func TestCreateDisk(t *testing.T) { ctx := context.Background() mockEC2.EXPECT().CreateVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(vol, tc.expErr) + if tc.diskOptions.AvailabilityZone == "" { + describeAvailabilityZonesResp := &ec2.DescribeAvailabilityZonesOutput{ + AvailabilityZones: []*ec2.AvailabilityZone{ + &ec2.AvailabilityZone{ + ZoneName: aws.String("us-west-2a"), + }, + &ec2.AvailabilityZone{ + ZoneName: aws.String("us-west-2b"), + }, + &ec2.AvailabilityZone{ + ZoneName: aws.String("us-west-2c"), + }, + }, + } + + mockEC2.EXPECT().DescribeAvailabilityZonesWithContext(gomock.Eq(ctx), gomock.Any()).Return(describeAvailabilityZonesResp, nil) + } + disk, err := c.CreateDisk(ctx, tc.volumeName, tc.diskOptions) if err != nil { if tc.expErr == nil { diff --git a/pkg/cloud/mocks/mock_ec2.go b/pkg/cloud/mocks/mock_ec2.go index 3816ead486..65637bde4a 100644 --- a/pkg/cloud/mocks/mock_ec2.go +++ b/pkg/cloud/mocks/mock_ec2.go @@ -89,6 +89,24 @@ func (mr *MockEC2MockRecorder) DeleteVolumeWithContext(arg0, arg1 interface{}, a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteVolumeWithContext", reflect.TypeOf((*MockEC2)(nil).DeleteVolumeWithContext), varargs...) } +// DescribeAvailabilityZonesWithContext mocks base method +func (m *MockEC2) DescribeAvailabilityZonesWithContext(arg0 aws.Context, arg1 *ec2.DescribeAvailabilityZonesInput, arg2 ...request.Option) (*ec2.DescribeAvailabilityZonesOutput, error) { + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DescribeAvailabilityZonesWithContext", varargs...) + ret0, _ := ret[0].(*ec2.DescribeAvailabilityZonesOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeAvailabilityZonesWithContext indicates an expected call of DescribeAvailabilityZonesWithContext +func (mr *MockEC2MockRecorder) DescribeAvailabilityZonesWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeAvailabilityZonesWithContext", reflect.TypeOf((*MockEC2)(nil).DescribeAvailabilityZonesWithContext), varargs...) +} + // DescribeInstancesWithContext mocks base method func (m *MockEC2) DescribeInstancesWithContext(arg0 aws.Context, arg1 *ec2.DescribeInstancesInput, arg2 ...request.Option) (*ec2.DescribeInstancesOutput, error) { varargs := []interface{}{arg0, arg1} diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index fab890b807..22c059aff8 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -68,24 +68,23 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) } } - if disk == nil { - opts := &cloud.DiskOptions{ - CapacityBytes: volSizeBytes, - Tags: map[string]string{cloud.VolumeNameTagKey: volName}, - } - newDisk, err := d.cloud.CreateDisk(ctx, volName, opts) - if err != nil { - return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err) - } - disk = newDisk + // volume exists already + if disk != nil { + return newCreateVolumeResponse(disk), nil } - return &csi.CreateVolumeResponse{ - Volume: &csi.Volume{ - Id: disk.VolumeID, - CapacityBytes: util.GiBToBytes(disk.CapacityGiB), - }, - }, nil + // create a new volume + zone := pickAvailabilityZone(req.GetAccessibilityRequirements()) + opts := &cloud.DiskOptions{ + CapacityBytes: volSizeBytes, + AvailabilityZone: zone, + Tags: map[string]string{cloud.VolumeNameTagKey: volName}, + } + disk, err = d.cloud.CreateDisk(ctx, volName, opts) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not create volume %q: %v", volName, err) + } + return newCreateVolumeResponse(disk), nil } func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { @@ -253,3 +252,38 @@ func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequ func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { return nil, status.Error(codes.Unimplemented, "") } + +// pickAvailabilityZone selects 1 zone given topology requirement. +// if not found, empty string is returned. +func pickAvailabilityZone(requirement *csi.TopologyRequirement) string { + if requirement == nil { + return "" + } + for _, topology := range requirement.GetPreferred() { + zone, exists := topology.GetSegments()[topologyKey] + if exists { + return zone + } + } + for _, topology := range requirement.GetRequisite() { + zone, exists := topology.GetSegments()[topologyKey] + if exists { + return zone + } + } + return "" +} + +func newCreateVolumeResponse(disk *cloud.Disk) *csi.CreateVolumeResponse { + return &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + Id: disk.VolumeID, + CapacityBytes: util.GiBToBytes(disk.CapacityGiB), + AccessibleTopology: []*csi.Topology{ + &csi.Topology{ + Segments: map[string]string{topologyKey: disk.AvailabilityZone}, + }, + }, + }, + } +} diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index 0fd887a486..28480dedae 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -148,7 +148,7 @@ func TestCreateVolume(t *testing.T) { t.Fatalf("Could not get error status code from error: %v", srvErr) } if srvErr.Code() != tc.expErrCode { - t.Fatalf("Expected error code %d, got %d", tc.expErrCode, srvErr.Code()) + t.Fatalf("Expected error code %d, got %d message %s", tc.expErrCode, srvErr.Code(), srvErr.Message()) } continue } @@ -235,3 +235,63 @@ func TestDeleteVolume(t *testing.T) { } } } + +func TestPickAvailabilityZone(t *testing.T) { + expZone := "us-west-2b" + testCases := []struct { + name string + requirement *csi.TopologyRequirement + expZone string + }{ + { + name: "Pick from preferred", + requirement: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + &csi.Topology{ + Segments: map[string]string{topologyKey: expZone}, + }, + }, + Preferred: []*csi.Topology{ + &csi.Topology{ + Segments: map[string]string{topologyKey: expZone}, + }, + }, + }, + expZone: expZone, + }, + { + name: "Pick from requisite", + requirement: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{ + &csi.Topology{ + Segments: map[string]string{topologyKey: expZone}, + }, + }, + }, + expZone: expZone, + }, + { + name: "Pick from empty topology", + requirement: &csi.TopologyRequirement{ + Preferred: []*csi.Topology{&csi.Topology{}}, + Requisite: []*csi.Topology{&csi.Topology{}}, + }, + expZone: "", + }, + { + name: "Topology Requirement is nil", + requirement: nil, + expZone: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual := pickAvailabilityZone(tc.requirement) + if actual != tc.expZone { + t.Fatalf("Expected zone %v, got zone: %v", tc.expZone, actual) + } + }) + } + +} diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 4098283b83..2c99b6ae56 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -31,6 +31,7 @@ import ( const ( driverName = "com.amazon.aws.csi.ebs" vendorVersion = "0.0.1" // FIXME + topologyKey = driverName + "/zone" ) type Driver struct { diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 0ef23b8961..c02eef3920 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -190,8 +190,14 @@ func (d *Driver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabi func (d *Driver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { glog.V(4).Infof("NodeGetInfo: called with args %#v", req) m := d.cloud.GetMetadata() + + topology := &csi.Topology{ + Segments: map[string]string{topologyKey: m.GetAvailabilityZone()}, + } + return &csi.NodeGetInfoResponse{ - NodeId: m.GetInstanceID(), + NodeId: m.GetInstanceID(), + AccessibleTopology: topology, }, nil }