Skip to content

Commit

Permalink
Improve block propagation performance (#879)
Browse files Browse the repository at this point in the history
* Strip task

* Reduce wating time

* Skip reply listening when we don't need

* Adopt fire-and-shot for broadcasting

* Remove unnecessary delaying

* Update changelogs

* Apply suggestions from code review

Co-authored-by: Hong Minhee <hong.minhee@gmail.com>

Co-authored-by: Hong Minhee <hong.minhee@gmail.com>
  • Loading branch information
longfin and dahlia authored May 27, 2020
1 parent 1c35f89 commit 488dffd
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 38 deletions.
7 changes: 5 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ To be released.

### Behavioral changes

- Improved performance of `Swarm<T>` by multiplexing response and
broadcast. [[#858], [#859]]
- Improved performance of `Swarm<T>`.
- Multiplexed response and broadcast. [[#858], [#859]]
- Reduced internal delays. [[#871], [#879]]
- `Transaction<T>.Create()`, `Transaction<T>.EvaluateActions()` and
`Transaction<T>.EvaluateActionsGradually()` no longer throw
`UnexpectedlyTerminatedActionException` directly. Instead, it records
Expand All @@ -48,8 +49,10 @@ To be released.
[#859]: https://github.com/planetarium/libplanet/pull/859
[#860]: https://github.com/planetarium/libplanet/issues/860
[#868]: https://github.com/planetarium/libplanet/pull/868
[#871]: https://github.com/planetarium/libplanet/issues/871
[#875]: https://github.com/planetarium/libplanet/pull/875
[#878]: https://github.com/planetarium/libplanet/pull/878
[#879]: https://github.com/planetarium/libplanet/pull/879


Version 0.9.2
Expand Down
75 changes: 42 additions & 33 deletions Libplanet/Net/NetMQTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,8 @@ public void Dispose()

public Task WaitForRunningAsync() => _runningEvent.Task;

public async Task SendMessageAsync(BoundPeer peer, Message message)
{
await SendMessageWithReplyAsync(peer, message, TimeSpan.FromSeconds(3), 0);
}
public Task SendMessageAsync(BoundPeer peer, Message message)
=> SendMessageWithReplyAsync(peer, message, TimeSpan.FromSeconds(3), 0);

public async Task<Message> SendMessageWithReplyAsync(
BoundPeer peer,
Expand Down Expand Up @@ -490,18 +488,25 @@ await _requests.AddAsync(
Interlocked.Read(ref _requestCount)
);

var reply = (await tcs.Task).ToList();
foreach (var msg in reply)
if (expectedResponses > 0)
{
MessageHistory.Enqueue(msg);
}
var reply = (await tcs.Task).ToList();
foreach (var msg in reply)
{
MessageHistory.Enqueue(msg);
}

const string logMsg =
"Received {ReplyMessageCount} reply messages to {RequestId} " +
"from {PeerAddress}: {ReplyMessages}.";
_logger.Debug(logMsg, reply.Count, reqId, peer.Address, reply);
const string logMsg =
"Received {ReplyMessageCount} reply messages to {RequestId} " +
"from {PeerAddress}: {ReplyMessages}.";
_logger.Debug(logMsg, reply.Count, reqId, peer.Address, reply);

return reply;
return reply;
}
else
{
return new Message[0];
}
}
catch (DifferentAppProtocolVersionException e)
{
Expand Down Expand Up @@ -621,28 +626,32 @@ private void DoBroadcast(object sender, NetMQQueueEventArgs<(Address?, Message)>
(Address? except, Message msg) = e.Queue.Dequeue();

// FIXME Should replace with PUB/SUB model.
try
{
var peers = Protocol.PeersToBroadcast(except).ToList();
_logger.Debug($"Broadcasting message [{msg}]");
_logger.Debug($"Peers to broadcast : {peers.Count}");
Parallel.ForEach(peers, async peer =>
{
await SendMessageAsync(peer, msg);
});
List<BoundPeer> peers = Protocol.PeersToBroadcast(except).ToList();
_logger.Debug("Broadcasting message: {Message}", msg);
_logger.Debug("Peers to broadcast: {PeersCount}", peers.Count);

_logger.Debug($"[{msg}] broadcasting completed.");
}
catch (TimeoutException ex)
{
_logger.Error(ex, $"TimeoutException occurred during {nameof(DoBroadcast)}().");
}
catch (Exception ex)
foreach (BoundPeer peer in peers)
{
_logger.Error(
ex,
$"An unexpected exception occurred during {nameof(DoBroadcast)}()."
);
_ = SendMessageAsync(peer, msg)
.ContinueWith(t =>
{
const string fname = nameof(DoBroadcast);
switch (t.Exception?.InnerException)
{
case TimeoutException te:
_logger.Error(
te,
$"TimeoutException occurred during {fname}()."
);
break;
case Exception ex:
_logger.Error(
ex,
$"An unexpected exception occurred during {fname}()."
);
break;
}
});
}
}

Expand Down
5 changes: 2 additions & 3 deletions Libplanet/Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2098,7 +2098,7 @@ private async Task ProcessFillblock(CancellationToken cancellationToken)
if (_demandBlockHash is null ||
_demandBlockHash.Value.Item1 <= BlockChain.Tip.Index)
{
await Task.Delay(100, cancellationToken);
await Task.Delay(1, cancellationToken);
continue;
}

Expand All @@ -2123,7 +2123,6 @@ await SyncPreviousBlocksAsync(
catch (TimeoutException)
{
_logger.Debug($"Timeout occurred during {nameof(ProcessFillblock)}");
await Task.Delay(100, cancellationToken);
}
catch (Exception e)
{
Expand All @@ -2141,7 +2140,7 @@ private async Task ProcessFillTxs(CancellationToken cancellationToken)
{
if (_demandTxIds.IsEmpty)
{
await Task.Delay(100, cancellationToken);
await Task.Delay(1, cancellationToken);
continue;
}

Expand Down

0 comments on commit 488dffd

Please sign in to comment.