Skip to content

Commit

Permalink
refactor: use peerInfo instead of peerID
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Dec 19, 2024
1 parent 78b522d commit 1ed0036
Show file tree
Hide file tree
Showing 24 changed files with 96 additions and 69 deletions.
4 changes: 2 additions & 2 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func Execute(options NodeOptions) error {
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
logger.Info("retrieving peer info via peer exchange protocol")

peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static,
peerID, err := wakuNode.AddPeer([]multiaddr.Multiaddr{*options.PeerExchange.Node}, wakupeerstore.Static,
pubSubTopicMapKeys, peer_exchange.PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
Expand Down Expand Up @@ -481,7 +481,7 @@ func processTopics(options NodeOptions) (map[string][]string, error) {

func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, pubSubTopics []string, protocols ...protocol.ID) error {
for _, addr := range addresses {
_, err := wakuNode.AddPeer(addr, wakupeerstore.Static, pubSubTopics, protocols...)
_, err := wakuNode.AddPeer([]multiaddr.Multiaddr{addr}, wakupeerstore.Static, pubSubTopics, protocols...)
if err != nil {
return fmt.Errorf("could not add static peer: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/waku/server/rest/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/logging"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (a *AdminService) postV1Peer(w http.ResponseWriter, req *http.Request) {
protos = append(protos, protocol.ID(proto))
}

id, err := a.node.AddPeer(addr, peerstore.Static, topics, protos...)
id, err := a.node.AddPeer([]multiaddr.Multiaddr{addr}, peerstore.Static, topics, protos...)
if err != nil {
a.log.Error("failed to add peer", zap.Error(err))
writeErrOrResponse(w, err, nil)
Expand Down
2 changes: 1 addition & 1 deletion library/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func AddPeer(instance *WakuInstance, address string, protocolID string) (string,
return "", err
}

peerID, err := instance.node.AddPeer(ma, peerstore.Static, instance.relayTopics, libp2pProtocol.ID(protocolID))
peerID, err := instance.node.AddPeer([]multiaddr.Multiaddr{ma}, peerstore.Static, instance.relayTopics, libp2pProtocol.ID(protocolID))
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/common/storenode_requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (
)

type StorenodeRequestor interface {
Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error)
Query(ctx context.Context, peerInfo peer.AddrInfo, query *pb.StoreQueryRequest) (StoreRequestResult, error)
}
18 changes: 18 additions & 0 deletions waku/v2/api/history/cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,24 @@ func (m *StorenodeCycle) GetActiveStorenode() peer.ID {
return m.activeStorenode
}

func (m *StorenodeCycle) GetActiveStorenodePeerInfo() peer.AddrInfo {
m.RLock()
defer m.RUnlock()

storeNodes, err := m.storenodeConfigProvider.Storenodes()
if err != nil {
return peer.AddrInfo{}
}

for _, p := range storeNodes {
if p.ID == m.activeStorenode {
return p
}
}

return peer.AddrInfo{}
}

func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool {
return m.storenodeStatus(peerID) == connected
}
Expand Down
20 changes: 10 additions & 10 deletions waku/v2/api/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type HistoryRetriever struct {

type HistoryProcessor interface {
OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error)
}

func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
Expand All @@ -51,7 +51,7 @@ func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor Histo
func (hr *HistoryRetriever) Query(
ctx context.Context,
criteria store.FilterCriteria,
storenodeID peer.ID,
storenode peer.AddrInfo,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
Expand Down Expand Up @@ -178,7 +178,7 @@ loop:
newCriteria.TimeStart = timeStart
newCriteria.TimeEnd = timeEnd

cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger)
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenode, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger)
queryCancel()

if err != nil {
Expand Down Expand Up @@ -241,7 +241,7 @@ loop:

func (hr *HistoryRetriever) createMessagesRequest(
ctx context.Context,
peerID peer.ID,
peerInfo peer.AddrInfo,
criteria store.FilterCriteria,
cursor []byte,
limit uint64,
Expand All @@ -257,7 +257,7 @@ func (hr *HistoryRetriever) createMessagesRequest(
})

go func() {
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes)
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, processEnvelopes)
resultCh <- struct {
storeCursor []byte
envelopesCount int
Expand All @@ -273,7 +273,7 @@ func (hr *HistoryRetriever) createMessagesRequest(
}
} else {
go func() {
_, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false)
_, _, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, false)
if err != nil {
logger.Error("failed to request store messages", zap.Error(err))
}
Expand All @@ -283,9 +283,9 @@ func (hr *HistoryRetriever) createMessagesRequest(
return
}

func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerInfo peer.AddrInfo, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
requestID := protocol.GenerateRequestID()
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerInfo.ID))

logger.Debug("store.query",
logging.Timep("startTime", criteria.TimeStart),
Expand All @@ -307,12 +307,12 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
}

queryStart := time.Now()
result, err := hr.store.Query(ctx, peerID, storeQueryRequest)
result, err := hr.store.Query(ctx, peerInfo, storeQueryRequest)
queryDuration := time.Since(queryStart)
if err != nil {
logger.Error("error querying storenode", zap.Error(err))

hr.historyProcessor.OnRequestFailed(requestID, peerID, err)
hr.historyProcessor.OnRequestFailed(requestID, peerInfo, err)

return nil, 0, err
}
Expand Down
8 changes: 4 additions & 4 deletions waku/v2/api/history/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *mockHistoryProcessor) OnEnvelope(env *protocol.Envelope, processEnvelop
return nil
}

func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerID peer.ID, err error) {
func (h *mockHistoryProcessor) OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error) {
}

func newMockHistoryProcessor() *mockHistoryProcessor {
Expand All @@ -92,7 +92,7 @@ func getInitialResponseKey(contentTopics []string) string {
return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...))
}

func (t *mockStore) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) {
func (t *mockStore) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) {
result := &mockResult{}
if len(storeQueryRequest.GetPaginationCursor()) == 0 {
initialResponse := getInitialResponseKey(storeQueryRequest.GetContentTopics())
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestSuccessBatchExecution(t *testing.T) {
ContentFilter: protocol.NewContentFilter("test", topics...),
}

err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true)
err = historyRetriever.Query(ctx, criteria, peer.AddrInfo{ID: storenodeID}, 10, func(i int) (bool, uint64) { return true, 10 }, true)
require.NoError(t, err)
}

Expand Down Expand Up @@ -246,6 +246,6 @@ func TestFailedBatchExecution(t *testing.T) {
ContentFilter: protocol.NewContentFilter("test", topics...),
}

err = historyRetriever.Query(ctx, criteria, storenodeID, 10, func(i int) (bool, uint64) { return true, 10 }, true)
err = historyRetriever.Query(ctx, criteria, peer.AddrInfo{ID: storenodeID}, 10, func(i int) (bool, uint64) { return true, 10 }, true)
require.Error(t, err)
}
4 changes: 2 additions & 2 deletions waku/v2/api/missing/criteria_interest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type criteriaInterest struct {
peerID peer.ID
peerInfo peer.AddrInfo
contentFilter protocol.ContentFilter
lastChecked time.Time

Expand All @@ -19,7 +19,7 @@ type criteriaInterest struct {
}

func (c criteriaInterest) equals(other criteriaInterest) bool {
if c.peerID != other.peerID {
if c.peerInfo.ID != other.peerInfo.ID {
return false
}

Expand Down
8 changes: 4 additions & 4 deletions waku/v2/api/missing/default_requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ type defaultStorenodeRequestor struct {
store *store.WakuStore
}

func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
return d.store.QueryByHash(ctx, messageHashes, store.WithPeerAddr(peerInfo.Addrs...), store.WithPaging(false, pageSize))
}

func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
return d.store.RequestRaw(ctx, peerID, storeQueryRequest)
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
return d.store.RequestRaw(ctx, peerInfo, storeQueryRequest)
}
12 changes: 6 additions & 6 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
}
}

func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilter protocol.ContentFilter) {
func (m *MissingMessageVerifier) SetCriteriaInterest(peerInfo peer.AddrInfo, contentFilter protocol.ContentFilter) {
m.criteriaInterestMu.Lock()
defer m.criteriaInterestMu.Unlock()

ctx, cancel := context.WithCancel(m.ctx)
criteriaInterest := criteriaInterest{
peerID: peerID,
peerInfo: peerInfo,
contentFilter: contentFilter,
lastChecked: m.timesource.Now().Add(-m.params.delay),
ctx: ctx,
Expand Down Expand Up @@ -190,7 +190,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
}

m.logger.Error("could not fetch history",
zap.Stringer("peerID", interest.peerID),
zap.Stringer("peerID", interest.peerInfo.ID),
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
zap.Strings("contentTopics", contentTopics))
continue
Expand Down Expand Up @@ -233,7 +233,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
contentTopics := interest.contentFilter.ContentTopics.ToList()

logger := m.logger.With(
zap.Stringer("peerID", interest.peerID),
zap.Stringer("peerID", interest.peerInfo.ID),
zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]),
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
logging.Epoch("from", interest.lastChecked),
Expand All @@ -252,7 +252,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,

return m.storenodeRequestor.Query(
ctx,
interest.peerID,
interest.peerInfo,
storeQueryRequest,
)
}, logger, "retrieving history to check for missing messages")
Expand Down Expand Up @@ -335,7 +335,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
}

return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest)
return m.storenodeRequestor.Query(queryCtx, interest.peerInfo, storeQueryRequest)
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/api/publish/default_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type defaultStorenodeMessageVerifier struct {
store *store.WakuStore
}

func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
var opts []store.RequestOption
opts = append(opts, store.WithRequestID(requestID))
opts = append(opts, store.WithPeer(peerID))
opts = append(opts, store.WithPeerAddr(peerID.Addrs...))
opts = append(opts, store.WithPaging(false, pageSize))
opts = append(opts, store.IncludeData(false))

Expand Down
10 changes: 5 additions & 5 deletions waku/v2/api/publish/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ISentCheck interface {

type StorenodeMessageVerifier interface {
// MessagesExist returns a list of the messages it found from a list of message hashes
MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
MessageHashesExist(ctx context.Context, requestID []byte, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
}

// MessageSentCheck tracks the outgoing messages and check against store node
Expand Down Expand Up @@ -211,8 +211,8 @@ func (m *MessageSentCheck) Start() {
}

func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash {
selectedPeer := m.storenodeCycle.GetActiveStorenode()
if selectedPeer == "" {
selectedPeer := m.storenodeCycle.GetActiveStorenodePeerInfo()
if selectedPeer.ID == "" {
m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic))
return []common.Hash{}
}
Expand All @@ -224,13 +224,13 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
}

m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Stringers("messageHashes", messageHashes))

queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
defer cancel()
result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
if err != nil {
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Error(err))
return []common.Hash{}
}

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/keepalive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestPeriodicKeepAlive(t *testing.T) {

node2MAddr, err := multiaddr.NewMultiaddr(host1.Addrs()[0].String() + "/p2p/" + host1.ID().String())
require.NoError(t, err)
_, err = wakuNode.AddPeer(node2MAddr, wps.Static, []string{"waku/rs/1/1"})
_, err = wakuNode.AddPeer([]multiaddr.Multiaddr{node2MAddr}, wps.Static, []string{"waku/rs/1/1"})
require.NoError(t, err)

time.Sleep(time.Second * 2)
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,8 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro

// AddPeer is used to add a peer and the protocols it support to the node peerstore
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
func (w *WakuNode) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
pData, err := w.peermanager.AddPeer(addresses, origin, pubSubTopics, protocols...)
if err != nil {
return "", err
}
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
defer wakuNode2.Stop()

peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1)
peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses(), peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1)
require.NoError(t, err)

subscription, err := wakuNode2.FilterLightnode().Subscribe(ctx, protocol.ContentFilter{
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
defer wakuNode3.Stop()

_, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, legacy_store.StoreID_v20beta4)
_, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses(), peerstore.Static, []string{relay.DefaultWakuTopic}, legacy_store.StoreID_v20beta4)
require.NoError(t, err)
time.Sleep(2 * time.Second)
// NODE2 should have returned the message received via filter
Expand Down
17 changes: 10 additions & 7 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,13 +678,19 @@ func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsu
}

// AddPeer adds peer to the peerStore and also to service slots
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
func (pm *PeerManager) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
//Assuming all addresses have peerId
info, err := peer.AddrInfoFromP2pAddr(address)
infoArr, err := peer.AddrInfosFromP2pAddrs(addresses...)
if err != nil {
return nil, err
}

if len(infoArr) > 1 {
return nil, errors.New("only a single peerID is expected in AddPeer")
}

info := infoArr[0]

//Add Service peers to serviceSlots.
for _, proto := range protocols {
pm.addPeerToServiceSlot(proto, info.ID)
Expand All @@ -697,11 +703,8 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTo
}

pData := &service.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: info.ID,
Addrs: info.Addrs,
},
Origin: origin,
AddrInfo: info,
PubsubTopics: pubsubTopics,
}

Expand Down
Loading

0 comments on commit 1ed0036

Please sign in to comment.