From 871efab571321eeba71507b435aabc1ec003515e Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Thu, 26 Sep 2024 11:28:07 +0530 Subject: [PATCH 01/12] added test wrt subscribe and unsubscribe --- .gitignore | 1 + tests/pubsub/testgossipsub2.nim | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 7b5308ac18..dbf1d97a50 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ build/ *.exe *.dll .vscode/ +.idea/ .DS_Store tests/pubsub/testgossipsub examples/*.md diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index ffc5c24e40..18582028aa 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -7,6 +7,12 @@ # This file may not be copied, modified, or distributed except according to # those terms. + ) + await GossipSub(nodes[1]).addDirectPeer( + nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs + ) + await GossipSub(nodes[2]).addDirectPeer( + nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs {.used.} import sequtils, options, tables, sets @@ -178,12 +184,6 @@ suite "GossipSub": await GossipSub(nodes[1]).addDirectPeer( nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs ) - await GossipSub(nodes[1]).addDirectPeer( - nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs - ) - await GossipSub(nodes[2]).addDirectPeer( - nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs - ) var handlerFut = newFuture[void]() proc handler(topic: string, data: seq[byte]) {.async.} = From dc7f8d4317efa3372943ef84b0f328211b50ab3c Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Thu, 26 Sep 2024 11:33:25 +0530 Subject: [PATCH 02/12] added tests/pubsub/testgossipinternal2 file --- tests/pubsub/testgossipinternal2.nim | 161 +++++++++++++++++++++++++++ tests/pubsub/testgossipsub2.nim | 12 +- 2 files changed, 167 insertions(+), 6 deletions(-) create mode 100644 tests/pubsub/testgossipinternal2.nim diff --git a/tests/pubsub/testgossipinternal2.nim b/tests/pubsub/testgossipinternal2.nim new file mode 100644 index 0000000000..11b0459fc9 --- /dev/null +++ b/tests/pubsub/testgossipinternal2.nim @@ -0,0 +1,161 @@ +# Nim-LibP2P +# Copyright (c) 2023-2024 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.used.} + +import std/[options, deques, sequtils, enumerate, algorithm, sets] +import stew/byteutils +import ../../libp2p/builders +import ../../libp2p/errors +import ../../libp2p/crypto/crypto +import ../../libp2p/stream/bufferstream +import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, mcache, peertable] +import ../../libp2p/protocols/pubsub/rpc/[message, messages] +import ../../libp2p/switch +import ../../libp2p/muxers/muxer +import ../../libp2p/protocols/pubsub/rpc/protobuf +import utils +import chronos + +import ../helpers + +proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = + discard + +const MsgIdSuccess = "msg id gen success" + +suite "GossipSub internal2": + teardown: + checkTrackers() + + # Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d + + # Simulate the `SUBSCRIBE` event and check proper handling in the mesh and gossipsub structures + asyncTest "handle SUBSCRIBE event": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + # Ensure topic is correctly initialized + let topic = "test-topic" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() # Initialize gossipsub for the topic + + var conns = newSeq[Connection]() + for i in 0 ..< 5: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) # Ensure the topic is added to gossipsub + + # Subscribe to the topic + gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + + check gossipSub.topics.contains(topic) # Check if the topic is in topics + check gossipSub.gossipsub[topic].len() > 0 # Check if topic added to gossipsub + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # This test will simulate an UNSUBSCRIBE event and check if the topic is removed from the relevant data structures but remains in gossipsub + asyncTest "handle UNSUBSCRIBE event": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + # Ensure topic is initialized properly in all relevant data structures + let topic = "test-topic" + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() # Initialize gossipsub for the topic + + var conns = newSeq[Connection]() + for i in 0 ..< 5: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) # Ensure peers are added to gossipsub for the topic + + # Subscribe to the topic first + gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + + # Now unsubscribe from the topic + gossipSub.PubSub.unsubscribeAll(topic) + + # Verify the topic is removed from relevant structures + check topic notin gossipSub.topics # The topic should not be in topics + check topic notin gossipSub.mesh # The topic should be removed from the mesh + check topic in gossipSub.gossipsub # The topic should remain in gossipsub (for fanout) + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # This test ensures that multiple topics can be subscribed to and unsubscribed from, with proper initialization of the topic structures. + asyncTest "handle multiple SUBSCRIBE and UNSUBSCRIBE events": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + let topics = ["topic1", "topic2", "topic3"] + + var conns = newSeq[Connection]() + for topic in topics: + # Initialize all relevant structures before subscribing + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() # Initialize gossipsub for each topic + + gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + + # Verify that all topics are added to the topics and gossipsub + check gossipSub.topics.len == 3 + for topic in topics: + check gossipSub.gossipsub[topic].len() >= 0 + + # Now unsubscribe from all topics + for topic in topics: + gossipSub.PubSub.unsubscribeAll(topic) + + # Ensure topics are removed from topics and mesh, but still present in gossipsub + for topic in topics: + check topic notin gossipSub.topics + check topic notin gossipSub.mesh + check topic in gossipSub.gossipsub + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # This test ensures that the number of subscriptions does not exceed the limit set in the GossipSub parameters + asyncTest "subscription limit test": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + gossipSub.topicsHigh = 10 # Set a limit for the number of subscriptions + + var conns = newSeq[Connection]() + for i in 0 .. gossipSub.topicsHigh + 5: + let topic = "topic" & $i + # Ensure all topics are properly initialized before subscribing + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + + if gossipSub.topics.len < gossipSub.topicsHigh: + gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + else: + # Prevent subscription beyond the limit and log the error + echo "Subscription limit reached for topic: ", topic + + # Ensure that the number of subscribed topics does not exceed the limit + check gossipSub.topics.len <= gossipSub.topicsHigh + check gossipSub.topics.len == gossipSub.topicsHigh + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index 18582028aa..ffc5c24e40 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -7,12 +7,6 @@ # This file may not be copied, modified, or distributed except according to # those terms. - ) - await GossipSub(nodes[1]).addDirectPeer( - nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs - ) - await GossipSub(nodes[2]).addDirectPeer( - nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs {.used.} import sequtils, options, tables, sets @@ -184,6 +178,12 @@ suite "GossipSub": await GossipSub(nodes[1]).addDirectPeer( nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs ) + await GossipSub(nodes[1]).addDirectPeer( + nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs + ) + await GossipSub(nodes[2]).addDirectPeer( + nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs + ) var handlerFut = newFuture[void]() proc handler(topic: string, data: seq[byte]) {.async.} = From 5790b6f428b68a0f2f9b9e78a1d2896a61196651 Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Thu, 26 Sep 2024 13:26:17 +0530 Subject: [PATCH 03/12] linters --- tests/pubsub/testgossipinternal2.nim | 53 +++++++++++++++++++--------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/tests/pubsub/testgossipinternal2.nim b/tests/pubsub/testgossipinternal2.nim index 11b0459fc9..6c961fddc0 100644 --- a/tests/pubsub/testgossipinternal2.nim +++ b/tests/pubsub/testgossipinternal2.nim @@ -44,7 +44,8 @@ suite "GossipSub internal2": let topic = "test-topic" gossipSub.mesh[topic] = initHashSet[PubSubPeer]() gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() # Initialize gossipsub for the topic + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + # Initialize gossipsub for the topic var conns = newSeq[Connection]() for i in 0 ..< 5: @@ -54,13 +55,17 @@ suite "GossipSub internal2": conn.peerId = peerId let peer = gossipSub.getPubSubPeer(peerId) peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) # Ensure the topic is added to gossipsub + gossipSub.gossipsub[topic].incl(peer) # Ensure the topic is added to gossipsub # Subscribe to the topic - gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + gossipSub.PubSub.subscribe( + topic, + proc(topic: string, data: seq[byte]): Future[void] {.async.} = + discard, + ) - check gossipSub.topics.contains(topic) # Check if the topic is in topics - check gossipSub.gossipsub[topic].len() > 0 # Check if topic added to gossipsub + check gossipSub.topics.contains(topic) # Check if the topic is in topics + check gossipSub.gossipsub[topic].len() > 0 # Check if topic added to gossipsub await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -73,7 +78,8 @@ suite "GossipSub internal2": let topic = "test-topic" gossipSub.mesh[topic] = initHashSet[PubSubPeer]() gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() # Initialize gossipsub for the topic + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + # Initialize gossipsub for the topic var conns = newSeq[Connection]() for i in 0 ..< 5: @@ -83,18 +89,24 @@ suite "GossipSub internal2": conn.peerId = peerId let peer = gossipSub.getPubSubPeer(peerId) peer.sendConn = conn - gossipSub.gossipsub[topic].incl(peer) # Ensure peers are added to gossipsub for the topic + gossipSub.gossipsub[topic].incl(peer) + # Ensure peers are added to gossipsub for the topic # Subscribe to the topic first - gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + gossipSub.PubSub.subscribe( + topic, + proc(topic: string, data: seq[byte]): Future[void] {.async.} = + discard, + ) # Now unsubscribe from the topic gossipSub.PubSub.unsubscribeAll(topic) # Verify the topic is removed from relevant structures - check topic notin gossipSub.topics # The topic should not be in topics - check topic notin gossipSub.mesh # The topic should be removed from the mesh - check topic in gossipSub.gossipsub # The topic should remain in gossipsub (for fanout) + check topic notin gossipSub.topics # The topic should not be in topics + check topic notin gossipSub.mesh # The topic should be removed from the mesh + check topic in gossipSub.gossipsub + # The topic should remain in gossipsub (for fanout) await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -110,9 +122,14 @@ suite "GossipSub internal2": # Initialize all relevant structures before subscribing gossipSub.mesh[topic] = initHashSet[PubSubPeer]() gossipSub.topicParams[topic] = TopicParams.init() - gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() # Initialize gossipsub for each topic + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + # Initialize gossipsub for each topic - gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + gossipSub.PubSub.subscribe( + topic, + proc(topic: string, data: seq[byte]): Future[void] {.async.} = + discard, + ) # Verify that all topics are added to the topics and gossipsub check gossipSub.topics.len == 3 @@ -135,7 +152,7 @@ suite "GossipSub internal2": # This test ensures that the number of subscriptions does not exceed the limit set in the GossipSub parameters asyncTest "subscription limit test": let gossipSub = TestGossipSub.init(newStandardSwitch()) - gossipSub.topicsHigh = 10 # Set a limit for the number of subscriptions + gossipSub.topicsHigh = 10 # Set a limit for the number of subscriptions var conns = newSeq[Connection]() for i in 0 .. gossipSub.topicsHigh + 5: @@ -146,7 +163,11 @@ suite "GossipSub internal2": gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() if gossipSub.topics.len < gossipSub.topicsHigh: - gossipSub.PubSub.subscribe(topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard) + gossipSub.PubSub.subscribe( + topic, + proc(topic: string, data: seq[byte]): Future[void] {.async.} = + discard, + ) else: # Prevent subscription beyond the limit and log the error echo "Subscription limit reached for topic: ", topic @@ -157,5 +178,3 @@ suite "GossipSub internal2": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() - - From 1c2e221d75da95dda88702ed642217191feb2fb1 Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Fri, 27 Sep 2024 10:16:49 +0530 Subject: [PATCH 04/12] refactor and suite name refactor --- ...ossipinternal2.nim => testgossipmembership.nim} | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) rename tests/pubsub/{testgossipinternal2.nim => testgossipmembership.nim} (97%) diff --git a/tests/pubsub/testgossipinternal2.nim b/tests/pubsub/testgossipmembership.nim similarity index 97% rename from tests/pubsub/testgossipinternal2.nim rename to tests/pubsub/testgossipmembership.nim index 6c961fddc0..82157f9db4 100644 --- a/tests/pubsub/testgossipinternal2.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -30,7 +30,7 @@ proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} const MsgIdSuccess = "msg id gen success" -suite "GossipSub internal2": +suite "GossipSub Topic Membership Tests": teardown: checkTrackers() @@ -61,7 +61,8 @@ suite "GossipSub internal2": gossipSub.PubSub.subscribe( topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = - discard, + discard + , ) check gossipSub.topics.contains(topic) # Check if the topic is in topics @@ -96,7 +97,8 @@ suite "GossipSub internal2": gossipSub.PubSub.subscribe( topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = - discard, + discard + , ) # Now unsubscribe from the topic @@ -128,7 +130,8 @@ suite "GossipSub internal2": gossipSub.PubSub.subscribe( topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = - discard, + discard + , ) # Verify that all topics are added to the topics and gossipsub @@ -166,7 +169,8 @@ suite "GossipSub internal2": gossipSub.PubSub.subscribe( topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = - discard, + discard + , ) else: # Prevent subscription beyond the limit and log the error From 2923a2d2805892b33021bba6120319e97011dfa7 Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Mon, 30 Sep 2024 15:22:42 +0530 Subject: [PATCH 05/12] test(gossipsub): added test for membership for join and leave topic --- tests/pubsub/testgossipmembership.nim | 84 +++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index 82157f9db4..108e79e25c 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -22,7 +22,16 @@ import ../../libp2p/muxers/muxer import ../../libp2p/protocols/pubsub/rpc/protobuf import utils import chronos +import unittest2, chronos, stew/byteutils, ../../libp2p/protocols/pubsub/gossipsub +import ../helpers + +import sequtils, options, tables, sets, sugar +import chronos, chronicles # Added chronicles for logging (trace) +import stew/byteutils +import chronos/ratelimit +import metrics +import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../helpers proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = @@ -182,3 +191,78 @@ suite "GossipSub Topic Membership Tests": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() + + # Test for verifying peers joining a topic using `JOIN(topic)` + asyncTest "handle JOIN event": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + let topic = "test-join-topic" + + # Initialize relevant data structures + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + + var conns = newSeq[Connection]() + + for i in 0 ..< 5: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) + + # Simulate the peer joining the topic + gossipSub.PubSub.subscribe( + topic, + proc(topic: string, data: seq[byte]): Future[void] {.async.} = + discard + , + ) + + check gossipSub.mesh[topic].len > 0 # Ensure the peer is added to the mesh + check gossipSub.topics.contains(topic) # Ensure the topic is in `topics` + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # Test for verifying peers leaving a topic using `LEAVE(topic)` + asyncTest "handle LEAVE event": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + let topic = "test-leave-topic" + + # Initialize relevant data structures + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + + var conns = newSeq[Connection]() + + for i in 0 ..< 5: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) + + # Simulate peer joining the topic first + gossipSub.PubSub.subscribe( + topic, + proc(topic: string, data: seq[byte]): Future[void] {.async.} = + discard + , + ) + + # Now simulate peer leaving the topic + gossipSub.PubSub.unsubscribeAll(topic) + + check topic notin gossipSub.mesh # Ensure the peer is removed from the mesh + check topic in gossipSub.gossipsub # Ensure the topic remains in `gossipsub` + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() From fda0d2b6e38a976350ea4356ec772fac5b845da3 Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Tue, 1 Oct 2024 17:22:47 +0530 Subject: [PATCH 06/12] test(gossipsub): import optimization --- tests/pubsub/testgossipmembership.nim | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index 108e79e25c..8835bf4162 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -15,21 +15,17 @@ import ../../libp2p/builders import ../../libp2p/errors import ../../libp2p/crypto/crypto import ../../libp2p/stream/bufferstream -import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, mcache, peertable] -import ../../libp2p/protocols/pubsub/rpc/[message, messages] +import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, peertable] +import ../../libp2p/protocols/pubsub/rpc/[message, messages, protobuf] import ../../libp2p/switch import ../../libp2p/muxers/muxer -import ../../libp2p/protocols/pubsub/rpc/protobuf import utils import chronos -import unittest2, chronos, stew/byteutils, ../../libp2p/protocols/pubsub/gossipsub +import unittest2 import ../helpers - -import sequtils, options, tables, sets, sugar -import chronos, chronicles # Added chronicles for logging (trace) -import stew/byteutils import chronos/ratelimit import metrics +import chronicles import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../helpers From f42a763708a08ed416ee5d31dd4ef0b53c2c873d Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Thu, 3 Oct 2024 16:39:04 +0530 Subject: [PATCH 07/12] added assertion for handle SUBSCRIBE to the topic --- tests/pubsub/testgossipmembership.nim | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index e9100448eb..3409425fea 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -83,21 +83,28 @@ suite "GossipSub Topic Membership Tests": let topic = "test-topic" let (gossipSub, conns) = setupGossipSub(topic, 5) + # Check if the topic is added to gossipsub and the peers list is not empty + check gossipSub.gossipsub[topic].len() > 0 + # Subscribe to the topic subscribeToTopics(gossipSub, @[topic]) # Check if the topic is present in the list of subscribed topics check gossipSub.topics.contains(topic) - # Check if the topic is added to gossipsub and the peers list is not empty - check gossipSub.gossipsub[topic].len() > 0 - # Close all peer connections and verify that they are properly cleaned up await allFuturesThrowing(conns.mapIt(it.close())) # Stop the gossipSub switch and wait for it to stop completely await gossipSub.switch.stop() + # Verify that connections have been closed and cleaned up after shutdown + for peer in gossipSub.peers.values: + check peer.sendConn == nil or peer.sendConn.closed() + + # Ensure that the topic is removed from the mesh after stopping + check gossipSub.mesh[topic].len() == 0 + # Simulate an UNSUBSCRIBE to the topic and check if the topic is removed from the relevant data structures but remains in gossipsub asyncTest "handle UNSUBSCRIBE to the topic": let topic = "test-topic" From 89473da8bc8e3a9687206c3ea15986a031dad841 Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Sun, 6 Oct 2024 01:45:13 +0530 Subject: [PATCH 08/12] TC fix --- tests/pubsub/testgossipmembership.nim | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index eeb7169ccb..cc8be49a2c 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -15,18 +15,14 @@ import ../../libp2p/builders import ../../libp2p/errors import ../../libp2p/crypto/crypto import ../../libp2p/stream/bufferstream +import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, mcache, peertable] +import ../../libp2p/protocols/pubsub/rpc/[message, messages] import ../../libp2p/switch import ../../libp2p/muxers/muxer import ../../libp2p/protocols/pubsub/rpc/protobuf import utils import chronos -import unittest2 -import ../helpers -import chronos/ratelimit -import metrics -import chronicles -import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../helpers proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = @@ -87,28 +83,21 @@ suite "GossipSub Topic Membership Tests": let topic = "test-topic" let (gossipSub, conns) = setupGossipSub(topic, 5) - # Check if the topic is added to gossipsub and the peers list is not empty - check gossipSub.gossipsub[topic].len() > 0 - # Subscribe to the topic subscribeToTopics(gossipSub, @[topic]) # Check if the topic is present in the list of subscribed topics check gossipSub.topics.contains(topic) + # Check if the topic is added to gossipsub and the peers list is not empty + check gossipSub.gossipsub[topic].len() > 0 + # Close all peer connections and verify that they are properly cleaned up await allFuturesThrowing(conns.mapIt(it.close())) # Stop the gossipSub switch and wait for it to stop completely await gossipSub.switch.stop() - # Verify that connections have been closed and cleaned up after shutdown - for peer in gossipSub.peers.values: - check peer.sendConn == nil or peer.sendConn.closed() - - # Ensure that the topic is removed from the mesh after stopping - check gossipSub.mesh[topic].len() == 0 - # Simulate an UNSUBSCRIBE to the topic and check if the topic is removed from the relevant data structures but remains in gossipsub asyncTest "handle UNSUBSCRIBE to the topic": let topic = "test-topic" From 806592ddbb63b91d40615afa63412a99383ff323 Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Mon, 7 Oct 2024 09:07:54 +0530 Subject: [PATCH 09/12] PR update --- tests/pubsub/testgossipmembership.nim | 37 +++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index cc8be49a2c..9550d95088 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -66,7 +66,7 @@ suite "GossipSub Topic Membership Tests": # Helper function to subscribe to topics proc subscribeToTopics(gossipSub: TestGossipSub, topics: seq[string]) = for topic in topics: - gossipSub.PubSub.subscribe( + gossipSub.subscribe( topic, proc(topic: string, data: seq[byte]): Future[void] {.async.} = discard @@ -114,7 +114,7 @@ suite "GossipSub Topic Membership Tests": check topic notin gossipSub.mesh check topic in gossipSub.gossipsub - # The topic should remain in gossipsub (for fanout) + # The topic should remain in gossipsub await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -213,3 +213,36 @@ suite "GossipSub Topic Membership Tests": # Clean up by closing connections and stopping the gossipSub switch await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() + + # Test the behavior when multiple peers join and leave a topic simultaneously. + asyncTest "multiple peers join and leave topic simultaneously": + let topic = "test-multi-join-leave" + + # Initialize the GossipSub system and simulate peer connections for 6 peers + let (gossipSub, conns) = setupGossipSub(@[topic], 6) + + # Ensure the topic is correctly initialized in mesh and gossipsub + doAssert gossipSub.mesh.contains(topic), "Topic not found in mesh" + doAssert gossipSub.gossipsub.contains(topic), "Topic not found in gossipsub" + + # Simulate 6 peers joining the topic + subscribeToTopics(gossipSub, @[topic]) + + # Check that 6 peers have joined the mesh + check gossipSub.mesh[topic].len == 6 # Mesh should have 6 peers + + # Simulate 3 peers leaving the topic by unsubscribing them + var peersToUnsubscribe = gossipSub.mesh[topic].toSeq()[0 .. 2] + # Select the first 3 peers to unsubscribe + for peer in peersToUnsubscribe: + gossipSub.PubSub.unsubscribeAll(topic) # Unsubscribing from the topic + + # Validate that 3 peers are still subscribed and 3 peers have been unsubscribed + check gossipSub.mesh[topic].len == 3 # Ensure 3 peers are still subscribed + for peer in peersToUnsubscribe: + check not gossipSub.mesh[topic].contains(peer) + # Ensure the first 3 peers are unsubscribed + + # Clean up by closing connections and stopping the gossipSub switch + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() From 2d38e8abfcd5632afb2d4e2ce7432adc3bf9b131 Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Tue, 8 Oct 2024 16:17:42 +0530 Subject: [PATCH 10/12] fix in test logic for multiple peers join and leave topic simultaneously --- tests/pubsub/testgossipmembership.nim | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index 9550d95088..37bff6bb26 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -228,21 +228,20 @@ suite "GossipSub Topic Membership Tests": # Simulate 6 peers joining the topic subscribeToTopics(gossipSub, @[topic]) - # Check that 6 peers have joined the mesh - check gossipSub.mesh[topic].len == 6 # Mesh should have 6 peers + check gossipSub.mesh[topic].len == 6 - # Simulate 3 peers leaving the topic by unsubscribing them + # Simulate 3 peers leaving the topic by unsubscribing them individually var peersToUnsubscribe = gossipSub.mesh[topic].toSeq()[0 .. 2] - # Select the first 3 peers to unsubscribe for peer in peersToUnsubscribe: - gossipSub.PubSub.unsubscribeAll(topic) # Unsubscribing from the topic + echo "Unsubscribing peer: ", peer.peerId + gossipSub.mesh[topic].excl(peer) + + # Now validate the state of the mesh and gossipsub. Ensure 3 peers are still subscribed + check gossipSub.mesh[topic].len == 3 - # Validate that 3 peers are still subscribed and 3 peers have been unsubscribed - check gossipSub.mesh[topic].len == 3 # Ensure 3 peers are still subscribed for peer in peersToUnsubscribe: + # Ensure the first 3 peers are unsubscribed by checking if they are not in the mesh check not gossipSub.mesh[topic].contains(peer) - # Ensure the first 3 peers are unsubscribed - # Clean up by closing connections and stopping the gossipSub switch await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() From cb7ccaee31ab2928b88f8c9cf88f7f4584102517 Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Thu, 10 Oct 2024 11:30:03 +0530 Subject: [PATCH 11/12] updated as per review comment --- tests/pubsub/testgossipmembership.nim | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index a59bec530f..dd91d96086 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -187,10 +187,9 @@ suite "GossipSub Topic Membership Tests": subscribeToTopics(gossipSub, @[topic]) # Check that peers are added to the mesh and the topic is tracked - check gossipSub.mesh[topic].len > 0 + check gossipSub.mesh[topic].len == 5 check gossipSub.topics.contains(topic) - # Clean up by closing connections and stopping the gossipSub switch await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -211,7 +210,6 @@ suite "GossipSub Topic Membership Tests": check topic notin gossipSub.mesh check topic in gossipSub.gossipsub - # Clean up by closing connections and stopping the gossipSub switch await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() @@ -229,20 +227,27 @@ suite "GossipSub Topic Membership Tests": # Simulate 6 peers joining the topic subscribeToTopics(gossipSub, @[topic]) - check gossipSub.mesh[topic].len == 6 + # Assert that 6 peers have joined the mesh + doAssert gossipSub.mesh[topic].len == 6, "Expected 6 peers to join the mesh" - # Simulate 3 peers leaving the topic by unsubscribing them individually + # Define a simple handler for unsubscribing the peers + proc dummyHandler(topic: string, data: seq[byte]): Future[void] {.async.} = + discard + + # Simulate 3 peers leaving the topic by unsubscribing them var peersToUnsubscribe = gossipSub.mesh[topic].toSeq()[0 .. 2] for peer in peersToUnsubscribe: echo "Unsubscribing peer: ", peer.peerId - gossipSub.mesh[topic].excl(peer) + gossipSub.PubSub.unsubscribe(topic, dummyHandler) - # Now validate the state of the mesh and gossipsub. Ensure 3 peers are still subscribed - check gossipSub.mesh[topic].len == 3 + # Now assert that 6 peers still remain in the mesh because the mesh retains peers + doAssert gossipSub.mesh[topic].len == 6, + "Expected 6 peers to still be in mesh after unsubscription" + # Assert that unsubscribed peers should remain in the mesh but should no longer receive messages for peer in peersToUnsubscribe: - # Ensure the first 3 peers are unsubscribed by checking if they are not in the mesh - check not gossipSub.mesh[topic].contains(peer) + doAssert gossipSub.mesh[topic].contains(peer), + "Peer should still be in mesh even after unsubscription" await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() From cc8e97636f4206c5b72678772a5e664cab58a8ab Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Thu, 10 Oct 2024 19:19:18 +0530 Subject: [PATCH 12/12] removed PubSub in sunscribe --- tests/pubsub/testgossipmembership.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index 58c7dd3ec3..315f0ea146 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -237,7 +237,7 @@ suite "GossipSub Topic Membership Tests": var peersToUnsubscribe = gossipSub.mesh[topic].toSeq()[0 .. 2] for peer in peersToUnsubscribe: echo "Unsubscribing peer: ", peer.peerId - gossipSub.PubSub.unsubscribe(topic, dummyHandler) + gossipSub.unsubscribe(topic, dummyHandler) # Now assert that 6 peers still remain in the mesh because the mesh retains peers doAssert gossipSub.mesh[topic].len == 6,