Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move 'BindingProxies' from 'NetMQTransport' into 'TurnClient' #868

Merged
merged 9 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions Libplanet.Stun/Stun/TurnClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Libplanet.Net;
using Libplanet.Stun.Messages;
using Nito.AsyncEx;
using Serilog;
Expand Down Expand Up @@ -218,6 +219,30 @@ public void Dispose()
}
}

public async Task BindingProxies(
riemannulus marked this conversation as resolved.
Show resolved Hide resolved
CancellationToken cancellationToken,
riemannulus marked this conversation as resolved.
Show resolved Hide resolved
int? listenPort)
riemannulus marked this conversation as resolved.
Show resolved Hide resolved
{
while (!cancellationToken.IsCancellationRequested)
{
#pragma warning disable IDE0067 // We'll dispose of `stream` in proxy task.
NetworkStream stream = await AcceptRelayedStreamAsync();
riemannulus marked this conversation as resolved.
Show resolved Hide resolved
#pragma warning restore IDE0067

// TODO We should expose the interface so that library users
// can limit / manage the task.
Func<Task> f = async () =>
riemannulus marked this conversation as resolved.
Show resolved Hide resolved
{
using var proxy = new NetworkStreamProxy(stream);
await proxy.StartAsync(IPAddress.Loopback, listenPort.Value);
};

#pragma warning disable CS4014
Task.Run(f).ContinueWith(_ => stream.Dispose());
riemannulus marked this conversation as resolved.
Show resolved Hide resolved
#pragma warning restore CS4014
}
}

private async Task SendMessageAsync(
NetworkStream stream,
StunMessage message,
Expand Down
51 changes: 13 additions & 38 deletions Libplanet/Net/NetMQTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,11 @@ public async Task StartAsync(CancellationToken cancellationToken)
_logger.Information($"Listen on {_listenPort}");

_cancellationToken = cancellationToken;
var tasks = new List<Task>();

if (!(_turnClient is null))
{
_publicIPAddress = (await _turnClient.GetMappedAddressAsync()).Address;

if (await _turnClient.IsBehindNAT())
{
_behindNAT = true;
}
_behindNAT = await _turnClient.IsBehindNAT();
}

if (_behindNAT)
Expand All @@ -246,12 +241,10 @@ public async Task StartAsync(CancellationToken cancellationToken)
);
_endPoint = new DnsEndPoint(turnEp.Address.ToString(), turnEp.Port);

// FIXME should be parameterized
tasks.Add(BindingProxies(_cancellationToken));
tasks.Add(BindingProxies(_cancellationToken));
tasks.Add(BindingProxies(_cancellationToken));
var tasks = BindingMultipleProxies(_cancellationToken, _listenPort, 3);
riemannulus marked this conversation as resolved.
Show resolved Hide resolved
tasks.Add(RefreshAllocate(_cancellationToken));
tasks.Add(RefreshPermissions(_cancellationToken));
await await Task.WhenAny(tasks);
}
else if (_host is null)
{
Expand Down Expand Up @@ -850,34 +843,6 @@ await dealer.SendMultipartMessageAsync(
DateTimeOffset.UtcNow - startedTime);
}

// FIXME: Separate turn related features outside of Transport if possible.
private async Task BindingProxies(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
#pragma warning disable IDE0067 // We'll dispose of `stream` in proxy task.
NetworkStream stream = await _turnClient.AcceptRelayedStreamAsync();
#pragma warning restore IDE0067

// TODO We should expose the interface so that library users
// can limit / manage the task.
#pragma warning disable CS4014
Task.Run(async () =>
{
using var proxy = new NetworkStreamProxy(stream);
await proxy.StartAsync(IPAddress.Loopback, _listenPort.Value);
}).ContinueWith(_ => stream.Dispose());
#pragma warning restore CS4014
}
catch (Exception e)
{
_logger.Error(e, "An unexpected exception occurred. try again...");
}
}
}

private void CheckStarted()
{
if (!Running)
Expand Down Expand Up @@ -1011,6 +976,16 @@ private Task RunPoller(NetMQPoller poller) =>
TaskScheduler.Default
);

private List<Task> BindingMultipleProxies(
riemannulus marked this conversation as resolved.
Show resolved Hide resolved
CancellationToken cancellationToken,
longfin marked this conversation as resolved.
Show resolved Hide resolved
riemannulus marked this conversation as resolved.
Show resolved Hide resolved
int? listenPort,
longfin marked this conversation as resolved.
Show resolved Hide resolved
int count)
{
return Enumerable.Range(1, count)
.Select(x => _turnClient.BindingProxies(_cancellationToken, listenPort.Value))
.ToList();
}

private readonly struct MessageRequest
{
private readonly int _retried;
Expand Down