Skip to content

Commit

Permalink
Merge pull request #987 from limebell/bugfix/find-peer-depth
Browse files Browse the repository at this point in the history
Fix FindSpecificPeerAsync's depth parameter did not worked properly
  • Loading branch information
longfin authored Sep 10, 2020
2 parents 5e0ab5c + 6b2e7dd commit 5eecf71
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
44 changes: 44 additions & 0 deletions Libplanet.Tests/Net/SwarmTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,50 @@ public async Task FindSpecificPeerAsyncFail()
}
}

[Fact(Timeout = Timeout)]
public async Task FindSpecificPeerAsyncDepthFail()
{
Swarm<DumbAction> swarmA = _swarms[0];
Swarm<DumbAction> swarmB = _swarms[1];
Swarm<DumbAction> swarmC = _swarms[2];
Swarm<DumbAction> swarmD = _swarms[3];
try
{
await StartAsync(swarmA);
await StartAsync(swarmB);
await StartAsync(swarmC);
await StartAsync(swarmD);

await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null);
await swarmB.AddPeersAsync(new Peer[] { swarmC.AsPeer }, null);
await swarmC.AddPeersAsync(new Peer[] { swarmD.AsPeer }, null);

BoundPeer foundPeer = await swarmA.FindSpecificPeerAsync(
swarmC.AsPeer.Address,
1,
TimeSpan.FromMilliseconds(3000));

Assert.Equal(swarmC.AsPeer.Address, foundPeer.Address);
((KademliaProtocol)swarmA.Protocol).ClearTable();
Assert.Empty(swarmA.Peers);
await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null);

foundPeer = await swarmA.FindSpecificPeerAsync(
swarmD.AsPeer.Address,
1,
TimeSpan.FromMilliseconds(3000));

Assert.Null(foundPeer);
}
finally
{
await StopAsync(swarmA);
await StopAsync(swarmB);
await StopAsync(swarmC);
await StopAsync(swarmD);
}
}

[Fact(Timeout = Timeout)]
public async Task DoNotFillMultipleTimes()
{
Expand Down
17 changes: 12 additions & 5 deletions Libplanet/Net/Protocols/KademliaProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,24 @@ public async Task<BoundPeer> FindSpecificPeerAsync(
}

var history = new ConcurrentBag<BoundPeer>();
var peersToFind = new ConcurrentQueue<BoundPeer>();
var peersToFind = new ConcurrentQueue<Tuple<BoundPeer, int>>();
foreach (BoundPeer peer in _routing.Neighbors(target, _findConcurrency, false))
{
peersToFind.Enqueue(peer);
peersToFind.Enqueue(new Tuple<BoundPeer, int>(peer, 0));
}

while (peersToFind.Any())
{
cancellationToken.ThrowIfCancellationRequested();

if (!peersToFind.TryDequeue(out BoundPeer viaPeer))
if (!peersToFind.TryDequeue(out Tuple<BoundPeer, int> tuple))
{
continue;
}

tuple.Deconstruct(out BoundPeer viaPeer, out int curDepth);
_logger.Debug("ViaPeer: {Peer}, curDepth: {curDepth}", viaPeer, curDepth);
if (depth != -1 && curDepth >= depth)
{
continue;
}
Expand All @@ -362,7 +369,7 @@ public async Task<BoundPeer> FindSpecificPeerAsync(
IEnumerable<BoundPeer> filteredPeers = foundPeers
.Where(peer =>
!history.Contains(peer) &&
!peersToFind.Contains(peer) &&
!peersToFind.Any(t => t.Item1.Equals(peer)) &&
!peer.Address.Equals(_address))
.Take(_findConcurrency);
int count = 0;
Expand All @@ -376,7 +383,7 @@ public async Task<BoundPeer> FindSpecificPeerAsync(
return found;
}

peersToFind.Enqueue(found);
peersToFind.Enqueue(new Tuple<BoundPeer, int>(found, curDepth + 1));

if (count++ >= _findConcurrency)
{
Expand Down

0 comments on commit 5eecf71

Please sign in to comment.