From fdf3c5390cabbe2c2f251f3152755d7cbb4dbb9b Mon Sep 17 00:00:00 2001 From: noot <36753753+noot@users.noreply.github.com> Date: Wed, 14 Apr 2021 10:31:14 -0400 Subject: [PATCH] fix(dot/network): fix receiving notifications messages from substrate peers (#1517) --- dot/network/block_announce.go | 2 +- dot/network/connmgr.go | 23 +++++++++++---- dot/network/connmgr_test.go | 2 +- dot/network/message.go | 4 +++ dot/network/notifications.go | 25 ++++++++-------- dot/network/service.go | 55 +++++++---------------------------- dot/network/service_test.go | 12 -------- dot/network/sync.go | 18 +++++++----- dot/network/sync_test.go | 6 ++-- dot/network/utils.go | 2 +- dot/sync/syncer.go | 23 ++++++++------- 11 files changed, 74 insertions(+), 98 deletions(-) diff --git a/dot/network/block_announce.go b/dot/network/block_announce.go index 05e1c9b71f..6d22b42b59 100644 --- a/dot/network/block_announce.go +++ b/dot/network/block_announce.go @@ -56,7 +56,7 @@ func (bm *BlockAnnounceMessage) Type() byte { // string formats a BlockAnnounceMessage as a string func (bm *BlockAnnounceMessage) String() string { - return fmt.Sprintf("BlockAnnounceMessage ParentHash=%s Number=%d StateRoot=%sx ExtrinsicsRoot=%s Digest=%v", + return fmt.Sprintf("BlockAnnounceMessage ParentHash=%s Number=%d StateRoot=%s ExtrinsicsRoot=%s Digest=%v", bm.ParentHash, bm.Number, bm.StateRoot, diff --git a/dot/network/connmgr.go b/dot/network/connmgr.go index f10b2fc585..276ad0e6e6 100644 --- a/dot/network/connmgr.go +++ b/dot/network/connmgr.go @@ -20,6 +20,7 @@ import ( "context" "math/rand" "sync" + "time" "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/network" @@ -29,6 +30,10 @@ import ( ma "github.com/multiformats/go-multiaddr" ) +var ( + maxRetries = 12 +) + // ConnManager implements connmgr.ConnManager type ConnManager struct { sync.Mutex @@ -191,10 +196,18 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) { Addrs: addrs, } - err := cm.host.connect(info) - if err != nil { - logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err) - } + go func() { + for i := 0; i < maxRetries; i++ { + err := cm.host.connect(info) + if err != nil { + logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err) + time.Sleep(time.Minute) + continue + } + + return + } + }() // TODO: if number of peers falls below the min desired peer count, we should try to connect to previously discovered peers } @@ -207,7 +220,6 @@ func (cm *ConnManager) registerDisconnectHandler(cb func(peer.ID)) { func (cm *ConnManager) OpenedStream(n network.Network, s network.Stream) { logger.Trace( "Opened stream", - "host", s.Conn().LocalPeer(), "peer", s.Conn().RemotePeer(), "protocol", s.Protocol(), ) @@ -221,7 +233,6 @@ func (cm *ConnManager) registerCloseHandler(protocolID protocol.ID, cb func(id p func (cm *ConnManager) ClosedStream(n network.Network, s network.Stream) { logger.Trace( "Closed stream", - "host", s.Conn().LocalPeer(), "peer", s.Conn().RemotePeer(), "protocol", s.Protocol(), ) diff --git a/dot/network/connmgr_test.go b/dot/network/connmgr_test.go index e760704013..5f6b22bbcb 100644 --- a/dot/network/connmgr_test.go +++ b/dot/network/connmgr_test.go @@ -112,7 +112,7 @@ func TestPersistentPeers(t *testing.T) { require.NotEqual(t, 0, len(conns)) // if A disconnects from B, B should reconnect - nodeA.host.h.Network().ClosePeer(nodeA.host.id()) + nodeA.host.h.Network().ClosePeer(nodeB.host.id()) time.Sleep(time.Millisecond * 500) conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id()) require.NotEqual(t, 0, len(conns)) diff --git a/dot/network/message.go b/dot/network/message.go index 87a60a976f..801d3a1296 100644 --- a/dot/network/message.go +++ b/dot/network/message.go @@ -148,6 +148,10 @@ func (bm *BlockRequestMessage) Decode(in []byte) error { case *pb.BlockRequest_Hash: startingBlock, err = variadic.NewUint64OrHash(common.BytesToHash(from.Hash)) case *pb.BlockRequest_Number: + // TODO: we are receiving block requests w/ 4-byte From field; did the format change? + if len(from.Number) != 8 { + return errors.New("invalid BlockResponseMessage.From; uint64 is not 8 bytes") + } startingBlock, err = variadic.NewUint64OrHash(binary.LittleEndian.Uint64(from.Number)) default: err = errors.New("invalid StartingBlock") diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 9d36695379..44ff1eddcb 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -131,7 +131,6 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, err := handshakeValidator(peer, hs) if err != nil { logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err) - _ = stream.Conn().Close() return errCannotValidateHandshake } @@ -141,17 +140,17 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, // once validated, send back a handshake resp, err := info.getHandshake() if err != nil { - logger.Debug("failed to get handshake", "protocol", info.protocolID, "error", err) + logger.Warn("failed to get handshake", "protocol", info.protocolID, "error", err) return err } - err = s.host.send(peer, info.protocolID, resp) + err = s.host.writeToStream(stream, resp) if err != nil { logger.Trace("failed to send handshake", "protocol", info.protocolID, "peer", peer, "error", err) - _ = stream.Conn().Close() return err } logger.Trace("receiver: sent handshake", "protocol", info.protocolID, "peer", peer) + return nil } // if we are the initiator and haven't received the handshake already, validate it @@ -161,7 +160,6 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, if err != nil { logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err) hsData.validated = false - _ = stream.Conn().Close() return errCannotValidateHandshake } @@ -175,7 +173,7 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, // if we are the initiator, send the message if hsData, has := info.getHandshakeData(peer); has && hsData.validated && hsData.received && hsData.outboundMsg != nil { logger.Trace("sender: sending message", "protocol", info.protocolID) - err := s.host.send(peer, info.protocolID, hsData.outboundMsg) + err := s.host.writeToStream(stream, hsData.outboundMsg) if err != nil { logger.Debug("failed to send message", "protocol", info.protocolID, "peer", peer, "error", err) return err @@ -197,11 +195,14 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, } // TODO: improve this by keeping track of who you've received/sent messages from - if !s.noGossip { - seen := s.gossip.hasSeen(msg) - if !seen { - s.broadcastExcluding(info, peer, msg) - } + if s.noGossip { + return nil + } + + seen := s.gossip.hasSeen(msg) + if !seen { + // TODO: update this to write to stream w/ handshake established + s.broadcastExcluding(info, peer, msg) } return nil @@ -261,7 +262,7 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer } if err != nil { - logger.Error("failed to send message to peer", "peer", peer, "error", err) + logger.Debug("failed to send message to peer", "peer", peer, "error", err) } } } diff --git a/dot/network/service.go b/dot/network/service.go index 6c454d611a..cbcb4623de 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -145,7 +145,7 @@ func NewService(cfg *Config) (*Service, error) { } network.syncQueue = newSyncQueue(network) - + network.noGossip = true // TODO: remove once duplicate message sending is merged return network, err } @@ -281,40 +281,8 @@ func (s *Service) logPeerCount() { func (s *Service) handleConn(conn libp2pnetwork.Conn) { // give new peers a slight weight + // TODO: do this once handshake is received s.syncQueue.updatePeerScore(conn.RemotePeer(), 1) - - s.notificationsMu.Lock() - defer s.notificationsMu.Unlock() - - info, has := s.notificationsProtocols[BlockAnnounceMsgType] - if !has { - // this shouldn't happen - logger.Warn("block announce protocol is not yet registered!") - return - } - - // open block announce substream - hs, err := info.getHandshake() - if err != nil { - logger.Warn("failed to get handshake", "protocol", blockAnnounceID, "error", err) - return - } - - info.mapMu.RLock() - defer info.mapMu.RUnlock() - - peer := conn.RemotePeer() - if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { //nolint - info.handshakeData.Store(peer, &handshakeData{ - validated: false, - }) - - logger.Trace("sending handshake", "protocol", info.protocolID, "peer", peer, "message", hs) - err = s.host.send(peer, info.protocolID, hs) - if err != nil { - logger.Trace("failed to send block announce handshake to peer", "peer", peer, "error", err) - } - } } func (s *Service) beginDiscovery() error { @@ -528,7 +496,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder if err == io.EOF { continue } else if err != nil { - logger.Trace("failed to read from stream", "protocol", stream.Protocol(), "error", err) + logger.Trace("failed to read from stream", "peer", stream.Conn().RemotePeer(), "protocol", stream.Protocol(), "error", err) _ = stream.Close() return } @@ -541,21 +509,18 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder } logger.Trace( - "Received message from peer", + "received message from peer", "host", s.host.id(), "peer", peer, "msg", msg.String(), ) - go func() { - // handle message based on peer status and message type - err = handler(stream, msg) - if err != nil { - logger.Trace("Failed to handle message from stream", "message", msg, "error", err) - _ = stream.Close() - return - } - }() + err = handler(stream, msg) + if err != nil { + logger.Debug("failed to handle message from stream", "message", msg, "error", err) + _ = stream.Close() + return + } } } diff --git a/dot/network/service_test.go b/dot/network/service_test.go index a63b446398..2d099b0334 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -438,16 +438,4 @@ func TestHandleConn(t *testing.T) { aScore, ok := nodeB.syncQueue.peerScore.Load(nodeA.host.id()) require.True(t, ok) require.Equal(t, 1, aScore) - - infoA := nodeA.notificationsProtocols[BlockAnnounceMsgType] - hsDataB, has := infoA.getHandshakeData(nodeB.host.id()) - require.True(t, has) - require.True(t, hsDataB.received) - require.True(t, hsDataB.validated) - - infoB := nodeB.notificationsProtocols[BlockAnnounceMsgType] - hsDataA, has := infoB.getHandshakeData(nodeA.host.id()) - require.True(t, has) - require.True(t, hsDataA.received) - require.True(t, hsDataA.validated) } diff --git a/dot/network/sync.go b/dot/network/sync.go index 5eb3b87d7d..125c2dcdae 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -18,6 +18,7 @@ package network import ( "context" + "errors" "fmt" "reflect" "sort" @@ -29,6 +30,7 @@ import ( "github.com/ChainSafe/gossamer/lib/common/optional" "github.com/ChainSafe/gossamer/lib/common/variadic" + "github.com/ChainSafe/chaindb" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" ) @@ -182,7 +184,7 @@ func (q *syncQueue) syncAtHead() { for { select { // sleep for average block time TODO: make this configurable from slot duration - case <-time.After(q.slotDuration): + case <-time.After(q.slotDuration * 2): case <-q.ctx.Done(): return } @@ -689,14 +691,14 @@ func (q *syncQueue) handleBlockJustification(data []*types.BlockData) { } func (q *syncQueue) handleBlockData(data []*types.BlockData) { - bestNum, err := q.s.blockState.BestBlockNumber() + finalized, err := q.s.blockState.GetFinalizedHeader(0, 0) if err != nil { panic(err) // TODO: don't panic but try again. seems blockState needs better concurrency handling } end := data[len(data)-1].Number().Int64() - if end <= bestNum.Int64() { - logger.Debug("ignoring block data that is below our head", "got", end, "head", bestNum.Int64()) + if end <= finalized.Number.Int64() { + logger.Debug("ignoring block data that is below our head", "got", end, "head", finalized.Number.Int64()) q.pushRequest(uint64(end+1), blockRequestBufferSize, "") return } @@ -736,7 +738,7 @@ func (q *syncQueue) handleBlockData(data []*types.BlockData) { func (q *syncQueue) handleBlockDataFailure(idx int, err error, data []*types.BlockData) { logger.Warn("failed to handle block data", "failed on block", q.currStart+int64(idx), "error", err) - if err.Error() == "failed to get parent hash: Key not found" { // TODO: unwrap err + if errors.Is(err, chaindb.ErrKeyNotFound) { header, err := types.NewHeaderFromOptional(data[idx].Header) if err != nil { logger.Debug("failed to get header from BlockData", "idx", idx, "error", err) @@ -787,7 +789,6 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID) return } - logger.Debug("received BlockAnnounce!", "number", msg.Number, "hash", header.Hash(), "from", from) has, _ := q.s.blockState.HasBlockBody(header.Hash()) if has { return @@ -797,13 +798,16 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID) return } + q.goal = header.Number.Int64() + bestNum, err := q.s.blockState.BestBlockNumber() if err != nil { logger.Error("failed to get best block number", "error", err) return } - q.goal = header.Number.Int64() + // TODO: if we're at the head, this should request by hash instead of number, since there will + // certainly be blocks with the same number. q.pushRequest(uint64(bestNum.Int64()+1), blockRequestBufferSize, from) } diff --git a/dot/network/sync_test.go b/dot/network/sync_test.go index c65d700952..fba9b11e05 100644 --- a/dot/network/sync_test.go +++ b/dot/network/sync_test.go @@ -28,6 +28,7 @@ import ( "github.com/ChainSafe/gossamer/lib/common/optional" "github.com/ChainSafe/gossamer/lib/utils" + "github.com/ChainSafe/chaindb" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" ) @@ -425,9 +426,10 @@ func TestSyncQueue_SyncAtHead(t *testing.T) { q.stop() time.Sleep(time.Second) q.ctx = context.Background() + q.slotDuration = time.Millisecond * 100 go q.syncAtHead() - time.Sleep(time.Millisecond * 6100) + time.Sleep(q.slotDuration * 3) select { case req := <-q.requestCh: require.Equal(t, uint64(2), req.req.StartingBlock.Uint64()) @@ -500,7 +502,7 @@ func TestSyncQueue_handleBlockDataFailure_MissingParent(t *testing.T) { q.ctx = context.Background() data := testBlockResponseMessage().BlockData - q.handleBlockDataFailure(0, fmt.Errorf("failed to get parent hash: Key not found"), data) + q.handleBlockDataFailure(0, fmt.Errorf("some error: %w", chaindb.ErrKeyNotFound), data) select { case req := <-q.requestCh: require.True(t, req.req.StartingBlock.IsHash()) diff --git a/dot/network/utils.go b/dot/network/utils.go index 62e9c53c00..f3bf385e7c 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -189,7 +189,7 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) { } if length == 0 { - return 0, err // TODO: return bytes read from readLEB128ToUint64 + return 0, nil // msg length of 0 is allowed, for example transactions handshake } // TODO: check if length > len(buf), if so probably log.Crit diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index 2348aeeaeb..5b4bd4d302 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -133,18 +133,19 @@ func (s *Service) HandleBlockAnnounce(msg *network.BlockAnnounceMessage) error { } // save block header if we don't have it already - if !has { - err = s.blockState.SetHeader(header) - if err != nil { - return err - } - logger.Debug( - "saved block header to block state", - "number", header.Number, - "hash", header.Hash(), - ) + if has { + return nil } + err = s.blockState.SetHeader(header) + if err != nil { + return err + } + logger.Debug( + "saved block header to block state", + "number", header.Number, + "hash", header.Hash(), + ) return nil } @@ -319,7 +320,7 @@ func (s *Service) handleBlock(block *types.Block) error { if err != nil { return err } - logger.Trace("stored resulting state", "state root", ts.MustRoot()) + logger.Trace("executed block and stored resulting state", "state root", ts.MustRoot()) // TODO: batch writes in AddBlock err = s.blockState.AddBlock(block)