Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move 'BindingProxies' from 'NetMQTransport' into 'TurnClient' #868

Merged
merged 9 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ To be released.

### Added APIs

- Added `TurnClient.BindProxies()` method. [[#756], [#868]]

### Behavioral changes

- Improved performance of `Swarm<T>` by multiplexing response and
Expand All @@ -23,8 +25,10 @@ To be released.

### CLI tools

[#756]: https://github.com/planetarium/libplanet/issues/756
[#858]: https://github.com/planetarium/libplanet/issues/858
[#859]: https://github.com/planetarium/libplanet/pull/859
[#868]: https://github.com/planetarium/libplanet/pull/868


Version 0.9.1
Expand Down
26 changes: 26 additions & 0 deletions Libplanet.Stun/Stun/TurnClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Libplanet.Net;
using Libplanet.Stun.Messages;
using Nito.AsyncEx;
using Serilog;
Expand Down Expand Up @@ -218,6 +219,31 @@ public void Dispose()
}
}

public async Task BindProxies(
int listenPort,
CancellationToken cancellationToken = default(CancellationToken))
{
while (!cancellationToken.IsCancellationRequested)
{
#pragma warning disable IDE0067 // We'll dispose of `stream` in proxy task.
NetworkStream stream = await AcceptRelayedStreamAsync(cancellationToken);
#pragma warning restore IDE0067

// TODO We should expose the interface so that library users
// can limit / manage the task.
Func<Task> startAsync = async () =>
{
using var proxy = new NetworkStreamProxy(stream);
await proxy.StartAsync(IPAddress.Loopback, listenPort);
};

#pragma warning disable CS4014
Task.Run(startAsync, cancellationToken)
.ContinueWith(_ => stream.Dispose(), cancellationToken);
#pragma warning restore CS4014
}
}

private async Task SendMessageAsync(
NetworkStream stream,
StunMessage message,
Expand Down
50 changes: 12 additions & 38 deletions Libplanet/Net/NetMQTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,11 @@ public async Task StartAsync(CancellationToken cancellationToken)
_logger.Information($"Listen on {_listenPort}");

_cancellationToken = cancellationToken;
var tasks = new List<Task>();

if (!(_turnClient is null))
{
_publicIPAddress = (await _turnClient.GetMappedAddressAsync()).Address;

if (await _turnClient.IsBehindNAT())
{
_behindNAT = true;
}
_behindNAT = await _turnClient.IsBehindNAT();
}

if (_behindNAT)
Expand All @@ -246,10 +241,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
);
_endPoint = new DnsEndPoint(turnEp.Address.ToString(), turnEp.Port);

// FIXME should be parameterized
tasks.Add(BindingProxies(_cancellationToken));
tasks.Add(BindingProxies(_cancellationToken));
tasks.Add(BindingProxies(_cancellationToken));
List<Task> tasks = BindMultipleProxies(_listenPort.Value, 3, _cancellationToken);
tasks.Add(RefreshAllocate(_cancellationToken));
tasks.Add(RefreshPermissions(_cancellationToken));
}
Expand Down Expand Up @@ -850,34 +842,6 @@ await dealer.SendMultipartMessageAsync(
DateTimeOffset.UtcNow - startedTime);
}

// FIXME: Separate turn related features outside of Transport if possible.
private async Task BindingProxies(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
#pragma warning disable IDE0067 // We'll dispose of `stream` in proxy task.
NetworkStream stream = await _turnClient.AcceptRelayedStreamAsync();
#pragma warning restore IDE0067

// TODO We should expose the interface so that library users
// can limit / manage the task.
#pragma warning disable CS4014
Task.Run(async () =>
{
using var proxy = new NetworkStreamProxy(stream);
await proxy.StartAsync(IPAddress.Loopback, _listenPort.Value);
}).ContinueWith(_ => stream.Dispose());
#pragma warning restore CS4014
}
catch (Exception e)
{
_logger.Error(e, "An unexpected exception occurred. try again...");
}
}
}

private void CheckStarted()
{
if (!Running)
Expand Down Expand Up @@ -1011,6 +975,16 @@ private Task RunPoller(NetMQPoller poller) =>
TaskScheduler.Default
);

private List<Task> BindMultipleProxies(
int listenPort,
int count,
CancellationToken cancellationToken = default(CancellationToken))
{
return Enumerable.Range(1, count)
.Select(x => _turnClient.BindProxies(listenPort, cancellationToken))
.ToList();
}

private readonly struct MessageRequest
{
private readonly int _retried;
Expand Down