Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: clear new p2p net handlers on fast catchup #6127

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,7 @@ func (handler *TxHandler) Start() {

// libp2p pubsub validator and handler abstracted as TaggedMessageProcessor
handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{
{
Tag: protocol.TxnTag,
// create anonymous struct to hold the two functions and satisfy the network.MessageProcessor interface
MessageHandler: struct {
network.ValidateHandleFunc
}{
network.ValidateHandleFunc(handler.validateIncomingTxMessage),
},
},
{Tag: protocol.TxnTag, MessageHandler: network.ValidateHandleFunc(handler.validateIncomingTxMessage)},
})

handler.backlogWg.Add(2)
Expand Down
4 changes: 2 additions & 2 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type GossipNode interface {
// Currently used as p2p pubsub topic validators.
RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler)

// ClearProcessors deregisters all the existing message processors.
ClearProcessors()
// ClearValidatorHandlers deregisters all the existing message processors.
ClearValidatorHandlers()

// GetHTTPClient returns a http.Client with a suitable for the network Transport
// that would also limit the number of outgoing connections.
Expand Down
8 changes: 4 additions & 4 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@
n.wsNetwork.RegisterValidatorHandlers(dispatch)
}

// ClearProcessors deregisters all the existing message processors.
func (n *HybridP2PNetwork) ClearProcessors() {
n.p2pNetwork.ClearProcessors()
n.wsNetwork.ClearProcessors()
// ClearValidatorHandlers deregisters all the existing message processors.
func (n *HybridP2PNetwork) ClearValidatorHandlers() {
n.p2pNetwork.ClearValidatorHandlers()
n.wsNetwork.ClearValidatorHandlers()

Check warning on line 205 in network/hybridNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/hybridNetwork.go#L203-L205

Added lines #L203 - L205 were not covered by tests
}

// GetHTTPClient returns a http.Client with a suitable for the network Transport
Expand Down
9 changes: 6 additions & 3 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@
p2phttpMux: mux.NewRouter(),
}
// libp2phttp server requires either explicit ListenAddrs or streamHost.Addrs() to be non-empty.
// If streamHost.Addrs() is empty, we will listen on all interfaces
// If streamHost.Addrs() is empty (that happens when NetAddress is set to ":0" and private address filtering is automatically enabled),
// we will listen on localhost to satisfy libp2phttp.Host.Serve() requirements.
// A side effect is it actually starts listening on interfaces listed in ListenAddrs and as go-libp2p v0.33.2
// there is no other way to have libp2phttp server running AND to have streamHost.Addrs() filtered.
if len(streamHost.Addrs()) == 0 {
logging.Base().Debugf("MakeHTTPServer: no addresses for %s, asking to listen all interfaces", streamHost.ID())
logging.Base().Debugf("MakeHTTPServer: no addresses for %s, asking to listen localhost interface to satisfy libp2phttp.Host.Serve ", streamHost.ID())

Check warning on line 56 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L56

Added line #L56 was not covered by tests
httpServer.ListenAddrs = []multiaddr.Multiaddr{
multiaddr.StringCast("/ip4/0.0.0.0/tcp/0/http"),
multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http"),

Check warning on line 58 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L58

Added line #L58 was not covered by tests
}
httpServer.InsecureAllowHTTP = true
}
Expand Down
4 changes: 2 additions & 2 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,8 @@
n.handler.RegisterValidatorHandlers(dispatch)
}

// ClearProcessors deregisters all the existing message handlers.
func (n *P2PNetwork) ClearProcessors() {
// ClearValidatorHandlers deregisters all the existing message handlers.
func (n *P2PNetwork) ClearValidatorHandlers() {

Check warning on line 710 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L710

Added line #L710 was not covered by tests
n.handler.ClearValidatorHandlers([]Tag{})
}

Expand Down
2 changes: 1 addition & 1 deletion network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func TestP2PRelay(t *testing.T) {
counter.Store(0)
var loggedMsgs [][]byte
counterHandler, counterDone = makeCounterHandler(expectedMsgs, &counter, &loggedMsgs)
netA.ClearProcessors()
netA.ClearValidatorHandlers()
netA.RegisterValidatorHandlers(counterHandler)

for i := 0; i < expectedMsgs/2; i++ {
Expand Down
4 changes: 2 additions & 2 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,8 @@
func (wn *WebsocketNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) {
}

// ClearProcessors deregisters all the existing message handlers.
func (wn *WebsocketNetwork) ClearProcessors() {
// ClearValidatorHandlers deregisters all the existing message handlers.
func (wn *WebsocketNetwork) ClearValidatorHandlers() {

Check warning on line 829 in network/wsNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/wsNetwork.go#L829

Added line #L829 was not covered by tests
}

func (wn *WebsocketNetwork) setHeaders(header http.Header) {
Expand Down
2 changes: 2 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ func (node *AlgorandFullNode) Stop() {
}()

node.net.ClearHandlers()
node.net.ClearValidatorHandlers()
if !node.config.DisableNetworking {
node.net.Stop()
}
Expand Down Expand Up @@ -1218,6 +1219,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
node.waitMonitoringRoutines()
}()
node.net.ClearHandlers()
node.net.ClearValidatorHandlers()
node.stateProofWorker.Stop()
node.txHandler.Stop()
node.agreementService.Shutdown()
Expand Down
47 changes: 45 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,11 @@ func TestDefaultResourcePaths(t *testing.T) {
log := logging.Base()

n, err := MakeFull(log, testDirectory, cfg, []string{}, genesis)
require.NoError(t, err)

n.Start()
defer n.Stop()

require.NoError(t, err)

// confirm genesis dir exists in the data dir, and that resources exist in the expected locations
require.DirExists(t, filepath.Join(testDirectory, genesis.ID()))

Expand Down Expand Up @@ -1073,3 +1072,47 @@ func TestNodeP2PRelays(t *testing.T) {
return len(nodes[2].net.GetPeers(network.PeersPhonebookRelays)) == 2
}, 80*time.Second, 1*time.Second)
}

// TestNodeSetCatchpointCatchupMode checks node can handle services restart for fast catchup correctly
func TestNodeSetCatchpointCatchupMode(t *testing.T) {
partitiontest.PartitionTest(t)

testDirectory := t.TempDir()

genesis := bookkeeping.Genesis{
SchemaID: "gen",
Proto: protocol.ConsensusCurrentVersion,
Network: config.Devtestnet,
FeeSink: sinkAddr.String(),
RewardsPool: poolAddr.String(),
}
log := logging.TestingLog(t)
cfg := config.GetDefaultLocal()

tests := []struct {
name string
enableP2P bool
}{
{"WS node", false},
{"P2P node", true},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cfg.EnableP2P = test.enableP2P

n, err := MakeFull(log, testDirectory, cfg, []string{}, genesis)
require.NoError(t, err)
err = n.Start()
require.NoError(t, err)
defer n.Stop()

// "start" catchpoint catchup => close services
outCh := n.SetCatchpointCatchupMode(true)
<-outCh
// "stop" catchpoint catchup => resume services
outCh = n.SetCatchpointCatchupMode(false)
<-outCh
})
}
}
Loading