-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/en 777 refactor p2p #58
Conversation
interceptors/resolvers refactor adapted integrationTests, node, spos packages
- minor code refactoring on tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a partial review, new comments will be added
p2p/libp2p/libp2pMessenger.go
Outdated
} | ||
|
||
// BroadcastData tries to send a byte buffer onto a topic | ||
func (p2pMes *libp2pMessenger) BroadcastData(pipe string, topic string, buff []byte) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I see in the usage of this function the pipe and topic are (or can be) equivalent. Ca we use the topic as pipe by default and make the signature and the use of this function simpler?
Also, the name can be simply Broadcast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first iteration of implementing pipes was made as you recommend. When it was time to apply the new implementation on node.GenerateAndSendBulkTransaction
, I have decided that those bulk transactions will be sent onto another specially created pipe as to not clog the regular transaction pipe on which the interceptors/resolvers are wired. So that is why there are pipe and topic parameters.
BroadcastData became Broadcast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a suggestion, what if we have:
func (netMes *networkMessenger) Broadcast(topic string, buff []byte) {
netMes.BroadcastWithPipe(topic, topic, buff)
}
func (netMes *networkMessenger) BroadcastWithPipe(pipe string, topic string, buff []byte) {}
I don't have a strong opinion about this but I'm aiming to have the signature as brief as possible for the regular caller. If someone is just interesting in making a call to broadcast maybe he should not care what are those pipes and what he should pass as a parameter there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waiting for @AdoAdoAdo's review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I incline towards corcos 2 methods: Broadcast(topic, buffer) and just on the "temporary" GenerateAndSendBulk should the call the one with a different topic. All other broadcasts should not know about the pipe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will add second method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not yet added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added now
- some name changes (networkMessengers + constructors), directSender's Send function - minor code optimization (dataThrottle)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Unrelated to this PR necessarily, but I'll leave it here since this changes affect the containing package. In (for ex.) process/block/check.go, we have wrappers over other structs with no added fields. Can't the methods be added directly to the to the original structure so we would avoid initialisation of other types?
Indeed, this is an issue. Will be taken into account after the architecture will be refactored. |
- some name changes (networkMessengers + constructors), directSender's Send function - minor code optimization (dataThrottle, block interceptor), added types for 2 handler functions in netMessenger - minor comments updates in sendDataThrottle, autodiscovery/main.go
…etwork/elrond-go-sandbox into feat/EN-777-refactor-p2p # Conflicts: # node/mock/messengerStub.go # p2p/libp2p/export_test.go # p2p/libp2p/netMessenger.go # p2p/libp2p/netMessenger_test.go # p2p/p2p.go # process/mock/messengerStub.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed until process/block/interceptors.go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed P2P changes
- changed the callback mechanism when receiving a message from pointer function to interface - Messenger.ConnectedPeers() will return an unique peer list, added test process: - refactored interceptors/resolvers as to not use the pointer function mechanism for message callback - moved containers inside factory package, created tests - changed interceptorsResolvers factory's name + code refactoring to comply with the new callback mechanism - added interface for a subset of Messenger's methods node: - changed factory option name + added more tests spos: - modified SposConsensWorker to implement p2p.TopicValidatorHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second review
@@ -334,14 +334,20 @@ func (netMes *networkMessenger) IsConnected(peerID p2p.PeerID) bool { | |||
func (netMes *networkMessenger) ConnectedPeers() []p2p.PeerID { | |||
peerList := make([]p2p.PeerID, 0) | |||
|
|||
connectedPeers := make(map[p2p.PeerID]struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this change really needed? Changing from a list to a map... and then adding the peers to a list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The map is used for removing p2p.PeerID duplicates. Without it, before every peerList append I should iterate the peerList to check if the peerID needs to be added. This check might prove to be more costly if we have a lot of connections/peers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you be connected 2 times to the same peerID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically yes, libp2p might have n connections for the same peerID, as far as I could see in their implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could use the Peerstore().Peers()
method to get the slice of peers and then check for the connection? I think Peers()
should return a unique list of peer ids
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Peers() returns all known peers. I might add a peer to peerstore but not connect to it.
p2p/libp2p/netMessenger_test.go
Outdated
err := mes.SetTopicValidator("test", func(message p2p.MessageP2P) error { | ||
return nil | ||
}) | ||
err := mes.SetTopicValidator("test", &mock.TopicValidatorHandlerStub{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would call the SetTopicValidator AddTopicValidator because there is a list of topic validators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a possibility to call SetTopicValidator("test", nil) in which case it will remove the topic validator and then the next call to SetTopicValidator with a non nil validator will set the new validator for that topic. So, can be left as it is or split in 2 functions: SetTopicValidator and ResetTopicValidator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think it should be called AddTopicValidator
or RegisterTopicValidator
. Set implies that the only validator will be the one you add at that point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so we can follow libp2p's model: RegisterTopicValidator + UnregisterTopicValidator.
Does this sound better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, it sounds better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only couple small changes, the rest looks good
// ReceivedMessage method redirects the received message to the channel which should handle it | ||
func (sposWorker *SPOSConsensusWorker) ReceivedMessage(message p2p.MessageP2P) error { | ||
// Validate method redirects the received message to the channel which should handle it | ||
func (sposWorker *SPOSConsensusWorker) Validate(message p2p.MessageP2P) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Validate
if it actually does something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the P2P perspective, it needs to validate the message. If the Validate func returns error, messenger won't broadcast the message to its peers. Now, since we already parse the message (in case of all interceptors) and check different things like nil fields, signature, etc, why not store the produced object? Otherwise we should have another registered object/function just for storing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's called Validate it should not do anything else. The flow is validate -> do something if the data was correct. If somebody sees a public Validate method and wants to validate that some data is correct it will end up writing in a channel. Now he must look for whoever reads from that channel and sees if it's a desired behaviour or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to ProcessReceivedMessage
@@ -334,14 +334,20 @@ func (netMes *networkMessenger) IsConnected(peerID p2p.PeerID) bool { | |||
func (netMes *networkMessenger) ConnectedPeers() []p2p.PeerID { | |||
peerList := make([]p2p.PeerID, 0) | |||
|
|||
connectedPeers := make(map[p2p.PeerID]struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could use the Peerstore().Peers()
method to get the slice of peers and then check for the connection? I think Peers()
should return a unique list of peer ids
p2p/libp2p/netMessenger_test.go
Outdated
err := mes.SetTopicValidator("test", func(message p2p.MessageP2P) error { | ||
return nil | ||
}) | ||
err := mes.SetTopicValidator("test", &mock.TopicValidatorHandlerStub{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think it should be called AddTopicValidator
or RegisterTopicValidator
. Set implies that the only validator will be the one you add at that point.
# Conflicts: # cmd/bootnode/main.go # consensus/spos/sposConsensusWorker.go # consensus/spos/sposConsensusWorker_test.go # integrationTests/block/interceptedRequestTxBlockBodyNet_test.go # integrationTests/block/testInitializer.go # integrationTests/transaction/interceptedTxMem_test.go # integrationTests/transaction/testInitializer.go # node/node_test.go # process/block/interceptedBlocks.go # process/block/interceptedBlocks_test.go # process/block/interceptors.go # process/block/interceptors_test.go # process/factory/factory.go # process/factory/factory_test.go # process/transaction/interceptedTransaction.go # process/transaction/interceptor.go # process/transaction/interceptor_test.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed required changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, we should add a configuration option to be able to use mdns when desired. This would also help in merging this branch without breaking master
PR description stated that bootstrap will be unavailabe |
Added the possibility to chose from "off", "mdns" and "kad-dht" discovery methods. The default is mdns. Added integration test to check if 20 peers connect to each other and a message broadcast by one of the peers should reach all other peers. |
- made the peerDiscovery test if all messenger got the message by checking a flag p2p: - changed some tests as Bootstrap initial test may not pass each time
There are 2 tests that do not pass when running with race detector on:
|
- typos fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mdns is implemented now but not triggered when the option is used
p2p/libp2p/netMessenger.go
Outdated
@@ -70,7 +77,13 @@ func NewMockNetworkMessenger(ctx context.Context, mockNet mocknet.Mocknet) (*net | |||
return nil, err | |||
} | |||
|
|||
mes, err := createMessenger(ctx, h, false, loadBalancer.NewOutgoingPipeLoadBalancer()) | |||
mes, err := createMessenger( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we have the MockNetworkMessenger inside the networkMessenger struct? I would create a different file for this structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be renamed into NewMemoryMessenger and placed in memMessenger.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
# Conflicts: # consensus/spos/errors.go # consensus/spos/sposConsensusWorker.go # consensus/spos/sposConsensusWorker_test.go # integrationTests/transaction/interceptedTxMem_test.go # node/node.go # process/interface.go # process/sync/block.go # process/sync/block_test.go
p2p: - created memMessenger file, NewMemoryMessenger func
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed all required changes
Done P2P package refactoring, syntax should be clearer now, peer discovery mechanism was changed to kad-dht instead of mdns, dropped Creator interface, dropped Topic class, added send direct capabilities, added send data throttling capabilities.
All it remains to do is start-up bootstrap.