Skip to content

Commit

Permalink
Merge pull request #1120 from limebell/refactor/protocol
Browse files Browse the repository at this point in the history
Separate peer discovery from transport
  • Loading branch information
longfin authored Dec 16, 2020
2 parents 11a01b2 + 62dc896 commit 892f99a
Show file tree
Hide file tree
Showing 13 changed files with 505 additions and 510 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ To be released.
- Added `Libplanet.Blockchain.Renderers.Debug.InvalidRenderException<T>`
class. [[#1119]]
- Added `InvalidBlockTxHashException` class. [[#1116]]
- Removed `Swarm<T>.TraceTable()` method. [[#1120]]
- Added `Swarm<T>.PeerStates` property. [[#1120]]
- Added `IProtocol` interface. [[#1120]]
- Added `KademliaProtocol` class which implements `IProtocol`. [[#1120]]

### Behavioral changes

Expand Down Expand Up @@ -107,6 +111,7 @@ To be released.
[#1116]: https://github.com/planetarium/libplanet/pull/1116
[#1117]: https://github.com/planetarium/libplanet/pull/1117
[#1119]: https://github.com/planetarium/libplanet/pull/1119
[#1120]: https://github.com/planetarium/libplanet/pull/1120
[#1124]: https://github.com/planetarium/libplanet/pull/1124
[#1125]: https://github.com/planetarium/libplanet/pull/1125
[#1132]: https://github.com/planetarium/libplanet/pull/1132
Expand Down
68 changes: 33 additions & 35 deletions Libplanet.Tests/Net/Protocols/ProtocolTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public async Task Ping()

Assert.Single(transportA.ReceivedMessages);
Assert.Single(transportB.ReceivedMessages);
Assert.Contains(transportA.AsPeer, transportB.Protocol.Peers);
Assert.Contains(transportA.AsPeer, transportB.Peers);
}
finally
{
Expand Down Expand Up @@ -113,8 +113,8 @@ public async Task PingTwice()

Assert.Equal(2, transportA.ReceivedMessages.Count);
Assert.Equal(2, transportB.ReceivedMessages.Count);
Assert.Contains(transportA.AsPeer, transportB.Protocol.Peers);
Assert.Contains(transportB.AsPeer, transportA.Protocol.Peers);
Assert.Contains(transportA.AsPeer, transportB.Peers);
Assert.Contains(transportB.AsPeer, transportA.Peers);
}
finally
{
Expand All @@ -136,8 +136,8 @@ public async Task PingToClosedPeer()

await transportA.AddPeersAsync(new[] { transportB.AsPeer, transportC.AsPeer }, null);

Assert.Contains(transportB.AsPeer, transportA.Protocol.Peers);
Assert.Contains(transportC.AsPeer, transportA.Protocol.Peers);
Assert.Contains(transportB.AsPeer, transportA.Peers);
Assert.Contains(transportC.AsPeer, transportA.Peers);

await transportC.StopAsync(TimeSpan.Zero);
await Assert.ThrowsAsync<TimeoutException>(
Expand All @@ -146,7 +146,7 @@ await Assert.ThrowsAsync<TimeoutException>(
TimeSpan.FromSeconds(3)));
await transportA.AddPeersAsync(new[] { transportB.AsPeer }, null);

Assert.Contains(transportB.AsPeer, transportA.Protocol.Peers);
Assert.Contains(transportB.AsPeer, transportA.Peers);

await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
Expand Down Expand Up @@ -183,12 +183,12 @@ public async Task BootstrapAsyncTest()
await transportB.BootstrapAsync(new[] { transportA.AsPeer });
await transportC.BootstrapAsync(new[] { transportA.AsPeer });

Assert.Contains(transportB.AsPeer, transportC.Protocol.Peers);
Assert.Contains(transportC.AsPeer, transportB.Protocol.Peers);
Assert.Contains(transportB.AsPeer, transportC.Peers);
Assert.Contains(transportC.AsPeer, transportB.Peers);

((KademliaProtocol)transportA.Protocol).ClearTable();
((KademliaProtocol)transportB.Protocol).ClearTable();
((KademliaProtocol)transportC.Protocol).ClearTable();
transportA.Table.Clear();
transportB.Table.Clear();
transportC.Table.Clear();

await transportB.AddPeersAsync(new[] { transportC.AsPeer }, null);
await transportC.StopAsync(TimeSpan.Zero);
Expand All @@ -214,12 +214,12 @@ public async Task RemoveStalePeers()
await StartTestTransportAsync(transportB);

await transportA.AddPeersAsync(new[] { transportB.AsPeer }, null);
Assert.Single(transportA.Protocol.Peers);
Assert.Single(transportA.Peers);

await transportB.StopAsync(TimeSpan.Zero);
await Task.Delay(100);
await transportA.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
Assert.Empty(transportA.Protocol.Peers);
Assert.Empty(transportA.Peers);
await transportA.StopAsync(TimeSpan.Zero);
}

Expand All @@ -240,10 +240,10 @@ public async Task RoutingTableFull()
await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);

Assert.Single(transportA.Protocol.Peers);
Assert.Contains(transportA.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportC.AsPeer, transport.Protocol.Peers);
Assert.Single(transportA.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Peers);
Assert.DoesNotContain(transportC.AsPeer, transport.Peers);

await transport.StopAsync(TimeSpan.Zero);
await transportA.StopAsync(TimeSpan.Zero);
Expand All @@ -269,18 +269,18 @@ public async Task ReplacementCache()
await Task.Delay(100);
await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);

Assert.Single(transportA.Protocol.Peers);
Assert.Contains(transportA.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
Assert.Single(transportA.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Peers);

await transportA.StopAsync(TimeSpan.Zero);
await transport.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
await transport.Protocol.CheckReplacementCacheAsync(default(CancellationToken));

Assert.Single(transport.Protocol.Peers);
Assert.DoesNotContain(transportA.AsPeer, transport.Protocol.Peers);
Assert.Contains(transportB.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportC.AsPeer, transport.Protocol.Peers);
Assert.Single(transport.Peers);
Assert.DoesNotContain(transportA.AsPeer, transport.Peers);
Assert.Contains(transportB.AsPeer, transport.Peers);
Assert.DoesNotContain(transportC.AsPeer, transport.Peers);

await transport.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
Expand All @@ -303,9 +303,9 @@ public async Task RemoveDeadReplacementCache()
await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);

Assert.Single(transport.Protocol.Peers);
Assert.Contains(transportA.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
Assert.Single(transport.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Peers);

await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
Expand All @@ -314,10 +314,10 @@ public async Task RemoveDeadReplacementCache()
await transport.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
await transport.Protocol.CheckReplacementCacheAsync(default(CancellationToken));

Assert.Single(transport.Protocol.Peers);
Assert.DoesNotContain(transportA.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
Assert.Contains(transportC.AsPeer, transport.Protocol.Peers);
Assert.Single(transport.Peers);
Assert.DoesNotContain(transportA.AsPeer, transport.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Peers);
Assert.Contains(transportC.AsPeer, transport.Peers);

await transport.StopAsync(TimeSpan.Zero);
await transportC.StopAsync(TimeSpan.Zero);
Expand Down Expand Up @@ -401,8 +401,6 @@ public async Task BroadcastGuarantee()
await t1.BootstrapAsync(new[] { seed.AsPeer });
await t2.BootstrapAsync(new[] { seed.AsPeer });

Log.Debug(seed.Protocol.Trace());

Log.Debug("Bootstrap completed.");

var tcs = new CancellationTokenSource();
Expand Down Expand Up @@ -491,8 +489,8 @@ public async Task DoNotBroadcastToSourcePeer()
private TestTransport CreateTestTransport(
PrivateKey privateKey = null,
bool blockBroadcast = false,
int? tableSize = null,
int? bucketSize = null,
int tableSize = Kademlia.TableSize,
int bucketSize = Kademlia.BucketSize,
TimeSpan? networkDelay = null)
{
return new TestTransport(
Expand Down
16 changes: 3 additions & 13 deletions Libplanet.Tests/Net/Protocols/RoutingTableTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,7 @@ public void AddPeer()
var pubKey1 = new PrivateKey().PublicKey;
var pubKey2 = new PrivateKey().PublicKey;
var pubKey3 = new PrivateKey().PublicKey;
var table = new RoutingTable(
pubKey0.ToAddress(),
1,
2,
new System.Random(),
Logger.None);
var table = new RoutingTable(pubKey0.ToAddress(), 1, 2);
var peer1 = new BoundPeer(pubKey1, new DnsEndPoint("0.0.0.0", 1234));
var peer2 = new BoundPeer(pubKey2, new DnsEndPoint("0.0.0.0", 1234));
var peer3 = new BoundPeer(pubKey3, new DnsEndPoint("0.0.0.0", 1234));
Expand All @@ -143,12 +138,7 @@ public void RemovePeer()
{
var pubKey1 = new PrivateKey().PublicKey;
var pubKey2 = new PrivateKey().PublicKey;
var table = new RoutingTable(
pubKey1.ToAddress(),
1,
2,
new System.Random(),
Logger.None);
var table = new RoutingTable(pubKey1.ToAddress(), 1, 2);
var peer1 = new BoundPeer(pubKey1, new DnsEndPoint("0.0.0.0", 1234));
var peer2 = new BoundPeer(pubKey2, new DnsEndPoint("0.0.0.0", 1234));

Expand All @@ -164,7 +154,7 @@ public void RemovePeer()

private RoutingTable CreateTable(Address addr)
{
return new RoutingTable(addr, TableSize, BucketSize, new System.Random(), Logger.None);
return new RoutingTable(addr);
}
}
}
19 changes: 8 additions & 11 deletions Libplanet.Tests/Net/Protocols/TestTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public TestTransport(
Dictionary<Address, TestTransport> transports,
PrivateKey privateKey,
bool blockBroadcast,
int? tableSize,
int? bucketSize,
int tableSize,
int bucketSize,
TimeSpan? networkDelay)
{
_privateKey = privateKey;
Expand All @@ -58,13 +58,8 @@ public TestTransport(
_requests = new AsyncCollection<Request>();
_ignoreTestMessageWithData = new List<string>();
_random = new Random();
Protocol = new KademliaProtocol(
this,
Address,
_logger,
tableSize,
bucketSize
);
Table = new RoutingTable(Address, tableSize, bucketSize);
Protocol = new KademliaProtocol(Table, this, Address);
}

public event EventHandler<Message> ProcessMessageHandler;
Expand All @@ -77,12 +72,14 @@ public TestTransport(
_privateKey.PublicKey,
new DnsEndPoint("localhost", 1234));

public IEnumerable<BoundPeer> Peers => Protocol.Peers;
public IEnumerable<BoundPeer> Peers => Table.Peers;

public DateTimeOffset? LastMessageTimestamp { get; private set; }

internal ConcurrentBag<Message> ReceivedMessages { get; }

internal RoutingTable Table { get; }

internal IProtocol Protocol { get; }

internal bool Running => !(_swarmCancellationTokenSource is null);
Expand Down Expand Up @@ -274,7 +271,7 @@ public void BroadcastTestMessage(Address? except, string data)

public void BroadcastMessage(Address? except, Message message)
{
var peers = Protocol.PeersToBroadcast(except).ToList();
var peers = Table.PeersToBroadcast(except).ToList();
var peersString = string.Join(", ", peers.Select(peer => peer.Address));
_logger.Debug(
"Broadcasting test message {Data} to {Count} peers which are: {Peers}",
Expand Down
9 changes: 5 additions & 4 deletions Libplanet.Tests/Net/SwarmTest.Fixtures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Libplanet.Blocks;
using Libplanet.Crypto;
using Libplanet.Net;
using Libplanet.Net.Protocols;
using Libplanet.Tests.Common.Action;
using Libplanet.Tests.Store;
using Serilog;
Expand Down Expand Up @@ -69,8 +70,8 @@ public partial class SwarmTest
private Swarm<DumbAction> CreateSwarm(
PrivateKey privateKey = null,
AppProtocolVersion? appProtocolVersion = null,
int? tableSize = null,
int? bucketSize = null,
int tableSize = Kademlia.TableSize,
int bucketSize = Kademlia.BucketSize,
string host = null,
int? listenPort = null,
DateTimeOffset? createdAt = null,
Expand Down Expand Up @@ -101,8 +102,8 @@ private Swarm<T> CreateSwarm<T>(
BlockChain<T> blockChain,
PrivateKey privateKey = null,
AppProtocolVersion? appProtocolVersion = null,
int? tableSize = null,
int? bucketSize = null,
int tableSize = Kademlia.TableSize,
int bucketSize = Kademlia.BucketSize,
string host = null,
int? listenPort = null,
DateTimeOffset? createdAt = null,
Expand Down
8 changes: 5 additions & 3 deletions Libplanet.Tests/Net/SwarmTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ public async Task BroadcastBlockToReconnectedPeer()

await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null);
await StopAsync(swarmA);
await seed.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
await seed.PeerDiscovery.RefreshTableAsync(
TimeSpan.Zero,
default(CancellationToken));

Assert.DoesNotContain(swarmA.AsPeer, seed.Peers);

Expand Down Expand Up @@ -1267,7 +1269,7 @@ async Task RefreshTableAsync(CancellationToken cancellationToken)
await Task.Delay(1000, cancellationToken);
try
{
await swarmA.Protocol.RefreshTableAsync(
await swarmA.PeerDiscovery.RefreshTableAsync(
TimeSpan.FromSeconds(1), cancellationToken);
}
catch (InvalidOperationException)
Expand Down Expand Up @@ -2015,7 +2017,7 @@ public async Task FindSpecificPeerAsyncDepthFail()
TimeSpan.FromMilliseconds(3000));

Assert.Equal(swarmC.AsPeer.Address, foundPeer.Address);
((KademliaProtocol)swarmA.Protocol).ClearTable();
swarmA.RoutingTable.Clear();
Assert.Empty(swarmA.Peers);
await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null);

Expand Down
31 changes: 0 additions & 31 deletions Libplanet/Net/ITransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ public interface ITransport : IDisposable
[Pure]
Peer AsPeer { get; }

/// <summary>
/// List of all <see cref="Peer"/>s in the routing table.
/// </summary>
[Pure]
IEnumerable<BoundPeer> Peers { get; }

/// <summary>
/// The <see cref="DateTimeOffset"/> of the last message was received.
/// </summary>
Expand Down Expand Up @@ -70,31 +64,6 @@ Task StopAsync(
TimeSpan waitFor,
CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Conducts peer discovery for given <paramref name="bootstrapPeers"/>.
/// </summary>
/// <param name="bootstrapPeers">A <see cref="IEnumerable{T}"/> of <see cref="Peer"/>s
/// to bootstrap.</param>
/// <param name="pingSeedTimeout">A timeout of waiting for the reply of <see cref="Ping"/>
/// message sent to seed <see cref="Peer"/>.
/// If <c>null</c> is given, the task never halts by itself
/// even no any response was given from the the target seed.</param>
/// <param name="findNeighborsTimeout">A timeout of waiting for the reply of
/// <see cref="FindNeighbors"/> message sent to seed <see cref="Peer"/>.
/// If <c>null</c> is given, task never halts by itself
/// even the target seed gives no any response.</param>
/// <param name="depth">Recursive operation depth to search peers from network.</param>
/// <param name="cancellationToken">
/// A cancellation token used to propagate notification that this
/// operation should be canceled.</param>
/// <returns>An awaitable task without value.</returns>
Task BootstrapAsync(
IEnumerable<BoundPeer> bootstrapPeers,
TimeSpan? pingSeedTimeout,
TimeSpan? findNeighborsTimeout,
int depth,
CancellationToken cancellationToken);

/// <summary>
/// Sends the <paramref name="message"/> to given <paramref name="peer"/>.
/// </summary>
Expand Down
Loading

0 comments on commit 892f99a

Please sign in to comment.