Skip to content

Commit

Permalink
Add peerpool.joinPeer() and tests.
Browse files Browse the repository at this point in the history
notifyAndWait() now waits PeerPool and disconnection.
  • Loading branch information
cheatfate committed Jan 7, 2021
1 parent 3c116da commit ee0a025
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 12 deletions.
22 changes: 12 additions & 10 deletions beacon_chain/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,16 @@ proc updateScore*(peer: Peer, score: int) {.inline.} =
proc join*(peer: Peer): Future[void] =
var retFuture = newFuture[void]("peer.lifetime.join")
let peerFut = peer.getFuture()
let alreadyFinished = peerFut.finished()

proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()

proc cancellation(udata: pointer) {.gcsafe.} =
if not(alreadyFinished):
if not(isNil(peerFut)):
peerFut.removeCallback(continuation)

if alreadyFinished:
if peerFut.finished():
# All the `peer.disconnectedFut` callbacks are already scheduled in current
# `poll()` call, to avoid race we going to finish only in next `poll()`
# call.
Expand All @@ -378,18 +377,21 @@ proc join*(peer: Peer): Future[void] =
# `peer.disconnectedFut` is not yet finished, but we want to be scheduled
# after all callbacks.
peerFut.addCallback(continuation)
retFuture.cancelCallback = cancellation

return retFuture
retFuture

proc notifyAndWait*(peer: Peer): Future[void] =
proc notifyAndWait*(network: ETh2Node, peer: Peer): Future[void] =
## Notify all the waiters that peer life is finished and wait until all
## callbacks will be processed.
let joinFut = peer.join()
let fut = peer.disconnectedFut
let
joinFut = peer.join()
poolFut = network.peerPool.joinPeer(peer)
discFut = peer.disconnectedFut
peer.connectionState = Disconnecting
fut.complete()
discFut.complete()
peer.disconnectedFut = nil
joinFut
allFutures(joinFut, poolFut)

proc calcThroughput(dur: Duration, value: uint64): float =
let secs = float(chronos.seconds(1).nanoseconds)
Expand Down Expand Up @@ -1087,7 +1089,7 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
node.addSeen(peerId, SeenTableTimeReconnect)

if not(isNil(peer.disconnectedFut)):
await peer.notifyAndWait()
await node.notifyAndWait(peer)
else:
# TODO (cheatfate): This could be removed when bug will be fixed inside
# `nim-libp2p`.
Expand Down
42 changes: 41 additions & 1 deletion beacon_chain/peer_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type
peerType: PeerType
flags: set[PeerFlags]
index: int
future: Future[void]

PeerIndex = object
data: int
Expand Down Expand Up @@ -301,13 +302,16 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
dec(pool.curOutPeersCount)
dec(pool.acqOutPeersCount)

let fut = item[].future
# Indicate that we have an empty space
pool.fireNotFullEvent(item[])
# Cleanup storage with default item, and removing key from hashtable.
pool.storage[pindex] = PeerItem[A]()
pool.registry.del(key)
pool.peerDeleted(peer)
pool.peerCountChanged()
# Indicate that peer was deleted
fut.complete()
else:
if item[].peerType == PeerType.Incoming:
# If peer is available, then its copy present in heapqueue, so we need
Expand All @@ -326,25 +330,61 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
break
dec(pool.curOutPeersCount)

let fut = item[].future
# Indicate that we have an empty space
pool.fireNotFullEvent(item[])
# Cleanup storage with default item, and removing key from hashtable.
pool.storage[pindex] = PeerItem[A]()
pool.registry.del(key)
pool.peerDeleted(peer)
pool.peerCountChanged()
# Indicate that peer was deleted
fut.complete()
true
else:
false

proc joinPeer*[A, B](pool: PeerPool[A, B], peer: A): Future[void] =
## This procedure will only when peer ``peer`` finally leaves PeerPool
## ``pool``.
mixin getKey
var retFuture = newFuture[void]("peerpool.joinPeer")
var future: Future[void]

proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()

proc cancellation(udata: pointer) {.gcsafe.} =
if not(isNil(future)):
future.removeCallback(continuation)

let key = getKey(peer)
if pool.registry.hasKey(key):
let pindex = pool.registry[key].data
var item = addr(pool.storage[pindex])
future = item[].future
# If peer is still in PeerPool, then item[].future should not be finished.
doAssert(not(future.finished()))
future.addCallback(continuation)
retFuture.cancelCallback = cancellation
else:
# If there no such peer in PeerPool anymore, its possible that
# PeerItem.future's callbacks is not yet processed, so we going to complete
# retFuture only in next `poll()` call.
callSoon(continuation, cast[pointer](retFuture))

retFuture

proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
peerType: PeerType) =
mixin getFuture
proc onPeerClosed(udata: pointer) {.gcsafe.} =
discard pool.deletePeer(peer)

let item = PeerItem[A](data: peer, peerType: peerType,
index: len(pool.storage))
index: len(pool.storage),
future: newFuture[void]("peerpool.peer"))
pool.storage.add(item)
var pitem = addr(pool.storage[^1])
let pindex = PeerIndex(data: item.index, cmp: pool.cmp)
Expand Down
102 changes: 101 additions & 1 deletion tests/test_peer_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ suiteReport "PeerPool testing suite":
lenAcquired(pool) == 0
len(pool) == 0

timedTest "Delete peer on release text":
timedTest "Delete peer on release test":
proc testDeleteOnRelease(): Future[bool] {.async.} =
proc scoreCheck(peer: PeerTest): bool =
if peer.weight >= 0:
Expand Down Expand Up @@ -686,6 +686,106 @@ suiteReport "PeerPool testing suite":

check waitFor(testDeleteOnRelease()) == true

timedTest "Notify when peer leaves pool test":
proc testNotifyOnLeave(): Future[bool] {.async.} =

var pool = newPeerPool[PeerTest, PeerTestID]()
var peer0 = PeerTest.init("idInc0", 100)
var peer1 = PeerTest.init("idOut0", 100)
var peer2 = PeerTest.init("idInc1", 100)
var peer3 = PeerTest.init("idOut1", 100)

# Case 1. Deleting peer which is not part of PeerPool.
block:
var fut0 = pool.joinPeer(peer0)
doAssert(fut0.finished == false)
await sleepAsync(20.milliseconds)
doAssert(fut0.finished == true and fut0.failed == false)

# Case 2. Deleting peer which is not acquired.
discard pool.addPeerNoWait(peer0, PeerType.Incoming)
block:
var fut0 = pool.joinPeer(peer0)
discard pool.deletePeer(peer0)
var fut1 = pool.joinPeer(peer0)
await sleepAsync(20.milliseconds)
var fut2 = pool.joinPeer(peer0)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut2.finished == false)
await sleepAsync(20.milliseconds)
doAssert(fut2.finished == true and fut2.failed == false)

# Case 3. Peer disconnected while it wasn't acquired.
discard pool.addPeerNoWait(peer1, PeerType.Outgoing)
block:
var fut0 = pool.joinPeer(peer1)
# Peer disconnecting
peer1.future.complete()
var fut1 = pool.joinPeer(peer1)
await sleepAsync(20.milliseconds)
var fut2 = pool.joinPeer(peer1)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut2.finished == false)
await sleepAsync(20.milliseconds)
doAssert(fut2.finished == true and fut2.failed == false)

# Case 4. Peer deleted when it was acquired.
discard pool.addPeerNoWait(peer2, PeerType.Incoming)
block:
var fut0 = pool.joinPeer(peer2)
var p = await pool.acquire()
doAssert(p.id == "idInc1")
var fut1 = pool.joinPeer(peer2)
discard pool.deletePeer(peer2)
await sleepAsync(20.milliseconds)
var fut2 = pool.joinPeer(peer2)
doAssert(fut0.finished == false)
doAssert(fut1.finished == false)
doAssert(fut2.finished == false)
pool.release(peer2)
var fut3 = pool.joinPeer(peer2)
await sleepAsync(20.milliseconds)
var fut4 = pool.joinPeer(peer2)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut2.finished == true and fut2.failed == false)
doAssert(fut3.finished == true and fut3.failed == false)
doAssert(fut4.finished == false)
await sleepAsync(20.milliseconds)
doAssert(fut4.finished == true and fut4.failed == false)

# Case 5. Peer disconnected while it was acquired.
discard pool.addPeerNoWait(peer3, PeerType.Outgoing)
block:
var fut0 = pool.joinPeer(peer3)
var p = await pool.acquire()
doAssert(p.id == "idOut1")
var fut1 = pool.joinPeer(peer3)
# Peer disconnecting
peer3.future.complete()
await sleepAsync(20.milliseconds)
var fut2 = pool.joinPeer(peer3)
doAssert(fut0.finished == false)
doAssert(fut1.finished == false)
doAssert(fut2.finished == false)
pool.release(peer3)
var fut3 = pool.joinPeer(peer3)
await sleepAsync(20.milliseconds)
var fut4 = pool.joinPeer(peer3)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut2.finished == true and fut2.failed == false)
doAssert(fut3.finished == true and fut3.failed == false)
doAssert(fut4.finished == false)
await sleepAsync(20.milliseconds)
doAssert(fut4.finished == true and fut4.failed == false)

result = true

check waitFor(testNotifyOnLeave()) == true

timedTest "Space tests":
var pool1 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79)
var pool2 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79,
Expand Down

0 comments on commit ee0a025

Please sign in to comment.