-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gossipsub Tracer RPCSent tracker #4543
Conversation
- track iHave messages on each RPC sent
Codecov Report
@@ Coverage Diff @@
## master #4543 +/- ##
==========================================
+ Coverage 56.25% 57.34% +1.08%
==========================================
Files 653 575 -78
Lines 64699 57866 -6833
==========================================
- Hits 36396 33182 -3214
+ Misses 25362 22236 -3126
+ Partials 2941 2448 -493
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looks great. However, please consider incorporating my suggestions to refine your code further. Once these adjustments have been made, please proceed with the merge.
network/p2p/tracer/internal/cache.go
Outdated
// - bool: true if the record is initialized, false otherwise (i.e.: the record already exists). | ||
// Note that if init is called multiple times for the same messageID, the record is initialized only once, and the | ||
// subsequent calls return false and do not change the record (i.e.: the record is not re-initialized). | ||
func (r *rpcSentCache) init(messageID flow.Identifier, controlMsgType p2pmsg.ControlMessageType) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I propose repositioning the iHaveRPCSentEntityID
method to this file and consolidating it with the newRPCSentEntity
function as follows:
func rpcSentEntityID(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) flow.Identifier {
return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s%s", topic, messageId, controlMsgType)))
}
This modification promotes efficient computing by saving one hash computation per message. Currently, we're processing two hashes for each message, which seems superfluous.
In addition, I recommend a substantial refactor of the init
method to improve its alignment with common software architecture principles such as cohesion, coupling, and simplicity:
-
The method name
init
should be changed toadd
. This aligns with the standardAdd
operations found in our mempools, which will improve understandability due to its familiar behavior. -
Its signature should be modified to
add(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType)
. This change enhances cohesion and reduces confusion by clarifying the type of records it maintains. The currentinit(messageID flow.Identifier, controlMsgType p2pmsg.ControlMessageType) bool
signature displays low cohesion and high coupling, requiring the caller to comprehend and compute themessageID
externally. Given that themessageID
is merely used as a key to keep records, this understanding should be internal to the cache. The current setup expects the caller to comprehend this internal aspect, which is not aligned with the best practices. Furthermore, themessageID
as aflow.Identifier
provides a generic representation, leaving ambiguity about its actual meaning. This issue can be resolved by utilizing therpcSentEntityID
method defined above to compute themessageID
.
Such changes adhere to the principles of software architecture, namely encapsulation, cohesion, and low coupling. By hiding internal details like the computation of messageID
, we achieve better encapsulation. The refactoring also improves cohesion because the new add
method will perform a more focused, single responsibility: adding a record based on specific input parameters. Additionally, the coupling is reduced as the new method is less reliant on the external computation of messageID
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
network/p2p/tracer/internal/cache.go
Outdated
// - flow.Identifier: the messageID to store the rpc control message. | ||
// Returns: | ||
// - bool: true if the RPC has been cache indicating it was sent from the local node. | ||
func (r *rpcSentCache) has(messageId flow.Identifier) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Building upon the previous comment, I suggest refactoring the has
function to:
has(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool {
}
This adjustment ensures a consistent and clear signature across all methods, enhancing code readability and maintenance.
The external call would then be presented as follows:
func (t *RPCSentTracker) WasIHaveRPCSent(topicID, messageID string) bool {
return t.cache.has(topicID, messageID, p2pmsg.CtrlMsgIHave)
}
This representation aligns with the refactored add
method, ensuring consistency throughout the interface. It also simplifies the interaction by providing an intuitive method signature, reinforcing the principle of least surprise in software design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests looks good, but please refactor them based on the comments on the signature of add
and has
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -139,6 +151,15 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) { | |||
lg.Info().Hex("flow_id", logging.ID(id.NodeID)).Str("role", id.Role.String()).Msg("pruned peer") | |||
} | |||
|
|||
// SendRPC is called when a RPC is sent. Currently, the GossipSubMeshTracer tracks iHave RPC messages that have been sent. | |||
// This function can be updated to track other control messages in the future as required. | |||
func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to propose a refinement for consideration in a separate PR, and you're welcome to open an issue and address it independently:
The current setup involves the SendRPC
function being invoked by GossipSub in a blocking manner. This implies that any processing performed here will result in a temporary pause of outgoing RPCs. This pause can negatively impact the performance and correctness of GossipSub message dissemination, potentially leading to dropped outgoing RPCs due to a 'context deadline exceeded' error that results from blocking the processing of an RPC. In addition, this can create lock contention between tracking sent RPCs and evaluating incoming RPCs when checking if an iWant
corresponds to a previously advertised iHave
.
To mitigate these issues, I suggest implementing a non-blocking approach for processing RPCs. This can be accomplished by utilizing a worker queue and dispatching a copy of the RPC to the queue for further processing while promptly returning from the function. This strategy also eliminates the aforementioned lock contention.
You might draw inspiration from the following example when implementing this solution: https://github.com/onflow/flow-go/blob/master/network/alsp/manager/manager.go#L228-L258.
By reducing contention, we can minimize the risk of deadlocks and improve the overall response time of the system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up issue https://github.com/dapperlabs/flow-internal/issues/1899
t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with OnIHaveRPCSent", func(t *testing.T) { | ||
topicID := channels.PushBlocks.String() | ||
messageID1 := unittest.IdentifierFixture().String() | ||
iHaves := []*pb.ControlIHave{{ | ||
TopicID: &topicID, | ||
MessageIDs: []string{messageID1}, | ||
}} | ||
rpc := rpcFixture(withIhaves(iHaves)) | ||
tracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) | ||
require.True(t, tracker.WasIHaveRPCSent(topicID, messageID1)) | ||
|
||
// manipulate last byte of message ID ensure false positive not returned | ||
messageID2 := []byte(messageID1) | ||
messageID2[len(messageID2)-1] = 'X' | ||
require.False(t, tracker.WasIHaveRPCSent(topicID, string(messageID2))) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add a test case tracking multiple messages on multiple topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Co-authored-by: Yahya Hassanzadeh, Ph.D. <yhassanzadeh@ieee.org>
Co-authored-by: Yahya Hassanzadeh, Ph.D. <yhassanzadeh@ieee.org>
Co-authored-by: Yahya Hassanzadeh, Ph.D. <yhassanzadeh@ieee.org>
Co-authored-by: Yahya Hassanzadeh, Ph.D. <yhassanzadeh@ieee.org>
Co-authored-by: Yahya Hassanzadeh, Ph.D. <yhassanzadeh@ieee.org>
…w-go into khalil/1897-ihave-rpc-tracker
This PR adds a tracker backed by herocache that tracks RPC messages sent from the local node. This is the first part of the iWant flooding https://github.com/dapperlabs/flow-go/issues/6472 . This tracker is intended to be used during RPC inspection iWant validation to ensure iWant control messages received are in response to iHave messages that have been sent.