Skip to content

Commit

Permalink
Fix message dropping (#890)
Browse files Browse the repository at this point in the history
* Improve performance for ReceiveMessage

* Remove Proxy

* Prevent message dropping

- Sepearted broadcasting logic from netmq runtime to avoid message dropping by DealerSocket.Dispose() before sending.
- Removed linger and timeout in runtime for broadcasting

* Update CHANGES.md

* Apply suggestions from code review

Co-authored-by: Seunghun Lee <waydi1@gmail.com>

* Apply suggestions from code review

Co-authored-by: Hong Minhee <hong.minhee@gmail.com>

Co-authored-by: Seunghun Lee <waydi1@gmail.com>
Co-authored-by: Hong Minhee <hong.minhee@gmail.com>
  • Loading branch information
3 people authored Jun 2, 2020
1 parent 377e454 commit a185dd3
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 140 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,21 @@ To be released.

### Bug fixes

- Fixed a bug that `Swarm<T>` had failed to receive a request from TURN relay
connections. [[#404], [#871], [#890]]

### CLI tools

[#404]: https://github.com/planetarium/libplanet/issues/404
[#756]: https://github.com/planetarium/libplanet/issues/756
[#858]: https://github.com/planetarium/libplanet/issues/858
[#859]: https://github.com/planetarium/libplanet/pull/859
[#860]: https://github.com/planetarium/libplanet/issues/860
[#868]: https://github.com/planetarium/libplanet/pull/868
[#871]: https://github.com/planetarium/libplanet/issues/871
[#875]: https://github.com/planetarium/libplanet/pull/875
[#883]: https://github.com/planetarium/libplanet/pull/883
[#883]: https://github.com/planetarium/libplanet/pull/890


Version 0.9.3
Expand Down
58 changes: 0 additions & 58 deletions Libplanet.Stun/Stun/NetworkStreamProxy.cs

This file was deleted.

35 changes: 20 additions & 15 deletions Libplanet.Stun/Stun/TurnClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Libplanet.Net;
using Libplanet.Stun.Messages;
using Nito.AsyncEx;
using Serilog;
Expand Down Expand Up @@ -225,21 +224,27 @@ public async Task BindProxies(
{
while (!cancellationToken.IsCancellationRequested)
{
#pragma warning disable IDE0067 // We'll dispose of `stream` in proxy task.
NetworkStream stream = await AcceptRelayedStreamAsync(cancellationToken);
#pragma warning restore IDE0067

// TODO We should expose the interface so that library users
// can limit / manage the task.
Func<Task> startAsync = async () =>
{
using var proxy = new NetworkStreamProxy(stream);
await proxy.StartAsync(IPAddress.Loopback, listenPort);
};

var tcpClient = new TcpClient();
#pragma warning disable PC001 // API not supported on all platforms
tcpClient.Connect(new IPEndPoint(IPAddress.Loopback, listenPort));
#pragma warning restore PC001
NetworkStream localStream = tcpClient.GetStream();
NetworkStream turnStream = await AcceptRelayedStreamAsync(cancellationToken);
#pragma warning disable CS4014
Task.Run(startAsync, cancellationToken)
.ContinueWith(_ => stream.Dispose(), cancellationToken);

const int bufferSize = 8042;
Task.WhenAny(
turnStream.CopyToAsync(localStream, bufferSize, cancellationToken),
localStream.CopyToAsync(turnStream, bufferSize, cancellationToken)
).ContinueWith(
t =>
{
turnStream.Dispose();
localStream.Dispose();
tcpClient.Dispose();
},
cancellationToken
);
#pragma warning restore CS4014
}
}
Expand Down
145 changes: 78 additions & 67 deletions Libplanet/Net/NetMQTransport.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
Expand Down Expand Up @@ -56,6 +57,7 @@ internal class NetMQTransport : ITransport

private TaskCompletionSource<object> _runningEvent;
private CancellationToken _cancellationToken;
private ConcurrentDictionary<Address, DealerSocket> _dealers;

/// <summary>
/// The <see cref="EventHandler" /> triggered when the different version of
Expand Down Expand Up @@ -155,6 +157,7 @@ public NetMQTransport(
_logger,
tableSize,
bucketSize);
_dealers = new ConcurrentDictionary<Address, DealerSocket>();
}

/// <summary>
Expand Down Expand Up @@ -316,6 +319,11 @@ public async Task StopAsync(
_router.Dispose();
_turnClient?.Dispose();

foreach (DealerSocket dealer in _dealers.Values)
{
dealer.Dispose();
}

Running = false;
}
}
Expand Down Expand Up @@ -569,56 +577,60 @@ public async Task CheckAllPeersAsync(CancellationToken cancellationToken, TimeSp

private void ReceiveMessage(object sender, NetMQSocketEventArgs e)
{
try
NetMQMessage raw = new NetMQMessage();
while (e.Socket.TryReceiveMultipartMessage(ref raw))
{
NetMQMessage raw = e.Socket.ReceiveMultipartMessage();

if (_cancellationToken.IsCancellationRequested)
try
{
return;
}
if (_cancellationToken.IsCancellationRequested)
{
return;
}

_logger.Verbose(
"A raw message [frame count: {0}] has received.",
raw.FrameCount
);
Message message = Message.Parse(raw, reply: false);
_logger.Debug("A message has parsed: {0}, from {1}", message, message.Remote);
MessageHistory.Enqueue(message);
if (!(message is Ping))
_logger.Verbose(
"A raw message [frame count: {0}] has received.",
raw.FrameCount
);
Message message = Message.Parse(raw, reply: false);
_logger.Debug("A message has parsed: {0}, from {1}", message, message.Remote);
MessageHistory.Enqueue(message);
if (!(message is Ping))
{
ValidateSender(message.Remote);
}

try
{
Protocol.ReceiveMessage(message);
ProcessMessageHandler?.Invoke(this, message);
}
catch (Exception exc)
{
_logger.Error(
exc,
"Something went wrong during message parsing: {0}",
exc);
throw;
}
}
catch (DifferentAppProtocolVersionException)
{
ValidateSender(message.Remote);
_logger.Debug("Ignore message from peer with different version.");
}

try
catch (InvalidMessageException ex)
{
Protocol.ReceiveMessage(message);
ProcessMessageHandler?.Invoke(this, message);
_logger.Error(ex, $"Could not parse NetMQMessage properly; ignore: {{0}}", ex);
}
catch (Exception exc)
catch (Exception ex)
{
const string mname = nameof(ReceiveMessage);
_logger.Error(
exc,
"Something went wrong during message parsing: {0}",
exc);
throw;
ex,
$"An unexpected exception occurred during {mname}(): {{0}}",
ex
);
}
}
catch (DifferentAppProtocolVersionException)
{
_logger.Debug("Ignore message from peer with different version.");
}
catch (InvalidMessageException ex)
{
_logger.Error(ex, $"Could not parse NetMQMessage properly; ignore: {ex}");
}
catch (Exception ex)
{
_logger.Error(
ex,
$"An unexpected exception occurred during {nameof(ReceiveMessage)}(): {ex}"
);
}
}

private void DoBroadcast(object sender, NetMQQueueEventArgs<(Address?, Message)> e)
Expand All @@ -630,28 +642,24 @@ private void DoBroadcast(object sender, NetMQQueueEventArgs<(Address?, Message)>
_logger.Debug("Broadcasting message: {Message}", msg);
_logger.Debug("Peers to broadcast: {PeersCount}", peers.Count);

NetMQMessage message = msg.ToNetMQMessage(_privateKey, AsPeer);

foreach (BoundPeer peer in peers)
{
_ = SendMessageAsync(peer, msg)
.ContinueWith(t =>
{
const string fname = nameof(DoBroadcast);
switch (t.Exception?.InnerException)
{
case TimeoutException te:
_logger.Error(
te,
$"TimeoutException occurred during {fname}()."
);
break;
case Exception ex:
_logger.Error(
ex,
$"An unexpected exception occurred during {fname}()."
);
break;
}
});
if (!_dealers.TryGetValue(peer.Address, out DealerSocket dealer))
{
dealer = new DealerSocket(ToNetMQAddress(peer));
_dealers[peer.Address] = dealer;
}

if (!dealer.TrySendMultipartMessage(TimeSpan.FromSeconds(3), message))
{
_logger.Warning(
"Broadcasting timed out. [Peer: {Peer}, Message: {Message}]",
peer,
msg
);
}
}
}

Expand Down Expand Up @@ -782,11 +790,6 @@ private async Task ProcessRequest(MessageRequest req, CancellationToken cancella

using var dealer = new DealerSocket(ToNetMQAddress(req.Peer));

// FIXME 1 min is an arbitrary value.
// See also https://github.com/planetarium/libplanet/pull/599 and
// https://github.com/planetarium/libplanet/pull/709
dealer.Options.Linger = TimeSpan.FromMinutes(1);

_logger.Debug(
"Trying to send {Message} to {Peer}...",
req.Message,
Expand Down Expand Up @@ -841,9 +844,6 @@ await dealer.SendMultipartMessageAsync(
tcs.TrySetException(te);
}

// Delaying dealer disposing to avoid ObjectDisposedException on NetMQPoller
await Task.Delay(100, cancellationToken);

_logger.Verbose(
"Request {Message}({RequestId}) processed in {TimeSpan}.",
req.Message,
Expand Down Expand Up @@ -934,6 +934,17 @@ private async Task RefreshTableAsync(
await Task.Delay(period, cancellationToken);
await Protocol.RefreshTableAsync(maxAge, cancellationToken);
await Protocol.CheckReplacementCacheAsync(cancellationToken);

ImmutableHashSet<Address> peerAddresses =
Peers.Select(p => p.Address).ToImmutableHashSet();
foreach (Address address in _dealers.Keys)
{
if (!peerAddresses.Contains(address) &&
_dealers.TryGetValue(address, out DealerSocket removed))
{
removed.Dispose();
}
}
}
catch (OperationCanceledException e)
{
Expand Down

0 comments on commit a185dd3

Please sign in to comment.