diff --git a/CHANGES.md b/CHANGES.md index 8e15c799fd..f1abc36d65 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -106,6 +106,8 @@ To be released. index of a block that does not exist locally. [[#208], [#317]] - Fixed a bug that `Swarm` had not dial to other peer after `Swarm.PreloadAsync()`. [[#311]] + - Fixed an issue where unknown exceptions occurred when `Swarm` receiving + a message. [[#321], [#327]] [LiteDB]: https://www.litedb.org/ @@ -135,8 +137,10 @@ To be released. [#310]: https://github.com/planetarium/libplanet/pull/310 [#311]: https://github.com/planetarium/libplanet/pull/311 [#317]: https://github.com/planetarium/libplanet/pull/317 +[#321]: https://github.com/planetarium/libplanet/pull/321 [#324]: https://github.com/planetarium/libplanet/pull/324 [#326]: https://github.com/planetarium/libplanet/pull/326 +[#327]: https://github.com/planetarium/libplanet/pull/327 [#329]: https://github.com/planetarium/libplanet/pull/329 Version 0.3.0 diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index b3965f296a..62754aac9e 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -58,7 +58,7 @@ public class Swarm private readonly NetMQQueue _replyQueue; private readonly NetMQQueue _broadcastQueue; - private readonly NetMQPoller _queuePoller; + private readonly NetMQPoller _poller; private readonly ILogger _logger; @@ -141,7 +141,7 @@ public Swarm( _router.Options.RouterHandover = true; _replyQueue = new NetMQQueue(); _broadcastQueue = new NetMQQueue(); - _queuePoller = new NetMQPoller { _replyQueue, _broadcastQueue }; + _poller = new NetMQPoller { _router, _replyQueue, _broadcastQueue }; _receiveMutex = new AsyncLock(); _blockSyncMutex = new AsyncLock(); @@ -170,6 +170,7 @@ public Swarm( _logger = Log.ForContext>() .ForContext("SwarmId", loggerId); + _router.ReceiveReady += ReceiveMessage; _replyQueue.ReceiveReady += DoReply; _broadcastQueue.ReceiveReady += DoBroadcast; } @@ -339,7 +340,6 @@ public async Task StopAsync( { if (Running) { - _router.Dispose(); _removedPeers[AsPeer] = DateTimeOffset.UtcNow; DistributeDelta(false); @@ -350,14 +350,16 @@ public async Task StopAsync( _broadcastQueue.ReceiveReady -= DoBroadcast; _replyQueue.ReceiveReady -= DoReply; + _router.ReceiveReady -= ReceiveMessage; - if (_queuePoller.IsRunning) + if (_poller.IsRunning) { - _queuePoller.Dispose(); + _poller.Dispose(); } _broadcastQueue.Dispose(); _replyQueue.Dispose(); + _router.Dispose(); foreach (DealerSocket s in _dealers.Values) { @@ -449,16 +451,14 @@ public async Task StartAsync( using (await _runningMutex.LockAsync()) { Running = true; - await PreloadAsync( - cancellationToken: _cancellationToken); + await PreloadAsync(cancellationToken: _cancellationToken); } var tasks = new List { RepeatDeltaDistributionAsync(distributeInterval, _cancellationToken), - ReceiveMessageAsync(_cancellationToken), BroadcastTxAsync(broadcastTxInterval, _cancellationToken), - Task.Run(() => _queuePoller.Run(), _cancellationToken), + Task.Run(() => _poller.Run(), _cancellationToken), }; if (behindNAT) @@ -814,68 +814,6 @@ await Task.WhenAll( } } - private async Task ReceiveMessageAsync( - CancellationToken cancellationToken) - { - while (!cancellationToken.IsCancellationRequested) - { - try - { - NetMQMessage raw; - try - { - raw = await _router.ReceiveMultipartMessageAsync( - timeout: TimeSpan.FromMilliseconds(100), - cancellationToken: cancellationToken); - } - catch (TimeoutException) - { - // Ignore this exception because it's expected - // when there is no received message in duration. - continue; - } - - _logger.Verbose($"The raw message[{raw}] has received."); - Message message = Message.Parse(raw, reply: false); - _logger.Debug($"The message[{message}] has parsed."); - - // Queue a task per message to avoid blocking. - #pragma warning disable CS4014 - Task.Run( - async () => - { - // it's still async because some method it relies - // are async yet. - await ProcessMessageAsync( - message, - cancellationToken - ); - }, - cancellationToken); - #pragma warning restore CS4014 - } - catch (InvalidMessageException e) - { - _logger.Error( - e, - "Could not parse NetMQMessage properly; ignore." - ); - } - catch (TaskCanceledException e) - { - _logger.Information(e, "Task was canceled."); - } - catch (Exception e) - { - _logger.Error( - e, - "An unexpected exception occured during ReceiveMessageAsync()" - ); - throw; - } - } - } - private async Task BroadcastTxAsync( TimeSpan broadcastTxInterval, CancellationToken cancellationToken) @@ -1693,6 +1631,32 @@ private async Task RepeatDeltaDistributionAsync( } } + private void ReceiveMessage(object sender, NetMQSocketEventArgs e) + { + try + { + NetMQMessage raw = e.Socket.ReceiveMultipartMessage(); + + _logger.Verbose($"The raw message[{raw}] has received."); + Message message = Message.Parse(raw, reply: false); + _logger.Debug($"The message[{message}] has parsed."); + + // it's still async because some method it relies are async yet. + Task.Run( + async () => { await ProcessMessageAsync(message, _cancellationToken); }, + _cancellationToken); + } + catch (InvalidMessageException ex) + { + _logger.Error(ex, "Could not parse NetMQMessage properly; ignore."); + } + catch (Exception ex) + { + _logger.Error(ex, "An unexpected exception occured during ReceiveMessage()."); + throw; + } + } + private void DoBroadcast(object sender, NetMQQueueEventArgs e) { Message msg = e.Queue.Dequeue();