Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: 282-Create InFlightNatsMsg to improve channel buffer usage. #290

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 273 additions & 0 deletions sandbox/MicroBenchmark/ChannelPassingBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
using System.Threading.Channels;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using NATS.Client.Core;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;

namespace MicroBenchmark;

[MemoryDiagnoser]
[SimpleJob(RuntimeMoniker.Net80)]
public class ChannelPassingBenchmarks
{
private NatsConnection Connection;
private SubWrappedChannel<string> _inFlightNatsMsgChannel;
private Channel<NatsMsg<string>> _natsMsgChannel;
private BoundedChannelOptions ChannelOpts;
private CancellationTokenSource _cts;

[GlobalCleanup]
public void TearDown()
{
_cts.Dispose();
_natsMsgChannel = null;
_inFlightNatsMsgChannel = null;
}

[GlobalSetup]
public void Setup()
{
Connection = new NatsConnection();
ChannelOpts = Connection.GetChannelOpts(Connection.Opts, default);
_inFlightNatsMsgChannel = new SubWrappedChannel<string>(
Channel.CreateBounded<InFlightNatsMsg<string>>(
ChannelOpts),
Connection);
_natsMsgChannel = Channel.CreateBounded<NatsMsg<string>>(
ChannelOpts);
_cts = new CancellationTokenSource();
}

[Benchmark]
public void RunNatsMsgChannel_Sync()
{
var maxCount = ChannelOpts.Capacity;


for (int i = 0; i < maxCount; i++)
{
_natsMsgChannel.Writer.TryWrite(new NatsMsg<string>("t", default, 3, default, "foo", default));
}

var reader = _natsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
reader.TryRead(out _);
}
}

[Benchmark]
public void RunInFlightNatsMsgChannel_Sync()
{
var maxCount = ChannelOpts.Capacity;

var writer = _inFlightNatsMsgChannel.Writer;
for (int i = 0; i < maxCount; i++)
{
writer.TryWrite(new InFlightNatsMsg<string>("t", default, 3, default, "foo"));
}

var reader = _inFlightNatsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
reader.TryRead(out _);
}
}

[Benchmark]
public void RunNatsMsgChannel_Sync_Pulse()
{
var maxCount = ChannelOpts.Capacity;


var reader = _natsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
_natsMsgChannel.Writer.TryWrite(new NatsMsg<string>("t", default, 3, default, "foo", default));
reader.TryRead(out _);
}
}

[Benchmark]
public void RunInFlightNatsMsgChannel_Sync_Pulse()
{
var maxCount = ChannelOpts.Capacity;

var writer = _inFlightNatsMsgChannel.Writer;
var reader = _inFlightNatsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
writer.TryWrite(new InFlightNatsMsg<string>("t", default, 3, default, "foo"));
reader.TryRead(out _);
}
}

[Benchmark]
public async Task RunNatsMsgChannel_Async_Pulse()
{
var maxCount = ChannelOpts.Capacity;


var reader = _natsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
_natsMsgChannel.Writer.TryWrite(new NatsMsg<string>("t", default, 3, default, "foo", default));
await reader.ReadAsync(_cts.Token);
}
}

[Benchmark]
public async Task RunInFlightNatsMsgChannel_Async_Pulse()
{
var maxCount = ChannelOpts.Capacity;

var writer = _inFlightNatsMsgChannel.Writer;
var reader = _inFlightNatsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
writer.TryWrite(new InFlightNatsMsg<string>("t", default, 3, default, "foo"));
await reader.ReadAsync(_cts.Token);
}
}

[Benchmark]
public async Task RunNatsMsgChannel_Async_Pulse_Unhappy()
{
var maxCount = ChannelOpts.Capacity;

var writer = _natsMsgChannel.Writer;
var reader = _natsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
var r = reader.ReadAsync(_cts.Token);
writer.TryWrite(new NatsMsg<string>("t", default, 3, default, "foo", default));
var v = await r;
if (v.Subject == null)
{
Console.WriteLine("wat");
}
}
}

[Benchmark]
public async Task RunInFlightNatsMsgChannel_Async_Pulse_Unhappy()
{
var maxCount = ChannelOpts.Capacity;

var writer = _inFlightNatsMsgChannel.Writer;
var reader = _inFlightNatsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
var r = reader.ReadAsync(_cts.Token);
writer.TryWrite(new InFlightNatsMsg<string>("t", default, 3, default, "foo"));
var v = await r;
if (v.Subject == null)
{
Console.WriteLine("wat");
}
}
}

[Benchmark]
public async Task RunNatsMsgChannel_Async_Unhappy()
{
var maxCount = ChannelOpts.Capacity;

var readTask = Task.Run(
async () =>
{
var reader = _natsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
await reader.ReadAsync(_cts.Token);
}
});
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
() =>
{
for (int i = 0; i < maxCount; i++)
{
_natsMsgChannel.Writer.TryWrite(new NatsMsg<string>("t", default, 3, default, "foo", default));
}
});
await readTask;
}

[Benchmark]
public async Task RunInFlightNatsMsgChannel_Async_Unhappy()
{
var maxCount = ChannelOpts.Capacity;

var readTask = Task.Run(
async () =>
{
var reader = _inFlightNatsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
await reader.ReadAsync(_cts.Token);
}
});

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
() =>
{
var writer = _inFlightNatsMsgChannel.Writer;
for (int i = 0; i < maxCount; i++)
{
writer.TryWrite(new InFlightNatsMsg<string>("t", default, 3, default, "foo"));
}
});
await readTask;
}

[Benchmark]
public async Task RunNatsMsgChannel_Async_Happy()
{
var maxCount = ChannelOpts.Capacity;

for (int i = 0; i < maxCount; i++)
{
_natsMsgChannel.Writer.TryWrite(new NatsMsg<string>("t", default, 3, default, "foo", default));
}

var readTask = Task.Run(
async () =>
{
var reader = _natsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
await reader.ReadAsync(_cts.Token);
}
});

await readTask;
}

[Benchmark]
public async Task RunInFlightNatsMsgChannel_Async_Happy()
{
var maxCount = ChannelOpts.Capacity;

var writer = _inFlightNatsMsgChannel.Writer;
for (int i = 0; i < maxCount; i++)
{
writer.TryWrite(new InFlightNatsMsg<string>("t", default, 3, default, "foo"));
}

var readTask = Task.Run(
async () =>
{
var reader = _inFlightNatsMsgChannel.Reader;
for (int i = 0; i < maxCount; i++)
{
await reader.ReadAsync(_cts.Token);
}
});
await readTask;
}
}
Loading
Loading