Skip to content
This repository has been archived by the owner on Aug 17, 2020. It is now read-only.

Commit

Permalink
Include code review feedback
Browse files Browse the repository at this point in the history
 * add missing region in SQS data
 * allow execution by time rather than quantity of requirements
 * add test for stresstestTimeout
 * fix remainingRequestCount negative termination check
 * add test execution to Makefile
 * update golang versions to 1.8 in Dockerfile and Godeps
  • Loading branch information
cwaltken-edrans committed Apr 17, 2017
1 parent 5c7009c commit 024801e
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 52 deletions.
12 changes: 6 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
FROM golang:1.5
FROM golang:1.8

RUN apt-get update
RUN apt-get install -y zip
ADD . /go/src/github.com/gophergala2016/goad
WORKDIR /go/src/github.com/gophergala2016/goad
RUN go get -u github.com/jteeuwen/go-bindata/...
RUN make bindata
RUN go build -o /go/bin/goad-api webapi/webapi.go
ADD . /go/src/github.com/goadapp/goad
WORKDIR /go/src/github.com/goadapp/goad
RUN go get -u github.com/jteeuwen/go-bindata/...
RUN make linux
# RUN go build -o /go/bin/goad-api webapi/webapi.go

CMD ["/go/bin/goad-api", "-addr", ":8080"]
EXPOSE 8080
4 changes: 2 additions & 2 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
all: osx linux windows

test:
go test ./...

lambda:
GOOS=linux GOARCH=amd64 go build -o data/lambda/goad-lambda ./lambda
zip -jr data/lambda data/lambda
Expand All @@ -21,6 +24,7 @@ windows: bindata

clean:
rm -rf data/lambda/goad-lambda
rm -rf data/lambda.zip
rm -rf build

all-zip: all
Expand Down
20 changes: 10 additions & 10 deletions goad.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var supportedRegions = []string{
type Test struct {
Config *TestConfig
infra *infrastructure.Infrastructure
lambdas int
}

// NewTest returns a configured Test
Expand Down Expand Up @@ -85,12 +86,13 @@ func (t *Test) Start() <-chan queue.RegionsAggData {
}

t.infra = infra
t.lambdas = numberOfLambdas(t.Config.Concurrency, len(t.Config.Regions))
t.invokeLambdas(awsConfig, infra.QueueURL())

results := make(chan queue.RegionsAggData)

go func() {
for result := range queue.Aggregate(awsConfig, infra.QueueURL(), t.Config.TotalRequests) {
for result := range queue.Aggregate(awsConfig, infra.QueueURL(), t.Config.TotalRequests, t.lambdas) {
results <- result
}
close(results)
Expand All @@ -100,15 +102,13 @@ func (t *Test) Start() <-chan queue.RegionsAggData {
}

func (t *Test) invokeLambdas(awsConfig *aws.Config, sqsURL string) {
lambdas := numberOfLambdas(t.Config.Concurrency, len(t.Config.Regions))

for i := 0; i < lambdas; i++ {
for i := 0; i < t.lambdas; i++ {
region := t.Config.Regions[i%len(t.Config.Regions)]
requests, requestsRemainder := divide(t.Config.TotalRequests, lambdas)
concurrency, _ := divide(t.Config.Concurrency, lambdas)
requests, requestsRemainder := divide(t.Config.TotalRequests, t.lambdas)
concurrency, _ := divide(t.Config.Concurrency, t.lambdas)
execTimeout := t.Config.ExecTimeout

if requestsRemainder > 0 && i == lambdas-1 {
if requestsRemainder > 0 && i == t.lambdas-1 {
requests += requestsRemainder
}

Expand All @@ -129,7 +129,7 @@ func (t *Test) invokeLambdas(awsConfig *aws.Config, sqsURL string) {
"-t",
fmt.Sprintf("%s", c.RequestTimeout.String()),
"-f",
fmt.Sprintf("%s", reportingFrequency(lambdas).String()),
fmt.Sprintf("%s", reportingFrequency(t.lambdas).String()),
"-r",
fmt.Sprintf("%s", region),
"-m",
Expand Down Expand Up @@ -196,8 +196,8 @@ func (c TestConfig) check() error {
if (c.TotalRequests < 1 && c.ExecTimeout <= 0) || c.TotalRequests > 2000000 {
return errors.New("Invalid total requests (use 1 - 2000000)")
}
if c.ExecTimeout > 300 {
return errors.New("Invalid maximum execution time in seconds (use 0 - 300)")
if c.ExecTimeout > 3600 {
return errors.New("Invalid maximum execution time in seconds (use 0 - 3600)")
}
if c.RequestTimeout.Nanoseconds() < nano || c.RequestTimeout.Nanoseconds() > nano*100 {
return errors.New("Invalid timeout (1s - 100s)")
Expand Down
24 changes: 16 additions & 8 deletions lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,16 @@ func NewLambda(s LambdaSettings) *goadLambda {
l := &goadLambda{}
l.Settings = s

l.Metrics = NewRequestMetric()
l.Metrics = NewRequestMetric(s.LambdaRegion)
remainingRequestCount := s.MaxRequestCount - s.CompletedRequestCount
if remainingRequestCount < 0 {
remainingRequestCount = 0
}
l.setupHTTPClientForSelfsignedTLS()
awsSqsConfig := l.setupAwsConfig()
l.setupAwsSqsAdapter(awsSqsConfig)
l.setupJobQueue(remainingRequestCount)
l.results = make(chan requestResult, remainingRequestCount)
l.results = make(chan requestResult)
return l
}

Expand All @@ -225,8 +228,10 @@ func setDefaultConcurrencyCount(s *LambdaSettings) {
}

func setLambdaExecTimeout(s *LambdaSettings) {
if s.LambdaExecTimeoutSeconds <= 0 || s.LambdaExecTimeoutSeconds > AWS_MAX_TIMEOUT {
if s.StresstestTimeout <= 0 || s.StresstestTimeout > AWS_MAX_TIMEOUT {
s.LambdaExecTimeoutSeconds = AWS_MAX_TIMEOUT
} else {
s.LambdaExecTimeoutSeconds = s.StresstestTimeout
}
}

Expand Down Expand Up @@ -281,9 +286,11 @@ func (l *goadLambda) spawnWorker() {

func work(l *goadLambda) {
for {
_, ok := <-l.jobs
if !ok {
break
if l.Settings.MaxRequestCount > 0 {
_, ok := <-l.jobs
if !ok {
break
}
}
l.results <- fetch(l.HTTPClient, l.Settings.RequestParameters, l.StartTime)
}
Expand Down Expand Up @@ -401,9 +408,9 @@ type resultSender interface {
SendResult(queue.AggData)
}

func NewRequestMetric() *requestMetric {
func NewRequestMetric(region string) *requestMetric {
metric := &requestMetric{
aggregatedResults: &queue.AggData{},
aggregatedResults: &queue.AggData{Region: region},
}
metric.resetAndKeepTotalReqs()
return metric
Expand Down Expand Up @@ -472,6 +479,7 @@ func (m *requestMetric) resetAndKeepTotalReqs() {
m.requestTimeTotal = 0
m.timeToFirstTotal = 0
m.aggregatedResults = &queue.AggData{
Region: m.aggregatedResults.Region,
Statuses: make(map[string]int),
Fastest: math.MaxInt64,
Finished: false,
Expand Down
46 changes: 29 additions & 17 deletions lambda/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestMain(m *testing.M) {
}

func TestRequestMetric(t *testing.T) {
metric := NewRequestMetric()
metric := NewRequestMetric("us-east-1")
agg := metric.aggregatedResults
if agg.TotalReqs != 0 {
t.Error("totalRequestsFinished should be initialized with 0")
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestRequestMetric(t *testing.T) {
func TestAddRequestStatus(t *testing.T) {
success := 200
successStr := strconv.Itoa(success)
metric := NewRequestMetric()
metric := NewRequestMetric("us-east-1")
result := &requestResult{
Status: success,
}
Expand All @@ -86,7 +86,7 @@ func TestAddRequest(t *testing.T) {
elapsedFirst := int64(100)
elapsedLast := int64(300)

metric := NewRequestMetric()
metric := NewRequestMetric("us-east-1")
result := &requestResult{
Time: 400,
ElapsedFirstByte: elapsedFirst,
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestAddRequest(t *testing.T) {
}

func TestResetAndKeepTotalReqs(t *testing.T) {
metric := NewRequestMetric()
metric := NewRequestMetric("us-east-1")
agg := metric.aggregatedResults
agg.TotalReqs = 7
metric.firstRequestTime = 123
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestMetricsAggregate(t *testing.T) {
elapsedFirst := int64(100)
elapsedLast := int64(300)

metric := NewRequestMetric()
metric := NewRequestMetric("us-east-1")
result := &requestResult{
Time: 10000000,
Elapsed: 10000000,
Expand Down Expand Up @@ -337,12 +337,11 @@ func TestQuitOnLambdaTimeout(t *testing.T) {

reportingFrequency := time.Duration(5) * time.Second
settings := LambdaSettings{
MaxRequestCount: 3,
ConcurrencyCount: 1,
ReportingFrequency: reportingFrequency,
StresstestTimeout: 10,
LambdaExecTimeoutSeconds: 1,
LambdaRegion: "us-east-1",
MaxRequestCount: 3,
ConcurrencyCount: 1,
ReportingFrequency: reportingFrequency,
StresstestTimeout: 10,
LambdaRegion: "us-east-1",
}
settings.RequestParameters.URL = urlStr
sender := &TestResultSender{}
Expand All @@ -353,6 +352,7 @@ func TestQuitOnLambdaTimeout(t *testing.T) {
function := &lambdaTestFunction{
lambda: lambda,
}
lambda.Settings.LambdaExecTimeoutSeconds = 1
RunOrFailAfterTimout(t, function, 1400)
resLength := len(sender.sentResults)
timeoutRemaining := lambda.Settings.StresstestTimeout
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestMetricSendResults(t *testing.T) {
ConnectionError: false,
}

metric := NewRequestMetric()
metric := NewRequestMetric("us-east-1")
sender := &TestResultSender{}

metric.addRequest(result)
Expand All @@ -404,24 +404,35 @@ func TestRunLoadTestWithHighConcurrency(t *testing.T) {
server := createAndStartTestServer()
defer server.Stop()

runLoadTestWith(t, 500, 100, 500)
runLoadTestWithDefaultExecTimeout(t, 500, 100, 500)
}

func TestRunLoadTestWithOneRequest(t *testing.T) {
server := createAndStartTestServer()
defer server.Stop()

runLoadTestWith(t, 1, -1, 50)
runLoadTestWithDefaultExecTimeout(t, 1, -1, 50)
}

func TestRunLoadTestWithZeroRequests(t *testing.T) {
server := createAndStartTestServer()
defer server.Stop()

runLoadTestWith(t, 0, 1, 50)
runLoadTestWithDefaultExecTimeout(t, 1, 1, 120)
}

func TestRunWithExecTimeout(t *testing.T) {
server := createAndStartTestServer()
defer server.Stop()

runLoadTestWith(t, 0, 1, 1, 1050)
}

func runLoadTestWithDefaultExecTimeout(t *testing.T, requestCount int, concurrency int, milliseconds int) {
runLoadTestWith(t, requestCount, 0, concurrency, milliseconds)
}

func runLoadTestWith(t *testing.T, requestCount int, concurrency int, milliseconds int) {
func runLoadTestWith(t *testing.T, requestCount int, stresstestTimeout int, concurrency int, milliseconds int) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
Expand All @@ -430,6 +441,7 @@ func runLoadTestWith(t *testing.T, requestCount int, concurrency int, millisecon
MaxRequestCount: requestCount,
ConcurrencyCount: concurrency,
ReportingFrequency: reportingFrequency,
StresstestTimeout: stresstestTimeout,
}
settings.RequestParameters.URL = urlStr
sender := &TestResultSender{}
Expand All @@ -447,7 +459,7 @@ func runLoadTestWith(t *testing.T, requestCount int, concurrency int, millisecon
if results.Finished != true {
t.Error("the lambda should have finished it's results")
}
if results.TotalReqs != requestCount {
if results.TotalReqs != requestCount && requestCount > 0 {
t.Errorf("the lambda generated results for %d request, expected %d", results.TotalReqs, requestCount)
}
}
Expand Down
20 changes: 11 additions & 9 deletions queue/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,28 @@ type AggData struct {
Region string `json:"region"`
FatalError string `json:"fatal-error"`
Finished bool `json:"finished"`
FinishedLambdas int `json:"finished-lambdas"`
}

// RegionsAggData type
type RegionsAggData struct {
Regions map[string]AggData
TotalExpectedRequests uint
lambdasByRegion int
}

func (d *RegionsAggData) allRequestsReceived() bool {
var requests uint
var finishedCount int
var finishedRegions int

for _, region := range d.Regions {
requests += uint(region.TotalReqs)
if region.Finished {
finishedCount += 1
if region.FinishedLambdas == d.lambdasByRegion {
finishedRegions += 1
}
}

return requests == d.TotalExpectedRequests || finishedCount == len(d.Regions)
return d.TotalExpectedRequests > 0 && requests == d.TotalExpectedRequests || finishedRegions == len(d.Regions)

This comment has been minimized.

Copy link
@cwaltken-edrans

cwaltken-edrans Apr 25, 2017

Author Collaborator

@zeph: this fixes #113.

}

func addResult(data *AggData, result *AggData, isFinalSum bool) {
Expand Down Expand Up @@ -77,7 +79,7 @@ func addResult(data *AggData, result *AggData, isFinalSum bool) {
data.Fastest = result.Fastest
}
if result.Finished {
data.Finished = true
data.FinishedLambdas += 1
}
}

Expand All @@ -92,15 +94,15 @@ func SumRegionResults(regionData *RegionsAggData) *AggData {
}

// Aggregate listens for results and sends totals, closing the channel when done
func Aggregate(awsConfig *aws.Config, queueURL string, totalExpectedRequests uint) chan RegionsAggData {
func Aggregate(awsConfig *aws.Config, queueURL string, totalExpectedRequests uint, lambdasByRegion int) chan RegionsAggData {
results := make(chan RegionsAggData)
go aggregate(results, awsConfig, queueURL, totalExpectedRequests)
go aggregate(results, awsConfig, queueURL, totalExpectedRequests, lambdasByRegion)
return results
}

func aggregate(results chan RegionsAggData, awsConfig *aws.Config, queueURL string, totalExpectedRequests uint) {
func aggregate(results chan RegionsAggData, awsConfig *aws.Config, queueURL string, totalExpectedRequests uint, lambdasByRegion int) {
defer close(results)
data := RegionsAggData{make(map[string]AggData), totalExpectedRequests}
data := RegionsAggData{make(map[string]AggData), totalExpectedRequests, lambdasByRegion}

adaptor := NewSQSAdaptor(awsConfig, queueURL)
timeoutStart := time.Now()
Expand Down

0 comments on commit 024801e

Please sign in to comment.