diff --git a/CHANGES.md b/CHANGES.md index 1a6083f03f..0ee620fca2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -28,8 +28,9 @@ To be released. ### Behavioral changes - - Improved performance of `Swarm` by multiplexing response and - broadcast. [[#858], [#859]] + - Improved performance of `Swarm`. + - Multiplexed response and broadcast. [[#858], [#859]] + - Reduced internal delays. [[#871], [#879]] - `Transaction.Create()`, `Transaction.EvaluateActions()` and `Transaction.EvaluateActionsGradually()` no longer throw `UnexpectedlyTerminatedActionException` directly. Instead, it records @@ -48,8 +49,10 @@ To be released. [#859]: https://github.com/planetarium/libplanet/pull/859 [#860]: https://github.com/planetarium/libplanet/issues/860 [#868]: https://github.com/planetarium/libplanet/pull/868 +[#871]: https://github.com/planetarium/libplanet/issues/871 [#875]: https://github.com/planetarium/libplanet/pull/875 [#878]: https://github.com/planetarium/libplanet/pull/878 +[#879]: https://github.com/planetarium/libplanet/pull/879 Version 0.9.2 diff --git a/Libplanet/Net/NetMQTransport.cs b/Libplanet/Net/NetMQTransport.cs index 80d66a0524..2bdc1c3f36 100644 --- a/Libplanet/Net/NetMQTransport.cs +++ b/Libplanet/Net/NetMQTransport.cs @@ -431,10 +431,8 @@ public void Dispose() public Task WaitForRunningAsync() => _runningEvent.Task; - public async Task SendMessageAsync(BoundPeer peer, Message message) - { - await SendMessageWithReplyAsync(peer, message, TimeSpan.FromSeconds(3), 0); - } + public Task SendMessageAsync(BoundPeer peer, Message message) + => SendMessageWithReplyAsync(peer, message, TimeSpan.FromSeconds(3), 0); public async Task SendMessageWithReplyAsync( BoundPeer peer, @@ -490,18 +488,25 @@ await _requests.AddAsync( Interlocked.Read(ref _requestCount) ); - var reply = (await tcs.Task).ToList(); - foreach (var msg in reply) + if (expectedResponses > 0) { - MessageHistory.Enqueue(msg); - } + var reply = (await tcs.Task).ToList(); + foreach (var msg in reply) + { + MessageHistory.Enqueue(msg); + } - const string logMsg = - "Received {ReplyMessageCount} reply messages to {RequestId} " + - "from {PeerAddress}: {ReplyMessages}."; - _logger.Debug(logMsg, reply.Count, reqId, peer.Address, reply); + const string logMsg = + "Received {ReplyMessageCount} reply messages to {RequestId} " + + "from {PeerAddress}: {ReplyMessages}."; + _logger.Debug(logMsg, reply.Count, reqId, peer.Address, reply); - return reply; + return reply; + } + else + { + return new Message[0]; + } } catch (DifferentAppProtocolVersionException e) { @@ -621,28 +626,32 @@ private void DoBroadcast(object sender, NetMQQueueEventArgs<(Address?, Message)> (Address? except, Message msg) = e.Queue.Dequeue(); // FIXME Should replace with PUB/SUB model. - try - { - var peers = Protocol.PeersToBroadcast(except).ToList(); - _logger.Debug($"Broadcasting message [{msg}]"); - _logger.Debug($"Peers to broadcast : {peers.Count}"); - Parallel.ForEach(peers, async peer => - { - await SendMessageAsync(peer, msg); - }); + List peers = Protocol.PeersToBroadcast(except).ToList(); + _logger.Debug("Broadcasting message: {Message}", msg); + _logger.Debug("Peers to broadcast: {PeersCount}", peers.Count); - _logger.Debug($"[{msg}] broadcasting completed."); - } - catch (TimeoutException ex) - { - _logger.Error(ex, $"TimeoutException occurred during {nameof(DoBroadcast)}()."); - } - catch (Exception ex) + foreach (BoundPeer peer in peers) { - _logger.Error( - ex, - $"An unexpected exception occurred during {nameof(DoBroadcast)}()." - ); + _ = SendMessageAsync(peer, msg) + .ContinueWith(t => + { + const string fname = nameof(DoBroadcast); + switch (t.Exception?.InnerException) + { + case TimeoutException te: + _logger.Error( + te, + $"TimeoutException occurred during {fname}()." + ); + break; + case Exception ex: + _logger.Error( + ex, + $"An unexpected exception occurred during {fname}()." + ); + break; + } + }); } } diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index 8f6958a23f..c480f42dfb 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -2098,7 +2098,7 @@ private async Task ProcessFillblock(CancellationToken cancellationToken) if (_demandBlockHash is null || _demandBlockHash.Value.Item1 <= BlockChain.Tip.Index) { - await Task.Delay(100, cancellationToken); + await Task.Delay(1, cancellationToken); continue; } @@ -2123,7 +2123,6 @@ await SyncPreviousBlocksAsync( catch (TimeoutException) { _logger.Debug($"Timeout occurred during {nameof(ProcessFillblock)}"); - await Task.Delay(100, cancellationToken); } catch (Exception e) { @@ -2141,7 +2140,7 @@ private async Task ProcessFillTxs(CancellationToken cancellationToken) { if (_demandTxIds.IsEmpty) { - await Task.Delay(100, cancellationToken); + await Task.Delay(1, cancellationToken); continue; }