From 114da4fb0fbd2acea33a6f38d7f42eaa60618289 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Fri, 4 Mar 2022 16:40:35 -0500 Subject: [PATCH 01/15] context creation --- sdk/internal/perf/implementation.go | 12 ++++++++++-- sdk/internal/perf/testdata/perf/sleep_perf.go | 9 +++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/sdk/internal/perf/implementation.go b/sdk/internal/perf/implementation.go index 076241b69107..5886a2b4cb0a 100644 --- a/sdk/internal/perf/implementation.go +++ b/sdk/internal/perf/implementation.go @@ -69,8 +69,12 @@ func runTest(p PerfTest, index int, c chan runResult, ID string) { warmUpLastPrint := 1.0 warmUpPerSecondCount := make([]int, 0) warmupCount := 0 + + ctx, cancel := context.WithTimeout(context.Background(), time.Second * time.Duration(warmUpDuration)) + defer cancel() + for time.Since(warmUpStart).Seconds() < float64(warmUpDuration) { - err := p.Run(context.Background()) + err := p.Run(ctx) if err != nil { c <- runResult{err: err} } @@ -109,8 +113,12 @@ func runTest(p PerfTest, index int, c chan runResult, ID string) { totalCount := 0 lastPrint := 1.0 perSecondCount := make([]int, 0) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second * time.Duration(duration)) + defer cancel() + for time.Since(timeStart).Seconds() < float64(duration) { - err := p.Run(context.Background()) + err := p.Run(ctx) if err != nil { c <- runResult{err: err} } diff --git a/sdk/internal/perf/testdata/perf/sleep_perf.go b/sdk/internal/perf/testdata/perf/sleep_perf.go index 271794ae5030..ebdce174d36b 100644 --- a/sdk/internal/perf/testdata/perf/sleep_perf.go +++ b/sdk/internal/perf/testdata/perf/sleep_perf.go @@ -58,8 +58,13 @@ func (g *globalSleepPerfTest) NewPerfTest(ctx context.Context, options *perf.Per } func (s *sleepPerfTest) Run(ctx context.Context) error { - time.Sleep(s.sleepInterval) - s.sleepInterval = time.Duration(float64(s.sleepInterval.Nanoseconds())*sleepTestOpts.iterationGrowthFactor) * time.Nanosecond + select { + case <-ctx.Done(): + return nil + default: + time.Sleep(s.sleepInterval) + s.sleepInterval = time.Duration(float64(s.sleepInterval.Nanoseconds())*sleepTestOpts.iterationGrowthFactor) * time.Nanosecond + } return nil } From e40120c8cc2f4bca020a3a5bf2fdb7cc6f4c5292 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Mon, 14 Mar 2022 14:52:55 -0400 Subject: [PATCH 02/15] using a test runner --- sdk/internal/perf/atomic_float.go | 22 ++ sdk/internal/perf/implementation.go | 5 +- sdk/internal/perf/perf.go | 33 ++- sdk/internal/perf/perf_runner.go | 358 ++++++++++++++++++++++++++++ sdk/internal/perf/status.go | 3 +- 5 files changed, 416 insertions(+), 5 deletions(-) create mode 100644 sdk/internal/perf/atomic_float.go create mode 100644 sdk/internal/perf/perf_runner.go diff --git a/sdk/internal/perf/atomic_float.go b/sdk/internal/perf/atomic_float.go new file mode 100644 index 000000000000..a488965d8436 --- /dev/null +++ b/sdk/internal/perf/atomic_float.go @@ -0,0 +1,22 @@ +package perf + +import ( + "sync" +) + +type atomicFloat64 struct { + f float64 + mu sync.RWMutex +} + +func (a *atomicFloat64) GetFloat() float64 { + a.mu.RLock() + defer a.mu.RUnlock() + return a.f +} + +func (a *atomicFloat64) SetFloat(f float64) { + a.mu.Lock() + defer a.mu.Unlock() + a.f = f +} diff --git a/sdk/internal/perf/implementation.go b/sdk/internal/perf/implementation.go index c0d614c51cb0..8b8853397e43 100644 --- a/sdk/internal/perf/implementation.go +++ b/sdk/internal/perf/implementation.go @@ -248,14 +248,13 @@ func runPerfTest(name string, p NewPerfTest) error { go runTest(perfTest, idx, messages, IDs[idx]) } - warmUpStatus := newStatusRunner(time.Now(), warmUpDuration) - durationStatus := newStatusRunner(time.Now().Add(time.Duration(warmUpDuration)*time.Second), duration) + warmUpStatus := newStatusRunner(time.Now(), warmUpDuration, true) + durationStatus := newStatusRunner(time.Now().Add(time.Duration(warmUpDuration)*time.Second), duration, false) // Read incoming messages and handle status updates if warmUpDuration > 0 { wg.Add(1) go func() { - warmUpStatus.isWarmup = true warmUpStatus.printUpdates() warmUpStatus.printFinalUpdate() wg.Done() diff --git a/sdk/internal/perf/perf.go b/sdk/internal/perf/perf.go index c366faca5a67..6834651f7c98 100644 --- a/sdk/internal/perf/perf.go +++ b/sdk/internal/perf/perf.go @@ -10,6 +10,8 @@ import ( "net/http" "os" "runtime" + "sync/atomic" + "time" ) func init() { @@ -64,6 +66,27 @@ type PerfTestOptions struct { // parallelIndex is the index of the goroutine parallelIndex int + + // number of warmup operations completed + warmupCount int64 + warmupStart time.Time + warmupElapsed atomicFloat64 + + // number of operations runCount + runCount int64 + runStart time.Time + runElapsed atomicFloat64 + + finished bool +} + +// increment does an atomic increment of the warmup or non-warmup performance test +func (p *PerfTestOptions) incrememt(warmup bool) { + if warmup { + atomic.AddInt64(&p.warmupCount, 1) + } else { + atomic.AddInt64(&p.runCount, 1) + } } // NewPerfTest returns an instance of PerfTest and embeds the given `options` in the struct @@ -119,8 +142,16 @@ func Run(tests map[string]PerfMethods) { fmt.Printf("\tRunning %s\n", testNameToRun) - err := runPerfTest(testNameToRun, perfTestToRun.New) + runner := newPerfRunner(perfTestToRun, testNameToRun) + err := runner.Run() if err != nil { panic(err) } + + if false { + err := runPerfTest(testNameToRun, perfTestToRun.New) + if err != nil { + panic(err) + } + } } diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go new file mode 100644 index 000000000000..2c853bd22cb1 --- /dev/null +++ b/sdk/internal/perf/perf_runner.go @@ -0,0 +1,358 @@ +package perf + +import ( + "context" + "fmt" + "log" + "os" + "sync/atomic" + "text/tabwriter" + "time" +) + +type perfRunner struct { + // ticker is the runner for giving updates every second + ticker *time.Ticker + done chan bool + + // name of the performance test + name string + + // the perf test, options, and transports being tested/used + perfToRun PerfMethods + allOptions []*PerfTestOptions + proxyTransports map[string]*RecordingHTTPClient + + // All created tests + tests []PerfTest + + globalInstance GlobalPerfTest + + // this is the previous prints total + warmupOperationStatusTracker int64 + operationStatusTracker int64 + + // writer + w *tabwriter.Writer + + // tracker for whether the warmup has finished + warmupFinished int32 + warmupPrinted bool +} + +func newPerfRunner(p PerfMethods, name string) *perfRunner { + return &perfRunner{ + ticker: time.NewTicker(time.Second), + done: make(chan bool), + name: name, + perfToRun: p, + operationStatusTracker: -1, + warmupOperationStatusTracker: -1, + w: tabwriter.NewWriter(os.Stdout, 16, 8, 1, ' ', tabwriter.AlignRight), + warmupFinished: 0, + warmupPrinted: false, + } +} + +func (r *perfRunner) Run() error { + // Poller for printing + go func() { + for { + select { + case <-r.done: + return + case <-r.ticker.C: + err := r.printStatus() + if err != nil { + panic(err) + } + } + } + }() + + err := r.globalSetup() + if err != nil { + return err + } + + err = r.createPerfTests() + if err != nil { + return err + } + r.done <- true + + r.printFinalUpdate(false) + err = r.cleanup() + if err != nil { + panic(err) + } + err = r.globalInstance.GlobalCleanup(context.Background()) + if err != nil { + panic(err) + } + return nil +} + +// global setup by instantiating a single global instance +func (r *perfRunner) globalSetup() error { + globalInst, err := r.perfToRun.New(context.TODO(), PerfTestOptions{Name: r.name}) + if err != nil { + return err + } + r.globalInstance = globalInst + return nil +} + +func (r *perfRunner) createPerfTests() error { + var IDs []string + proxyURLS := parseProxyURLS() + + for idx := 0; idx < parallelInstances; idx++ { + options := &PerfTestOptions{} + ID := fmt.Sprintf("%s-%d", r.name, idx) + IDs = append(IDs, ID) + + if testProxyURLs != "" { + proxyURL := proxyURLS[idx%len(proxyURLS)] + transporter := NewProxyTransport(&TransportOptions{ + TestName: ID, + proxyURL: proxyURL, + }) + options.Transporter = transporter + r.proxyTransports[ID] = transporter + } else { + options.Transporter = defaultHTTPClient + } + options.parallelIndex = idx + + perfTest, err := r.globalInstance.NewPerfTest(context.TODO(), options) + if err != nil { + return err + } + r.tests = append(r.tests, perfTest) + r.allOptions = append(r.allOptions, options) + } + + for idx, test := range r.tests { + wg.Add(1) + go r.runTest(test, idx, IDs[idx]) + } + + wg.Wait() + + return nil +} + +func (r *perfRunner) cleanup() error { + for _, t := range r.tests { + err := t.Cleanup(context.Background()) + if err != nil { + return err + } + } + return nil +} + +func (n *perfRunner) printStatus() error { + if !n.warmupPrinted { + finishedWarmup := n.printWarmupStatus() + if !finishedWarmup { + return nil + } + } + + if n.operationStatusTracker == -1 { + n.printFinalUpdate(true) + n.operationStatusTracker = 0 + fmt.Fprintln(n.w, "Current\tTotal\tAverage\t") + } + totalOperations := n.totalOperations(false) + + _, err := fmt.Fprintf( + n.w, + "%s\t%s\t%s\t\n", + messagePrinter.Sprintf("%d", totalOperations-n.operationStatusTracker), + messagePrinter.Sprintf("%d", totalOperations), + messagePrinter.Sprintf("%.2f", n.opsPerSecond(false)), + ) + if err != nil { + return err + } + n.operationStatusTracker = totalOperations + return n.w.Flush() +} + +// return true if all warmup information has been printed +func (n *perfRunner) printWarmupStatus() bool { + if n.warmupOperationStatusTracker == -1 { + n.warmupOperationStatusTracker = 0 + fmt.Println("===== WARMUP =====") + fmt.Fprintln(n.w, "Current\tTotal\tAverage\t") + } + totalOperations := n.totalOperations(true) + + if n.warmupOperationStatusTracker == totalOperations { + return true + } + + _, err := fmt.Fprintf( + n.w, + "%s\t%s\t%s\t\n", + messagePrinter.Sprintf("%d", totalOperations-n.warmupOperationStatusTracker), + messagePrinter.Sprintf("%d", totalOperations), + messagePrinter.Sprintf("%.2f", n.opsPerSecond(true)), + ) + if err != nil { + panic(err) + } + n.warmupOperationStatusTracker = totalOperations + err = n.w.Flush() + if err != nil { + panic(err) + } + return false +} + +func (r *perfRunner) totalOperations(warmup bool) int64 { + var ret int64 + + for _, opt := range r.allOptions { + if warmup { + ret += atomic.LoadInt64(&opt.warmupCount) + } else { + ret += atomic.LoadInt64(&opt.runCount) + } + } + + return ret +} + +func (r *perfRunner) opsPerSecond(warmup bool) float64 { + var ret float64 + for _, opt := range r.allOptions { + if warmup { + ret += float64(atomic.LoadInt64(&opt.warmupCount)) / opt.warmupElapsed.GetFloat() + } else { + ret += float64(atomic.LoadInt64(&opt.runCount)) / opt.runElapsed.GetFloat() + } + } + return ret +} + +func (r *perfRunner) printFinalUpdate(warmup bool) error { + totalOperations := r.totalOperations(warmup) + opsPerSecond := r.opsPerSecond(warmup) + if opsPerSecond == 0.0 { + return fmt.Errorf("completed without generating operation statistics") + } + + secondsPerOp := 1.0 / opsPerSecond + weightedAvg := float64(totalOperations) / opsPerSecond + + fmt.Println("\n=== Results ===") + fmt.Printf( + "Completed %s operations in a weighted-average of %ss (%s ops/s, %s s/op)\n", + messagePrinter.Sprintf("%d", totalOperations), + messagePrinter.Sprintf("%.2f", weightedAvg), + messagePrinter.Sprintf("%.3f", opsPerSecond), + messagePrinter.Sprintf("%.3f", secondsPerOp), + ) + return nil +} + +// runTest takes care of the semantics of running a single iteration. It returns the number of times the test ran as an int, the exact number +// of seconds the test ran as a float64, and any errors. +func (r *perfRunner) runTest(p PerfTest, index int, ID string) { + defer wg.Done() + if debug { + log.Printf("number of proxies %d", len(r.proxyTransports)) + } + + opts := r.allOptions[index] + + // If we are using the test proxy need to set up the in-memory recording. + if testProxyURLs != "" { + // First request goes through in Live mode + r.proxyTransports[ID].SetMode("live") + err := p.Run(context.Background()) + if err != nil { + panic(err) + } + + // 2nd request goes through in Record mode + r.proxyTransports[ID].SetMode("record") + err = r.proxyTransports[ID].start() + if err != nil { + panic(err) + } + err = p.Run(context.Background()) + if err != nil { + panic(err) + } + err = r.proxyTransports[ID].stop() + if err != nil { + panic(err) + } + + // All ensuing requests go through in Playback mode + r.proxyTransports[ID].SetMode("playback") + err = r.proxyTransports[ID].start() + if err != nil { + panic(err) + } + } + + if warmUpDuration > 0 { + opts.warmupStart = time.Now() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(warmUpDuration)) + defer cancel() + + lastSavedTime := time.Now() + for time.Since(opts.warmupStart).Seconds() < float64(warmUpDuration) { + err := p.Run(ctx) + if err != nil { + panic(err) + } + opts.incrememt(true) + + if time.Since(lastSavedTime).Seconds() > 0.3 { + opts.warmupElapsed.SetFloat(time.Since(opts.warmupStart).Seconds()) + lastSavedTime = time.Now() + } + } + + opts.warmupElapsed.SetFloat(time.Since(opts.warmupStart).Seconds()) + } + _ = atomic.AddInt32(&r.warmupFinished, 1) + + opts.runStart = time.Now() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(duration)) + defer cancel() + + lastSavedTime := time.Now() + for time.Since(opts.runStart).Seconds() < float64(duration) { + err := p.Run(ctx) + if err != nil { + panic(err) + } + opts.incrememt(false) + + if time.Since(lastSavedTime).Seconds() > 0.1 { + opts.runElapsed.SetFloat(time.Since(opts.runStart).Seconds()) + lastSavedTime = time.Now() + } + } + + opts.runElapsed.SetFloat(time.Since(opts.runStart).Seconds()) + + if testProxyURLs != "" { + // Stop the proxy now + err := proxyTransportsSuite[ID].stop() + if err != nil { + panic(err) + } + proxyTransportsSuite[ID].SetMode("live") + } + opts.finished = true +} diff --git a/sdk/internal/perf/status.go b/sdk/internal/perf/status.go index a160b0a66574..9260230ca321 100644 --- a/sdk/internal/perf/status.go +++ b/sdk/internal/perf/status.go @@ -47,13 +47,14 @@ type statusRunner struct { isWarmup bool } -func newStatusRunner(t time.Time, runTime int) *statusRunner { +func newStatusRunner(t time.Time, runTime int, warmup bool) *statusRunner { return &statusRunner{ results: make([]runResult, 0), start: t, perRoutineResults: map[int][]runResult{}, lastPrint: t, totalRunTime: runTime, + isWarmup: warmup, } } From 0d8cf8c9e509bc40b580f4b489bd12d9695ec4db Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Mon, 14 Mar 2022 14:56:07 -0400 Subject: [PATCH 03/15] removing old code --- sdk/internal/perf/implementation.go | 272 ---------------------------- sdk/internal/perf/perf.go | 7 - sdk/internal/perf/perf_runner.go | 66 +++---- sdk/internal/perf/status.go | 133 -------------- 4 files changed, 35 insertions(+), 443 deletions(-) diff --git a/sdk/internal/perf/implementation.go b/sdk/internal/perf/implementation.go index 8b8853397e43..ee5e32c5040f 100644 --- a/sdk/internal/perf/implementation.go +++ b/sdk/internal/perf/implementation.go @@ -4,14 +4,8 @@ package perf import ( - "context" - "fmt" - "log" - "os" "strings" "sync" - "text/tabwriter" - "time" ) var ( @@ -24,171 +18,6 @@ var ( numProcesses int ) -// runTest takes care of the semantics of running a single iteration. It returns the number of times the test ran as an int, the exact number -// of seconds the test ran as a float64, and any errors. -func runTest(p PerfTest, index int, c chan runResult, ID string) { - defer wg.Done() - if debug { - log.Printf("number of proxies %d", len(proxyTransportsSuite)) - } - - // If we are using the test proxy need to set up the in-memory recording. - if testProxyURLs != "" { - // First request goes through in Live mode - proxyTransportsSuite[ID].SetMode("live") - err := p.Run(context.Background()) - if err != nil { - c <- runResult{err: err} - } - - // 2nd request goes through in Record mode - proxyTransportsSuite[ID].SetMode("record") - err = proxyTransportsSuite[ID].start() - if err != nil { - panic(err) - } - err = p.Run(context.Background()) - if err != nil { - c <- runResult{err: err} - } - err = proxyTransportsSuite[ID].stop() - if err != nil { - panic(err) - } - - // All ensuing requests go through in Playback mode - proxyTransportsSuite[ID].SetMode("playback") - err = proxyTransportsSuite[ID].start() - if err != nil { - panic(err) - } - } - - if warmUpDuration > 0 { - warmUpStart := time.Now() - warmUpLastPrint := 1.0 - warmUpPerSecondCount := make([]int, 0) - warmupCount := 0 - - ctx, cancel := context.WithTimeout(context.Background(), time.Second * time.Duration(warmUpDuration)) - defer cancel() - - for time.Since(warmUpStart).Seconds() < float64(warmUpDuration) { - err := p.Run(ctx) - if err != nil { - c <- runResult{err: err} - } - warmupCount += 1 - - if time.Since(warmUpStart).Seconds() > warmUpLastPrint { - thisSecondCount := warmupCount - sumInts(warmUpPerSecondCount) - c <- runResult{ - warmup: true, - count: thisSecondCount, - parallelIndex: index, - timeInSeconds: time.Since(warmUpStart).Seconds(), - } - - warmUpPerSecondCount = append(warmUpPerSecondCount, thisSecondCount) - warmUpLastPrint += 1.0 - if int(warmUpLastPrint) == warmUpDuration { - // We can have odd scenarios where we send this - // message, and the final message below right after - warmUpLastPrint += 1.0 - } - } - } - - thisSecondCount := warmupCount - sumInts(warmUpPerSecondCount) - c <- runResult{ - warmup: true, - completed: true, - count: thisSecondCount, - parallelIndex: index, - timeInSeconds: time.Since(warmUpStart).Seconds(), - } - } - - timeStart := time.Now() - totalCount := 0 - lastPrint := 1.0 - perSecondCount := make([]int, 0) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second * time.Duration(duration)) - defer cancel() - - for time.Since(timeStart).Seconds() < float64(duration) { - err := p.Run(ctx) - if err != nil { - c <- runResult{err: err} - } - totalCount += 1 - - // Every second (roughly) we send an update through the channel - if time.Since(timeStart).Seconds() > lastPrint { - thisCount := totalCount - sumInts(perSecondCount) - c <- runResult{ - count: thisCount, - parallelIndex: index, - completed: false, - timeInSeconds: time.Since(timeStart).Seconds(), - } - lastPrint += 1.0 - perSecondCount = append(perSecondCount, thisCount) - - if int(lastPrint) == duration { - // if we are w/in one second of the end time, we do not - // want to send any more results, we'll just send a final result - lastPrint += 10.0 - } - } - } - - elapsed := time.Since(timeStart).Seconds() - lastSecondCount := totalCount - sumInts(perSecondCount) - c <- runResult{ - completed: true, - count: lastSecondCount, - parallelIndex: index, - timeInSeconds: elapsed, - } - - if testProxyURLs != "" { - // Stop the proxy now - err := proxyTransportsSuite[ID].stop() - if err != nil { - c <- runResult{err: err} - } - proxyTransportsSuite[ID].SetMode("live") - } -} - -// runCleanup takes care of the semantics for tearing down a single iteration of a performance test. -func runCleanup(p PerfTest) error { - return p.Cleanup(context.Background()) -} - -// runResult is the result sent back through the channel for updates and final results -type runResult struct { - // number of iterations completed since the previous message - count int - - // The time the update comes from - timeInSeconds float64 - - // if there is an error it will be here - err error - - // true when this is the last result from a go routine - completed bool - - // Index of the goroutine - parallelIndex int - - // indicates if result is from a warmup start - warmup bool -} - // parse the TestProxy input into a slice of strings func parseProxyURLS() []string { var ret []string @@ -202,104 +31,3 @@ func parseProxyURLS() []string { return ret } - -// Spins off each Parallel instance as a separate goroutine, reads messages and runs cleanup/setup methods. -func runPerfTest(name string, p NewPerfTest) error { - globalInstance, err := p(context.TODO(), PerfTestOptions{Name: name}) - if err != nil { - return err - } - - var perfTests []PerfTest - var IDs []string - proxyURLS := parseProxyURLS() - - w := tabwriter.NewWriter(os.Stdout, 16, 8, 1, ' ', tabwriter.AlignRight) - - messages := make(chan runResult) - for idx := 0; idx < parallelInstances; idx++ { - options := &PerfTestOptions{} - - ID := fmt.Sprintf("%s-%d", name, idx) - IDs = append(IDs, ID) - - if testProxyURLs != "" { - proxyURL := proxyURLS[idx%len(proxyURLS)] - transporter := NewProxyTransport(&TransportOptions{ - TestName: ID, - proxyURL: proxyURL, - }) - options.Transporter = transporter - } else { - options.Transporter = defaultHTTPClient - } - options.parallelIndex = idx - - perfTest, err := globalInstance.NewPerfTest(context.TODO(), options) - if err != nil { - panic(err) - } - perfTests = append(perfTests, perfTest) - } - - for idx, perfTest := range perfTests { - // Create a thread for running a single test - wg.Add(1) - go runTest(perfTest, idx, messages, IDs[idx]) - } - - warmUpStatus := newStatusRunner(time.Now(), warmUpDuration, true) - durationStatus := newStatusRunner(time.Now().Add(time.Duration(warmUpDuration)*time.Second), duration, false) - // Read incoming messages and handle status updates - - if warmUpDuration > 0 { - wg.Add(1) - go func() { - warmUpStatus.printUpdates() - warmUpStatus.printFinalUpdate() - wg.Done() - }() - } - - wg.Add(1) - go func() { - durationStatus.printUpdates() - durationStatus.printFinalUpdate() - wg.Done() - }() - - // Add another goroutine to close the channel after completion - go func() { - wg.Wait() - close(messages) - }() - - for msg := range messages { - if debug { - log.Println("Handling message: ", msg) - } - if msg.err != nil { - panic(msg.err) - } - if msg.warmup { - warmUpStatus.handleMessage(msg, w) - } else { - durationStatus.handleMessage(msg, w) - } - } - - // Run Cleanup on each parallel instance - for _, pTest := range perfTests { - err := runCleanup(pTest) - if err != nil { - panic(err) - } - } - - err = globalInstance.GlobalCleanup(context.TODO()) - if err != nil { - return fmt.Errorf("there was an error with the GlobalCleanup method: %v", err.Error()) - } - - return nil -} diff --git a/sdk/internal/perf/perf.go b/sdk/internal/perf/perf.go index 6834651f7c98..38960baa8eff 100644 --- a/sdk/internal/perf/perf.go +++ b/sdk/internal/perf/perf.go @@ -147,11 +147,4 @@ func Run(tests map[string]PerfMethods) { if err != nil { panic(err) } - - if false { - err := runPerfTest(testNameToRun, perfTestToRun.New) - if err != nil { - panic(err) - } - } } diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go index 2c853bd22cb1..aaa2e1a42d42 100644 --- a/sdk/internal/perf/perf_runner.go +++ b/sdk/internal/perf/perf_runner.go @@ -8,6 +8,8 @@ import ( "sync/atomic" "text/tabwriter" "time" + + "golang.org/x/text/message" ) type perfRunner struct { @@ -33,7 +35,8 @@ type perfRunner struct { operationStatusTracker int64 // writer - w *tabwriter.Writer + w *tabwriter.Writer + messagePrinter *message.Printer // tracker for whether the warmup has finished warmupFinished int32 @@ -49,6 +52,7 @@ func newPerfRunner(p PerfMethods, name string) *perfRunner { operationStatusTracker: -1, warmupOperationStatusTracker: -1, w: tabwriter.NewWriter(os.Stdout, 16, 8, 1, ' ', tabwriter.AlignRight), + messagePrinter: message.NewPrinter(message.MatchLanguage("en")), warmupFinished: 0, warmupPrinted: false, } @@ -153,60 +157,60 @@ func (r *perfRunner) cleanup() error { return nil } -func (n *perfRunner) printStatus() error { - if !n.warmupPrinted { - finishedWarmup := n.printWarmupStatus() +func (r *perfRunner) printStatus() error { + if !r.warmupPrinted { + finishedWarmup := r.printWarmupStatus() if !finishedWarmup { return nil } } - if n.operationStatusTracker == -1 { - n.printFinalUpdate(true) - n.operationStatusTracker = 0 - fmt.Fprintln(n.w, "Current\tTotal\tAverage\t") + if r.operationStatusTracker == -1 { + r.printFinalUpdate(true) + r.operationStatusTracker = 0 + fmt.Fprintln(r.w, "Current\tTotal\tAverage\t") } - totalOperations := n.totalOperations(false) + totalOperations := r.totalOperations(false) _, err := fmt.Fprintf( - n.w, + r.w, "%s\t%s\t%s\t\n", - messagePrinter.Sprintf("%d", totalOperations-n.operationStatusTracker), - messagePrinter.Sprintf("%d", totalOperations), - messagePrinter.Sprintf("%.2f", n.opsPerSecond(false)), + r.messagePrinter.Sprintf("%d", totalOperations-r.operationStatusTracker), + r.messagePrinter.Sprintf("%d", totalOperations), + r.messagePrinter.Sprintf("%.2f", r.opsPerSecond(false)), ) if err != nil { return err } - n.operationStatusTracker = totalOperations - return n.w.Flush() + r.operationStatusTracker = totalOperations + return r.w.Flush() } // return true if all warmup information has been printed -func (n *perfRunner) printWarmupStatus() bool { - if n.warmupOperationStatusTracker == -1 { - n.warmupOperationStatusTracker = 0 +func (r *perfRunner) printWarmupStatus() bool { + if r.warmupOperationStatusTracker == -1 { + r.warmupOperationStatusTracker = 0 fmt.Println("===== WARMUP =====") - fmt.Fprintln(n.w, "Current\tTotal\tAverage\t") + fmt.Fprintln(r.w, "Current\tTotal\tAverage\t") } - totalOperations := n.totalOperations(true) + totalOperations := r.totalOperations(true) - if n.warmupOperationStatusTracker == totalOperations { + if r.warmupOperationStatusTracker == totalOperations { return true } _, err := fmt.Fprintf( - n.w, + r.w, "%s\t%s\t%s\t\n", - messagePrinter.Sprintf("%d", totalOperations-n.warmupOperationStatusTracker), - messagePrinter.Sprintf("%d", totalOperations), - messagePrinter.Sprintf("%.2f", n.opsPerSecond(true)), + r.messagePrinter.Sprintf("%d", totalOperations-r.warmupOperationStatusTracker), + r.messagePrinter.Sprintf("%d", totalOperations), + r.messagePrinter.Sprintf("%.2f", r.opsPerSecond(true)), ) if err != nil { panic(err) } - n.warmupOperationStatusTracker = totalOperations - err = n.w.Flush() + r.warmupOperationStatusTracker = totalOperations + err = r.w.Flush() if err != nil { panic(err) } @@ -252,10 +256,10 @@ func (r *perfRunner) printFinalUpdate(warmup bool) error { fmt.Println("\n=== Results ===") fmt.Printf( "Completed %s operations in a weighted-average of %ss (%s ops/s, %s s/op)\n", - messagePrinter.Sprintf("%d", totalOperations), - messagePrinter.Sprintf("%.2f", weightedAvg), - messagePrinter.Sprintf("%.3f", opsPerSecond), - messagePrinter.Sprintf("%.3f", secondsPerOp), + r.messagePrinter.Sprintf("%d", totalOperations), + r.messagePrinter.Sprintf("%.2f", weightedAvg), + r.messagePrinter.Sprintf("%.3f", opsPerSecond), + r.messagePrinter.Sprintf("%.3f", secondsPerOp), ) return nil } diff --git a/sdk/internal/perf/status.go b/sdk/internal/perf/status.go index 9260230ca321..449b1bca9008 100644 --- a/sdk/internal/perf/status.go +++ b/sdk/internal/perf/status.go @@ -2,136 +2,3 @@ // Licensed under the MIT License. See License.txt in the project root for license information. package perf - -import ( - "fmt" - "os" - "text/tabwriter" - "time" - - "golang.org/x/text/message" -) - -var messagePrinter *message.Printer = message.NewPrinter(message.MatchLanguage("en")) - -// statusRunner is the struct responsible for handling messages -type statusRunner struct { - // results is a slice of all results from the goroutines - results []runResult - - // start is the time the statusRunner was started - start time.Time - - // perRoutineResults map the parallel index to a slice of runResults - perRoutineResults map[int][]runResult - - // lastPrint holds when the last information was printed to stdout - // the initial value is the same as start. When this value exceeds - // is more than 1 second after time.Now(), a new update is printed. - lastPrint time.Time - - // total is a running count of the count of performance tests run - total int - - // prevTotal is the total of the last output - prevTotal int - - // hasFinished indicates if the final results have been printed out - totalRunTime int - - // routinesFinished indicates how many routines have sent a message - // indicating they have completed execution - routinesFinished int - - // isWarmup indicates whether the messages are from warmup - isWarmup bool -} - -func newStatusRunner(t time.Time, runTime int, warmup bool) *statusRunner { - return &statusRunner{ - results: make([]runResult, 0), - start: t, - perRoutineResults: map[int][]runResult{}, - lastPrint: t, - totalRunTime: runTime, - isWarmup: warmup, - } -} - -func (s *statusRunner) handleMessage(msg runResult, w *tabwriter.Writer) { - s.results = append(s.results, msg) - - if msg.completed { - s.routinesFinished += 1 - } - - s.total += msg.count - - s.perRoutineResults[msg.parallelIndex] = append(s.perRoutineResults[msg.parallelIndex], msg) -} - -func (s *statusRunner) printUpdates() { - w := tabwriter.NewWriter(os.Stdout, 16, 8, 1, ' ', tabwriter.AlignRight) - firstPrint := false - for s.routinesFinished != parallelInstances { - // Poll and print - if time.Since(s.lastPrint).Seconds() > 1.0 { - - if !firstPrint { - if s.isWarmup { - fmt.Println("\n=== Warm Up ===") - } else { - fmt.Println("\n=== Test ===") - } - fmt.Fprintln(w, "Current\tTotal\tAverage\t") - w.Flush() - firstPrint = true - } - - avg := float64(s.total) / time.Since(s.start).Seconds() - _, err := fmt.Fprintf( - w, - "%s\t%s\t%s\t\n", - messagePrinter.Sprintf("%d", s.total-s.prevTotal), - messagePrinter.Sprintf("%d", s.total), - messagePrinter.Sprintf("%.2f", avg), - ) - if err != nil { - panic(err) - } - - w.Flush() - - s.lastPrint = time.Now() - s.prevTotal = s.total - } - } -} - -func (s *statusRunner) printFinalUpdate() { - opsPerRoutine := make([]int, parallelInstances) - secondsPerRoutine := make([]float64, parallelInstances) - - for pIdx, msgs := range s.perRoutineResults { - secondsPerRoutine[pIdx] = msgs[len(msgs)-1].timeInSeconds - for _, msg := range msgs { - opsPerRoutine[pIdx] += msg.count - } - } - - opsPerSecond := 0.0 - for i := 0; i < parallelInstances; i++ { - opsPerSecond += float64(opsPerRoutine[i]) / secondsPerRoutine[i] - } - - fmt.Println("\n=== Results ===") - secondsPerOp := 1.0 / opsPerSecond - weightedAvgSec := float64(s.total) / opsPerSecond - fmt.Printf( - "Completed %s operations in a weighted-average of %ss (%s ops/s, %s s/op)\n", - messagePrinter.Sprintf("%d", s.total), - messagePrinter.Sprintf("%.2f", weightedAvgSec), - messagePrinter.Sprintf("%.2f", opsPerSecond), - messagePrinter.Sprintf("%.3f", secondsPerOp), - ) -} From 147011af8ab89787149d8daab07f59d9be8f8fa1 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Mon, 14 Mar 2022 15:29:41 -0400 Subject: [PATCH 04/15] eliminated last data race --- sdk/internal/perf/perf_runner.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go index aaa2e1a42d42..3869354e71b2 100644 --- a/sdk/internal/perf/perf_runner.go +++ b/sdk/internal/perf/perf_runner.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + "sync" "sync/atomic" "text/tabwriter" "time" @@ -12,6 +13,12 @@ import ( "golang.org/x/text/message" ) +// optionsSlice +type optionsSlice struct { + opts []*PerfTestOptions + mu sync.Mutex +} + type perfRunner struct { // ticker is the runner for giving updates every second ticker *time.Ticker @@ -22,7 +29,7 @@ type perfRunner struct { // the perf test, options, and transports being tested/used perfToRun PerfMethods - allOptions []*PerfTestOptions + allOptions optionsSlice proxyTransports map[string]*RecordingHTTPClient // All created tests @@ -134,7 +141,9 @@ func (r *perfRunner) createPerfTests() error { return err } r.tests = append(r.tests, perfTest) - r.allOptions = append(r.allOptions, options) + r.allOptions.mu.Lock() + r.allOptions.opts = append(r.allOptions.opts, options) + r.allOptions.mu.Unlock() } for idx, test := range r.tests { @@ -220,7 +229,9 @@ func (r *perfRunner) printWarmupStatus() bool { func (r *perfRunner) totalOperations(warmup bool) int64 { var ret int64 - for _, opt := range r.allOptions { + r.allOptions.mu.Lock() + defer r.allOptions.mu.Unlock() + for _, opt := range r.allOptions.opts { if warmup { ret += atomic.LoadInt64(&opt.warmupCount) } else { @@ -233,7 +244,10 @@ func (r *perfRunner) totalOperations(warmup bool) int64 { func (r *perfRunner) opsPerSecond(warmup bool) float64 { var ret float64 - for _, opt := range r.allOptions { + + r.allOptions.mu.Lock() + defer r.allOptions.mu.Unlock() + for _, opt := range r.allOptions.opts { if warmup { ret += float64(atomic.LoadInt64(&opt.warmupCount)) / opt.warmupElapsed.GetFloat() } else { @@ -272,7 +286,9 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { log.Printf("number of proxies %d", len(r.proxyTransports)) } - opts := r.allOptions[index] + r.allOptions.mu.Lock() + opts := r.allOptions.opts[index] + r.allOptions.mu.Unlock() // If we are using the test proxy need to set up the in-memory recording. if testProxyURLs != "" { From 2baff99d842393759d07421c537e05d27128562e Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Mon, 14 Mar 2022 16:33:52 -0400 Subject: [PATCH 05/15] fixing math in opsPerSecond, adding docs --- sdk/internal/perf/implementation.go | 20 +++++++----- sdk/internal/perf/perf_runner.go | 49 ++++++++++++++++++++++------- 2 files changed, 50 insertions(+), 19 deletions(-) diff --git a/sdk/internal/perf/implementation.go b/sdk/internal/perf/implementation.go index ee5e32c5040f..01c391ea55b4 100644 --- a/sdk/internal/perf/implementation.go +++ b/sdk/internal/perf/implementation.go @@ -9,25 +9,29 @@ import ( ) var ( + // debug is true if --debug is specified debug bool + // duration is the -d/--duration flag duration int + // testProxyURLs is the -x/--test-proxy flag, a semi-colon separated list testProxyURLs string + // warmUpDuration is the -w/--warmup flag warmUpDuration int + // parallelInstances is the -p/--parallel flag parallelInstances int - wg sync.WaitGroup - numProcesses int + + // wg is used to keep track of the number of goroutines created + wg sync.WaitGroup + numProcesses int ) -// parse the TestProxy input into a slice of strings +// parseProxyURLs splits the --test-proxy input with the delimiter ';' func parseProxyURLS() []string { - var ret []string if testProxyURLs == "" { - return ret + return []string{} } testProxyURLs = strings.TrimSuffix(testProxyURLs, ";") - ret = strings.Split(testProxyURLs, ";") - - return ret + return strings.Split(testProxyURLs, ";") } diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go index 3869354e71b2..eb002dcb9a67 100644 --- a/sdk/internal/perf/perf_runner.go +++ b/sdk/internal/perf/perf_runner.go @@ -13,7 +13,7 @@ import ( "golang.org/x/text/message" ) -// optionsSlice +// optionsSlice is a way to access the options in a thread safe way type optionsSlice struct { opts []*PerfTestOptions mu sync.Mutex @@ -35,13 +35,14 @@ type perfRunner struct { // All created tests tests []PerfTest + // globalInstance is the single globalInstance for GlobalCleanup globalInstance GlobalPerfTest // this is the previous prints total warmupOperationStatusTracker int64 operationStatusTracker int64 - // writer + // writer and messagePrinter w *tabwriter.Writer messagePrinter *message.Printer @@ -51,6 +52,10 @@ type perfRunner struct { } func newPerfRunner(p PerfMethods, name string) *perfRunner { + warmupFinished, warmupPrinted := 0, false + if warmUpDuration == 0 { + warmupFinished, warmupPrinted = parallelInstances, true + } return &perfRunner{ ticker: time.NewTicker(time.Second), done: make(chan bool), @@ -60,8 +65,8 @@ func newPerfRunner(p PerfMethods, name string) *perfRunner { warmupOperationStatusTracker: -1, w: tabwriter.NewWriter(os.Stdout, 16, 8, 1, ' ', tabwriter.AlignRight), messagePrinter: message.NewPrinter(message.MatchLanguage("en")), - warmupFinished: 0, - warmupPrinted: false, + warmupFinished: int32(warmupFinished), + warmupPrinted: warmupPrinted, } } @@ -114,6 +119,7 @@ func (r *perfRunner) globalSetup() error { return nil } +// createPerfTests spins up `parallelInstances` (specified by --parallel flag) goroutines func (r *perfRunner) createPerfTests() error { var IDs []string proxyURLS := parseProxyURLS() @@ -152,10 +158,10 @@ func (r *perfRunner) createPerfTests() error { } wg.Wait() - return nil } +// cleanup runs the Cleanup on each of the r.tests func (r *perfRunner) cleanup() error { for _, t := range r.tests { err := t.Cleanup(context.Background()) @@ -166,6 +172,7 @@ func (r *perfRunner) cleanup() error { return nil } +// print an update for the last second func (r *perfRunner) printStatus() error { if !r.warmupPrinted { finishedWarmup := r.printWarmupStatus() @@ -226,6 +233,7 @@ func (r *perfRunner) printWarmupStatus() bool { return false } +// totalOperations iterates over all options structs to get the number of operations completed func (r *perfRunner) totalOperations(warmup bool) int64 { var ret int64 @@ -242,6 +250,7 @@ func (r *perfRunner) totalOperations(warmup bool) int64 { return ret } +// opsPerSecond calculates the average number of operations per second func (r *perfRunner) opsPerSecond(warmup bool) float64 { var ret float64 @@ -249,15 +258,25 @@ func (r *perfRunner) opsPerSecond(warmup bool) float64 { defer r.allOptions.mu.Unlock() for _, opt := range r.allOptions.opts { if warmup { - ret += float64(atomic.LoadInt64(&opt.warmupCount)) / opt.warmupElapsed.GetFloat() + e := opt.warmupElapsed.GetFloat() + if e != 0 { + ret += float64(atomic.LoadInt64(&opt.warmupCount)) / e + } } else { - ret += float64(atomic.LoadInt64(&opt.runCount)) / opt.runElapsed.GetFloat() + e := opt.runElapsed.GetFloat() + if e != 0 { + ret += float64(atomic.LoadInt64(&opt.runCount)) / e + } } } return ret } +// printFinalUpdate prints the final update for the warmup/test run func (r *perfRunner) printFinalUpdate(warmup bool) error { + if r.warmupPrinted && warmup { + return nil + } totalOperations := r.totalOperations(warmup) opsPerSecond := r.opsPerSecond(warmup) if opsPerSecond == 0.0 { @@ -267,7 +286,11 @@ func (r *perfRunner) printFinalUpdate(warmup bool) error { secondsPerOp := 1.0 / opsPerSecond weightedAvg := float64(totalOperations) / opsPerSecond - fmt.Println("\n=== Results ===") + if warmup { + fmt.Println("\n=== Warm Up Results ===") + } else { + fmt.Println("\n=== Results ===") + } fmt.Printf( "Completed %s operations in a weighted-average of %ss (%s ops/s, %s s/op)\n", r.messagePrinter.Sprintf("%d", totalOperations), @@ -278,8 +301,9 @@ func (r *perfRunner) printFinalUpdate(warmup bool) error { return nil } -// runTest takes care of the semantics of running a single iteration. It returns the number of times the test ran as an int, the exact number -// of seconds the test ran as a float64, and any errors. +// runTest takes care of the semantics of running a single iteration. +// It changes configuration on the proxy, increments counters, and +// updates the running-time. func (r *perfRunner) runTest(p PerfTest, index int, ID string) { defer wg.Done() if debug { @@ -344,7 +368,10 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { opts.warmupElapsed.SetFloat(time.Since(opts.warmupStart).Seconds()) } - _ = atomic.AddInt32(&r.warmupFinished, 1) + val := atomic.AddInt32(&r.warmupFinished, 1) + if debug { + fmt.Printf("finished %d warmups\n", val) + } opts.runStart = time.Now() ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(duration)) From bec296a84113e03714f9f080a6d8d2ce09543f99 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Mon, 14 Mar 2022 17:56:27 -0400 Subject: [PATCH 06/15] adding tables perf test --- sdk/data/aztables/testdata/perf/go.mod | 20 +++ sdk/data/aztables/testdata/perf/go.sum | 60 +++++++ .../aztables/testdata/perf/insert_entity.go | 151 ++++++++++++++++++ sdk/data/aztables/testdata/perf/main.go | 14 ++ 4 files changed, 245 insertions(+) create mode 100644 sdk/data/aztables/testdata/perf/go.mod create mode 100644 sdk/data/aztables/testdata/perf/go.sum create mode 100644 sdk/data/aztables/testdata/perf/insert_entity.go create mode 100644 sdk/data/aztables/testdata/perf/main.go diff --git a/sdk/data/aztables/testdata/perf/go.mod b/sdk/data/aztables/testdata/perf/go.mod new file mode 100644 index 000000000000..c9867fb8e9ad --- /dev/null +++ b/sdk/data/aztables/testdata/perf/go.mod @@ -0,0 +1,20 @@ +module github.com/Azure/azure-sdk-for-go/sdk/data/aztables/testdata/perf + +go 1.17 + +replace github.com/Azure/azure-sdk-for-go/sdk/internal => ../../../../internal + +require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0 + github.com/Azure/azure-sdk-for-go/sdk/data/aztables v0.6.0 + github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.7.0 // indirect + golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect + golang.org/x/text v0.3.7 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect +) diff --git a/sdk/data/aztables/testdata/perf/go.sum b/sdk/data/aztables/testdata/perf/go.sum new file mode 100644 index 000000000000..b9d57f12bc59 --- /dev/null +++ b/sdk/data/aztables/testdata/perf/go.sum @@ -0,0 +1,60 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0 h1:8wVJL0HUP5yDFXvotdewORTw7Yu88JbreWN/mobSvsQ= +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.13.0 h1:bLRntPH25SkY1uZ/YZW+dmxNky9r1fAHvDFrzluo+4Q= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.13.0/go.mod h1:TmXReXZ9yPp5D5TBRMTAtyz+UyOl15Py4hL5E5p6igQ= +github.com/Azure/azure-sdk-for-go/sdk/data/aztables v0.6.0 h1:aSPOq3mqbWTXPSQhXAwgsJas4ZdyapBn+uWA54HZRto= +github.com/Azure/azure-sdk-for-go/sdk/data/aztables v0.6.0/go.mod h1:fRf7GSd+2fcFo7pa3QndmE29N9ndRxJK4TosS72TpdI= +github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= +github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= +github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= +github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= +github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= +github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI= +github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sdk/data/aztables/testdata/perf/insert_entity.go b/sdk/data/aztables/testdata/perf/insert_entity.go new file mode 100644 index 000000000000..4a283aca5774 --- /dev/null +++ b/sdk/data/aztables/testdata/perf/insert_entity.go @@ -0,0 +1,151 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/data/aztables" + "github.com/Azure/azure-sdk-for-go/sdk/internal/perf" + "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" +) + +var stringEntity = map[string]string{ + "PartitionKey": "", + "RowKey": "", + "StringTypeProperty1": "StringTypeProperty", + "StringTypeProperty2": "1970-10-04T00:00:00+00:00", + "StringTypeProperty3": "c9da6455-213d-42c9-9a79-3e9149a57833", + "StringTypeProperty4": "BinaryTypeProperty", + "StringTypeProperty5": fmt.Sprint(2 ^ 32 + 1), + "StringTypeProperty6": "200.23", + "StringTypeProperty7": "5", +} + +type downloadTestOptions struct{} + +//nolint +var downloadTestOpts downloadTestOptions = downloadTestOptions{} + +// downloadTestRegister is called once per process +func downloadTestRegister() { + +} + +type downloadTestGlobal struct { + perf.PerfTestOptions + tableName string +} + +// NewInsertEntityTest is called once per process +func NewInsertEntityTest(ctx context.Context, options perf.PerfTestOptions) (perf.GlobalPerfTest, error) { + guid, err := uuid.New() + if err != nil { + return nil, err + } + tableName := fmt.Sprintf("table%s",strings.ReplaceAll(guid.String(), "-", "")) + d := &downloadTestGlobal{ + PerfTestOptions: options, + tableName: tableName, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return nil, err + } + _, err = svcClient.CreateTable(context.Background(), d.tableName, nil) + if err != nil { + return nil, err + } + + return d, nil +} + +func (d *downloadTestGlobal) GlobalCleanup(ctx context.Context) error { + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return err + } + + _, err = svcClient.DeleteTable(context.Background(), d.tableName, nil) + return err +} + +type downloadPerfTest struct { + *downloadTestGlobal + perf.PerfTestOptions + entity []byte + tableClient *aztables.Client +} + +// NewPerfTest is called once per goroutine +func (g *downloadTestGlobal) NewPerfTest(ctx context.Context, options *perf.PerfTestOptions) (perf.PerfTest, error) { + d := &downloadPerfTest{ + downloadTestGlobal: g, + PerfTestOptions: *options, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, &aztables.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Transport: d.PerfTestOptions.Transporter, + }, + }) + if err != nil { + return nil, err + } + + d.tableClient = svcClient.NewClient(g.tableName) + + rk, err := uuid.New() + if err != nil { + return nil, err + } + pk, err := uuid.New() + if err != nil { + return nil, err + } + + stringEntity["PartitionKey"] = pk.String() + stringEntity["RowKey"] = rk.String() + + bytes, err := json.Marshal(stringEntity) + if err != nil { + return nil, err + } + + d.entity = bytes + + return d, nil +} + +func (d *downloadPerfTest) Run(ctx context.Context) error { + _, err := d.tableClient.InsertEntity(ctx, d.entity, &aztables.InsertEntityOptions{ + UpdateMode: aztables.EntityUpdateModeMerge, + }) + return err +} + +func (*downloadPerfTest) Cleanup(ctx context.Context) error { + return nil +} diff --git a/sdk/data/aztables/testdata/perf/main.go b/sdk/data/aztables/testdata/perf/main.go new file mode 100644 index 000000000000..bb90d13f61c3 --- /dev/null +++ b/sdk/data/aztables/testdata/perf/main.go @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package main + +import ( + "github.com/Azure/azure-sdk-for-go/sdk/internal/perf" +) + +func main() { + perf.Run(map[string]perf.PerfMethods{ + "InsertEntityTest": {Register: downloadTestRegister, New: NewInsertEntityTest}, + }) +} \ No newline at end of file From 3130741a758eafa83e3fe15340e1fc05dcfdb86d Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Mon, 14 Mar 2022 17:57:02 -0400 Subject: [PATCH 07/15] differentiating from context.DeadlineExceeded error --- sdk/internal/perf/perf_runner.go | 38 +++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go index eb002dcb9a67..006e08798cc5 100644 --- a/sdk/internal/perf/perf_runner.go +++ b/sdk/internal/perf/perf_runner.go @@ -2,6 +2,7 @@ package perf import ( "context" + "errors" "fmt" "log" "os" @@ -60,6 +61,7 @@ func newPerfRunner(p PerfMethods, name string) *perfRunner { ticker: time.NewTicker(time.Second), done: make(chan bool), name: name, + proxyTransports: map[string]*RecordingHTTPClient{}, perfToRun: p, operationStatusTracker: -1, warmupOperationStatusTracker: -1, @@ -90,6 +92,12 @@ func (r *perfRunner) Run() error { if err != nil { return err } + defer func() { + err = r.globalInstance.GlobalCleanup(context.Background()) + if err != nil { + panic(err) + } + }() err = r.createPerfTests() if err != nil { @@ -102,10 +110,6 @@ func (r *perfRunner) Run() error { if err != nil { panic(err) } - err = r.globalInstance.GlobalCleanup(context.Background()) - if err != nil { - panic(err) - } return nil } @@ -320,7 +324,11 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { r.proxyTransports[ID].SetMode("live") err := p.Run(context.Background()) if err != nil { - panic(err) + if errors.Is(err, context.DeadlineExceeded) { + return + } else { + panic(err) + } } // 2nd request goes through in Record mode @@ -328,10 +336,16 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { err = r.proxyTransports[ID].start() if err != nil { panic(err) + } + err = p.Run(context.Background()) if err != nil { - panic(err) + if errors.Is(err, context.DeadlineExceeded) { + return + } else { + panic(err) + } } err = r.proxyTransports[ID].stop() if err != nil { @@ -356,7 +370,11 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { for time.Since(opts.warmupStart).Seconds() < float64(warmUpDuration) { err := p.Run(ctx) if err != nil { - panic(err) + if errors.Is(err, context.DeadlineExceeded) { + return + } else { + panic(err) + } } opts.incrememt(true) @@ -381,7 +399,11 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { for time.Since(opts.runStart).Seconds() < float64(duration) { err := p.Run(ctx) if err != nil { - panic(err) + if errors.Is(err, context.DeadlineExceeded) { + return + } else { + panic(err) + } } opts.incrememt(false) From 726deb0b6c180ffeca221ad35841e0d65534d4a6 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Tue, 15 Mar 2022 10:19:53 -0400 Subject: [PATCH 08/15] insert entity test --- .../aztables/testdata/perf/insert_entity.go | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/sdk/data/aztables/testdata/perf/insert_entity.go b/sdk/data/aztables/testdata/perf/insert_entity.go index 4a283aca5774..391c3ba6a599 100644 --- a/sdk/data/aztables/testdata/perf/insert_entity.go +++ b/sdk/data/aztables/testdata/perf/insert_entity.go @@ -6,9 +6,11 @@ package main import ( "context" "encoding/json" + "flag" "fmt" "os" "strings" + "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/data/aztables" @@ -28,14 +30,37 @@ var stringEntity = map[string]string{ "StringTypeProperty7": "5", } -type downloadTestOptions struct{} +var fullEdm = aztables.EDMEntity{ + Entity: aztables.Entity{ + PartitionKey: "", + RowKey: "", + }, + Properties: map[string]interface{}{ + "StringTypeProperty": "StringTypeProperty", + "DatetimeTypeProperty": aztables.EDMDateTime(time.Now()), + "GuidTypeProperty": aztables.EDMGUID("c9da6455-213d-42c9-9a79-3e9149a57833"), + "BinaryTypeProperty": aztables.EDMBinary([]byte("BinaryTypeProperty")), + "Int64TypeProperty": aztables.EDMInt64(2 ^ 32 + 1), + "DoubleTypeProperty": 200.23, + "IntTypeProperty": 5, + }, +} + +type downloadTestOptions struct { + fullEDM bool + clientSharing bool +} //nolint -var downloadTestOpts downloadTestOptions = downloadTestOptions{} +var downloadTestOpts downloadTestOptions = downloadTestOptions{ + fullEDM: false, + clientSharing: false, +} // downloadTestRegister is called once per process func downloadTestRegister() { - + flag.BoolVar(&downloadTestOpts.fullEDM, "full-edm", false, "whether to use entities that utiliza all EDM types for serialization/deserialization, or only strings. Default is only strings") + flag.BoolVar(&downloadTestOpts.clientSharing, "no-client-share", false, "create one ServiceClient per test instance. Default is to share a single ServiceClient") } type downloadTestGlobal struct { @@ -49,7 +74,7 @@ func NewInsertEntityTest(ctx context.Context, options perf.PerfTestOptions) (per if err != nil { return nil, err } - tableName := fmt.Sprintf("table%s",strings.ReplaceAll(guid.String(), "-", "")) + tableName := fmt.Sprintf("table%s", strings.ReplaceAll(guid.String(), "-", "")) d := &downloadTestGlobal{ PerfTestOptions: options, tableName: tableName, From b5586a6ef47d2b40fce12cd2087de6c305539cb7 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Tue, 15 Mar 2022 13:10:22 -0400 Subject: [PATCH 09/15] adding list entities test and fixing up some smaller consistency issues --- .../aztables/testdata/perf/insert_entity.go | 35 ++-- .../aztables/testdata/perf/list_entities.go | 186 ++++++++++++++++++ sdk/data/aztables/testdata/perf/main.go | 5 +- sdk/internal/perf/atomic_float.go | 9 +- sdk/internal/perf/implementation.go | 12 +- sdk/internal/perf/perf.go | 17 +- sdk/internal/perf/perf_runner.go | 46 +++-- sdk/internal/perf/perf_test.go | 9 +- sdk/internal/perf/random_stream.go | 7 + sdk/internal/perf/status.go | 4 - sdk/internal/perf/utils.go | 20 -- 11 files changed, 267 insertions(+), 83 deletions(-) create mode 100644 sdk/data/aztables/testdata/perf/list_entities.go delete mode 100644 sdk/internal/perf/status.go delete mode 100644 sdk/internal/perf/utils.go diff --git a/sdk/data/aztables/testdata/perf/insert_entity.go b/sdk/data/aztables/testdata/perf/insert_entity.go index 391c3ba6a599..86594990b047 100644 --- a/sdk/data/aztables/testdata/perf/insert_entity.go +++ b/sdk/data/aztables/testdata/perf/insert_entity.go @@ -46,24 +46,23 @@ var fullEdm = aztables.EDMEntity{ }, } -type downloadTestOptions struct { +type insertEntityTestOptions struct { fullEDM bool clientSharing bool } -//nolint -var downloadTestOpts downloadTestOptions = downloadTestOptions{ +var insertTestOpts insertEntityTestOptions = insertEntityTestOptions{ fullEDM: false, clientSharing: false, } -// downloadTestRegister is called once per process -func downloadTestRegister() { - flag.BoolVar(&downloadTestOpts.fullEDM, "full-edm", false, "whether to use entities that utiliza all EDM types for serialization/deserialization, or only strings. Default is only strings") - flag.BoolVar(&downloadTestOpts.clientSharing, "no-client-share", false, "create one ServiceClient per test instance. Default is to share a single ServiceClient") +// insertTestRegister is called once per process +func insertTestRegister() { + flag.BoolVar(&insertTestOpts.fullEDM, "full-edm", false, "whether to use entities that utiliza all EDM types for serialization/deserialization, or only strings. Default is only strings") + flag.BoolVar(&insertTestOpts.clientSharing, "no-client-share", false, "create one ServiceClient per test instance. Default is to share a single ServiceClient") } -type downloadTestGlobal struct { +type insertEntityTestGlobal struct { perf.PerfTestOptions tableName string } @@ -75,7 +74,7 @@ func NewInsertEntityTest(ctx context.Context, options perf.PerfTestOptions) (per return nil, err } tableName := fmt.Sprintf("table%s", strings.ReplaceAll(guid.String(), "-", "")) - d := &downloadTestGlobal{ + d := &insertEntityTestGlobal{ PerfTestOptions: options, tableName: tableName, } @@ -97,7 +96,7 @@ func NewInsertEntityTest(ctx context.Context, options perf.PerfTestOptions) (per return d, nil } -func (d *downloadTestGlobal) GlobalCleanup(ctx context.Context) error { +func (d *insertEntityTestGlobal) GlobalCleanup(ctx context.Context) error { connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") if !ok { return fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") @@ -112,18 +111,18 @@ func (d *downloadTestGlobal) GlobalCleanup(ctx context.Context) error { return err } -type downloadPerfTest struct { - *downloadTestGlobal +type insertEntityPerfTest struct { + *insertEntityTestGlobal perf.PerfTestOptions entity []byte tableClient *aztables.Client } // NewPerfTest is called once per goroutine -func (g *downloadTestGlobal) NewPerfTest(ctx context.Context, options *perf.PerfTestOptions) (perf.PerfTest, error) { - d := &downloadPerfTest{ - downloadTestGlobal: g, - PerfTestOptions: *options, +func (g *insertEntityTestGlobal) NewPerfTest(ctx context.Context, options *perf.PerfTestOptions) (perf.PerfTest, error) { + d := &insertEntityPerfTest{ + insertEntityTestGlobal: g, + PerfTestOptions: *options, } connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") @@ -164,13 +163,13 @@ func (g *downloadTestGlobal) NewPerfTest(ctx context.Context, options *perf.Perf return d, nil } -func (d *downloadPerfTest) Run(ctx context.Context) error { +func (d *insertEntityPerfTest) Run(ctx context.Context) error { _, err := d.tableClient.InsertEntity(ctx, d.entity, &aztables.InsertEntityOptions{ UpdateMode: aztables.EntityUpdateModeMerge, }) return err } -func (*downloadPerfTest) Cleanup(ctx context.Context) error { +func (*insertEntityPerfTest) Cleanup(ctx context.Context) error { return nil } diff --git a/sdk/data/aztables/testdata/perf/list_entities.go b/sdk/data/aztables/testdata/perf/list_entities.go new file mode 100644 index 000000000000..cdd1eeeedd9a --- /dev/null +++ b/sdk/data/aztables/testdata/perf/list_entities.go @@ -0,0 +1,186 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "strings" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/data/aztables" + "github.com/Azure/azure-sdk-for-go/sdk/internal/perf" + "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" +) + +type listEntitiesTestOptions struct { + fullEDM bool + clientSharing bool + count int +} + +var listTestOpts listEntitiesTestOptions = listEntitiesTestOptions{ + fullEDM: false, + clientSharing: false, + count: 0, +} + +// listTestRegister is called once per process +func listTestRegister() { + flag.IntVar(&listTestOpts.count, "count", 100, "Number of entities to list") + flag.IntVar(&listTestOpts.count, "c", 100, "Number of entities to list") + flag.BoolVar(&listTestOpts.fullEDM, "full-edm", false, "whether to use entities that utiliza all EDM types for serialization/deserialization, or only strings. Default is only strings") + flag.BoolVar(&listTestOpts.clientSharing, "no-client-share", false, "create one ServiceClient per test instance. Default is to share a single ServiceClient") +} + +type listEntityTestGlobal struct { + perf.PerfTestOptions + tableName string + svcClient *aztables.ServiceClient +} + +// NewListEntitiesTest is called once per process +func NewListEntitiesTest(ctx context.Context, options perf.PerfTestOptions) (perf.GlobalPerfTest, error) { + guid, err := uuid.New() + if err != nil { + return nil, err + } + tableName := fmt.Sprintf("table%s", strings.ReplaceAll(guid.String(), "-", "")) + d := &listEntityTestGlobal{ + PerfTestOptions: options, + tableName: tableName, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return nil, err + } + _, err = svcClient.CreateTable(context.Background(), d.tableName, nil) + if err != nil { + return nil, err + } + d.svcClient = svcClient + + client := d.svcClient.NewClient(d.tableName) + + baseEntityEDM := fullEdm + baseEntityString := stringEntity + + u, err := uuid.New() + if err != nil { + return nil, err + } + + baseEntityEDM.PartitionKey = u.String() + baseEntityString["PartitionKey"] = u.String() + + for i := 0; i < listTestOpts.count; i++ { + if listTestOpts.fullEDM { + u, err := uuid.New() + if err != nil { + return nil, err + } + baseEntityEDM.RowKey = u.String() + + marshalled, err := json.Marshal(baseEntityEDM) + if err != nil { + return nil, err + } + + _, err = client.InsertEntity(ctx, marshalled, nil) + if err != nil { + return nil, err + } + } else { + u, err := uuid.New() + if err != nil { + return nil, err + } + baseEntityString["RowKey"] = u.String() + + marshalled, err := json.Marshal(baseEntityString) + if err != nil { + return nil, err + } + + _, err = client.InsertEntity(ctx, marshalled, nil) + if err != nil { + return nil, err + } + } + } + + return d, nil +} + +func (d *listEntityTestGlobal) GlobalCleanup(ctx context.Context) error { + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return err + } + + _, err = svcClient.DeleteTable(context.Background(), d.tableName, nil) + return err +} + +type listEntitiesPerfTest struct { + *listEntityTestGlobal + perf.PerfTestOptions + client *aztables.Client +} + +// NewPerfTest is called once per goroutine +func (g *listEntityTestGlobal) NewPerfTest(ctx context.Context, options *perf.PerfTestOptions) (perf.PerfTest, error) { + d := &listEntitiesPerfTest{ + listEntityTestGlobal: g, + PerfTestOptions: *options, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, &aztables.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Transport: d.PerfTestOptions.Transporter, + }, + }) + if err != nil { + return nil, err + } + + d.client = svcClient.NewClient(g.tableName) + + return d, nil +} + +func (d *listEntitiesPerfTest) Run(ctx context.Context) error { + pager := d.client.List(nil) + for pager.More() { + resp, err := pager.NextPage(ctx) + if err != nil { + return err + } + _ = resp + } + return nil +} + +func (*listEntitiesPerfTest) Cleanup(ctx context.Context) error { + return nil +} diff --git a/sdk/data/aztables/testdata/perf/main.go b/sdk/data/aztables/testdata/perf/main.go index bb90d13f61c3..956367d7f3e5 100644 --- a/sdk/data/aztables/testdata/perf/main.go +++ b/sdk/data/aztables/testdata/perf/main.go @@ -9,6 +9,7 @@ import ( func main() { perf.Run(map[string]perf.PerfMethods{ - "InsertEntityTest": {Register: downloadTestRegister, New: NewInsertEntityTest}, + "InsertEntity": {Register: insertTestRegister, New: NewInsertEntityTest}, + "ListEntities": {Register: listTestRegister, New: NewListEntitiesTest}, }) -} \ No newline at end of file +} diff --git a/sdk/internal/perf/atomic_float.go b/sdk/internal/perf/atomic_float.go index a488965d8436..ac2f0e01b0c4 100644 --- a/sdk/internal/perf/atomic_float.go +++ b/sdk/internal/perf/atomic_float.go @@ -6,7 +6,14 @@ import ( type atomicFloat64 struct { f float64 - mu sync.RWMutex + mu *sync.RWMutex +} + +func newAtomicFloat64(f float64) *atomicFloat64 { + return &atomicFloat64{ + f: f, + mu: &sync.RWMutex{}, + } } func (a *atomicFloat64) GetFloat() float64 { diff --git a/sdk/internal/perf/implementation.go b/sdk/internal/perf/implementation.go index 01c391ea55b4..9baa4c34020a 100644 --- a/sdk/internal/perf/implementation.go +++ b/sdk/internal/perf/implementation.go @@ -10,18 +10,20 @@ import ( var ( // debug is true if --debug is specified - debug bool + debug bool // duration is the -d/--duration flag - duration int + duration int // testProxyURLs is the -x/--test-proxy flag, a semi-colon separated list - testProxyURLs string + testProxyURLs string // warmUpDuration is the -w/--warmup flag - warmUpDuration int + warmUpDuration int // parallelInstances is the -p/--parallel flag parallelInstances int // wg is used to keep track of the number of goroutines created - wg sync.WaitGroup + wg sync.WaitGroup + + // number of processes to use, the --maxprocs flag numProcesses int ) diff --git a/sdk/internal/perf/perf.go b/sdk/internal/perf/perf.go index 38960baa8eff..5ecbfa112232 100644 --- a/sdk/internal/perf/perf.go +++ b/sdk/internal/perf/perf.go @@ -68,18 +68,26 @@ type PerfTestOptions struct { parallelIndex int // number of warmup operations completed - warmupCount int64 - warmupStart time.Time - warmupElapsed atomicFloat64 + warmupCount int64 + warmupStart time.Time + warmupElapsed *atomicFloat64 // number of operations runCount runCount int64 runStart time.Time - runElapsed atomicFloat64 + runElapsed *atomicFloat64 finished bool } +func newPerfTestOptions(name string) PerfTestOptions { + return PerfTestOptions{ + Name: name, + runElapsed: newAtomicFloat64(0), + warmupElapsed: newAtomicFloat64(0), + } +} + // increment does an atomic increment of the warmup or non-warmup performance test func (p *PerfTestOptions) incrememt(warmup bool) { if warmup { @@ -100,7 +108,6 @@ type PerfMethods struct { // Run runs an individual test, registers, and parses command line flags func Run(tests map[string]PerfMethods) { - if len(os.Args) < 2 { // Error out and show available perf tests fmt.Println("Available performance tests:") diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go index 006e08798cc5..79b8134c28e3 100644 --- a/sdk/internal/perf/perf_runner.go +++ b/sdk/internal/perf/perf_runner.go @@ -73,21 +73,6 @@ func newPerfRunner(p PerfMethods, name string) *perfRunner { } func (r *perfRunner) Run() error { - // Poller for printing - go func() { - for { - select { - case <-r.done: - return - case <-r.ticker.C: - err := r.printStatus() - if err != nil { - panic(err) - } - } - } - }() - err := r.globalSetup() if err != nil { return err @@ -103,6 +88,27 @@ func (r *perfRunner) Run() error { if err != nil { return err } + + r.ticker = time.NewTicker(time.Second) + + // Poller for printing + go func() { + for { + if r.ticker != nil { + select { + case <-r.done: + return + case <-r.ticker.C: + err := r.printStatus() + if err != nil { + panic(err) + } + } + } + } + }() + wg.Wait() + r.done <- true r.printFinalUpdate(false) @@ -115,7 +121,7 @@ func (r *perfRunner) Run() error { // global setup by instantiating a single global instance func (r *perfRunner) globalSetup() error { - globalInst, err := r.perfToRun.New(context.TODO(), PerfTestOptions{Name: r.name}) + globalInst, err := r.perfToRun.New(context.TODO(), newPerfTestOptions(r.name)) if err != nil { return err } @@ -129,8 +135,8 @@ func (r *perfRunner) createPerfTests() error { proxyURLS := parseProxyURLS() for idx := 0; idx < parallelInstances; idx++ { - options := &PerfTestOptions{} ID := fmt.Sprintf("%s-%d", r.name, idx) + options := newPerfTestOptions(ID) IDs = append(IDs, ID) if testProxyURLs != "" { @@ -146,13 +152,13 @@ func (r *perfRunner) createPerfTests() error { } options.parallelIndex = idx - perfTest, err := r.globalInstance.NewPerfTest(context.TODO(), options) + perfTest, err := r.globalInstance.NewPerfTest(context.TODO(), &options) if err != nil { return err } r.tests = append(r.tests, perfTest) r.allOptions.mu.Lock() - r.allOptions.opts = append(r.allOptions.opts, options) + r.allOptions.opts = append(r.allOptions.opts, &options) r.allOptions.mu.Unlock() } @@ -160,8 +166,6 @@ func (r *perfRunner) createPerfTests() error { wg.Add(1) go r.runTest(test, idx, IDs[idx]) } - - wg.Wait() return nil } diff --git a/sdk/internal/perf/perf_test.go b/sdk/internal/perf/perf_test.go index c52d77fb4ccd..009a37d1b619 100644 --- a/sdk/internal/perf/perf_test.go +++ b/sdk/internal/perf/perf_test.go @@ -58,14 +58,9 @@ func TestRun(t *testing.T) { warmUpDuration = 0 parallelInstances = 1 - err := runPerfTest("Sleep", NewNoOpTest) + runner := newPerfRunner(PerfMethods{New: NewNoOpTest, Register: nil}, "NoOpTest") + err := runner.Run() require.NoError(t, err) - - require.True(t, globalCleanup) - require.True(t, newnooptest) - require.True(t, newperftest) - require.True(t, run) - require.True(t, cleanup) } func TestParseProxyURLs(t *testing.T) { diff --git a/sdk/internal/perf/random_stream.go b/sdk/internal/perf/random_stream.go index 571ee8dce068..f1385ec3da34 100644 --- a/sdk/internal/perf/random_stream.go +++ b/sdk/internal/perf/random_stream.go @@ -22,6 +22,13 @@ type randomStream struct { remaining int } +func min(a, b int) int { + if a < b { + return a + } + return b +} + func (r *randomStream) Read(p []byte) (int, error) { if r.remaining == 0 { return 0, io.EOF diff --git a/sdk/internal/perf/status.go b/sdk/internal/perf/status.go deleted file mode 100644 index 449b1bca9008..000000000000 --- a/sdk/internal/perf/status.go +++ /dev/null @@ -1,4 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -package perf diff --git a/sdk/internal/perf/utils.go b/sdk/internal/perf/utils.go deleted file mode 100644 index 9905cd4b1037..000000000000 --- a/sdk/internal/perf/utils.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -package perf - -// Helper function to sum a slice of integers -func sumInts(ints []int) int { - var ret int - for _, i := range ints { - ret += i - } - return ret -} - -func min(a, b int) int { - if a < b { - return a - } - return b -} From 0a2e4c06e989b7f74e087b987260ae58b978ef6b Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Tue, 15 Mar 2022 13:46:15 -0400 Subject: [PATCH 10/15] finished tables performance tests --- sdk/data/aztables/testdata/perf/batch.go | 179 ++++++++++++++++++ .../aztables/testdata/perf/list_entities.go | 2 +- sdk/data/aztables/testdata/perf/main.go | 5 +- sdk/internal/perf/perf_runner.go | 23 +-- 4 files changed, 192 insertions(+), 17 deletions(-) create mode 100644 sdk/data/aztables/testdata/perf/batch.go diff --git a/sdk/data/aztables/testdata/perf/batch.go b/sdk/data/aztables/testdata/perf/batch.go new file mode 100644 index 000000000000..7e0bff84c252 --- /dev/null +++ b/sdk/data/aztables/testdata/perf/batch.go @@ -0,0 +1,179 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "strings" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/data/aztables" + "github.com/Azure/azure-sdk-for-go/sdk/internal/perf" + "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" +) + +type batchTestOptions struct { + fullEDM bool + clientSharing bool + count int +} + +var batchTestOpts batchTestOptions = batchTestOptions{ + fullEDM: false, + clientSharing: false, + count: 100, +} + +// batchTestRegister is called once per process +func batchTestRegister() { + flag.IntVar(&listTestOpts.count, "count", 100, "Number of entities to batch create") + flag.IntVar(&listTestOpts.count, "c", 100, "Number of entities to batch create") + flag.BoolVar(&batchTestOpts.fullEDM, "full-edm", false, "whether to use entities that utiliza all EDM types for serialization/deserialization, or only strings. Default is only strings") + flag.BoolVar(&batchTestOpts.clientSharing, "no-client-share", false, "create one ServiceClient per test instance. Default is to share a single ServiceClient") +} + +type batchTestGlobal struct { + perf.PerfTestOptions + tableName string +} + +// NewBatchTest is called once per process +func NewBatchTest(ctx context.Context, options perf.PerfTestOptions) (perf.GlobalPerfTest, error) { + guid, err := uuid.New() + if err != nil { + return nil, err + } + tableName := fmt.Sprintf("table%s", strings.ReplaceAll(guid.String(), "-", "")) + d := &batchTestGlobal{ + PerfTestOptions: options, + tableName: tableName, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return nil, err + } + _, err = svcClient.CreateTable(context.Background(), d.tableName, nil) + if err != nil { + return nil, err + } + + return d, nil +} + +func (d *batchTestGlobal) GlobalCleanup(ctx context.Context) error { + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return err + } + + _, err = svcClient.DeleteTable(context.Background(), d.tableName, nil) + return err +} + +type batchEntityPerfTest struct { + *batchTestGlobal + perf.PerfTestOptions + baseEDMEntity aztables.EDMEntity + baseStringEntity map[string]string + tableClient *aztables.Client +} + +// NewPerfTest is called once per goroutine +func (g *batchTestGlobal) NewPerfTest(ctx context.Context, options *perf.PerfTestOptions) (perf.PerfTest, error) { + d := &batchEntityPerfTest{ + batchTestGlobal: g, + PerfTestOptions: *options, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, &aztables.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Transport: d.PerfTestOptions.Transporter, + }, + }) + if err != nil { + return nil, err + } + + d.tableClient = svcClient.NewClient(g.tableName) + + pk, err := uuid.New() + if err != nil { + return nil, err + } + + stringEntity["PartitionKey"] = pk.String() + + d.baseStringEntity = stringEntity + + edmEntity := fullEdm + edmEntity.PartitionKey = pk.String() + d.baseEDMEntity = edmEntity + + return d, nil +} + +func (d *batchEntityPerfTest) Run(ctx context.Context) error { + batch := make([]aztables.TransactionAction, batchTestOpts.count) + + for i := 0; i < batchTestOpts.count; i++ { + + if batchTestOpts.fullEDM { + + d.baseEDMEntity.RowKey = fmt.Sprint(i) + + marshalled, err := json.Marshal(d.baseEDMEntity) + if err != nil { + return err + } + + batch[i] = aztables.TransactionAction{ + Entity: marshalled, + ActionType: aztables.TransactionTypeUpdateMerge, + } + + } else { + + d.baseStringEntity["RowKey"] = fmt.Sprint(i) + + marshalled, err := json.Marshal(d.baseStringEntity) + if err != nil { + return err + } + + batch[i] = aztables.TransactionAction{ + Entity: marshalled, + ActionType: aztables.TransactionTypeUpdateMerge, + } + + } + + } + + _, err := d.tableClient.SubmitTransaction(ctx, batch, nil) + return err +} + +func (*batchEntityPerfTest) Cleanup(ctx context.Context) error { + return nil +} diff --git a/sdk/data/aztables/testdata/perf/list_entities.go b/sdk/data/aztables/testdata/perf/list_entities.go index cdd1eeeedd9a..1344d601912b 100644 --- a/sdk/data/aztables/testdata/perf/list_entities.go +++ b/sdk/data/aztables/testdata/perf/list_entities.go @@ -26,7 +26,7 @@ type listEntitiesTestOptions struct { var listTestOpts listEntitiesTestOptions = listEntitiesTestOptions{ fullEDM: false, clientSharing: false, - count: 0, + count: 100, } // listTestRegister is called once per process diff --git a/sdk/data/aztables/testdata/perf/main.go b/sdk/data/aztables/testdata/perf/main.go index 956367d7f3e5..72eea986c93b 100644 --- a/sdk/data/aztables/testdata/perf/main.go +++ b/sdk/data/aztables/testdata/perf/main.go @@ -9,7 +9,8 @@ import ( func main() { perf.Run(map[string]perf.PerfMethods{ - "InsertEntity": {Register: insertTestRegister, New: NewInsertEntityTest}, - "ListEntities": {Register: listTestRegister, New: NewListEntitiesTest}, + "CreateEntityTest": {Register: insertTestRegister, New: NewInsertEntityTest}, + "ListEntitiesTest": {Register: listTestRegister, New: NewListEntitiesTest}, + "CreateEntityBatchTest": {Register: batchTestRegister, New: NewBatchTest}, }) } diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go index 79b8134c28e3..00d9345d1f22 100644 --- a/sdk/internal/perf/perf_runner.go +++ b/sdk/internal/perf/perf_runner.go @@ -111,12 +111,11 @@ func (r *perfRunner) Run() error { r.done <- true - r.printFinalUpdate(false) - err = r.cleanup() + err = r.printFinalUpdate(false) if err != nil { - panic(err) + return err } - return nil + return r.cleanup() } // global setup by instantiating a single global instance @@ -192,7 +191,7 @@ func (r *perfRunner) printStatus() error { if r.operationStatusTracker == -1 { r.printFinalUpdate(true) r.operationStatusTracker = 0 - fmt.Fprintln(r.w, "Current\tTotal\tAverage\t") + fmt.Fprintln(r.w, "\nCurrent\tTotal\tAverage\t") } totalOperations := r.totalOperations(false) @@ -215,7 +214,7 @@ func (r *perfRunner) printWarmupStatus() bool { if r.warmupOperationStatusTracker == -1 { r.warmupOperationStatusTracker = 0 fmt.Println("===== WARMUP =====") - fmt.Fprintln(r.w, "Current\tTotal\tAverage\t") + fmt.Fprintln(r.w, "\nCurrent\tTotal\tAverage\t") } totalOperations := r.totalOperations(true) @@ -328,9 +327,7 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { r.proxyTransports[ID].SetMode("live") err := p.Run(context.Background()) if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - return - } else { + if err != nil { panic(err) } } @@ -345,9 +342,7 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { err = p.Run(context.Background()) if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - return - } else { + if err != nil { panic(err) } } @@ -375,7 +370,7 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { err := p.Run(ctx) if err != nil { if errors.Is(err, context.DeadlineExceeded) { - return + break } else { panic(err) } @@ -404,7 +399,7 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { err := p.Run(ctx) if err != nil { if errors.Is(err, context.DeadlineExceeded) { - return + break } else { panic(err) } From 840c39b328431b70e35b6f780003edd7ae664c38 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Tue, 15 Mar 2022 14:30:41 -0400 Subject: [PATCH 11/15] returning initialize string slice --- sdk/internal/perf/perf_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/internal/perf/perf_test.go b/sdk/internal/perf/perf_test.go index 009a37d1b619..bcdf4f366ea2 100644 --- a/sdk/internal/perf/perf_test.go +++ b/sdk/internal/perf/perf_test.go @@ -66,7 +66,7 @@ func TestRun(t *testing.T) { func TestParseProxyURLs(t *testing.T) { testProxyURLs = "" result := parseProxyURLS() - require.Nil(t, result) + require.Equal(t, result, []string{}) testProxyURLs = "https://localhost:5001" result = parseProxyURLS() From 4212c4341a0ead3467bae9639f05094767adef65 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Tue, 15 Mar 2022 14:32:51 -0400 Subject: [PATCH 12/15] was ignoring an error --- sdk/internal/perf/perf_runner.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go index 00d9345d1f22..347c1da07220 100644 --- a/sdk/internal/perf/perf_runner.go +++ b/sdk/internal/perf/perf_runner.go @@ -189,7 +189,10 @@ func (r *perfRunner) printStatus() error { } if r.operationStatusTracker == -1 { - r.printFinalUpdate(true) + err := r.printFinalUpdate(true) + if err != nil { + return err + } r.operationStatusTracker = 0 fmt.Fprintln(r.w, "\nCurrent\tTotal\tAverage\t") } From abac91de44ef8f53c8e1c86c7732f929e0c5e112 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Tue, 15 Mar 2022 15:14:41 -0400 Subject: [PATCH 13/15] adding test for atomic float --- sdk/internal/perf/atomic_float.go | 3 +++ sdk/internal/perf/atomic_float_test.go | 31 ++++++++++++++++++++++++++ sdk/internal/perf/perf_runner.go | 3 +++ 3 files changed, 37 insertions(+) create mode 100644 sdk/internal/perf/atomic_float_test.go diff --git a/sdk/internal/perf/atomic_float.go b/sdk/internal/perf/atomic_float.go index ac2f0e01b0c4..31af5a0e3019 100644 --- a/sdk/internal/perf/atomic_float.go +++ b/sdk/internal/perf/atomic_float.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + package perf import ( diff --git a/sdk/internal/perf/atomic_float_test.go b/sdk/internal/perf/atomic_float_test.go new file mode 100644 index 000000000000..29bc62e7cd1c --- /dev/null +++ b/sdk/internal/perf/atomic_float_test.go @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package perf + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestAtomicFloat(t *testing.T) { + a := newAtomicFloat64(0) + + wg := &sync.WaitGroup{} + for i := 0; i < 2; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for j := 0; j < 100*i; j++ { + a.SetFloat(float64(j)) + } + }(i) + time.Sleep(100 * time.Millisecond) + } + wg.Wait() + + require.Equal(t, float64(99), a.GetFloat()) +} diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go index 347c1da07220..a95ec87270c6 100644 --- a/sdk/internal/perf/perf_runner.go +++ b/sdk/internal/perf/perf_runner.go @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + package perf import ( From 066094979eebc32218d7e5c159149364dada0a75 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Wed, 16 Mar 2022 11:46:37 -0400 Subject: [PATCH 14/15] removing atomic float, incrememt -> increment, loop for warmup/testrun --- sdk/data/aztables/testdata/perf/batch.go | 6 - sdk/internal/perf/atomic_float.go | 32 ----- sdk/internal/perf/atomic_float_test.go | 31 ----- sdk/internal/perf/implementation.go | 2 +- sdk/internal/perf/perf.go | 16 +-- sdk/internal/perf/perf_runner.go | 109 +++++++++++------- sdk/internal/perf/perf_test.go | 2 +- sdk/internal/perf/testdata/perf/sleep_perf.go | 2 +- 8 files changed, 78 insertions(+), 122 deletions(-) delete mode 100644 sdk/internal/perf/atomic_float.go delete mode 100644 sdk/internal/perf/atomic_float_test.go diff --git a/sdk/data/aztables/testdata/perf/batch.go b/sdk/data/aztables/testdata/perf/batch.go index 7e0bff84c252..09522e9452b8 100644 --- a/sdk/data/aztables/testdata/perf/batch.go +++ b/sdk/data/aztables/testdata/perf/batch.go @@ -139,9 +139,7 @@ func (d *batchEntityPerfTest) Run(ctx context.Context) error { for i := 0; i < batchTestOpts.count; i++ { if batchTestOpts.fullEDM { - d.baseEDMEntity.RowKey = fmt.Sprint(i) - marshalled, err := json.Marshal(d.baseEDMEntity) if err != nil { return err @@ -151,11 +149,8 @@ func (d *batchEntityPerfTest) Run(ctx context.Context) error { Entity: marshalled, ActionType: aztables.TransactionTypeUpdateMerge, } - } else { - d.baseStringEntity["RowKey"] = fmt.Sprint(i) - marshalled, err := json.Marshal(d.baseStringEntity) if err != nil { return err @@ -165,7 +160,6 @@ func (d *batchEntityPerfTest) Run(ctx context.Context) error { Entity: marshalled, ActionType: aztables.TransactionTypeUpdateMerge, } - } } diff --git a/sdk/internal/perf/atomic_float.go b/sdk/internal/perf/atomic_float.go deleted file mode 100644 index 31af5a0e3019..000000000000 --- a/sdk/internal/perf/atomic_float.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -package perf - -import ( - "sync" -) - -type atomicFloat64 struct { - f float64 - mu *sync.RWMutex -} - -func newAtomicFloat64(f float64) *atomicFloat64 { - return &atomicFloat64{ - f: f, - mu: &sync.RWMutex{}, - } -} - -func (a *atomicFloat64) GetFloat() float64 { - a.mu.RLock() - defer a.mu.RUnlock() - return a.f -} - -func (a *atomicFloat64) SetFloat(f float64) { - a.mu.Lock() - defer a.mu.Unlock() - a.f = f -} diff --git a/sdk/internal/perf/atomic_float_test.go b/sdk/internal/perf/atomic_float_test.go deleted file mode 100644 index 29bc62e7cd1c..000000000000 --- a/sdk/internal/perf/atomic_float_test.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -package perf - -import ( - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestAtomicFloat(t *testing.T) { - a := newAtomicFloat64(0) - - wg := &sync.WaitGroup{} - for i := 0; i < 2; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - for j := 0; j < 100*i; j++ { - a.SetFloat(float64(j)) - } - }(i) - time.Sleep(100 * time.Millisecond) - } - wg.Wait() - - require.Equal(t, float64(99), a.GetFloat()) -} diff --git a/sdk/internal/perf/implementation.go b/sdk/internal/perf/implementation.go index 9baa4c34020a..225b5d207f6c 100644 --- a/sdk/internal/perf/implementation.go +++ b/sdk/internal/perf/implementation.go @@ -30,7 +30,7 @@ var ( // parseProxyURLs splits the --test-proxy input with the delimiter ';' func parseProxyURLS() []string { if testProxyURLs == "" { - return []string{} + return nil } testProxyURLs = strings.TrimSuffix(testProxyURLs, ";") diff --git a/sdk/internal/perf/perf.go b/sdk/internal/perf/perf.go index 5ecbfa112232..50f9ff97b74f 100644 --- a/sdk/internal/perf/perf.go +++ b/sdk/internal/perf/perf.go @@ -69,27 +69,27 @@ type PerfTestOptions struct { // number of warmup operations completed warmupCount int64 - warmupStart time.Time - warmupElapsed *atomicFloat64 + warmupStart *time.Time + warmupElapsed time.Duration // number of operations runCount runCount int64 - runStart time.Time - runElapsed *atomicFloat64 + runStart *time.Time + runElapsed time.Duration finished bool } func newPerfTestOptions(name string) PerfTestOptions { return PerfTestOptions{ - Name: name, - runElapsed: newAtomicFloat64(0), - warmupElapsed: newAtomicFloat64(0), + Name: name, + warmupStart: &time.Time{}, + runStart: &time.Time{}, } } // increment does an atomic increment of the warmup or non-warmup performance test -func (p *PerfTestOptions) incrememt(warmup bool) { +func (p *PerfTestOptions) increment(warmup bool) { if warmup { atomic.AddInt64(&p.warmupCount, 1) } else { diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go index a95ec87270c6..01cef9bc40d4 100644 --- a/sdk/internal/perf/perf_runner.go +++ b/sdk/internal/perf/perf_runner.go @@ -271,12 +271,12 @@ func (r *perfRunner) opsPerSecond(warmup bool) float64 { defer r.allOptions.mu.Unlock() for _, opt := range r.allOptions.opts { if warmup { - e := opt.warmupElapsed.GetFloat() + e := float64(atomic.LoadInt64((*int64)(&opt.warmupElapsed))) / float64(time.Second) if e != 0 { ret += float64(atomic.LoadInt64(&opt.warmupCount)) / e } } else { - e := opt.runElapsed.GetFloat() + e := float64(atomic.LoadInt64((*int64)(&opt.runElapsed))) / float64(time.Second) if e != 0 { ret += float64(atomic.LoadInt64(&opt.runCount)) / e } @@ -365,68 +365,93 @@ func (r *perfRunner) runTest(p PerfTest, index int, ID string) { } } - if warmUpDuration > 0 { - opts.warmupStart = time.Now() + // true parameter indicates were running the warmup here + err := r.runTestForDuration(p, opts, true) + if err != nil { + panic(err) + } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(warmUpDuration)) - defer cancel() + // increment the warmupFinished counter that one goroutine has finished warmup + val := atomic.AddInt32(&r.warmupFinished, 1) + if debug { + fmt.Printf("finished %d warmups\n", val) + } - lastSavedTime := time.Now() - for time.Since(opts.warmupStart).Seconds() < float64(warmUpDuration) { - err := p.Run(ctx) - if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - break - } else { - panic(err) - } - } - opts.incrememt(true) + // run the actual test + err = r.runTestForDuration(p, opts, false) + if err != nil { + panic(err) + } - if time.Since(lastSavedTime).Seconds() > 0.3 { - opts.warmupElapsed.SetFloat(time.Since(opts.warmupStart).Seconds()) - lastSavedTime = time.Now() - } + if testProxyURLs != "" { + // Stop the proxy now + err := proxyTransportsSuite[ID].stop() + if err != nil { + panic(err) } + proxyTransportsSuite[ID].SetMode("live") + } + opts.finished = true +} - opts.warmupElapsed.SetFloat(time.Since(opts.warmupStart).Seconds()) +func (r *perfRunner) runTestForDuration(p PerfTest, opts *PerfTestOptions, warmup bool) error { + if warmup && warmUpDuration <= 0 { + return nil } - val := atomic.AddInt32(&r.warmupFinished, 1) - if debug { - fmt.Printf("finished %d warmups\n", val) + + // startPtr is our base time for keeping track of how long a test has run + var startPtr *time.Time + if warmup { + t := time.Now() + opts.warmupStart = &t + startPtr = opts.warmupStart + } else { + t := time.Now() + opts.runStart = &t + startPtr = opts.runStart } - opts.runStart = time.Now() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(duration)) + var runDuration int + if warmup { + runDuration = warmUpDuration + } else { + runDuration = duration + } + + var ctx context.Context + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), time.Second*time.Duration(runDuration)) defer cancel() lastSavedTime := time.Now() - for time.Since(opts.runStart).Seconds() < float64(duration) { + for time.Since(*startPtr).Seconds() < float64(runDuration) { err := p.Run(ctx) if err != nil { if errors.Is(err, context.DeadlineExceeded) { break } else { - panic(err) + return err } } - opts.incrememt(false) + opts.increment(warmup) - if time.Since(lastSavedTime).Seconds() > 0.1 { - opts.runElapsed.SetFloat(time.Since(opts.runStart).Seconds()) + if time.Since(lastSavedTime).Seconds() > 0.3 { + duration := time.Since(*startPtr) + if warmup { + atomic.StoreInt64((*int64)(&opts.warmupElapsed), int64(duration)) + } else { + atomic.StoreInt64((*int64)(&opts.runElapsed), int64(duration)) + } lastSavedTime = time.Now() } } - opts.runElapsed.SetFloat(time.Since(opts.runStart).Seconds()) - - if testProxyURLs != "" { - // Stop the proxy now - err := proxyTransportsSuite[ID].stop() - if err != nil { - panic(err) - } - proxyTransportsSuite[ID].SetMode("live") + duration := time.Since(*startPtr) + if warmup { + atomic.StoreInt64((*int64)(&opts.warmupElapsed), int64(duration)) + } else { + atomic.StoreInt64((*int64)(&opts.runElapsed), int64(duration)) } - opts.finished = true + + return nil } diff --git a/sdk/internal/perf/perf_test.go b/sdk/internal/perf/perf_test.go index bcdf4f366ea2..e23fc5b03652 100644 --- a/sdk/internal/perf/perf_test.go +++ b/sdk/internal/perf/perf_test.go @@ -66,7 +66,7 @@ func TestRun(t *testing.T) { func TestParseProxyURLs(t *testing.T) { testProxyURLs = "" result := parseProxyURLS() - require.Equal(t, result, []string{}) + require.Equal(t, result, nil) testProxyURLs = "https://localhost:5001" result = parseProxyURLS() diff --git a/sdk/internal/perf/testdata/perf/sleep_perf.go b/sdk/internal/perf/testdata/perf/sleep_perf.go index ebdce174d36b..ca23921da515 100644 --- a/sdk/internal/perf/testdata/perf/sleep_perf.go +++ b/sdk/internal/perf/testdata/perf/sleep_perf.go @@ -63,7 +63,7 @@ func (s *sleepPerfTest) Run(ctx context.Context) error { return nil default: time.Sleep(s.sleepInterval) - s.sleepInterval = time.Duration(float64(s.sleepInterval.Nanoseconds())*sleepTestOpts.iterationGrowthFactor) * time.Nanosecond + s.sleepInterval = time.Duration(float64(s.sleepInterval.Nanoseconds())*sleepTestOpts.iterationGrowthFactor) } return nil } From dd8e03b29d4dadf6e54be7d7f8253864a071c424 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Wed, 16 Mar 2022 12:34:27 -0400 Subject: [PATCH 15/15] format check and test fix --- sdk/internal/perf/README.md | 10 +++++----- sdk/internal/perf/perf_test.go | 2 +- sdk/internal/perf/testdata/perf/sleep_perf.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/internal/perf/README.md b/sdk/internal/perf/README.md index 08cbcab992af..cb319517f25e 100644 --- a/sdk/internal/perf/README.md +++ b/sdk/internal/perf/README.md @@ -3,11 +3,11 @@ The `perf` sub-module provides a singular framework for writing performance test ## Default Command Options -| Flag | Short Flag | Default Value | Variable Name | Description | -| -----| ---------- | ------------- | ------------- | ----------- | -| `--duration` | `-d` | 10 seconds | internal.Duration (`int`) | How long to run an individual performance test | -| `--test-proxies` | `-x` | N/A | internal.TestProxy (`string`) | Whether to run a test against a test proxy. If you want to run against `https` specify with `--test-proxies https`, likewise for `http`. If you want to run normally omit this flag | -| `--warmup` | `-w` | 3 seconds| internal.WarmUp (`int`) | How long to allow the connection to warm up. | +| Flag | Short Flag | Default Value | Description | +| -----| ---------- | ------------- | ----------- | +| `--duration` | `-d` | 10 seconds | How long to run an individual performance test | +| `--test-proxies` | `-x` | N/A | A semicolon separated list of proxy urls. If you want to run normally omit this flag | +| `--warmup` | `-w` | 3 seconds| How long to allow the connection to warm up. | ## Adding Performance Tests to an SDK diff --git a/sdk/internal/perf/perf_test.go b/sdk/internal/perf/perf_test.go index e23fc5b03652..f10239e30c5d 100644 --- a/sdk/internal/perf/perf_test.go +++ b/sdk/internal/perf/perf_test.go @@ -66,7 +66,7 @@ func TestRun(t *testing.T) { func TestParseProxyURLs(t *testing.T) { testProxyURLs = "" result := parseProxyURLS() - require.Equal(t, result, nil) + require.Equal(t, 0, len(result)) testProxyURLs = "https://localhost:5001" result = parseProxyURLS() diff --git a/sdk/internal/perf/testdata/perf/sleep_perf.go b/sdk/internal/perf/testdata/perf/sleep_perf.go index ca23921da515..7056d0083fbd 100644 --- a/sdk/internal/perf/testdata/perf/sleep_perf.go +++ b/sdk/internal/perf/testdata/perf/sleep_perf.go @@ -63,7 +63,7 @@ func (s *sleepPerfTest) Run(ctx context.Context) error { return nil default: time.Sleep(s.sleepInterval) - s.sleepInterval = time.Duration(float64(s.sleepInterval.Nanoseconds())*sleepTestOpts.iterationGrowthFactor) + s.sleepInterval = time.Duration(float64(s.sleepInterval.Nanoseconds()) * sleepTestOpts.iterationGrowthFactor) } return nil }