Skip to content

Commit

Permalink
[Perf] Add EventPerfTest to framework, add tests for EventHubs and Ev…
Browse files Browse the repository at this point in the history
…entHubs.Processor (Azure#22760)

- Only change to perf framework is adding EventPerfTest
- Adds sample event-based perf tests under Azure.Sample.Perf
- Adds real-world perf tests for EventHubs and EventHubs.Processor
- Builds on Azure#24653
- Fixes Azure#24322
  • Loading branch information
mikeharder authored Nov 1, 2021
1 parent 5baa6d5 commit cc7287c
Show file tree
Hide file tree
Showing 30 changed files with 945 additions and 61 deletions.
19 changes: 19 additions & 0 deletions common/Perf/Azure.Sample.Perf/Event/MockErrorEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;

namespace Azure.Sample.Perf.Event
{
public class MockErrorEventArgs
{
public int Partition { get; }
public Exception Exception { get; }

public MockErrorEventArgs(int partition, Exception exception)
{
Partition = partition;
Exception = exception;
}
}
}
17 changes: 17 additions & 0 deletions common/Perf/Azure.Sample.Perf/Event/MockEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace Azure.Sample.Perf.Event
{
public class MockEventArgs
{
public int Partition { get; }
public string Data { get; }

public MockEventArgs(int partition, string data)
{
Partition = partition;
Data = data;
}
}
}
140 changes: 140 additions & 0 deletions common/Perf/Azure.Sample.Perf/Event/MockEventProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Sample.Perf.Event
{
public class MockEventProcessor
{
public int Partitions { get; }
public int? MaxEventsPerSecond { get; }
private double MaxEventsPerSecondPerPartition => ((double)MaxEventsPerSecond) / Partitions;

public TimeSpan? ErrorAfter { get; }
private bool _errorRaised;
private object _errorLock = new object();

private readonly MockEventArgs[] _eventArgs;
private readonly int[] _eventsRaised;
private readonly Stopwatch _sw = new Stopwatch();

private Func<MockEventArgs, Task> _processEventAsync;
private Func<MockErrorEventArgs, Task> _processErrorAsync;

public MockEventProcessor(int partitions, int? maxEventsPerSecond, TimeSpan? errorAfter)
{
Partitions = partitions;
MaxEventsPerSecond = maxEventsPerSecond;
ErrorAfter = errorAfter;

_eventArgs = new MockEventArgs[partitions];
for (var i=0; i < partitions; i++)
{
_eventArgs[i] = new MockEventArgs(partition: i, data: "hello");
}

_eventsRaised = new int[partitions];
}

public event Func<MockEventArgs, Task> ProcessEventAsync
{
add
{
_processEventAsync = value;
}
remove
{
_processEventAsync = default;
}
}

public event Func<MockErrorEventArgs, Task> ProcessErrorAsync
{
add
{
_processErrorAsync = value;
}
remove
{
_processErrorAsync = default;
}
}

public Task StartProcessingAsync()
{
_sw.Start();

for (var i=0; i < Partitions; i++)
{
var j = i;
Task.Run(() => Process(j));
}

return Task.CompletedTask;
}

private void Process(int partition)
{
var eventArgs = _eventArgs[partition];

if (MaxEventsPerSecond.HasValue)
{
while (true)
{
if (ErrorAfter.HasValue && !_errorRaised && _sw.Elapsed > ErrorAfter)
{
lock (_errorLock)
{
if (!_errorRaised)
{
_processErrorAsync(new MockErrorEventArgs(partition, new InvalidOperationException("test exception")));
_errorRaised = true;
}
}
}
else
{
var eventsRaised = _eventsRaised[partition];
var targetEventsRaised = _sw.Elapsed.TotalSeconds * MaxEventsPerSecondPerPartition;

if (eventsRaised < targetEventsRaised)
{
_processEventAsync(eventArgs).Wait();
_eventsRaised[partition]++;
}
else
{
Thread.Sleep(TimeSpan.FromSeconds(1 / MaxEventsPerSecondPerPartition));
}
}
}
}
else
{
while (true)
{
if (ErrorAfter.HasValue && !_errorRaised && _sw.Elapsed > ErrorAfter)
{
lock (_errorLock)
{
if (!_errorRaised)
{
_processErrorAsync(new MockErrorEventArgs(partition, new InvalidOperationException("test exception")));
_errorRaised = true;
}
}
}
else
{
_processEventAsync(eventArgs).Wait();
_eventsRaised[partition]++;
}
}
}
}
}
}
81 changes: 81 additions & 0 deletions common/Perf/Azure.Sample.Perf/Event/MockEventProcessorBaseTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Azure.Test.Perf;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Sample.Perf.Event
{
public class MockEventProcessorBaseTest : PerfTestBase<MockEventProcessorOptions>
{
private readonly Stopwatch _stopwatch;
private readonly MockEventProcessor _eventProcessor;

private long[] _eventsProcessed;
public override long CompletedOperations => _eventsProcessed.Sum();

public override IList<TimeSpan> Latencies => throw new NotImplementedException();
public override IList<TimeSpan> CorrectedLatencies => throw new NotImplementedException();

public MockEventProcessorBaseTest(MockEventProcessorOptions options) : base(options)
{
_stopwatch = new Stopwatch();

_eventProcessor = new MockEventProcessor(options.Partitions, options.MaxEventsPerSecond,
options.ErrorAfterSeconds.HasValue ? TimeSpan.FromSeconds(options.ErrorAfterSeconds.Value) : null);

_eventProcessor.ProcessEventAsync += ProcessEventAsync;

_eventsProcessed = new long[options.Partitions];
}

private Task ProcessEventAsync(MockEventArgs arg)
{
Interlocked.Increment(ref _eventsProcessed[arg.Partition]);
LastCompletionTime = _stopwatch.Elapsed;

return Task.CompletedTask;
}

public override async Task SetupAsync()
{
await base.SetupAsync();
await _eventProcessor.StartProcessingAsync();
}

public override void RunAll(CancellationToken cancellationToken)
{
for (var i = 0; i < _eventsProcessed.Length; i++)
{
Interlocked.Exchange(ref _eventsProcessed[i], 0);
}
LastCompletionTime = default;

RunAllAsync(cancellationToken).Wait();
}

public override async Task RunAllAsync(CancellationToken cancellationToken)
{
for (var i = 0; i < _eventsProcessed.Length; i++)
{
Interlocked.Exchange(ref _eventsProcessed[i], 0);
}
LastCompletionTime = default;

_stopwatch.Restart();

try
{
await Task.Delay(Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException)
{
}
}
}
}
41 changes: 41 additions & 0 deletions common/Perf/Azure.Sample.Perf/Event/MockEventProcessorEventTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Azure.Test.Perf;
using System;
using System.Threading.Tasks;

namespace Azure.Sample.Perf.Event
{
public class MockEventProcessorEventTest : EventPerfTest<MockEventProcessorOptions>
{
private readonly MockEventProcessor _eventProcessor;

public MockEventProcessorEventTest(MockEventProcessorOptions options) : base(options)
{
_eventProcessor = new MockEventProcessor(options.Partitions, options.MaxEventsPerSecond,
options.ErrorAfterSeconds.HasValue ? TimeSpan.FromSeconds(options.ErrorAfterSeconds.Value) : null);

_eventProcessor.ProcessEventAsync += ProcessEventAsync;
_eventProcessor.ProcessErrorAsync += ProcessErrorAsync;
}

private Task ProcessEventAsync(MockEventArgs arg)
{
EventRaised();
return Task.CompletedTask;
}

private Task ProcessErrorAsync(MockErrorEventArgs arg)
{
ErrorRaised(arg.Exception);
return Task.CompletedTask;
}

public override async Task SetupAsync()
{
await base.SetupAsync();
await _eventProcessor.StartProcessingAsync();
}
}
}
20 changes: 20 additions & 0 deletions common/Perf/Azure.Sample.Perf/Event/MockEventProcessorOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Azure.Test.Perf;
using CommandLine;

namespace Azure.Sample.Perf.Event
{
public class MockEventProcessorOptions : PerfOptions
{
[Option("error-after-seconds", HelpText = "Raise error after this number of seconds")]
public int? ErrorAfterSeconds { get; set; }

[Option("max-events-per-second", HelpText = "Maximum events per second across all partitions")]
public int? MaxEventsPerSecond { get; set; }

[Option("partitions", Default = 8)]
public int Partitions { get; set; }
}
}
Loading

0 comments on commit cc7287c

Please sign in to comment.