From 024801ec5374b7968303e1a2e6a8b249ebe47a27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Clemens=20W=C3=A4ltken?= Date: Tue, 11 Apr 2017 17:06:18 -0300 Subject: [PATCH] Include code review feedback * add missing region in SQS data * allow execution by time rather than quantity of requirements * add test for stresstestTimeout * fix remainingRequestCount negative termination check * add test execution to Makefile * update golang versions to 1.8 in Dockerfile and Godeps --- Dockerfile | 12 +++++------ Godeps/Godeps.json | 4 ++-- Makefile | 4 ++++ goad.go | 20 +++++++++---------- lambda/lambda.go | 24 ++++++++++++++-------- lambda/lambda_test.go | 46 +++++++++++++++++++++++++++---------------- queue/aggregation.go | 20 ++++++++++--------- 7 files changed, 78 insertions(+), 52 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8f753081..f9d3df49 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,12 @@ -FROM golang:1.5 +FROM golang:1.8 RUN apt-get update RUN apt-get install -y zip -ADD . /go/src/github.com/gophergala2016/goad -WORKDIR /go/src/github.com/gophergala2016/goad -RUN go get -u github.com/jteeuwen/go-bindata/... -RUN make bindata -RUN go build -o /go/bin/goad-api webapi/webapi.go +ADD . /go/src/github.com/goadapp/goad +WORKDIR /go/src/github.com/goadapp/goad +RUN go get -u github.com/jteeuwen/go-bindata/... +RUN make linux +# RUN go build -o /go/bin/goad-api webapi/webapi.go CMD ["/go/bin/goad-api", "-addr", ":8080"] EXPOSE 8080 diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 9fb3bbe0..6706d447 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,7 +1,7 @@ { "ImportPath": "github.com/goadapp/goad", - "GoVersion": "go1.6", - "GodepVersion": "v60", + "GoVersion": "go1.8", + "GodepVersion": "v79", "Packages": [ "./..." ], diff --git a/Makefile b/Makefile index a1bcdd69..497c9bed 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,8 @@ all: osx linux windows +test: + go test ./... + lambda: GOOS=linux GOARCH=amd64 go build -o data/lambda/goad-lambda ./lambda zip -jr data/lambda data/lambda @@ -21,6 +24,7 @@ windows: bindata clean: rm -rf data/lambda/goad-lambda + rm -rf data/lambda.zip rm -rf build all-zip: all diff --git a/goad.go b/goad.go index 9523655c..7d215c46 100644 --- a/goad.go +++ b/goad.go @@ -56,6 +56,7 @@ var supportedRegions = []string{ type Test struct { Config *TestConfig infra *infrastructure.Infrastructure + lambdas int } // NewTest returns a configured Test @@ -85,12 +86,13 @@ func (t *Test) Start() <-chan queue.RegionsAggData { } t.infra = infra + t.lambdas = numberOfLambdas(t.Config.Concurrency, len(t.Config.Regions)) t.invokeLambdas(awsConfig, infra.QueueURL()) results := make(chan queue.RegionsAggData) go func() { - for result := range queue.Aggregate(awsConfig, infra.QueueURL(), t.Config.TotalRequests) { + for result := range queue.Aggregate(awsConfig, infra.QueueURL(), t.Config.TotalRequests, t.lambdas) { results <- result } close(results) @@ -100,15 +102,13 @@ func (t *Test) Start() <-chan queue.RegionsAggData { } func (t *Test) invokeLambdas(awsConfig *aws.Config, sqsURL string) { - lambdas := numberOfLambdas(t.Config.Concurrency, len(t.Config.Regions)) - - for i := 0; i < lambdas; i++ { + for i := 0; i < t.lambdas; i++ { region := t.Config.Regions[i%len(t.Config.Regions)] - requests, requestsRemainder := divide(t.Config.TotalRequests, lambdas) - concurrency, _ := divide(t.Config.Concurrency, lambdas) + requests, requestsRemainder := divide(t.Config.TotalRequests, t.lambdas) + concurrency, _ := divide(t.Config.Concurrency, t.lambdas) execTimeout := t.Config.ExecTimeout - if requestsRemainder > 0 && i == lambdas-1 { + if requestsRemainder > 0 && i == t.lambdas-1 { requests += requestsRemainder } @@ -129,7 +129,7 @@ func (t *Test) invokeLambdas(awsConfig *aws.Config, sqsURL string) { "-t", fmt.Sprintf("%s", c.RequestTimeout.String()), "-f", - fmt.Sprintf("%s", reportingFrequency(lambdas).String()), + fmt.Sprintf("%s", reportingFrequency(t.lambdas).String()), "-r", fmt.Sprintf("%s", region), "-m", @@ -196,8 +196,8 @@ func (c TestConfig) check() error { if (c.TotalRequests < 1 && c.ExecTimeout <= 0) || c.TotalRequests > 2000000 { return errors.New("Invalid total requests (use 1 - 2000000)") } - if c.ExecTimeout > 300 { - return errors.New("Invalid maximum execution time in seconds (use 0 - 300)") + if c.ExecTimeout > 3600 { + return errors.New("Invalid maximum execution time in seconds (use 0 - 3600)") } if c.RequestTimeout.Nanoseconds() < nano || c.RequestTimeout.Nanoseconds() > nano*100 { return errors.New("Invalid timeout (1s - 100s)") diff --git a/lambda/lambda.go b/lambda/lambda.go index 1da2b019..1b68b54e 100644 --- a/lambda/lambda.go +++ b/lambda/lambda.go @@ -208,13 +208,16 @@ func NewLambda(s LambdaSettings) *goadLambda { l := &goadLambda{} l.Settings = s - l.Metrics = NewRequestMetric() + l.Metrics = NewRequestMetric(s.LambdaRegion) remainingRequestCount := s.MaxRequestCount - s.CompletedRequestCount + if remainingRequestCount < 0 { + remainingRequestCount = 0 + } l.setupHTTPClientForSelfsignedTLS() awsSqsConfig := l.setupAwsConfig() l.setupAwsSqsAdapter(awsSqsConfig) l.setupJobQueue(remainingRequestCount) - l.results = make(chan requestResult, remainingRequestCount) + l.results = make(chan requestResult) return l } @@ -225,8 +228,10 @@ func setDefaultConcurrencyCount(s *LambdaSettings) { } func setLambdaExecTimeout(s *LambdaSettings) { - if s.LambdaExecTimeoutSeconds <= 0 || s.LambdaExecTimeoutSeconds > AWS_MAX_TIMEOUT { + if s.StresstestTimeout <= 0 || s.StresstestTimeout > AWS_MAX_TIMEOUT { s.LambdaExecTimeoutSeconds = AWS_MAX_TIMEOUT + } else { + s.LambdaExecTimeoutSeconds = s.StresstestTimeout } } @@ -281,9 +286,11 @@ func (l *goadLambda) spawnWorker() { func work(l *goadLambda) { for { - _, ok := <-l.jobs - if !ok { - break + if l.Settings.MaxRequestCount > 0 { + _, ok := <-l.jobs + if !ok { + break + } } l.results <- fetch(l.HTTPClient, l.Settings.RequestParameters, l.StartTime) } @@ -401,9 +408,9 @@ type resultSender interface { SendResult(queue.AggData) } -func NewRequestMetric() *requestMetric { +func NewRequestMetric(region string) *requestMetric { metric := &requestMetric{ - aggregatedResults: &queue.AggData{}, + aggregatedResults: &queue.AggData{Region: region}, } metric.resetAndKeepTotalReqs() return metric @@ -472,6 +479,7 @@ func (m *requestMetric) resetAndKeepTotalReqs() { m.requestTimeTotal = 0 m.timeToFirstTotal = 0 m.aggregatedResults = &queue.AggData{ + Region: m.aggregatedResults.Region, Statuses: make(map[string]int), Fastest: math.MaxInt64, Finished: false, diff --git a/lambda/lambda_test.go b/lambda/lambda_test.go index 6fb7b8de..fbef5c3d 100644 --- a/lambda/lambda_test.go +++ b/lambda/lambda_test.go @@ -33,7 +33,7 @@ func TestMain(m *testing.M) { } func TestRequestMetric(t *testing.T) { - metric := NewRequestMetric() + metric := NewRequestMetric("us-east-1") agg := metric.aggregatedResults if agg.TotalReqs != 0 { t.Error("totalRequestsFinished should be initialized with 0") @@ -67,7 +67,7 @@ func TestRequestMetric(t *testing.T) { func TestAddRequestStatus(t *testing.T) { success := 200 successStr := strconv.Itoa(success) - metric := NewRequestMetric() + metric := NewRequestMetric("us-east-1") result := &requestResult{ Status: success, } @@ -86,7 +86,7 @@ func TestAddRequest(t *testing.T) { elapsedFirst := int64(100) elapsedLast := int64(300) - metric := NewRequestMetric() + metric := NewRequestMetric("us-east-1") result := &requestResult{ Time: 400, ElapsedFirstByte: elapsedFirst, @@ -193,7 +193,7 @@ func TestAddRequest(t *testing.T) { } func TestResetAndKeepTotalReqs(t *testing.T) { - metric := NewRequestMetric() + metric := NewRequestMetric("us-east-1") agg := metric.aggregatedResults agg.TotalReqs = 7 metric.firstRequestTime = 123 @@ -242,7 +242,7 @@ func TestMetricsAggregate(t *testing.T) { elapsedFirst := int64(100) elapsedLast := int64(300) - metric := NewRequestMetric() + metric := NewRequestMetric("us-east-1") result := &requestResult{ Time: 10000000, Elapsed: 10000000, @@ -337,12 +337,11 @@ func TestQuitOnLambdaTimeout(t *testing.T) { reportingFrequency := time.Duration(5) * time.Second settings := LambdaSettings{ - MaxRequestCount: 3, - ConcurrencyCount: 1, - ReportingFrequency: reportingFrequency, - StresstestTimeout: 10, - LambdaExecTimeoutSeconds: 1, - LambdaRegion: "us-east-1", + MaxRequestCount: 3, + ConcurrencyCount: 1, + ReportingFrequency: reportingFrequency, + StresstestTimeout: 10, + LambdaRegion: "us-east-1", } settings.RequestParameters.URL = urlStr sender := &TestResultSender{} @@ -353,6 +352,7 @@ func TestQuitOnLambdaTimeout(t *testing.T) { function := &lambdaTestFunction{ lambda: lambda, } + lambda.Settings.LambdaExecTimeoutSeconds = 1 RunOrFailAfterTimout(t, function, 1400) resLength := len(sender.sentResults) timeoutRemaining := lambda.Settings.StresstestTimeout @@ -389,7 +389,7 @@ func TestMetricSendResults(t *testing.T) { ConnectionError: false, } - metric := NewRequestMetric() + metric := NewRequestMetric("us-east-1") sender := &TestResultSender{} metric.addRequest(result) @@ -404,24 +404,35 @@ func TestRunLoadTestWithHighConcurrency(t *testing.T) { server := createAndStartTestServer() defer server.Stop() - runLoadTestWith(t, 500, 100, 500) + runLoadTestWithDefaultExecTimeout(t, 500, 100, 500) } func TestRunLoadTestWithOneRequest(t *testing.T) { server := createAndStartTestServer() defer server.Stop() - runLoadTestWith(t, 1, -1, 50) + runLoadTestWithDefaultExecTimeout(t, 1, -1, 50) } func TestRunLoadTestWithZeroRequests(t *testing.T) { server := createAndStartTestServer() defer server.Stop() - runLoadTestWith(t, 0, 1, 50) + runLoadTestWithDefaultExecTimeout(t, 1, 1, 120) +} + +func TestRunWithExecTimeout(t *testing.T) { + server := createAndStartTestServer() + defer server.Stop() + + runLoadTestWith(t, 0, 1, 1, 1050) +} + +func runLoadTestWithDefaultExecTimeout(t *testing.T, requestCount int, concurrency int, milliseconds int) { + runLoadTestWith(t, requestCount, 0, concurrency, milliseconds) } -func runLoadTestWith(t *testing.T, requestCount int, concurrency int, milliseconds int) { +func runLoadTestWith(t *testing.T, requestCount int, stresstestTimeout int, concurrency int, milliseconds int) { if testing.Short() { t.Skip("skipping test in short mode.") } @@ -430,6 +441,7 @@ func runLoadTestWith(t *testing.T, requestCount int, concurrency int, millisecon MaxRequestCount: requestCount, ConcurrencyCount: concurrency, ReportingFrequency: reportingFrequency, + StresstestTimeout: stresstestTimeout, } settings.RequestParameters.URL = urlStr sender := &TestResultSender{} @@ -447,7 +459,7 @@ func runLoadTestWith(t *testing.T, requestCount int, concurrency int, millisecon if results.Finished != true { t.Error("the lambda should have finished it's results") } - if results.TotalReqs != requestCount { + if results.TotalReqs != requestCount && requestCount > 0 { t.Errorf("the lambda generated results for %d request, expected %d", results.TotalReqs, requestCount) } } diff --git a/queue/aggregation.go b/queue/aggregation.go index 9cf58905..9789387a 100644 --- a/queue/aggregation.go +++ b/queue/aggregation.go @@ -22,26 +22,28 @@ type AggData struct { Region string `json:"region"` FatalError string `json:"fatal-error"` Finished bool `json:"finished"` + FinishedLambdas int `json:"finished-lambdas"` } // RegionsAggData type type RegionsAggData struct { Regions map[string]AggData TotalExpectedRequests uint + lambdasByRegion int } func (d *RegionsAggData) allRequestsReceived() bool { var requests uint - var finishedCount int + var finishedRegions int for _, region := range d.Regions { requests += uint(region.TotalReqs) - if region.Finished { - finishedCount += 1 + if region.FinishedLambdas == d.lambdasByRegion { + finishedRegions += 1 } } - return requests == d.TotalExpectedRequests || finishedCount == len(d.Regions) + return d.TotalExpectedRequests > 0 && requests == d.TotalExpectedRequests || finishedRegions == len(d.Regions) } func addResult(data *AggData, result *AggData, isFinalSum bool) { @@ -77,7 +79,7 @@ func addResult(data *AggData, result *AggData, isFinalSum bool) { data.Fastest = result.Fastest } if result.Finished { - data.Finished = true + data.FinishedLambdas += 1 } } @@ -92,15 +94,15 @@ func SumRegionResults(regionData *RegionsAggData) *AggData { } // Aggregate listens for results and sends totals, closing the channel when done -func Aggregate(awsConfig *aws.Config, queueURL string, totalExpectedRequests uint) chan RegionsAggData { +func Aggregate(awsConfig *aws.Config, queueURL string, totalExpectedRequests uint, lambdasByRegion int) chan RegionsAggData { results := make(chan RegionsAggData) - go aggregate(results, awsConfig, queueURL, totalExpectedRequests) + go aggregate(results, awsConfig, queueURL, totalExpectedRequests, lambdasByRegion) return results } -func aggregate(results chan RegionsAggData, awsConfig *aws.Config, queueURL string, totalExpectedRequests uint) { +func aggregate(results chan RegionsAggData, awsConfig *aws.Config, queueURL string, totalExpectedRequests uint, lambdasByRegion int) { defer close(results) - data := RegionsAggData{make(map[string]AggData), totalExpectedRequests} + data := RegionsAggData{make(map[string]AggData), totalExpectedRequests, lambdasByRegion} adaptor := NewSQSAdaptor(awsConfig, queueURL) timeoutStart := time.Now()