diff --git a/src/core/Akka.Persistence.TCK/Akka.Persistence.TCK.csproj b/src/core/Akka.Persistence.TCK/Akka.Persistence.TCK.csproj
index b8dd14c05cb..95c6d001e5e 100644
--- a/src/core/Akka.Persistence.TCK/Akka.Persistence.TCK.csproj
+++ b/src/core/Akka.Persistence.TCK/Akka.Persistence.TCK.csproj
@@ -16,6 +16,7 @@
+
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TCK/Performance/JournalPerfSpec.cs b/src/core/Akka.Persistence.TCK/Performance/JournalPerfSpec.cs
index e3285c4d6b2..f3c4d92fcd1 100644
--- a/src/core/Akka.Persistence.TCK/Performance/JournalPerfSpec.cs
+++ b/src/core/Akka.Persistence.TCK/Performance/JournalPerfSpec.cs
@@ -17,6 +17,7 @@
using Akka.TestKit;
using Akka.Util;
using Akka.Util.Internal;
+using MathNet.Numerics.Statistics;
using Xunit;
using Xunit.Abstractions;
@@ -33,7 +34,7 @@ namespace Akka.Persistence.TestKit.Performance
///
public abstract class JournalPerfSpec : Akka.TestKit.Xunit2.TestKit
{
- private readonly TestProbe testProbe;
+ private readonly TestProbe _testProbe;
///
/// Number of messages sent to the PersistentActor under test for each test iteration
@@ -44,6 +45,11 @@ public abstract class JournalPerfSpec : Akka.TestKit.Xunit2.TestKit
/// Number of measurement iterations each test will be run.
///
protected int MeasurementIterations = 10;
+
+ ///
+ /// Sigma value for the z-score outlier rejection algorithm
+ ///
+ protected double OutlierRejectionSigma = 2.0;
///
/// Override in order to customize timeouts used for ExpectMsg, in order to tune the awaits to your journal's perf
@@ -55,12 +61,12 @@ public abstract class JournalPerfSpec : Akka.TestKit.Xunit2.TestKit
protected JournalPerfSpec(Config config, string actorSystem, ITestOutputHelper output)
: base(config ?? Config.Empty, actorSystem, output)
{
- testProbe = CreateTestProbe();
+ _testProbe = CreateTestProbe();
}
internal IActorRef BenchActor(string pid, int replyAfter)
{
- return Sys.ActorOf(Props.Create(() => new BenchActor(pid, testProbe, EventsCount, false)));;
+ return Sys.ActorOf(Props.Create(() => new BenchActor(pid, _testProbe, EventsCount, false)));;
}
internal (IActorRef aut,TestProbe probe) BenchActorNewProbe(string pid, int replyAfter)
@@ -81,27 +87,39 @@ internal IActorRef BenchActor(string pid, int replyAfter)
internal void FeedAndExpectLast(IActorRef actor, string mode, IReadOnlyList commands)
{
- commands.ForEach(c => actor.Tell(new Cmd(mode, c)));
- testProbe.ExpectMsg(commands.Last(), ExpectDuration);
+ for (var i = 0; i < commands.Count; i++)
+ {
+ actor.Tell(new Cmd(mode, commands[i]));
+ }
+
+ _testProbe.ExpectMsg(commands[commands.Count - 1], ExpectDuration);
}
internal void FeedAndExpectLastSpecific((IActorRef actor, TestProbe probe) aut, string mode, IReadOnlyList commands)
{
- commands.ForEach(c => aut.actor.Tell(new Cmd(mode, c)));
+ var (actor, probe) = aut;
+ for (var i = 0; i < commands.Count; i++)
+ {
+ actor.Tell(new Cmd(mode, commands[i]));
+ }
- aut.probe.ExpectMsg(commands.Last(), ExpectDuration);
+ probe.ExpectMsg(commands[commands.Count - 1], ExpectDuration);
}
internal void FeedAndExpectLastRouterSet(
(IActorRef actor, TestProbe probe) autSet, string mode,
IReadOnlyList commands, int numExpect)
{
-
- commands.ForEach(c => autSet.actor.Tell(new Broadcast(new Cmd(mode, c))));
+ var (actor, probe) = autSet;
+ for (var i = 0; i < commands.Count; i++)
+ {
+ actor.Tell(new Broadcast(new Cmd(mode, commands[i])));
+ }
- for (int i = 0; i < numExpect; i++)
+ var expected = commands[commands.Count - 1];
+ for (var i = 0; i < numExpect; i++)
{
- autSet.probe.ExpectMsg(commands.Last(), ExpectDuration);
+ probe.ExpectMsg(expected, ExpectDuration);
}
}
@@ -126,10 +144,25 @@ internal void Measure(Func msg, Action block)
i++;
}
- double avgTime = measurements.Select(c => c.TotalMilliseconds).Sum() / MeasurementIterations;
- double msgPerSec = (EventsCount / avgTime) * 1000;
+ var (rejected, times) = RejectOutliers(measurements.Select(c => c.TotalMilliseconds).ToArray(), OutlierRejectionSigma);
+
+ var mean = times.Average();
+ var stdDev = times.PopulationStandardDeviation();
+ var min = times.Minimum();
+ var q1 = times.LowerQuartile();
+ var median = times.Median();
+ var q3 = times.UpperQuartile();
+ var max = times.Maximum();
+
+ Output.WriteLine($"Mean: {mean:F2} ms, Standard Deviation: {stdDev:F2} ms, Min: {min:F2} ms, Q1: {q1:F2} ms, Median: {median:F2} ms, Q3: {q3:F2} ms, Max: {max:F2} ms");
- Output.WriteLine($"Average time: {avgTime} ms, {msgPerSec} msg/sec");
+ var msgPerSec = EventsCount / mean * 1000;
+ Output.WriteLine($"Mean throughput: {msgPerSec:F2} msg/s");
+
+ var medianMsgPerSec = EventsCount / median * 1000;
+ Output.WriteLine($"Median throughput: {medianMsgPerSec:F2} msg/s");
+
+ Output.WriteLine($"Rejected outlier (sigma: {OutlierRejectionSigma}): {string.Join(", ", rejected)}");
}
///
@@ -156,10 +189,28 @@ internal void MeasureGroup(Func msg, Action block, int numMsg,
i++;
}
- double avgTime = measurements.Select(c => c.TotalMilliseconds).Sum() / MeasurementIterations;
- double msgPerSec = (numMsg / avgTime) * 1000;
- double msgPerSecTotal = (numMsg*numGroup / avgTime) * 1000;
- Output.WriteLine($"Workers: {numGroup} , Average time: {avgTime} ms, {msgPerSec} msg/sec/actor, {msgPerSecTotal} total msg/sec.");
+ var (rejected, times) = RejectOutliers(measurements.Select(c => c.TotalMilliseconds).ToArray(), OutlierRejectionSigma);
+
+ var mean = times.Average();
+ var stdDev = times.PopulationStandardDeviation();
+ var min = times.Minimum();
+ var q1 = times.LowerQuartile();
+ var median = times.Median();
+ var q3 = times.UpperQuartile();
+ var max = times.Maximum();
+
+ Output.WriteLine($"Workers: {numGroup}, Mean: {mean:F2} ms, Standard Deviation: {stdDev:F2} ms, Min: {min:F2} ms, Q1: {q1:F2} ms, Median: {median:F2} ms, Q3: {q3:F2} ms, Max: {max:F2} ms");
+
+ var msgPerSec = numMsg / mean * 1000;
+ var msgPerSecTotal = numMsg * numGroup / mean * 1000;
+
+ Output.WriteLine($"Mean throughput: {msgPerSec:F2} msg/s/actor, Mean total throughput: {msgPerSecTotal:F2} msg/s");
+
+ var medianMsgPerSec = numMsg / median * 1000;
+ var medianMsgPerSecTotal = numMsg * numGroup / median * 1000;
+ Output.WriteLine($"Median throughput: {medianMsgPerSec:F2} msg/s/actor, Median total throughput: {medianMsgPerSecTotal:F2} msg/s");
+
+ Output.WriteLine($"Rejected outlier (sigma: {OutlierRejectionSigma}): {string.Join(", ", rejected)}");
}
private void RunPersistGroupBenchmark(int numGroup, int numCommands)
@@ -179,6 +230,18 @@ private void RunPersistGroupBenchmark(int numGroup, int numCommands)
);
}
+ private static (IReadOnlyList Rejected, IReadOnlyList Measurements) RejectOutliers(IReadOnlyList measurements, double sigma)
+ {
+ var mean = measurements.Average();
+ var stdDev = measurements.PopulationStandardDeviation();
+ var threshold = sigma * stdDev;
+ var minThreshold = mean - threshold;
+ var maxThreshold = mean + threshold;
+ var rejected = measurements.Where(m => m < minThreshold || m > maxThreshold);
+ var accepted = measurements.Where(m => m >= minThreshold && m <= maxThreshold);
+ return (rejected.ToArray(), accepted.ToArray());
+ }
+
[Fact]
public void PersistenceActor_performance_must_measure_Persist()
{
@@ -272,7 +335,7 @@ public void PersistenceActor_performance_must_measure_Recovering()
Measure(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms", () =>
{
BenchActor("PersistRecoverPid", EventsCount);
- testProbe.ExpectMsg(Commands.Last(), ExpectDuration);
+ _testProbe.ExpectMsg(Commands.Last(), ExpectDuration);
});
}