From eb2f6bf3463997d6df636177822cbca4dded4bf4 Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Thu, 26 Sep 2024 11:28:07 +0530 Subject: [PATCH] test(gossipsub): Test cases covering subscribe and unsubscribe Events added test wrt subscribe and unsubscribe added tests/pubsub/testgossipinternal2 file linters feat: rendezvous refactor (#1183) Hello! This PR aim to refactor rendezvous code so that it is easier to impl. Waku rdv strategy. The hardcoded min and max TTL were out of range with what we needed and specifying which peers to interact with is also needed since Waku deals with peers on multiple separate shards. I tried to keep the changes to a minimum, specifically I did not change the name of any public procs which result in less than descriptive names in some cases. I also wanted to return results instead of raising exceptions but didn't. Would it be acceptable to do so? Please advise on best practices, thank you. --------- Co-authored-by: Ludovic Chenut refactor and suite name refactor chore(ci): Enable S3 caching for interop (#1193) - Adds our S3 bucket for caching docker images as Protocol Labs shut down their shared one. - Remove the free disk space workaround that prevented the jobs from failing for using too much space for the images. --------- Co-authored-by: diegomrsantos PR review comment changes --- .github/workflows/interop.yml | 14 ++- .gitignore | 1 + libp2p/protocols/rendezvous.nim | 110 ++++++++++++----- tests/pubsub/testgossipmembership.nim | 169 ++++++++++++++++++++++++++ tests/testrendezvous.nim | 14 ++- 5 files changed, 268 insertions(+), 40 deletions(-) create mode 100644 tests/pubsub/testgossipmembership.nim diff --git a/.github/workflows/interop.yml b/.github/workflows/interop.yml index 8290a6c39c..acd238f32c 100644 --- a/.github/workflows/interop.yml +++ b/.github/workflows/interop.yml @@ -17,12 +17,6 @@ jobs: name: Run transport interoperability tests runs-on: ubuntu-22.04 steps: - - name: Free Disk Space (Ubuntu) - # For some reason the original job (libp2p/test-plans) has enough disk space, but this one doesn't. - uses: jlumbroso/free-disk-space@v1.3.1 - with: - tool-cache: true - - uses: actions/checkout@v4 - uses: docker/setup-buildx-action@v3 - name: Build image @@ -32,6 +26,10 @@ jobs: with: test-filter: nim-libp2p-head extra-versions: ${{ github.workspace }}/tests/transport-interop/version.json + s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }} + s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }} + s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_REGION }} run-hole-punching-interop: name: Run hole-punching interoperability tests @@ -46,3 +44,7 @@ jobs: with: test-filter: nim-libp2p-head extra-versions: ${{ github.workspace }}/tests/hole-punching-interop/version.json + s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }} + s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }} + s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_REGION }} diff --git a/.gitignore b/.gitignore index 7b5308ac18..dbf1d97a50 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ build/ *.exe *.dll .vscode/ +.idea/ .DS_Store tests/pubsub/testgossipsub examples/*.md diff --git a/libp2p/protocols/rendezvous.nim b/libp2p/protocols/rendezvous.nim index ad8dbfebd5..06155bad24 100644 --- a/libp2p/protocols/rendezvous.nim +++ b/libp2p/protocols/rendezvous.nim @@ -35,8 +35,6 @@ const RendezVousCodec* = "/rendezvous/1.0.0" MinimumDuration* = 2.hours MaximumDuration = 72.hours - MinimumTTL = MinimumDuration.seconds.uint64 - MaximumTTL = MaximumDuration.seconds.uint64 RegistrationLimitPerPeer = 1000 DiscoverLimit = 1000'u64 SemaphoreDefaultSize = 5 @@ -320,6 +318,10 @@ type peers: seq[PeerId] cookiesSaved: Table[PeerId, Table[string, seq[byte]]] switch: Switch + minDuration: Duration + maxDuration: Duration + minTTL: uint64 + maxTTL: uint64 proc checkPeerRecord(spr: seq[byte], peerId: PeerId): Result[void, string] = if spr.len == 0: @@ -395,7 +397,7 @@ proc save( rdv.registered.add( RegisteredData( peerId: peerId, - expiration: Moment.now() + r.ttl.get(MinimumTTL).int64.seconds, + expiration: Moment.now() + r.ttl.get(rdv.minTTL).int64.seconds, data: r, ) ) @@ -409,8 +411,8 @@ proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] = libp2p_rendezvous_register.inc() if r.ns.len notin 1 .. 255: return conn.sendRegisterResponseError(InvalidNamespace) - let ttl = r.ttl.get(MinimumTTL) - if ttl notin MinimumTTL .. MaximumTTL: + let ttl = r.ttl.get(rdv.minTTL) + if ttl notin rdv.minTTL .. rdv.maxTTL: return conn.sendRegisterResponseError(InvalidTTL) let pr = checkPeerRecord(r.signedPeerRecord, conn.peerId) if pr.isErr(): @@ -506,24 +508,35 @@ proc advertisePeer(rdv: RendezVous, peer: PeerId, msg: seq[byte]) {.async.} = await rdv.sema.acquire() discard await advertiseWrap().withTimeout(5.seconds) -method advertise*( - rdv: RendezVous, ns: string, ttl: Duration = MinimumDuration -) {.async, base.} = - let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr: - raise newException(RendezVousError, "Wrong Signed Peer Record") +proc advertise*( + rdv: RendezVous, ns: string, ttl: Duration, peers: seq[PeerId] +) {.async.} = if ns.len notin 1 .. 255: raise newException(RendezVousError, "Invalid namespace") - if ttl notin MinimumDuration .. MaximumDuration: - raise newException(RendezVousError, "Invalid time to live") + + if ttl notin rdv.minDuration .. rdv.maxDuration: + raise newException(RendezVousError, "Invalid time to live: " & $ttl) + + let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr: + raise newException(RendezVousError, "Wrong Signed Peer Record") + let r = Register(ns: ns, signedPeerRecord: sprBuff, ttl: Opt.some(ttl.seconds.uint64)) msg = encode(Message(msgType: MessageType.Register, register: Opt.some(r))) + rdv.save(ns, rdv.switch.peerInfo.peerId, r) - let fut = collect(newSeq()): - for peer in rdv.peers: + + let futs = collect(newSeq()): + for peer in peers: trace "Send Advertise", peerId = peer, ns rdv.advertisePeer(peer, msg.buffer) - await allFutures(fut) + + await allFutures(futs) + +method advertise*( + rdv: RendezVous, ns: string, ttl: Duration = rdv.minDuration +) {.async, base.} = + await rdv.advertise(ns, ttl, rdv.peers) proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] = let @@ -540,9 +553,8 @@ proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] = @[] proc request*( - rdv: RendezVous, ns: string, l: int = DiscoverLimit.int + rdv: RendezVous, ns: string, l: int = DiscoverLimit.int, peers: seq[PeerId] ): Future[seq[PeerRecord]] {.async.} = - let nsSalted = ns & rdv.salt var s: Table[PeerId, (PeerRecord, Register)] limit: uint64 @@ -587,8 +599,8 @@ proc request*( for r in resp.registrations: if limit == 0: return - let ttl = r.ttl.get(MaximumTTL + 1) - if ttl > MaximumTTL: + let ttl = r.ttl.get(rdv.maxTTL + 1) + if ttl > rdv.maxTTL: continue let spr = SignedPeerRecord.decode(r.signedPeerRecord).valueOr: @@ -596,7 +608,7 @@ proc request*( pr = spr.data if s.hasKey(pr.peerId): let (prSaved, rSaved) = s[pr.peerId] - if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(MaximumTTL) < ttl) or + if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(rdv.maxTTL) < ttl) or prSaved.seqNo < pr.seqNo: s[pr.peerId] = (pr, r) else: @@ -605,8 +617,6 @@ proc request*( for (_, r) in s.values(): rdv.save(ns, peer, r, false) - # copy to avoid resizes during the loop - let peers = rdv.peers for peer in peers: if limit == 0: break @@ -621,6 +631,11 @@ proc request*( trace "exception catch in request", description = exc.msg return toSeq(s.values()).mapIt(it[0]) +proc request*( + rdv: RendezVous, ns: string, l: int = DiscoverLimit.int +): Future[seq[PeerRecord]] {.async.} = + await rdv.request(ns, l, rdv.peers) + proc unsubscribeLocally*(rdv: RendezVous, ns: string) = let nsSalted = ns & rdv.salt try: @@ -630,16 +645,15 @@ proc unsubscribeLocally*(rdv: RendezVous, ns: string) = except KeyError: return -proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} = - # TODO: find a way to improve this, maybe something similar to the advertise +proc unsubscribe*(rdv: RendezVous, ns: string, peerIds: seq[PeerId]) {.async.} = if ns.len notin 1 .. 255: raise newException(RendezVousError, "Invalid namespace") - rdv.unsubscribeLocally(ns) + let msg = encode( Message(msgType: MessageType.Unregister, unregister: Opt.some(Unregister(ns: ns))) ) - proc unsubscribePeer(rdv: RendezVous, peerId: PeerId) {.async.} = + proc unsubscribePeer(peerId: PeerId) {.async.} = try: let conn = await rdv.switch.dial(peerId, RendezVousCodec) defer: @@ -648,8 +662,16 @@ proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} = except CatchableError as exc: trace "exception while unsubscribing", description = exc.msg - for peer in rdv.peers: - discard await rdv.unsubscribePeer(peer).withTimeout(5.seconds) + let futs = collect(newSeq()): + for peer in peerIds: + unsubscribePeer(peer) + + discard await allFutures(futs).withTimeout(5.seconds) + +proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} = + rdv.unsubscribeLocally(ns) + + await rdv.unsubscribe(ns, rdv.peers) proc setup*(rdv: RendezVous, switch: Switch) = rdv.switch = switch @@ -662,7 +684,25 @@ proc setup*(rdv: RendezVous, switch: Switch) = rdv.switch.addPeerEventHandler(handlePeer, Joined) rdv.switch.addPeerEventHandler(handlePeer, Left) -proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T = +proc new*( + T: typedesc[RendezVous], + rng: ref HmacDrbgContext = newRng(), + minDuration = MinimumDuration, + maxDuration = MaximumDuration, +): T {.raises: [RendezVousError].} = + if minDuration < 1.minutes: + raise newException(RendezVousError, "TTL too short: 1 minute minimum") + + if maxDuration > 72.hours: + raise newException(RendezVousError, "TTL too long: 72 hours maximum") + + if minDuration >= maxDuration: + raise newException(RendezVousError, "Minimum TTL longer than maximum") + + let + minTTL = minDuration.seconds.uint64 + maxTTL = maxDuration.seconds.uint64 + let rdv = T( rng: rng, salt: string.fromBytes(generateBytes(rng[], 8)), @@ -670,6 +710,10 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T = defaultDT: Moment.now() - 1.days, #registerEvent: newAsyncEvent(), sema: newAsyncSemaphore(SemaphoreDefaultSize), + minDuration: minDuration, + maxDuration: maxDuration, + minTTL: minTTL, + maxTTL: maxTTL, ) logScope: topics = "libp2p discovery rendezvous" @@ -701,9 +745,13 @@ proc new*(T: typedesc[RendezVous], rng: ref HmacDrbgContext = newRng()): T = return rdv proc new*( - T: typedesc[RendezVous], switch: Switch, rng: ref HmacDrbgContext = newRng() + T: typedesc[RendezVous], + switch: Switch, + rng: ref HmacDrbgContext = newRng(), + minDuration = MinimumDuration, + maxDuration = MaximumDuration, ): T = - let rdv = T.new(rng) + let rdv = T.new(rng, minDuration, maxDuration) rdv.setup(switch) return rdv diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim new file mode 100644 index 0000000000..998e8cea49 --- /dev/null +++ b/tests/pubsub/testgossipmembership.nim @@ -0,0 +1,169 @@ +# Nim-LibP2P +# Copyright (c) 2023-2024 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.used.} + +import std/[options, deques, sequtils, enumerate, algorithm, sets] +import stew/byteutils +import ../../libp2p/builders +import ../../libp2p/errors +import ../../libp2p/crypto/crypto +import ../../libp2p/stream/bufferstream +import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, mcache, peertable] +import ../../libp2p/protocols/pubsub/rpc/[message, messages] +import ../../libp2p/switch +import ../../libp2p/muxers/muxer +import ../../libp2p/protocols/pubsub/rpc/protobuf +import utils +import chronos + +import ../helpers + +proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = + discard + +const MsgIdSuccess = "msg id gen success" + +suite "GossipSub Topic Membership Tests": + teardown: + checkTrackers() + + # Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d + + # Generalized setup function to initialize one or more topics + proc setupGossipSub( + topics: seq[string], numPeers: int + ): (TestGossipSub, seq[Connection]) = + let gossipSub = TestGossipSub.init(newStandardSwitch()) + var conns = newSeq[Connection]() + + for topic in topics: + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + + for i in 0 ..< numPeers: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) + + return (gossipSub, conns) + + # Wrapper function to initialize a single topic by converting it into a seq + proc setupGossipSub(topic: string, numPeers: int): (TestGossipSub, seq[Connection]) = + setupGossipSub(@[topic], numPeers) + + # Helper function to subscribe to topics + proc subscribeToTopics(gossipSub: TestGossipSub, topics: seq[string]) = + for topic in topics: + gossipSub.PubSub.subscribe( + topic, + proc(topic: string, data: seq[byte]): Future[void] {.async.} = + discard + , + ) + + # Helper function to unsubscribe to topics + proc unsubscribeFromTopics(gossipSub: TestGossipSub, topics: seq[string]) = + for topic in topics: + gossipSub.PubSub.unsubscribeAll(topic) + + # Simulate the `SUBSCRIBE` event and check proper handling in the mesh and gossipsub structures + asyncTest "handle SUBSCRIBE event": + let topic = "test-topic" + let (gossipSub, conns) = setupGossipSub(topic, 5) + + # Subscribe to the topic (ensure `@[topic]` is passed as a sequence) + subscribeToTopics(gossipSub, @[topic].toSeq()) # Pass topic as seq[string] + + check gossipSub.topics.contains(topic) # Check if the topic is in topics + check gossipSub.gossipsub[topic].len() > 0 # Check if topic added to gossipsub + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # Simulate an UNSUBSCRIBE event and check if the topic is removed from the relevant data structures but remains in gossipsub + asyncTest "handle UNSUBSCRIBE event": + let topic = "test-topic" + let (gossipSub, conns) = setupGossipSub(topic, 5) + + # Subscribe to the topic first + subscribeToTopics(gossipSub, @[topic]) # Pass topic as seq[string] + + # Now unsubscribe from the topic + unsubscribeFromTopics(gossipSub, @[topic]) # Pass topic as seq[string] + + # Verify the topic is removed from relevant structures + check topic notin gossipSub.topics # The topic should not be in topics + check topic notin gossipSub.mesh # The topic should be removed from the mesh + check topic in gossipSub.gossipsub + # The topic should remain in gossipsub (for fanout) + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # Test subscribing and unsubscribing multiple topics + asyncTest "handle multiple SUBSCRIBE and UNSUBSCRIBE events": + let topics = ["topic1", "topic2", "topic3"].toSeq() + let (gossipSub, conns) = setupGossipSub(topics, 5) # Initialize all topics + + # Subscribe to multiple topics + subscribeToTopics(gossipSub, topics) + + # Verify that all topics are added to the topics and gossipsub + check gossipSub.topics.len == 3 + for topic in topics: + check gossipSub.gossipsub[topic].len() >= 0 + + # Unsubscribe from all topics + unsubscribeFromTopics(gossipSub, topics) + + # Ensure topics are removed from topics and mesh, but still present in gossipsub + for topic in topics: + check topic notin gossipSub.topics + check topic notin gossipSub.mesh + check topic in gossipSub.gossipsub + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # Test ensuring that the number of subscriptions does not exceed the limit set in the GossipSub parameters + asyncTest "subscription limit test": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + gossipSub.topicsHigh = 10 # Set a limit for the number of subscriptions + + var conns = newSeq[Connection]() + for i in 0 .. gossipSub.topicsHigh + 5: + let topic = "topic" & $i + # Ensure all topics are properly initialized before subscribing + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + + if gossipSub.topics.len < gossipSub.topicsHigh: + gossipSub.PubSub.subscribe( + topic, + proc(topic: string, data: seq[byte]): Future[void] {.async.} = + discard + , + ) + else: + # Prevent subscription beyond the limit and log the error + echo "Subscription limit reached for topic: ", topic + + # Ensure that the number of subscribed topics does not exceed the limit + check gossipSub.topics.len <= gossipSub.topicsHigh + check gossipSub.topics.len == gossipSub.topicsHigh + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() diff --git a/tests/testrendezvous.nim b/tests/testrendezvous.nim index 12494a13c2..e0c4c25776 100644 --- a/tests/testrendezvous.nim +++ b/tests/testrendezvous.nim @@ -126,7 +126,7 @@ suite "RendezVous": asyncTest "Various local error": let - rdv = RendezVous.new() + rdv = RendezVous.new(minDuration = 1.minutes, maxDuration = 72.hours) switch = createSwitch(rdv) expect RendezVousError: discard await rdv.request("A".repeat(300)) @@ -137,6 +137,14 @@ suite "RendezVous": expect RendezVousError: await rdv.advertise("A".repeat(300)) expect RendezVousError: - await rdv.advertise("A", 2.weeks) + await rdv.advertise("A", 73.hours) + expect RendezVousError: + await rdv.advertise("A", 30.seconds) + + test "Various config error": + expect RendezVousError: + discard RendezVous.new(minDuration = 30.seconds) + expect RendezVousError: + discard RendezVous.new(maxDuration = 73.hours) expect RendezVousError: - await rdv.advertise("A", 5.minutes) + discard RendezVous.new(minDuration = 15.minutes, maxDuration = 10.minutes)