diff --git a/data/txHandler.go b/data/txHandler.go index 7ee5764137..ec3a84cc1f 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -257,15 +257,7 @@ func (handler *TxHandler) Start() { // libp2p pubsub validator and handler abstracted as TaggedMessageProcessor handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{ - { - Tag: protocol.TxnTag, - // create anonymous struct to hold the two functions and satisfy the network.MessageProcessor interface - MessageHandler: struct { - network.ValidateHandleFunc - }{ - network.ValidateHandleFunc(handler.validateIncomingTxMessage), - }, - }, + {Tag: protocol.TxnTag, MessageHandler: network.ValidateHandleFunc(handler.validateIncomingTxMessage)}, }) handler.backlogWg.Add(2) diff --git a/network/gossipNode.go b/network/gossipNode.go index 86a7b42c55..91fb3506bc 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -86,8 +86,8 @@ type GossipNode interface { // Currently used as p2p pubsub topic validators. RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) - // ClearProcessors deregisters all the existing message processors. - ClearProcessors() + // ClearValidatorHandlers deregisters all the existing message processors. + ClearValidatorHandlers() // GetHTTPClient returns a http.Client with a suitable for the network Transport // that would also limit the number of outgoing connections. diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 85621260a9..20022898a6 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -199,10 +199,10 @@ func (n *HybridP2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageVal n.wsNetwork.RegisterValidatorHandlers(dispatch) } -// ClearProcessors deregisters all the existing message processors. -func (n *HybridP2PNetwork) ClearProcessors() { - n.p2pNetwork.ClearProcessors() - n.wsNetwork.ClearProcessors() +// ClearValidatorHandlers deregisters all the existing message processors. +func (n *HybridP2PNetwork) ClearValidatorHandlers() { + n.p2pNetwork.ClearValidatorHandlers() + n.wsNetwork.ClearValidatorHandlers() } // GetHTTPClient returns a http.Client with a suitable for the network Transport diff --git a/network/p2p/http.go b/network/p2p/http.go index f11b5375ab..18497f403e 100644 --- a/network/p2p/http.go +++ b/network/p2p/http.go @@ -48,11 +48,14 @@ func MakeHTTPServer(streamHost host.Host) *HTTPServer { p2phttpMux: mux.NewRouter(), } // libp2phttp server requires either explicit ListenAddrs or streamHost.Addrs() to be non-empty. - // If streamHost.Addrs() is empty, we will listen on all interfaces + // If streamHost.Addrs() is empty (that happens when NetAddress is set to ":0" and private address filtering is automatically enabled), + // we will listen on localhost to satisfy libp2phttp.Host.Serve() requirements. + // A side effect is it actually starts listening on interfaces listed in ListenAddrs and as go-libp2p v0.33.2 + // there is no other way to have libp2phttp server running AND to have streamHost.Addrs() filtered. if len(streamHost.Addrs()) == 0 { - logging.Base().Debugf("MakeHTTPServer: no addresses for %s, asking to listen all interfaces", streamHost.ID()) + logging.Base().Debugf("MakeHTTPServer: no addresses for %s, asking to listen localhost interface to satisfy libp2phttp.Host.Serve ", streamHost.ID()) httpServer.ListenAddrs = []multiaddr.Multiaddr{ - multiaddr.StringCast("/ip4/0.0.0.0/tcp/0/http"), + multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http"), } httpServer.InsecureAllowHTTP = true } diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index aa991d0429..0968db7b77 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -706,8 +706,8 @@ func (n *P2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidator n.handler.RegisterValidatorHandlers(dispatch) } -// ClearProcessors deregisters all the existing message handlers. -func (n *P2PNetwork) ClearProcessors() { +// ClearValidatorHandlers deregisters all the existing message handlers. +func (n *P2PNetwork) ClearValidatorHandlers() { n.handler.ClearValidatorHandlers([]Tag{}) } diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index d18196c4c4..e5e4400d46 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -963,7 +963,7 @@ func TestP2PRelay(t *testing.T) { counter.Store(0) var loggedMsgs [][]byte counterHandler, counterDone = makeCounterHandler(expectedMsgs, &counter, &loggedMsgs) - netA.ClearProcessors() + netA.ClearValidatorHandlers() netA.RegisterValidatorHandlers(counterHandler) for i := 0; i < expectedMsgs/2; i++ { diff --git a/network/wsNetwork.go b/network/wsNetwork.go index c67200f01b..5ab45e0406 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -825,8 +825,8 @@ func (wn *WebsocketNetwork) ClearHandlers() { func (wn *WebsocketNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) { } -// ClearProcessors deregisters all the existing message handlers. -func (wn *WebsocketNetwork) ClearProcessors() { +// ClearValidatorHandlers deregisters all the existing message handlers. +func (wn *WebsocketNetwork) ClearValidatorHandlers() { } func (wn *WebsocketNetwork) setHeaders(header http.Header) { diff --git a/node/node.go b/node/node.go index 04d2ced84c..dddb3203e3 100644 --- a/node/node.go +++ b/node/node.go @@ -452,6 +452,7 @@ func (node *AlgorandFullNode) Stop() { }() node.net.ClearHandlers() + node.net.ClearValidatorHandlers() if !node.config.DisableNetworking { node.net.Stop() } @@ -1218,6 +1219,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo node.waitMonitoringRoutines() }() node.net.ClearHandlers() + node.net.ClearValidatorHandlers() node.stateProofWorker.Stop() node.txHandler.Stop() node.agreementService.Shutdown() diff --git a/node/node_test.go b/node/node_test.go index 19463177df..d35ac43d98 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -606,12 +606,11 @@ func TestDefaultResourcePaths(t *testing.T) { log := logging.Base() n, err := MakeFull(log, testDirectory, cfg, []string{}, genesis) + require.NoError(t, err) n.Start() defer n.Stop() - require.NoError(t, err) - // confirm genesis dir exists in the data dir, and that resources exist in the expected locations require.DirExists(t, filepath.Join(testDirectory, genesis.ID())) @@ -1073,3 +1072,47 @@ func TestNodeP2PRelays(t *testing.T) { return len(nodes[2].net.GetPeers(network.PeersPhonebookRelays)) == 2 }, 80*time.Second, 1*time.Second) } + +// TestNodeSetCatchpointCatchupMode checks node can handle services restart for fast catchup correctly +func TestNodeSetCatchpointCatchupMode(t *testing.T) { + partitiontest.PartitionTest(t) + + testDirectory := t.TempDir() + + genesis := bookkeeping.Genesis{ + SchemaID: "gen", + Proto: protocol.ConsensusCurrentVersion, + Network: config.Devtestnet, + FeeSink: sinkAddr.String(), + RewardsPool: poolAddr.String(), + } + log := logging.TestingLog(t) + cfg := config.GetDefaultLocal() + + tests := []struct { + name string + enableP2P bool + }{ + {"WS node", false}, + {"P2P node", true}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cfg.EnableP2P = test.enableP2P + + n, err := MakeFull(log, testDirectory, cfg, []string{}, genesis) + require.NoError(t, err) + err = n.Start() + require.NoError(t, err) + defer n.Stop() + + // "start" catchpoint catchup => close services + outCh := n.SetCatchpointCatchupMode(true) + <-outCh + // "stop" catchpoint catchup => resume services + outCh = n.SetCatchpointCatchupMode(false) + <-outCh + }) + } +}