Skip to content

Commit

Permalink
Move ReceiveMessageAsync to poller
Browse files Browse the repository at this point in the history
Co-Authored-By: Swen Mun <longfinfunnel@gmail.com>
  • Loading branch information
earlbread and longfin committed Jul 4, 2019
1 parent 0e1c52c commit fa1ae9b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 71 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ To be released.
index of a block that does not exist locally. [[#208], [#317]]
- Fixed a bug that `Swarm` had not dial to other peer after
`Swarm.PreloadAsync()`. [[#311]]
- Fixed an issue where unknown exceptions occurred when `Swarm<T>` receiving
a message. [[#321], [#327]]


[LiteDB]: https://www.litedb.org/
Expand Down Expand Up @@ -135,8 +137,10 @@ To be released.
[#310]: https://github.com/planetarium/libplanet/pull/310
[#311]: https://github.com/planetarium/libplanet/pull/311
[#317]: https://github.com/planetarium/libplanet/pull/317
[#321]: https://github.com/planetarium/libplanet/pull/321
[#324]: https://github.com/planetarium/libplanet/pull/324
[#326]: https://github.com/planetarium/libplanet/pull/326
[#327]: https://github.com/planetarium/libplanet/pull/327
[#329]: https://github.com/planetarium/libplanet/pull/329

Version 0.3.0
Expand Down
106 changes: 35 additions & 71 deletions Libplanet/Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class Swarm<T>

private readonly NetMQQueue<Message> _replyQueue;
private readonly NetMQQueue<Message> _broadcastQueue;
private readonly NetMQPoller _queuePoller;
private readonly NetMQPoller _poller;

private readonly ILogger _logger;

Expand Down Expand Up @@ -141,7 +141,7 @@ public Swarm(
_router.Options.RouterHandover = true;
_replyQueue = new NetMQQueue<Message>();
_broadcastQueue = new NetMQQueue<Message>();
_queuePoller = new NetMQPoller { _replyQueue, _broadcastQueue };
_poller = new NetMQPoller { _router, _replyQueue, _broadcastQueue };

_receiveMutex = new AsyncLock();
_blockSyncMutex = new AsyncLock();
Expand Down Expand Up @@ -170,6 +170,7 @@ public Swarm(
_logger = Log.ForContext<Swarm<T>>()
.ForContext("SwarmId", loggerId);

_router.ReceiveReady += ReceiveMessage;
_replyQueue.ReceiveReady += DoReply;
_broadcastQueue.ReceiveReady += DoBroadcast;
}
Expand Down Expand Up @@ -339,7 +340,6 @@ public async Task StopAsync(
{
if (Running)
{
_router.Dispose();
_removedPeers[AsPeer] = DateTimeOffset.UtcNow;
DistributeDelta(false);

Expand All @@ -350,14 +350,16 @@ public async Task StopAsync(

_broadcastQueue.ReceiveReady -= DoBroadcast;
_replyQueue.ReceiveReady -= DoReply;
_router.ReceiveReady -= ReceiveMessage;

if (_queuePoller.IsRunning)
if (_poller.IsRunning)
{
_queuePoller.Dispose();
_poller.Dispose();
}

_broadcastQueue.Dispose();
_replyQueue.Dispose();
_router.Dispose();

foreach (DealerSocket s in _dealers.Values)
{
Expand Down Expand Up @@ -449,16 +451,14 @@ public async Task StartAsync(
using (await _runningMutex.LockAsync())
{
Running = true;
await PreloadAsync(
cancellationToken: _cancellationToken);
await PreloadAsync(cancellationToken: _cancellationToken);
}

var tasks = new List<Task>
{
RepeatDeltaDistributionAsync(distributeInterval, _cancellationToken),
ReceiveMessageAsync(_cancellationToken),
BroadcastTxAsync(broadcastTxInterval, _cancellationToken),
Task.Run(() => _queuePoller.Run(), _cancellationToken),
Task.Run(() => _poller.Run(), _cancellationToken),
};

if (behindNAT)
Expand Down Expand Up @@ -814,68 +814,6 @@ await Task.WhenAll(
}
}

private async Task ReceiveMessageAsync(
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
NetMQMessage raw;
try
{
raw = await _router.ReceiveMultipartMessageAsync(
timeout: TimeSpan.FromMilliseconds(100),
cancellationToken: cancellationToken);
}
catch (TimeoutException)
{
// Ignore this exception because it's expected
// when there is no received message in duration.
continue;
}

_logger.Verbose($"The raw message[{raw}] has received.");
Message message = Message.Parse(raw, reply: false);
_logger.Debug($"The message[{message}] has parsed.");

// Queue a task per message to avoid blocking.
#pragma warning disable CS4014
Task.Run(
async () =>
{
// it's still async because some method it relies
// are async yet.
await ProcessMessageAsync(
message,
cancellationToken
);
},
cancellationToken);
#pragma warning restore CS4014
}
catch (InvalidMessageException e)
{
_logger.Error(
e,
"Could not parse NetMQMessage properly; ignore."
);
}
catch (TaskCanceledException e)
{
_logger.Information(e, "Task was canceled.");
}
catch (Exception e)
{
_logger.Error(
e,
"An unexpected exception occured during ReceiveMessageAsync()"
);
throw;
}
}
}

private async Task BroadcastTxAsync(
TimeSpan broadcastTxInterval,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -1693,6 +1631,32 @@ private async Task RepeatDeltaDistributionAsync(
}
}

private void ReceiveMessage(object sender, NetMQSocketEventArgs e)
{
try
{
NetMQMessage raw = e.Socket.ReceiveMultipartMessage();

_logger.Verbose($"The raw message[{raw}] has received.");
Message message = Message.Parse(raw, reply: false);
_logger.Debug($"The message[{message}] has parsed.");

// it's still async because some method it relies are async yet.
Task.Run(
async () => { await ProcessMessageAsync(message, _cancellationToken); },
_cancellationToken);
}
catch (InvalidMessageException ex)
{
_logger.Error(ex, "Could not parse NetMQMessage properly; ignore.");
}
catch (Exception ex)
{
_logger.Error(ex, "An unexpected exception occured during ReceiveMessage().");
throw;
}
}

private void DoBroadcast(object sender, NetMQQueueEventArgs<Message> e)
{
Message msg = e.Queue.Dequeue();
Expand Down

0 comments on commit fa1ae9b

Please sign in to comment.