From 1ed00364d21a5a504d0e25531c2ba3916c2e933c Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 19 Dec 2024 14:47:13 -0400 Subject: [PATCH] refactor: use peerInfo instead of peerID --- cmd/waku/node.go | 4 ++-- cmd/waku/server/rest/admin.go | 3 ++- library/node.go | 2 +- waku/v2/api/common/storenode_requestor.go | 2 +- waku/v2/api/history/cycle.go | 18 +++++++++++++++++ waku/v2/api/history/history.go | 20 +++++++++---------- waku/v2/api/history/history_test.go | 8 ++++---- waku/v2/api/missing/criteria_interest.go | 4 ++-- waku/v2/api/missing/default_requestor.go | 8 ++++---- waku/v2/api/missing/missing_messages.go | 12 +++++------ waku/v2/api/publish/default_verifier.go | 4 ++-- waku/v2/api/publish/message_check.go | 10 +++++----- waku/v2/node/keepalive_test.go | 2 +- waku/v2/node/wakunode2.go | 4 ++-- waku/v2/node/wakunode2_test.go | 4 ++-- waku/v2/peermanager/peer_manager.go | 17 +++++++++------- waku/v2/peermanager/peer_manager_test.go | 17 ++++++++-------- waku/v2/protocol/filter/client.go | 3 ++- waku/v2/protocol/filter/test_utils.go | 3 ++- .../legacy_store/waku_store_client.go | 2 +- waku/v2/protocol/lightpush/waku_lightpush.go | 3 ++- waku/v2/protocol/peer_exchange/client.go | 3 ++- waku/v2/protocol/store/client.go | 6 +++--- waku/v2/protocol/store/options.go | 6 +++--- 24 files changed, 96 insertions(+), 69 deletions(-) diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 61fdf4fb1..8901c2182 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -386,7 +386,7 @@ func Execute(options NodeOptions) error { if options.PeerExchange.Enable && options.PeerExchange.Node != nil { logger.Info("retrieving peer info via peer exchange protocol") - peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static, + peerID, err := wakuNode.AddPeer([]multiaddr.Multiaddr{*options.PeerExchange.Node}, wakupeerstore.Static, pubSubTopicMapKeys, peer_exchange.PeerExchangeID_v20alpha1) if err != nil { logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err)) @@ -481,7 +481,7 @@ func processTopics(options NodeOptions) (map[string][]string, error) { func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, pubSubTopics []string, protocols ...protocol.ID) error { for _, addr := range addresses { - _, err := wakuNode.AddPeer(addr, wakupeerstore.Static, pubSubTopics, protocols...) + _, err := wakuNode.AddPeer([]multiaddr.Multiaddr{addr}, wakupeerstore.Static, pubSubTopics, protocols...) if err != nil { return fmt.Errorf("could not add static peer: %w", err) } diff --git a/cmd/waku/server/rest/admin.go b/cmd/waku/server/rest/admin.go index 7823c46bc..e767ebf5f 100644 --- a/cmd/waku/server/rest/admin.go +++ b/cmd/waku/server/rest/admin.go @@ -7,6 +7,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/cmd/waku/server" "github.com/waku-org/go-waku/logging" @@ -117,7 +118,7 @@ func (a *AdminService) postV1Peer(w http.ResponseWriter, req *http.Request) { protos = append(protos, protocol.ID(proto)) } - id, err := a.node.AddPeer(addr, peerstore.Static, topics, protos...) + id, err := a.node.AddPeer([]multiaddr.Multiaddr{addr}, peerstore.Static, topics, protos...) if err != nil { a.log.Error("failed to add peer", zap.Error(err)) writeErrOrResponse(w, err, nil) diff --git a/library/node.go b/library/node.go index 5232f4d92..47eea0f0d 100644 --- a/library/node.go +++ b/library/node.go @@ -352,7 +352,7 @@ func AddPeer(instance *WakuInstance, address string, protocolID string) (string, return "", err } - peerID, err := instance.node.AddPeer(ma, peerstore.Static, instance.relayTopics, libp2pProtocol.ID(protocolID)) + peerID, err := instance.node.AddPeer([]multiaddr.Multiaddr{ma}, peerstore.Static, instance.relayTopics, libp2pProtocol.ID(protocolID)) if err != nil { return "", err } diff --git a/waku/v2/api/common/storenode_requestor.go b/waku/v2/api/common/storenode_requestor.go index 8a723c9e6..a5076b3f6 100644 --- a/waku/v2/api/common/storenode_requestor.go +++ b/waku/v2/api/common/storenode_requestor.go @@ -8,5 +8,5 @@ import ( ) type StorenodeRequestor interface { - Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error) + Query(ctx context.Context, peerInfo peer.AddrInfo, query *pb.StoreQueryRequest) (StoreRequestResult, error) } diff --git a/waku/v2/api/history/cycle.go b/waku/v2/api/history/cycle.go index 5da8d0ee4..c4976110a 100644 --- a/waku/v2/api/history/cycle.go +++ b/waku/v2/api/history/cycle.go @@ -338,6 +338,24 @@ func (m *StorenodeCycle) GetActiveStorenode() peer.ID { return m.activeStorenode } +func (m *StorenodeCycle) GetActiveStorenodePeerInfo() peer.AddrInfo { + m.RLock() + defer m.RUnlock() + + storeNodes, err := m.storenodeConfigProvider.Storenodes() + if err != nil { + return peer.AddrInfo{} + } + + for _, p := range storeNodes { + if p.ID == m.activeStorenode { + return p + } + } + + return peer.AddrInfo{} +} + func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool { return m.storenodeStatus(peerID) == connected } diff --git a/waku/v2/api/history/history.go b/waku/v2/api/history/history.go index 004cd1567..c61bb2e14 100644 --- a/waku/v2/api/history/history.go +++ b/waku/v2/api/history/history.go @@ -37,7 +37,7 @@ type HistoryRetriever struct { type HistoryProcessor interface { OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error - OnRequestFailed(requestID []byte, peerID peer.ID, err error) + OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error) } func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { @@ -51,7 +51,7 @@ func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor Histo func (hr *HistoryRetriever) Query( ctx context.Context, criteria store.FilterCriteria, - storenodeID peer.ID, + storenode peer.AddrInfo, pageLimit uint64, shouldProcessNextPage func(int) (bool, uint64), processEnvelopes bool, @@ -178,7 +178,7 @@ loop: newCriteria.TimeStart = timeStart newCriteria.TimeEnd = timeEnd - cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger) + cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenode, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger) queryCancel() if err != nil { @@ -241,7 +241,7 @@ loop: func (hr *HistoryRetriever) createMessagesRequest( ctx context.Context, - peerID peer.ID, + peerInfo peer.AddrInfo, criteria store.FilterCriteria, cursor []byte, limit uint64, @@ -257,7 +257,7 @@ func (hr *HistoryRetriever) createMessagesRequest( }) go func() { - storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes) + storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, processEnvelopes) resultCh <- struct { storeCursor []byte envelopesCount int @@ -273,7 +273,7 @@ func (hr *HistoryRetriever) createMessagesRequest( } } else { go func() { - _, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false) + _, _, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, false) if err != nil { logger.Error("failed to request store messages", zap.Error(err)) } @@ -283,9 +283,9 @@ func (hr *HistoryRetriever) createMessagesRequest( return } -func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) { +func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerInfo peer.AddrInfo, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) { requestID := protocol.GenerateRequestID() - logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID)) + logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerInfo.ID)) logger.Debug("store.query", logging.Timep("startTime", criteria.TimeStart), @@ -307,12 +307,12 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee } queryStart := time.Now() - result, err := hr.store.Query(ctx, peerID, storeQueryRequest) + result, err := hr.store.Query(ctx, peerInfo, storeQueryRequest) queryDuration := time.Since(queryStart) if err != nil { logger.Error("error querying storenode", zap.Error(err)) - hr.historyProcessor.OnRequestFailed(requestID, peerID, err) + hr.historyProcessor.OnRequestFailed(requestID, peerInfo, err) return nil, 0, err } diff --git a/waku/v2/api/history/history_test.go b/waku/v2/api/history/history_test.go index 460b25d2a..8f4a829ed 100644 --- a/waku/v2/api/history/history_test.go +++ b/waku/v2/api/history/history_test.go @@ -70,7 +70,7 @@ func (h *mockHistoryProcessor) OnEnvelope(env *protocol.Envelope, processEnvelop return nil } -func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerID peer.ID, err error) { +func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error) { } func newMockHistoryProcessor() *mockHistoryProcessor { @@ -92,7 +92,7 @@ func getInitialResponseKey(contentTopics []string) string { return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...)) } -func (t *mockStore) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) { +func (t *mockStore) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) { result := &mockResult{} if len(storeQueryRequest.GetPaginationCursor()) == 0 { initialResponse := getInitialResponseKey(storeQueryRequest.GetContentTopics()) @@ -218,7 +218,7 @@ func TestSuccessBatchExecution(t *testing.T) { ContentFilter: protocol.NewContentFilter("test", topics...), } - err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true) + err = historyRetriever.Query(ctx, criteria, peer.AddrInfo{ID: storenodeID}, 10, func(i int) (bool, uint64) { return true, 10 }, true) require.NoError(t, err) } @@ -246,6 +246,6 @@ func TestFailedBatchExecution(t *testing.T) { ContentFilter: protocol.NewContentFilter("test", topics...), } - err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true) + err = historyRetriever.Query(ctx, criteria, peer.AddrInfo{ID: storenodeID}, 10, func(i int) (bool, uint64) { return true, 10 }, true) require.Error(t, err) } diff --git a/waku/v2/api/missing/criteria_interest.go b/waku/v2/api/missing/criteria_interest.go index 919b2fc91..19aa7b84d 100644 --- a/waku/v2/api/missing/criteria_interest.go +++ b/waku/v2/api/missing/criteria_interest.go @@ -10,7 +10,7 @@ import ( ) type criteriaInterest struct { - peerID peer.ID + peerInfo peer.AddrInfo contentFilter protocol.ContentFilter lastChecked time.Time @@ -19,7 +19,7 @@ type criteriaInterest struct { } func (c criteriaInterest) equals(other criteriaInterest) bool { - if c.peerID != other.peerID { + if c.peerInfo.ID != other.peerInfo.ID { return false } diff --git a/waku/v2/api/missing/default_requestor.go b/waku/v2/api/missing/default_requestor.go index 382821735..a72af3c55 100644 --- a/waku/v2/api/missing/default_requestor.go +++ b/waku/v2/api/missing/default_requestor.go @@ -20,10 +20,10 @@ type defaultStorenodeRequestor struct { store *store.WakuStore } -func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) { - return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize)) +func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) { + return d.store.QueryByHash(ctx, messageHashes, store.WithPeerAddr(peerInfo.Addrs...), store.WithPaging(false, pageSize)) } -func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) { - return d.store.RequestRaw(ctx, peerID, storeQueryRequest) +func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) { + return d.store.RequestRaw(ctx, peerInfo, storeQueryRequest) } diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index 927ffb9c9..f36068fd6 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -71,13 +71,13 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes } } -func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilter protocol.ContentFilter) { +func (m *MissingMessageVerifier) SetCriteriaInterest(peerInfo peer.AddrInfo, contentFilter protocol.ContentFilter) { m.criteriaInterestMu.Lock() defer m.criteriaInterestMu.Unlock() ctx, cancel := context.WithCancel(m.ctx) criteriaInterest := criteriaInterest{ - peerID: peerID, + peerInfo: peerInfo, contentFilter: contentFilter, lastChecked: m.timesource.Now().Add(-m.params.delay), ctx: ctx, @@ -190,7 +190,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter } m.logger.Error("could not fetch history", - zap.Stringer("peerID", interest.peerID), + zap.Stringer("peerID", interest.peerInfo.ID), zap.String("pubsubTopic", interest.contentFilter.PubsubTopic), zap.Strings("contentTopics", contentTopics)) continue @@ -233,7 +233,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, contentTopics := interest.contentFilter.ContentTopics.ToList() logger := m.logger.With( - zap.Stringer("peerID", interest.peerID), + zap.Stringer("peerID", interest.peerInfo.ID), zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]), zap.String("pubsubTopic", interest.contentFilter.PubsubTopic), logging.Epoch("from", interest.lastChecked), @@ -252,7 +252,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, return m.storenodeRequestor.Query( ctx, - interest.peerID, + interest.peerInfo, storeQueryRequest, ) }, logger, "retrieving history to check for missing messages") @@ -335,7 +335,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, PaginationLimit: proto.Uint64(maxMsgHashesPerRequest), } - return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest) + return m.storenodeRequestor.Query(queryCtx, interest.peerInfo, storeQueryRequest) }, logger, "retrieving missing messages") if err != nil { if !errors.Is(err, context.Canceled) { diff --git a/waku/v2/api/publish/default_verifier.go b/waku/v2/api/publish/default_verifier.go index 68eca0304..386728ece 100644 --- a/waku/v2/api/publish/default_verifier.go +++ b/waku/v2/api/publish/default_verifier.go @@ -18,10 +18,10 @@ type defaultStorenodeMessageVerifier struct { store *store.WakuStore } -func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { +func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { var opts []store.RequestOption opts = append(opts, store.WithRequestID(requestID)) - opts = append(opts, store.WithPeer(peerID)) + opts = append(opts, store.WithPeerAddr(peerID.Addrs...)) opts = append(opts, store.WithPaging(false, pageSize)) opts = append(opts, store.IncludeData(false)) diff --git a/waku/v2/api/publish/message_check.go b/waku/v2/api/publish/message_check.go index 8a37e20ce..c091e9592 100644 --- a/waku/v2/api/publish/message_check.go +++ b/waku/v2/api/publish/message_check.go @@ -33,7 +33,7 @@ type ISentCheck interface { type StorenodeMessageVerifier interface { // MessagesExist returns a list of the messages it found from a list of message hashes - MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) + MessageHashesExist(ctx context.Context, requestID []byte, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) } // MessageSentCheck tracks the outgoing messages and check against store node @@ -211,8 +211,8 @@ func (m *MessageSentCheck) Start() { } func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash { - selectedPeer := m.storenodeCycle.GetActiveStorenode() - if selectedPeer == "" { + selectedPeer := m.storenodeCycle.GetActiveStorenodePeerInfo() + if selectedPeer.ID == "" { m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic)) return []common.Hash{} } @@ -224,13 +224,13 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c messageHashes[i] = pb.ToMessageHash(hash.Bytes()) } - m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes)) + m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Stringers("messageHashes", messageHashes)) queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout) defer cancel() result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes) if err != nil { - m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err)) + m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Error(err)) return []common.Hash{} } diff --git a/waku/v2/node/keepalive_test.go b/waku/v2/node/keepalive_test.go index 63c48abb0..7473f9df1 100644 --- a/waku/v2/node/keepalive_test.go +++ b/waku/v2/node/keepalive_test.go @@ -79,7 +79,7 @@ func TestPeriodicKeepAlive(t *testing.T) { node2MAddr, err := multiaddr.NewMultiaddr(host1.Addrs()[0].String() + "/p2p/" + host1.ID().String()) require.NoError(t, err) - _, err = wakuNode.AddPeer(node2MAddr, wps.Static, []string{"waku/rs/1/1"}) + _, err = wakuNode.AddPeer([]multiaddr.Multiaddr{node2MAddr}, wps.Static, []string{"waku/rs/1/1"}) require.NoError(t, err) time.Sleep(time.Second * 2) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 1ae5b2448..abb5ca604 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -703,8 +703,8 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro // AddPeer is used to add a peer and the protocols it support to the node peerstore // TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics. -func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { - pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...) +func (w *WakuNode) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { + pData, err := w.peermanager.AddPeer(addresses, origin, pubSubTopics, protocols...) if err != nil { return "", err } diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 99224dc0c..0d7f5a1e5 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -268,7 +268,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) defer wakuNode2.Stop() - peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1) + peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses(), peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1) require.NoError(t, err) subscription, err := wakuNode2.FilterLightnode().Subscribe(ctx, protocol.ContentFilter{ @@ -317,7 +317,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) defer wakuNode3.Stop() - _, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, legacy_store.StoreID_v20beta4) + _, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses(), peerstore.Static, []string{relay.DefaultWakuTopic}, legacy_store.StoreID_v20beta4) require.NoError(t, err) time.Sleep(2 * time.Second) // NODE2 should have returned the message received via filter diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 69a0b23c1..9aa8df570 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -678,13 +678,19 @@ func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsu } // AddPeer adds peer to the peerStore and also to service slots -func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) { +func (pm *PeerManager) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) { //Assuming all addresses have peerId - info, err := peer.AddrInfoFromP2pAddr(address) + infoArr, err := peer.AddrInfosFromP2pAddrs(addresses...) if err != nil { return nil, err } + if len(infoArr) > 1 { + return nil, errors.New("only a single peerID is expected in AddPeer") + } + + info := infoArr[0] + //Add Service peers to serviceSlots. for _, proto := range protocols { pm.addPeerToServiceSlot(proto, info.ID) @@ -697,11 +703,8 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTo } pData := &service.PeerData{ - Origin: origin, - AddrInfo: peer.AddrInfo{ - ID: info.ID, - Addrs: info.Addrs, - }, + Origin: origin, + AddrInfo: info, PubsubTopics: pubsubTopics, } diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index aad3b639c..0c6f47339 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" + "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" @@ -57,7 +58,7 @@ func TestServiceSlots(t *testing.T) { // add h2 peer to peer manager t.Log(h2.ID()) - _, err = pm.AddPeer(tests.GetAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{""}, libp2pProtocol.ID(protocol)) require.NoError(t, err) /////////////// @@ -71,7 +72,7 @@ func TestServiceSlots(t *testing.T) { require.Equal(t, h2.ID(), peers[0]) // add h3 peer to peer manager - _, err = pm.AddPeer(tests.GetAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{""}, libp2pProtocol.ID(protocol)) require.NoError(t, err) // check that returned peer is h2 or h3 peer @@ -96,7 +97,7 @@ func TestServiceSlots(t *testing.T) { require.Error(t, err, utils.ErrNoPeersAvailable) // add h4 peer for protocol1 - _, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) + _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h4)}, wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) require.NoError(t, err) //Test peer selection for protocol1 @@ -124,10 +125,10 @@ func TestPeerSelection(t *testing.T) { defer h3.Close() protocol := libp2pProtocol.ID("test/protocol") - _, err = pm.AddPeer(tests.GetAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) - _, err = pm.AddPeer(tests.GetAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) @@ -158,7 +159,7 @@ func TestPeerSelection(t *testing.T) { h4, err := tests.MakeHost(ctx, 0, rand.Reader) require.NoError(t, err) defer h4.Close() - _, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h4)}, wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3}) @@ -185,7 +186,7 @@ func TestDefaultProtocol(t *testing.T) { defer h5.Close() //Test peer selection for relay protocol from peer store - _, err = pm.AddPeer(tests.GetAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200) + _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h5)}, wps.Static, []string{""}, relay.WakuRelayID_v200) require.NoError(t, err) // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. @@ -206,7 +207,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { require.NoError(t, err) defer h6.Close() - _, err = pm.AddPeer(tests.GetAddr(h6), wps.Static, []string{""}, protocol2) + _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h6)}, wps.Static, []string{""}, protocol2) require.NoError(t, err) peers, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 8fbcd91c1..df7f8a6a2 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -17,6 +17,7 @@ import ( libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-msgio/pbio" + "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/onlinechecker" @@ -348,7 +349,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, //Add Peer to peerstore. if params.pm != nil && params.peerAddr != nil { - pData, err := wf.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1) + pData, err := wf.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1) if err != nil { return nil, nil, err } diff --git a/waku/v2/protocol/filter/test_utils.go b/waku/v2/protocol/filter/test_utils.go index 88b9e04e6..c6f8d220c 100644 --- a/waku/v2/protocol/filter/test_utils.go +++ b/waku/v2/protocol/filter/test_utils.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/suite" "github.com/waku-org/go-waku/tests" @@ -102,7 +103,7 @@ func (s *FilterTestSuite) TearDownTest() { func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) { mAddr := tests.GetAddr(h2.h) - _, err := h1.pm.AddPeer(mAddr, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1) + _, err := h1.pm.AddPeer([]multiaddr.Multiaddr{mAddr}, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1) s.Log.Info("add peer", zap.Stringer("mAddr", mAddr)) s.Require().NoError(err) } diff --git a/waku/v2/protocol/legacy_store/waku_store_client.go b/waku/v2/protocol/legacy_store/waku_store_client.go index ef971f003..61781e44c 100644 --- a/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/waku/v2/protocol/legacy_store/waku_store_client.go @@ -310,7 +310,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR //Add Peer to peerstore. if store.pm != nil && params.peerAddr != nil { - pData, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4) + pData, err := store.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, pubsubTopics, StoreID_v20beta4) if err != nil { return nil, err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index c6bed8c2d..3525829bf 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -15,6 +15,7 @@ import ( libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-msgio/pbio" + "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" @@ -278,7 +279,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe } if params.pm != nil && params.peerAddr != nil { - pData, err := wakuLP.pm.AddPeer(params.peerAddr, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1) + pData, err := wakuLP.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1) if err != nil { return nil, err } diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index ef1f7bb9a..94d702035 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio/pbio" + "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -36,7 +37,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts } if params.pm != nil && params.peerAddr != nil { - pData, err := wakuPX.pm.AddPeer(params.peerAddr, peerstore.Static, []string{}, PeerExchangeID_v20alpha1) + pData, err := wakuPX.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, []string{}, PeerExchangeID_v20alpha1) if err != nil { return err } diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index efb6448b9..2cb3d58b7 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -196,15 +196,15 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return result, nil } -func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) { +func (s *WakuStore) RequestRaw(ctx context.Context, peerInfo peer.AddrInfo, storeRequest *pb.StoreQueryRequest) (Result, error) { err := storeRequest.Validate() if err != nil { return nil, err } var params Parameters - params.selectedPeer = peerID - if params.selectedPeer == "" { + params.peerAddr = peerInfo.Addrs + if len(params.peerAddr) == 0 { return nil, ErrMustSelectPeer } diff --git a/waku/v2/protocol/store/options.go b/waku/v2/protocol/store/options.go index facb3f54f..e6218cc7c 100644 --- a/waku/v2/protocol/store/options.go +++ b/waku/v2/protocol/store/options.go @@ -11,7 +11,7 @@ import ( type Parameters struct { selectedPeer peer.ID - peerAddr multiaddr.Multiaddr + peerAddr []multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice requestID []byte @@ -33,7 +33,7 @@ type RequestOption func(*Parameters) error func WithPeer(p peer.ID) RequestOption { return func(params *Parameters) error { params.selectedPeer = p - if params.peerAddr != nil { + if len(params.peerAddr) != 0 { return errors.New("WithPeer and WithPeerAddr options are mutually exclusive") } return nil @@ -43,7 +43,7 @@ func WithPeer(p peer.ID) RequestOption { // WithPeerAddr is an option used to specify a peerAddress to request the message history. // This new peer will be added to peerStore. // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. -func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption { +func WithPeerAddr(pAddr ...multiaddr.Multiaddr) RequestOption { return func(params *Parameters) error { params.peerAddr = pAddr if params.selectedPeer != "" {