Skip to content

Commit

Permalink
Refactor with NetMQPoller
Browse files Browse the repository at this point in the history
  • Loading branch information
longfin committed Apr 26, 2019
1 parent ce36224 commit bdd775b
Showing 1 changed file with 34 additions and 48 deletions.
82 changes: 34 additions & 48 deletions Libplanet/Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Swarm : ICollection<Peer>

private NetMQQueue<Message> _replyQueue;
private NetMQQueue<Message> _broadcastQueue;
private NetMQPoller _queuePoller;

public Swarm(
PrivateKey privateKey,
Expand Down Expand Up @@ -122,6 +123,7 @@ public Swarm(
_router = new RouterSocket();
_replyQueue = new NetMQQueue<Message>();
_broadcastQueue = new NetMQQueue<Message>();
_queuePoller = new NetMQPoller { _replyQueue, _broadcastQueue };

_distributeMutex = new AsyncLock();
_receiveMutex = new AsyncLock();
Expand Down Expand Up @@ -150,6 +152,9 @@ public Swarm(
string loggerId = _privateKey.PublicKey.ToAddress().ToHex();
_logger = Log.ForContext<Swarm>()
.ForContext("SwarmId", loggerId);

_replyQueue.ReceiveReady += DoReply;
_broadcastQueue.ReceiveReady += DoBroadcast;
}

~Swarm()
Expand Down Expand Up @@ -388,6 +393,7 @@ public async Task StopAsync(
await Task.Delay(_linger);
}

_queuePoller.StopAsync();
_router.Dispose();

foreach (DealerSocket s in _dealers.Values)
Expand Down Expand Up @@ -490,8 +496,6 @@ await PreloadAsync(
RepeatDeltaDistributionAsync(
distributeInterval, cancellationToken),
ReceiveMessageAsync(blockChain, cancellationToken),
RepeatReplyAsync(cancellationToken),
RepeatBroadcastAsync(cancellationToken),
};

if (behindNAT)
Expand All @@ -501,6 +505,7 @@ await PreloadAsync(
tasks.Add(RefreshPermissions());
}

_queuePoller.RunAsync();
await Task.WhenAll(tasks);
}
catch (Exception e)
Expand Down Expand Up @@ -1636,58 +1641,39 @@ private async Task RepeatDeltaDistributionAsync(
}
}

private async Task RepeatReplyAsync(CancellationToken token)
private void DoBroadcast(object sender, NetMQQueueEventArgs<Message> e)
{
TimeSpan interval = TimeSpan.FromMilliseconds(100);
while (Running)
{
if (_replyQueue.TryDequeue(out Message reply, interval))
{
_logger.Debug(
$"Reply {reply} to {ByteUtil.Hex(reply.Identity)}...");
NetMQMessage netMQMessage =
reply.ToNetMQMessage(_privateKey);
await _router.SendMultipartMessageAsync(
netMQMessage, cancellationToken: token);
_logger.Debug($"Replied.");
}
Message msg = e.Queue.Dequeue();
NetMQMessage netMQMessage = msg.ToNetMQMessage(_privateKey);

await Task.Delay(interval, token);
// FIXME Should replace with PUB/SUB model.
try
{
Task.WhenAll(
_dealers.Values.Select(s =>
Task.Run(() => s.SendMultipartMessage(netMQMessage))
)
).Wait();
}
}

private async Task RepeatBroadcastAsync(CancellationToken token)
{
TimeSpan interval = TimeSpan.FromMilliseconds(100);
while (Running)
catch (TimeoutException ex)
{
if (_broadcastQueue.TryDequeue(out Message raw, interval))
{
NetMQMessage message = raw.ToNetMQMessage(_privateKey);

// FIXME Should replace with PUB/SUB model.
try
{
await Task.WhenAll(
_dealers.Values.Select(s =>
Task.Run(() => s.SendMultipartMessage(message))
)
);
}
catch (TimeoutException e)
{
_logger.Error(e, "TimeoutException occured.");
}
catch (Exception e)
{
_logger.Error(e, "An unexpected exception occured.");
}
_logger.Error(ex, "TimeoutException occured.");
}
catch (Exception ex)
{
_logger.Error(ex, "An unexpected exception occured.");
}

_logger.Debug($"broadcasted: {raw}");
}
_logger.Debug($"broadcasted: {msg}");
}

await Task.Delay(interval, token);
}
private void DoReply(object sender, NetMQQueueEventArgs<Message> e)
{
Message msg = e.Queue.Dequeue();
_logger.Debug($"Reply {msg} to {ByteUtil.Hex(msg.Identity)}...");
NetMQMessage netMQMessage = msg.ToNetMQMessage(_privateKey);
_router.SendMultipartMessage(netMQMessage);
_logger.Debug($"Replied.");
}

private void CheckStarted()
Expand Down

0 comments on commit bdd775b

Please sign in to comment.