Skip to content

Commit

Permalink
[Java] Compute ops/s for each parallel workstream (#5)
Browse files Browse the repository at this point in the history
* Add comment

* Add .vscode to python .gitignore

* Compute ops/s for each parallel workstream

* Improve status printing
Fix bug in opsPerSec calculation
  • Loading branch information
mikeharder authored Nov 15, 2019
1 parent b915a8c commit f80c973
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.JCommander.Builder;
Expand All @@ -22,14 +23,15 @@
import reactor.core.publisher.Mono;

public class PerfStressProgram {
private static AtomicInteger _completedOperations = new AtomicInteger();
private static int[] _completedOperations;
private static long[] _lastCompletionNanoTimes;

public static void Run(Class<?>[] classes, String[] args) {
List<Class<?>> classList = new ArrayList<>(Arrays.asList(classes));

try {
classList.add(Class.forName("com.azure.perfstress.NoOpTest"));
classList.add(Class.forName("com.azure.perfstress.SleepTest"));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -83,7 +85,7 @@ public static void Run(Class<?> testClass, PerfStressOptions options) {

System.out.println();
System.out.println();
Disposable setupStatus = PrintStatus("=== Setup ===", () -> ".", false);
Disposable setupStatus = PrintStatus("=== Setup ===", () -> ".", false, false);
Disposable cleanupStatus = null;

PerfStressTest<?>[] tests = new PerfStressTest<?>[options.Parallel];
Expand Down Expand Up @@ -117,7 +119,7 @@ public static void Run(Class<?> testClass, PerfStressOptions options) {
} finally {
if (!options.NoCleanup) {
if (cleanupStatus == null) {
cleanupStatus = PrintStatus("=== Cleanup ===", () -> ".", false);
cleanupStatus = PrintStatus("=== Cleanup ===", () -> ".", false, false);
}

Flux.just(tests).flatMap(t -> t.CleanupAsync()).blockLast();
Expand All @@ -126,7 +128,7 @@ public static void Run(Class<?> testClass, PerfStressOptions options) {
} finally {
if (!options.NoCleanup) {
if (cleanupStatus == null) {
cleanupStatus = PrintStatus("=== Cleanup ===", () -> ".", false);
cleanupStatus = PrintStatus("=== Cleanup ===", () -> ".", false, false);
}

tests[0].GlobalCleanupAsync().block();
Expand All @@ -139,89 +141,99 @@ public static void Run(Class<?> testClass, PerfStressOptions options) {
}

public static void RunTests(PerfStressTest<?>[] tests, boolean sync, int parallel, int durationSeconds, String title) {
_completedOperations.set(0);
_completedOperations = new int[parallel];
_lastCompletionNanoTimes = new long[parallel];

long endNanoTime = System.nanoTime() + ((long) durationSeconds * 1000000000);

int[] lastCompleted = new int[] { 0 };
Disposable progressStatus = PrintStatus(
"=== " + title + " ===" + System.lineSeparator() + "Current\t\tTotal", () -> {
int totalCompleted = _completedOperations.get();
int totalCompleted = IntStream.of(_completedOperations).sum();
int currentCompleted = totalCompleted - lastCompleted[0];
lastCompleted[0] = totalCompleted;
return currentCompleted + "\t\t" + totalCompleted;
}, true);
}, true, true);

if (sync) {
ForkJoinPool forkJoinPool = new ForkJoinPool(parallel);
try {
forkJoinPool.submit(() -> {
Arrays.stream(tests).parallel().forEach(t -> RunLoop(t, endNanoTime));
IntStream.range(0, parallel).parallel().forEach(i -> RunLoop(tests[i], i, endNanoTime));
}).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
else {
Flux.just(tests).flatMap(t -> RunLoopAsync(t, endNanoTime)).blockLast();
Flux.range(0, parallel).flatMap(i -> RunLoopAsync(tests[i], i, endNanoTime)).blockLast();
}

progressStatus.dispose();

System.out.println("=== Results ===");

double averageElapsedSeconds = (Arrays.stream(_lastCompletionNanoTimes).average().orElse(Double.NaN))
/ 1000000000;
double operationsPerSecond = _completedOperations.get() / averageElapsedSeconds;
int totalOperations = IntStream.of(_completedOperations).sum();
double operationsPerSecond = IntStream.range(0, parallel)
.mapToDouble(i -> ((double)_completedOperations[i]) / (_lastCompletionNanoTimes[i] / 1000000000))
.sum();
double secondsPerOperation = 1 / operationsPerSecond;
double weightedAverageSeconds = totalOperations / operationsPerSecond;

System.out.printf("Completed %d operations in an average of %.2fs (%.2f ops/s, %.3f s/op)%n",
_completedOperations.get(), averageElapsedSeconds, operationsPerSecond, secondsPerOperation);
System.out.printf("Completed %d operations in a weighted-average of %.2fs (%.2f ops/s, %.3f s/op)%n",
totalOperations, weightedAverageSeconds, operationsPerSecond, secondsPerOperation);
System.out.println();
}

private static void RunLoop(PerfStressTest<?> test, long endNanoTime) {
private static void RunLoop(PerfStressTest<?> test, int index, long endNanoTime) {
long startNanoTime = System.nanoTime();
while (System.nanoTime() < endNanoTime) {
test.Run();
int count = _completedOperations.incrementAndGet();
_lastCompletionNanoTimes[count % _lastCompletionNanoTimes.length] = System.nanoTime() - startNanoTime;
_completedOperations[index]++;
_lastCompletionNanoTimes[index] = System.nanoTime() - startNanoTime;
}
}

private static Mono<Void> RunLoopAsync(PerfStressTest<?> test, long endNanoTime) {
private static Mono<Void> RunLoopAsync(PerfStressTest<?> test, int index, long endNanoTime) {
long startNanoTime = System.nanoTime();

return Flux.just(1)
.repeat()
.flatMap(i -> test.RunAsync().then(Mono.just(1)), 1)
.doOnNext(v -> {
int count = _completedOperations.incrementAndGet();
_lastCompletionNanoTimes[count % _lastCompletionNanoTimes.length] = System.nanoTime() - startNanoTime;
_completedOperations[index]++;
_lastCompletionNanoTimes[index] = System.nanoTime() - startNanoTime;
})
.take(Duration.ofNanos(endNanoTime - startNanoTime))
.then();
}

private static Disposable PrintStatus(String header, Supplier<Object> status, boolean newLine) {
private static Disposable PrintStatus(String header, Supplier<Object> status, boolean newLine, boolean printFinalStatus) {
System.out.println(header);

boolean[] needsExtraNewline = new boolean[] { false };

return Flux.interval(Duration.ofSeconds(1)).doFinally(s -> {
if (printFinalStatus) {
PrintStatusHelper(status, newLine, needsExtraNewline);
}

if (needsExtraNewline[0]) {
System.out.println();
}
System.out.println();
}).subscribe(i -> {
Object obj = status.get();
if (newLine) {
System.out.println(obj);
} else {
System.out.print(obj);
needsExtraNewline[0] = true;
}
PrintStatusHelper(status, newLine, needsExtraNewline);
});
}

private static void PrintStatusHelper(Supplier<Object> status, boolean newLine, boolean[] needsExtraNewline) {
Object obj = status.get();
if (newLine) {
System.out.println(obj);
} else {
System.out.print(obj);
needsExtraNewline[0] = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.azure.perfstress;

import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

public class SleepTest extends PerfStressTest<PerfStressOptions> {
private static final AtomicInteger _instanceCount = new AtomicInteger();
private final int _secondsPerOperation;

public SleepTest(PerfStressOptions options) {
super(options);

int instanceCount = _instanceCount.incrementAndGet();
_secondsPerOperation = Pow(2, instanceCount);
}

private static int Pow(int value, int exponent) {
int power = 1;
for (int i=0; i < exponent; i++) {
power *= value;
}
return power;
}

@Override
public void Run() {
try {
Thread.sleep(_secondsPerOperation * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public Mono<Void> RunAsync() {
return Mono.delay(Duration.ofSeconds(_secondsPerOperation)).then();
}
}

0 comments on commit f80c973

Please sign in to comment.