Skip to content

Commit

Permalink
feat: adding onValidated observer (#1128)
Browse files Browse the repository at this point in the history
### Description

Adding an `onValidated` observer which will run every time a message is
received and validated. This comes from the necessity of precisely track
message deliveries and network activity.

`onRecv` observers run before any check is performed on the received
message, which means that it runs every time a duplicate or invalid
message arrives, which is inefficient and inaccurate for our purpose of
tracking only received, unique and valid messages. Therefore, adding
this extra option of running an observer for every message after all
validation checks pass.
  • Loading branch information
gabrielmer authored Aug 1, 2024
1 parent 62f2d85 commit a60f0c5
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
3 changes: 3 additions & 0 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ proc validateAndRelay(

g.rewardDelivered(peer, topic, true)

# trigger hooks
peer.validatedObservers(msg, msgId)

# The send list typically matches the idontwant list from above, but
# might differ if validation takes time
var toSendPeers = HashSet[PubSubPeer]()
Expand Down
15 changes: 13 additions & 2 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type
PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].}
onSend*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].}
onValidated*:
proc(peer: PubSubPeer, msg: Message, msgId: MessageId) {.gcsafe, raises: [].}

PubSubPeerEventKind* {.pure.} = enum
StreamOpened
Expand Down Expand Up @@ -170,14 +172,23 @@ proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) =
if not (isNil(p.observers)) and p.observers[].len > 0:
for obs in p.observers[]:
if not (isNil(obs)): # TODO: should never be nil, but...
obs.onRecv(p, msg)
if not (isNil(obs.onRecv)):
obs.onRecv(p, msg)

proc validatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) =
# trigger hooks
if not (isNil(p.observers)) and p.observers[].len > 0:
for obs in p.observers[]:
if not (isNil(obs.onValidated)):
obs.onValidated(p, msg, msgId)

proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not (isNil(p.observers)) and p.observers[].len > 0:
for obs in p.observers[]:
if not (isNil(obs)): # TODO: should never be nil, but...
obs.onSend(p, msg)
if not (isNil(obs.onSend)):
obs.onSend(p, msg)

proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "starting pubsub read loop", conn, peer = p, closed = conn.closed
Expand Down
57 changes: 57 additions & 0 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,63 @@ suite "GossipSub":

await allFuturesThrowing(nodesFut.concat())

asyncTest "GossipSub's observers should run after message is sent, received and validated":
var
recvCounter = 0
sendCounter = 0
validatedCounter = 0

proc handler(topic: string, data: seq[byte]) {.async.} =
discard

proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
inc recvCounter

proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
inc sendCounter

proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) =
inc validatedCounter

let obs0 = PubSubObserver(onSend: onSend)
let obs1 = PubSubObserver(onRecv: onRecv, onValidated: onValidated)

let nodes = generateNodes(2, gossip = true)
# start switches
discard await allFinished(nodes[0].switch.start(), nodes[1].switch.start())

await subscribeNodes(nodes)

nodes[0].addObserver(obs0)
nodes[1].addObserver(obs1)
nodes[1].subscribe("foo", handler)
nodes[1].subscribe("bar", handler)

proc validator(
topic: string, message: Message
): Future[ValidationResult] {.async.} =
result = if topic == "foo": ValidationResult.Accept else: ValidationResult.Reject

nodes[1].addValidator("foo", "bar", validator)

# Send message that will be accepted by the receiver's validator
tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1

check:
recvCounter == 1
validatedCounter == 1
sendCounter == 1

# Send message that will be rejected by the receiver's validator
tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1

check:
recvCounter == 2
validatedCounter == 1
sendCounter == 2

await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())

asyncTest "GossipSub unsub - resub faster than backoff":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async.} =
Expand Down

0 comments on commit a60f0c5

Please sign in to comment.