Skip to content

Commit

Permalink
Separate peer discovery from transport
Browse files Browse the repository at this point in the history
  • Loading branch information
limebell committed Dec 3, 2020
1 parent ae68331 commit 72ca5e0
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 400 deletions.
64 changes: 31 additions & 33 deletions Libplanet.Tests/Net/Protocols/ProtocolTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public async Task Ping()

Assert.Single(transportA.ReceivedMessages);
Assert.Single(transportB.ReceivedMessages);
Assert.Contains(transportA.AsPeer, transportB.Protocol.Peers);
Assert.Contains(transportA.AsPeer, transportB.Peers);
}
finally
{
Expand Down Expand Up @@ -113,8 +113,8 @@ public async Task PingTwice()

Assert.Equal(2, transportA.ReceivedMessages.Count);
Assert.Equal(2, transportB.ReceivedMessages.Count);
Assert.Contains(transportA.AsPeer, transportB.Protocol.Peers);
Assert.Contains(transportB.AsPeer, transportA.Protocol.Peers);
Assert.Contains(transportA.AsPeer, transportB.Peers);
Assert.Contains(transportB.AsPeer, transportA.Peers);
}
finally
{
Expand All @@ -136,8 +136,8 @@ public async Task PingToClosedPeer()

await transportA.AddPeersAsync(new[] { transportB.AsPeer, transportC.AsPeer }, null);

Assert.Contains(transportB.AsPeer, transportA.Protocol.Peers);
Assert.Contains(transportC.AsPeer, transportA.Protocol.Peers);
Assert.Contains(transportB.AsPeer, transportA.Peers);
Assert.Contains(transportC.AsPeer, transportA.Peers);

await transportC.StopAsync(TimeSpan.Zero);
await Assert.ThrowsAsync<TimeoutException>(
Expand All @@ -146,7 +146,7 @@ await Assert.ThrowsAsync<TimeoutException>(
TimeSpan.FromSeconds(3)));
await transportA.AddPeersAsync(new[] { transportB.AsPeer }, null);

Assert.Contains(transportB.AsPeer, transportA.Protocol.Peers);
Assert.Contains(transportB.AsPeer, transportA.Peers);

await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
Expand Down Expand Up @@ -183,12 +183,12 @@ public async Task BootstrapAsyncTest()
await transportB.BootstrapAsync(new[] { transportA.AsPeer });
await transportC.BootstrapAsync(new[] { transportA.AsPeer });

Assert.Contains(transportB.AsPeer, transportC.Protocol.Peers);
Assert.Contains(transportC.AsPeer, transportB.Protocol.Peers);
Assert.Contains(transportB.AsPeer, transportC.Peers);
Assert.Contains(transportC.AsPeer, transportB.Peers);

((KademliaProtocol)transportA.Protocol).ClearTable();
((KademliaProtocol)transportB.Protocol).ClearTable();
((KademliaProtocol)transportC.Protocol).ClearTable();
transportA.Table.Clear();
transportB.Table.Clear();
transportC.Table.Clear();

await transportB.AddPeersAsync(new[] { transportC.AsPeer }, null);
await transportC.StopAsync(TimeSpan.Zero);
Expand All @@ -214,12 +214,12 @@ public async Task RemoveStalePeers()
await StartTestTransportAsync(transportB);

await transportA.AddPeersAsync(new[] { transportB.AsPeer }, null);
Assert.Single(transportA.Protocol.Peers);
Assert.Single(transportA.Peers);

await transportB.StopAsync(TimeSpan.Zero);
await Task.Delay(100);
await transportA.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
Assert.Empty(transportA.Protocol.Peers);
Assert.Empty(transportA.Peers);
await transportA.StopAsync(TimeSpan.Zero);
}

Expand All @@ -240,10 +240,10 @@ public async Task RoutingTableFull()
await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);

Assert.Single(transportA.Protocol.Peers);
Assert.Contains(transportA.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportC.AsPeer, transport.Protocol.Peers);
Assert.Single(transportA.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Peers);
Assert.DoesNotContain(transportC.AsPeer, transport.Peers);

await transport.StopAsync(TimeSpan.Zero);
await transportA.StopAsync(TimeSpan.Zero);
Expand All @@ -269,18 +269,18 @@ public async Task ReplacementCache()
await Task.Delay(100);
await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);

Assert.Single(transportA.Protocol.Peers);
Assert.Contains(transportA.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
Assert.Single(transportA.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Peers);

await transportA.StopAsync(TimeSpan.Zero);
await transport.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
await transport.Protocol.CheckReplacementCacheAsync(default(CancellationToken));

Assert.Single(transport.Protocol.Peers);
Assert.DoesNotContain(transportA.AsPeer, transport.Protocol.Peers);
Assert.Contains(transportB.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportC.AsPeer, transport.Protocol.Peers);
Assert.Single(transport.Peers);
Assert.DoesNotContain(transportA.AsPeer, transport.Peers);
Assert.Contains(transportB.AsPeer, transport.Peers);
Assert.DoesNotContain(transportC.AsPeer, transport.Peers);

await transport.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
Expand All @@ -303,9 +303,9 @@ public async Task RemoveDeadReplacementCache()
await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);

Assert.Single(transport.Protocol.Peers);
Assert.Contains(transportA.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
Assert.Single(transport.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Peers);

await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
Expand All @@ -314,10 +314,10 @@ public async Task RemoveDeadReplacementCache()
await transport.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
await transport.Protocol.CheckReplacementCacheAsync(default(CancellationToken));

Assert.Single(transport.Protocol.Peers);
Assert.DoesNotContain(transportA.AsPeer, transport.Protocol.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
Assert.Contains(transportC.AsPeer, transport.Protocol.Peers);
Assert.Single(transport.Peers);
Assert.DoesNotContain(transportA.AsPeer, transport.Peers);
Assert.DoesNotContain(transportB.AsPeer, transport.Peers);
Assert.Contains(transportC.AsPeer, transport.Peers);

await transport.StopAsync(TimeSpan.Zero);
await transportC.StopAsync(TimeSpan.Zero);
Expand Down Expand Up @@ -401,8 +401,6 @@ public async Task BroadcastGuarantee()
await t1.BootstrapAsync(new[] { seed.AsPeer });
await t2.BootstrapAsync(new[] { seed.AsPeer });

Log.Debug(seed.Protocol.Trace());

Log.Debug("Bootstrap completed.");

var tcs = new CancellationTokenSource();
Expand Down
4 changes: 1 addition & 3 deletions Libplanet.Tests/Net/Protocols/RoutingTableTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void AddPeer()
pubKey0.ToAddress(),
1,
2,
new System.Random(),
Logger.None);
var peer1 = new BoundPeer(pubKey1, new DnsEndPoint("0.0.0.0", 1234));
var peer2 = new BoundPeer(pubKey2, new DnsEndPoint("0.0.0.0", 1234));
Expand All @@ -147,7 +146,6 @@ public void RemovePeer()
pubKey1.ToAddress(),
1,
2,
new System.Random(),
Logger.None);
var peer1 = new BoundPeer(pubKey1, new DnsEndPoint("0.0.0.0", 1234));
var peer2 = new BoundPeer(pubKey2, new DnsEndPoint("0.0.0.0", 1234));
Expand All @@ -164,7 +162,7 @@ public void RemovePeer()

private RoutingTable CreateTable(Address addr)
{
return new RoutingTable(addr, TableSize, BucketSize, new System.Random(), Logger.None);
return new RoutingTable(addr, TableSize, BucketSize, Logger.None);
}
}
}
12 changes: 7 additions & 5 deletions Libplanet.Tests/Net/Protocols/TestTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ public TestTransport(
_requests = new AsyncCollection<Request>();
_ignoreTestMessageWithData = new List<string>();
_random = new Random();
Table = new RoutingTable(Address, tableSize, bucketSize, _logger);
Protocol = new KademliaProtocol(
Table,
this,
Address,
_logger,
tableSize,
bucketSize
_logger
);
}

Expand All @@ -77,12 +77,14 @@ public TestTransport(
_privateKey.PublicKey,
new DnsEndPoint("localhost", 1234));

public IEnumerable<BoundPeer> Peers => Protocol.Peers;
public IEnumerable<BoundPeer> Peers => Table.Peers;

public DateTimeOffset? LastMessageTimestamp { get; private set; }

internal ConcurrentBag<Message> ReceivedMessages { get; }

internal RoutingTable Table { get; }

internal IProtocol Protocol { get; }

internal bool Running => !(_swarmCancellationTokenSource is null);
Expand Down Expand Up @@ -274,7 +276,7 @@ public void BroadcastTestMessage(Address? except, string data)

public void BroadcastMessage(Address? except, Message message)
{
var peers = Protocol.PeersToBroadcast(except).ToList();
var peers = Table.PeersToBroadcast(except).ToList();
var peersString = string.Join(", ", peers.Select(peer => peer.Address));
_logger.Debug(
"Broadcasting test message {Data} to {Count} peers which are: {Peers}",
Expand Down
8 changes: 5 additions & 3 deletions Libplanet.Tests/Net/SwarmTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ public async Task BroadcastBlockToReconnectedPeer()

await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null);
await StopAsync(swarmA);
await seed.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
await seed.PeerDiscovery.RefreshTableAsync(
TimeSpan.Zero,
default(CancellationToken));

Assert.DoesNotContain(swarmA.AsPeer, seed.Peers);

Expand Down Expand Up @@ -1266,7 +1268,7 @@ async Task RefreshTableAsync(CancellationToken cancellationToken)
await Task.Delay(1000, cancellationToken);
try
{
await swarmA.Protocol.RefreshTableAsync(
await swarmA.PeerDiscovery.RefreshTableAsync(
TimeSpan.FromSeconds(10), cancellationToken);
}
catch (TurnClientException)
Expand Down Expand Up @@ -1980,7 +1982,7 @@ public async Task FindSpecificPeerAsyncDepthFail()
TimeSpan.FromMilliseconds(3000));

Assert.Equal(swarmC.AsPeer.Address, foundPeer.Address);
((KademliaProtocol)swarmA.Protocol).ClearTable();
swarmA.RoutingTable.Clear();
Assert.Empty(swarmA.Peers);
await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null);

Expand Down
31 changes: 0 additions & 31 deletions Libplanet/Net/ITransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ public interface ITransport : IDisposable
[Pure]
Peer AsPeer { get; }

/// <summary>
/// List of all <see cref="Peer"/>s in the routing table.
/// </summary>
[Pure]
IEnumerable<BoundPeer> Peers { get; }

/// <summary>
/// The <see cref="DateTimeOffset"/> of the last message was received.
/// </summary>
Expand Down Expand Up @@ -70,31 +64,6 @@ Task StopAsync(
TimeSpan waitFor,
CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Conducts peer discovery for given <paramref name="bootstrapPeers"/>.
/// </summary>
/// <param name="bootstrapPeers">A <see cref="IEnumerable{T}"/> of <see cref="Peer"/>s
/// to bootstrap.</param>
/// <param name="pingSeedTimeout">A timeout of waiting for the reply of <see cref="Ping"/>
/// message sent to seed <see cref="Peer"/>.
/// If <c>null</c> is given, the task never halts by itself
/// even no any response was given from the the target seed.</param>
/// <param name="findNeighborsTimeout">A timeout of waiting for the reply of
/// <see cref="FindNeighbors"/> message sent to seed <see cref="Peer"/>.
/// If <c>null</c> is given, task never halts by itself
/// even the target seed gives no any response.</param>
/// <param name="depth">Recursive operation depth to search peers from network.</param>
/// <param name="cancellationToken">
/// A cancellation token used to propagate notification that this
/// operation should be canceled.</param>
/// <returns>An awaitable task without value.</returns>
Task BootstrapAsync(
IEnumerable<BoundPeer> bootstrapPeers,
TimeSpan? pingSeedTimeout,
TimeSpan? findNeighborsTimeout,
int depth,
CancellationToken cancellationToken);

/// <summary>
/// Sends the <paramref name="message"/> to given <paramref name="peer"/>.
/// </summary>
Expand Down
Loading

0 comments on commit 72ca5e0

Please sign in to comment.