Skip to content

Commit

Permalink
Add tests for eks algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
mateimicu committed May 7, 2020
1 parent c6f705b commit 46d6304
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 124 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/sirupsen/logrus v1.2.0
github.com/spf13/cobra v0.0.6
github.com/stretchr/testify v1.5.1
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7
gopkg.in/yaml.v2 v2.2.8
k8s.io/client-go v0.18.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
129 changes: 129 additions & 0 deletions internal/aws/eks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Package internal provides wrapper for creating aws sessions
package aws

import (
"encoding/base64"
"errors"
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/eks"
"github.com/mateimicu/kdiscover/internal/cluster"
log "github.com/sirupsen/logrus"
)

//type ClusterListDescriber struct {
type EKSClient struct {
EKS *eks.EKS
Region string
}

func (c *EKSClient) String() string {
return fmt.Sprintf("EKS Client for region %v", c.Region)
}

func (c *EKSClient) GetClusters(ch chan<- *cluster.Cluster) {
input := &eks.ListClustersInput{}

err := c.EKS.ListClustersPages(input,
func(page *eks.ListClustersOutput, lastPage bool) bool {
log.WithFields(log.Fields{
"svc": c.String(),
"page": page.GoString(),
}).Debug("Parse page")
for _, cluster := range page.Clusters {
log.WithFields(log.Fields{
"svc": c.String(),
"cluster": *cluster,
}).Debug("Found cluster")
if cls, err := c.detailCluster(*cluster); err == nil {
ch <- cls
} else {
log.WithFields(log.Fields{
"svc": c.String(),
"cluster": *cluster,
"err": err,
}).Warn("Can't get details on the cluster")
}
}

if lastPage {
log.WithFields(log.Fields{
"svc": c.String(),
}).Debug("hit last page")
return false
}
return true
})

if err != nil {
log.WithFields(log.Fields{
"err": err,
"svc": c.String(),
}).Warn("Can't list clusters")
}

close(ch)
}

func (c *EKSClient) detailCluster(cName string) (*cluster.Cluster, error) {
input := &eks.DescribeClusterInput{
Name: aws.String(cName),
}

result, err := c.EKS.DescribeCluster(input)
if err != nil {
// TODO(mmicu): handle errors better here
if aerr, ok := err.(awserr.Error); ok {
log.Warn(aerr.Error())
} else {
log.Warn(err.Error())
}
msg := fmt.Sprintf("Can't fetch more details for the cluster %v", cName)
log.WithFields(log.Fields{
"cluster-name": cName,
"svc": c.String(),
}).Warn(msg)
return nil, errors.New(msg)
}

certificatAuthorityData, err := base64.StdEncoding.DecodeString(*result.Cluster.CertificateAuthority.Data)
if err != nil {
log.WithFields(log.Fields{
"cluster-name": *result.Cluster.Name,
"arn": *result.Cluster.Arn,
"certificate-authority-data": *result.Cluster.CertificateAuthority.Data,
"svc": c.String(),
}).Error("Can't decode the Certificate Authority Data")
return nil, err
}

cls := cluster.NewCluster()
cls.Name = *result.Cluster.Name
cls.ID = *result.Cluster.Arn
cls.Endpoint = *result.Cluster.Endpoint
cls.CertificateAuthorityData = string(certificatAuthorityData)
cls.Status = *result.Cluster.Status
cls.Region = c.Region

return cls, nil
}

func NewEKS(region string) (*EKSClient, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
})
if err != nil {
log.WithFields(log.Fields{
"region": region,
"error": err.Error(),
}).Error("Failed to create AWS SDK session")
return nil, err
}
return &EKSClient{
EKS: eks.New(sess),
Region: region,
}, nil
}
140 changes: 42 additions & 98 deletions internal/aws/eks_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@
package aws

import (
"encoding/base64"
"errors"
"fmt"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/eks"
"github.com/mateimicu/kdiscover/internal/cluster"
log "github.com/sirupsen/logrus"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
Expand All @@ -24,8 +18,7 @@ func getConfigAuthInfo(cls *cluster.Cluster) *clientcmdapi.AuthInfo {
authInfo := clientcmdapi.NewAuthInfo()
args := make([]string, len(options[authType]))
copy(args, options[authType])
args = append(args, cls.Name)
args = append(args, "--region", cls.Region)
args = append(args, cls.Name, "--region", cls.Region)

authInfo.Exec = &clientcmdapi.ExecConfig{
Command: commands[authType],
Expand All @@ -34,114 +27,65 @@ func getConfigAuthInfo(cls *cluster.Cluster) *clientcmdapi.AuthInfo {
return authInfo
}

func getNewCluster(clsName string, svc *eks.EKS) (*cluster.Cluster, error) {
input := &eks.DescribeClusterInput{
Name: aws.String(clsName),
}

result, err := svc.DescribeCluster(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
log.Warn(aerr.Error())
} else {
log.Warn(err.Error())
}
msg := fmt.Sprintf("Can't fetch more details for the cluster %v", clsName)
log.Warn(msg)
return &cluster.Cluster{}, errors.New(msg)
}
certificatAuthorityData, err := base64.StdEncoding.DecodeString(*result.Cluster.CertificateAuthority.Data)
if err != nil {
log.WithFields(log.Fields{
"cluster-name": *result.Cluster.Name,
"arn": *result.Cluster.Arn,
"certificate-authority-data": *result.Cluster.CertificateAuthority.Data,
}).Error("Can't decode the Certificate Authority Data")
}

cls := cluster.NewCluster()
cls.Name = *result.Cluster.Name
cls.ID = *result.Cluster.Arn
cls.Endpoint = *result.Cluster.Endpoint
cls.CertificateAuthorityData = string(certificatAuthorityData)
cls.Status = *result.Cluster.Status
cls.GenerateAuthInfo = getConfigAuthInfo

return cls, nil
type ClusterGetter interface {
GetClusters(ch chan<- *cluster.Cluster)
}

func getEKSClustersPerRegion(region string, ch chan<- *cluster.Cluster, wg *sync.WaitGroup) {
sess, err := getAWSSession(region)
if err != nil {
func GetEKSClusters(regions []string) []*cluster.Cluster {
clients := make([]ClusterGetter, 0, len(regions))

for _, region := range regions {
log.WithFields(log.Fields{
"region": region,
"error": err.Error(),
}).Error("Failed to create AWS SDK session")
}
svc := getEKSClient(sess)

input := &eks.ListClustersInput{}

err = svc.ListClustersPages(input,
func(page *eks.ListClustersOutput, lastPage bool) bool {
for _, cluster := range page.Clusters {
log.WithFields(log.Fields{
"region": region,
"cluster": cluster,
"page": page,
}).Debug("Found cluster")
if cls, ok := getNewCluster(*cluster, svc); ok == nil {
cls.Region = region
ch <- cls
}
}
if lastPage {
log.Debug("hit last page")
return false
}
return true
})
}).Info("Initialize client")
eks, err := NewEKS(region)
if err != nil {
log.WithFields(log.Fields{
"region": region,
"error": err.Error(),
}).Error("Failed to create AWS SDK session")
continue
}

if err != nil {
log.WithFields(log.Fields{
"err": err,
}).Warn("Can't list clusters")
clients = append(clients, ClusterGetter(eks))
}
wg.Done()
return getEKSClusters(clients)
}

// GetEKSClusters will query the given regions and return a list of
// clusters accesable. It will use the default credential chain for AWS
// in order to figure out the context for the API calls
func GetEKSClusters(regions []string) []*cluster.Cluster {
clusters := make([]*cluster.Cluster, 0, len(regions))
func getEKSClusters(clients []ClusterGetter) []*cluster.Cluster {
clusters := make([]*cluster.Cluster, 0, len(clients))
ch := make(chan *cluster.Cluster)

var wg sync.WaitGroup
ch := make(chan *cluster.Cluster)
wg.Add(len(clients))

for _, region := range regions {
log.WithFields(log.Fields{
"region": region,
}).Info("Query clusters")
for _, c := range clients {
regionCh := make(chan *cluster.Cluster)
go c.GetClusters(regionCh)

wg.Add(1)
go getEKSClustersPerRegion(region, ch, &wg)
// fan-in from all the regions to one output channel
go func(out chan<- *cluster.Cluster, wg *sync.WaitGroup) {
for cls := range regionCh {
out <- cls
}
wg.Done()
}(ch, &wg)
}

done := make(chan struct{})
go func(done chan<- struct{}, wg *sync.WaitGroup) {
// close the channel when all regions have finished the querys
go func(wg *sync.WaitGroup, out chan<- *cluster.Cluster) {
defer close(out)
wg.Wait()
done <- struct{}{}
}(done, &wg)

loop:
for {
select {
case cluster := <-ch:
clusters = append(clusters, cluster)
case <-done:
break loop
}
}(&wg, ch)

for cluster := range ch {
// add EKS specific auth config
cluster.GenerateAuthInfo = getConfigAuthInfo
clusters = append(clusters, cluster)
}

return clusters
}
Loading

0 comments on commit 46d6304

Please sign in to comment.