From 757c9bbf45cdf80c68bfc8de7494da9344a6cc81 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 28 Nov 2024 14:20:47 -0400 Subject: [PATCH] fix_: ensure storenode requests do not exceed 24h --- go.mod | 2 +- go.sum | 4 +- protocol/messenger_mailserver.go | 53 ++------------ protocol/messenger_mailserver_cycle.go | 25 +++---- services/mailservers/database.go | 10 +-- .../go-waku/waku/v2/api/common/pinger.go | 39 +++++++++++ .../waku/v2/api/common/storenode_requestor.go | 12 ++++ .../waku/v2/api/filter/filter_manager.go | 1 + .../go-waku/waku/v2/api/history/cycle.go | 53 ++++++-------- .../go-waku/waku/v2/api/history/history.go | 69 ++++++++++++++----- .../go-waku/waku/v2/api/history/sort.go | 2 +- .../waku/v2/api/missing/default_requestor.go | 12 ++-- .../waku/v2/api/missing/missing_messages.go | 42 +++++++---- .../go-waku/waku/v2/node/wakunode2.go | 18 +++-- .../waku/v2/peermanager/peer_manager.go | 25 ++++++- .../go-waku/waku/v2/protocol/store/client.go | 29 ++++++++ vendor/modules.txt | 2 +- wakuv2/waku.go | 8 ++- 18 files changed, 256 insertions(+), 150 deletions(-) create mode 100644 vendor/github.com/waku-org/go-waku/waku/v2/api/common/pinger.go create mode 100644 vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go diff --git a/go.mod b/go.mod index d477c490962..b64b64ec969 100644 --- a/go.mod +++ b/go.mod @@ -97,7 +97,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0 + github.com/waku-org/go-waku v0.8.1-0.20241125190547-f98a17bacfd0 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 4bcf72487f2..3ca3125f8ba 100644 --- a/go.sum +++ b/go.sum @@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27 github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0 h1:PNKcOPMn0yoC2NQaJPPB8FvHT/YtaU8hZAoovSl42KM= -github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= +github.com/waku-org/go-waku v0.8.1-0.20241125190547-f98a17bacfd0 h1:s2vzIQM+qLo/tTkJQl/9308qvLBdhAm35tSvkpt/8n8= +github.com/waku-org/go-waku v0.8.1-0.20241125190547-f98a17bacfd0/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 9cefd90abb4..d1127fd2638 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" + "golang.org/x/exp/maps" "github.com/waku-org/go-waku/waku/v2/api/history" @@ -542,55 +543,15 @@ func (m *Messenger) syncFiltersFrom(peerID peer.ID, filters []*transport.Filter, m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches)) } - var batches24h []types.MailserverBatch for pubsubTopic := range batches { - batchKeys := make([]int, 0, len(batches[pubsubTopic])) - for k := range batches[pubsubTopic] { - batchKeys = append(batchKeys, k) - } + batchKeys := maps.Keys(batches[pubsubTopic]) sort.Ints(batchKeys) - - keysToIterate := append([]int{}, batchKeys...) - for { - // For all batches - var tmpKeysToIterate []int - for _, k := range keysToIterate { - batch := batches[pubsubTopic][k] - - dayBatch := types.MailserverBatch{ - To: batch.To, - Cursor: batch.Cursor, - PubsubTopic: batch.PubsubTopic, - Topics: batch.Topics, - ChatIDs: batch.ChatIDs, - } - - from := batch.To.Add(-oneDayDuration) - if from.After(batch.From) { - dayBatch.From = from - batches24h = append(batches24h, dayBatch) - - // Replace og batch with new dates - batch.To = from - batches[pubsubTopic][k] = batch - tmpKeysToIterate = append(tmpKeysToIterate, k) - } else { - batches24h = append(batches24h, batch) - } - } - - if len(tmpKeysToIterate) == 0 { - break + for _, k := range batchKeys { + err := m.processMailserverBatch(peerID, batches[pubsubTopic][k]) + if err != nil { + m.logger.Error("error syncing topics", zap.Error(err)) + return nil, err } - keysToIterate = tmpKeysToIterate - } - } - - for _, batch := range batches24h { - err := m.processMailserverBatch(peerID, batch) - if err != nil { - m.logger.Error("error syncing topics", zap.Error(err)) - return nil, err } } diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go index f7145ebb579..87f0ccbf323 100644 --- a/protocol/messenger_mailserver_cycle.go +++ b/protocol/messenger_mailserver_cycle.go @@ -93,63 +93,64 @@ func (m *Messenger) asyncRequestAllHistoricMessages() { }() } -func (m *Messenger) GetPinnedStorenode() (peer.ID, error) { +func (m *Messenger) GetPinnedStorenode() (peer.AddrInfo, error) { fleet, err := m.getFleet() if err != nil { - return "", err + return peer.AddrInfo{}, err } pinnedMailservers, err := m.settings.GetPinnedMailservers() if err != nil { - return "", err + return peer.AddrInfo{}, err } pinnedMailserver, ok := pinnedMailservers[fleet] if !ok { - return "", nil + return peer.AddrInfo{}, nil } fleetMailservers := mailservers.DefaultMailservers() for _, c := range fleetMailservers { if c.Fleet == fleet && c.ID == pinnedMailserver { - return c.PeerID() + return c.PeerInfo() } } if m.mailserversDatabase != nil { customMailservers, err := m.mailserversDatabase.Mailservers() if err != nil { - return "", err + return peer.AddrInfo{}, err } for _, c := range customMailservers { if c.Fleet == fleet && c.ID == pinnedMailserver { - return c.PeerID() + return c.PeerInfo() } } } - return "", nil + return peer.AddrInfo{}, nil } func (m *Messenger) UseStorenodes() (bool, error) { return m.settings.CanUseMailservers() } -func (m *Messenger) Storenodes() ([]peer.ID, error) { +func (m *Messenger) Storenodes() ([]peer.AddrInfo, error) { mailservers, err := m.AllMailservers() if err != nil { return nil, err } - var result []peer.ID + var result []peer.AddrInfo for _, m := range mailservers { - peerID, err := m.PeerID() + + peerInfo, err := m.PeerInfo() if err != nil { return nil, err } - result = append(result, peerID) + result = append(result, peerInfo) } return result, nil diff --git a/services/mailservers/database.go b/services/mailservers/database.go index a52646a6229..3d3dba99658 100644 --- a/services/mailservers/database.go +++ b/services/mailservers/database.go @@ -49,13 +49,13 @@ type Mailserver struct { FailedRequests uint `json:"-"` } -func (m Mailserver) PeerInfo() (*peer.AddrInfo, error) { +func (m Mailserver) PeerInfo() (peer.AddrInfo, error) { var maddrs []multiaddr.Multiaddr if m.ENR != nil { addrInfo, err := enr.EnodeToPeerInfo(m.ENR) if err != nil { - return nil, err + return peer.AddrInfo{}, err } addrInfo.Addrs = utils.EncapsulatePeerID(addrInfo.ID, addrInfo.Addrs...) maddrs = append(maddrs, addrInfo.Addrs...) @@ -67,14 +67,14 @@ func (m Mailserver) PeerInfo() (*peer.AddrInfo, error) { p, err := peer.AddrInfosFromP2pAddrs(maddrs...) if err != nil { - return nil, err + return peer.AddrInfo{}, err } if len(p) != 1 { - return nil, errors.New("invalid mailserver setup") + return peer.AddrInfo{}, errors.New("invalid mailserver setup") } - return &p[0], nil + return p[0], nil } func (m Mailserver) PeerID() (peer.ID, error) { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/common/pinger.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/pinger.go new file mode 100644 index 00000000000..f99895dd47d --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/pinger.go @@ -0,0 +1,39 @@ +package common + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" +) + +type Pinger interface { + PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) +} + +type defaultPingImpl struct { + host host.Host +} + +func NewDefaultPinger(host host.Host) Pinger { + return &defaultPingImpl{ + host: host, + } +} + +func (d *defaultPingImpl) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) { + d.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.AddressTTL) + pingResultCh := ping.Ping(ctx, d.host, peerInfo.ID) + select { + case <-ctx.Done(): + return 0, ctx.Err() + case r := <-pingResultCh: + if r.Error != nil { + return 0, r.Error + } + return r.RTT, nil + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go new file mode 100644 index 00000000000..8a723c9e6c7 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go @@ -0,0 +1,12 @@ +package common + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" +) + +type StorenodeRequestor interface { + Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error) +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go index b4933a798aa..a43c3c3963c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go @@ -230,6 +230,7 @@ func (mgr *FilterManager) UnsubscribeFilter(filterID string) { } if len(af.sub.ContentFilter.ContentTopics) == 0 { af.cancel() + delete(mgr.filterSubscriptions, filterConfig.ID) } else { go af.sub.Unsubscribe(filterConfig.contentFilter) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go index d3453140691..eb51ba040fa 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go @@ -14,9 +14,8 @@ import ( "sync" "time" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.uber.org/zap" ) @@ -46,8 +45,8 @@ type peerStatus struct { type StorenodeConfigProvider interface { UseStorenodes() (bool, error) - GetPinnedStorenode() (peer.ID, error) - Storenodes() ([]peer.ID, error) + GetPinnedStorenode() (peer.AddrInfo, error) + Storenodes() ([]peer.AddrInfo, error) } type StorenodeCycle struct { @@ -55,9 +54,8 @@ type StorenodeCycle struct { logger *zap.Logger - host host.Host - storenodeConfigProvider StorenodeConfigProvider + pinger common.Pinger StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}] StorenodeChangedEmitter *Emitter[peer.ID] @@ -71,7 +69,7 @@ type StorenodeCycle struct { peers map[peer.ID]peerStatus } -func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle { +func NewStorenodeCycle(logger *zap.Logger, pinger common.Pinger) *StorenodeCycle { return &StorenodeCycle{ StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](), StorenodeChangedEmitter: NewEmitter[peer.ID](), @@ -81,9 +79,8 @@ func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle { } } -func (m *StorenodeCycle) Start(ctx context.Context, h host.Host) { +func (m *StorenodeCycle) Start(ctx context.Context) { m.logger.Debug("starting storenode cycle") - m.host = h m.failedRequests = make(map[peer.ID]uint) m.peers = make(map[peer.ID]peerStatus) @@ -107,7 +104,7 @@ func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error } // If no pinned storenode, no need to disconnect and wait for it to be available - if pinnedStorenode == "" { + if pinnedStorenode.ID == "" { m.disconnectActiveStorenode(graylistBackoff) } @@ -183,21 +180,26 @@ func poolSize(fleetSize int) int { return int(math.Ceil(float64(fleetSize) / 4)) } -func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID { +func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.AddrInfo) []peer.AddrInfo { + peerIDToInfo := make(map[peer.ID]peer.AddrInfo) + for _, p := range allStorenodes { + peerIDToInfo[p.ID] = p + } + availableStorenodes := make(map[peer.ID]time.Duration) availableStorenodesMutex := sync.Mutex{} availableStorenodesWg := sync.WaitGroup{} for _, storenode := range allStorenodes { availableStorenodesWg.Add(1) - go func(peerID peer.ID) { + go func(peerInfo peer.AddrInfo) { defer availableStorenodesWg.Done() ctx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() - rtt, err := m.pingPeer(ctx, peerID) + rtt, err := m.pinger.PingPeer(ctx, peerInfo) if err == nil { // pinging storenodes might fail, but we don't care availableStorenodesMutex.Lock() - availableStorenodes[peerID] = rtt + availableStorenodes[peerInfo.ID] = rtt availableStorenodesMutex.Unlock() } }(storenode) @@ -212,7 +214,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, var sortedStorenodes []SortedStorenode for storenodeID, rtt := range availableStorenodes { sortedStorenode := SortedStorenode{ - Storenode: storenodeID, + Storenode: peerIDToInfo[storenodeID], RTT: rtt, } m.peersMutex.Lock() @@ -225,7 +227,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, } sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes)) - result := make([]peer.ID, len(sortedStorenodes)) + result := make([]peer.AddrInfo, len(sortedStorenodes)) for i, s := range sortedStorenodes { result[i] = s.Storenode } @@ -233,19 +235,6 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, return result } -func (m *StorenodeCycle) pingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { - pingResultCh := ping.Ping(ctx, m.host, peerID) - select { - case <-ctx.Done(): - return 0, ctx.Err() - case r := <-pingResultCh: - if r.Error != nil { - return 0, r.Error - } - return r.RTT, nil - } -} - func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { // we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581 if overrideDNS { @@ -268,8 +257,8 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { return err } - if pinnedStorenode != "" { - return m.setActiveStorenode(pinnedStorenode) + if pinnedStorenode.ID != "" { + return m.setActiveStorenode(pinnedStorenode.ID) } m.logger.Info("Finding a new storenode..") @@ -303,7 +292,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { } ms := allStorenodes[r.Int64()] - return m.setActiveStorenode(ms) + return m.setActiveStorenode(ms.ID) } func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go index e95f01a5763..004cd156711 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go @@ -2,6 +2,7 @@ package history import ( "context" + "encoding/hex" "errors" "math" "sync" @@ -10,8 +11,12 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "google.golang.org/protobuf/proto" + "go.uber.org/zap" ) @@ -25,7 +30,7 @@ type work struct { } type HistoryRetriever struct { - store Store + store common.StorenodeRequestor logger *zap.Logger historyProcessor HistoryProcessor } @@ -35,11 +40,7 @@ type HistoryProcessor interface { OnRequestFailed(requestID []byte, peerID peer.ID, err error) } -type Store interface { - Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) -} - -func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { +func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { return &HistoryRetriever{ store: store, logger: logger.Named("history-retriever"), @@ -158,7 +159,26 @@ loop: }() queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout) - cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, w.criteria, w.cursor, w.limit, true, processEnvelopes, logger) + + // If time range is greater than 24 hours, limit the range: to - (to-24h) + // TODO: handle cases in which TimeStart/TimeEnd could be nil + // (this type of query does not happen in status-go, though, and + // nwaku might limit query duration to 24h anyway, so perhaps + // it's not worth adding such logic) + timeStart := w.criteria.TimeStart + timeEnd := w.criteria.TimeEnd + exceeds24h := false + if timeStart != nil && timeEnd != nil && *timeEnd-*timeStart > (24*time.Hour).Nanoseconds() { + newTimeStart := *timeEnd - (24 * time.Hour).Nanoseconds() + timeStart = &newTimeStart + exceeds24h = true + } + + newCriteria := w.criteria + newCriteria.TimeStart = timeStart + newCriteria.TimeEnd = timeEnd + + cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger) queryCancel() if err != nil { @@ -179,18 +199,28 @@ loop: // Check the cursor after calling `shouldProcessNextPage`. // The app might use process the fetched envelopes in the callback for own needs. - if cursor == nil { + // If from/to does not exceed 24h and no cursor was returned, we have already + // requested the entire time range + if cursor == nil && !exceeds24h { return } logger.Debug("processBatch producer - creating work (cursor)") - workWg.Add(1) - workCh <- work{ + newWork := work{ criteria: w.criteria, cursor: cursor, limit: nextPageLimit, } + + // If from/to has exceeded the 24h, but there are no more records within the current + // 24h range, then we update the `to` for the new work to not include it. + if cursor == nil && exceeds24h { + newWork.criteria.TimeEnd = timeStart + } + + workWg.Add(1) + workCh <- newWork }(w) case err := <-errCh: logger.Debug("processBatch - received error", zap.Error(err)) @@ -257,12 +287,6 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee requestID := protocol.GenerateRequestID() logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID)) - opts := []store.RequestOption{ - store.WithPaging(false, limit), - store.WithRequestID(requestID), - store.WithPeer(peerID), - store.WithCursor(cursor)} - logger.Debug("store.query", logging.Timep("startTime", criteria.TimeStart), logging.Timep("endTime", criteria.TimeEnd), @@ -271,8 +295,19 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee zap.String("cursor", hexutil.Encode(cursor)), ) + storeQueryRequest := &pb.StoreQueryRequest{ + RequestId: hex.EncodeToString(requestID), + IncludeData: true, + PubsubTopic: &criteria.PubsubTopic, + ContentTopics: criteria.ContentTopicsList(), + TimeStart: criteria.TimeStart, + TimeEnd: criteria.TimeEnd, + PaginationCursor: cursor, + PaginationLimit: proto.Uint64(limit), + } + queryStart := time.Now() - result, err := hr.store.Query(ctx, criteria, opts...) + result, err := hr.store.Query(ctx, peerID, storeQueryRequest) queryDuration := time.Since(queryStart) if err != nil { logger.Error("error querying storenode", zap.Error(err)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/sort.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/sort.go index 22e94c571f9..4f38941deed 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/sort.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/sort.go @@ -7,7 +7,7 @@ import ( ) type SortedStorenode struct { - Storenode peer.ID + Storenode peer.AddrInfo RTT time.Duration CanConnectAfter time.Time } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go index 248d61c6dc0..3828217350d 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go @@ -5,12 +5,12 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/api/common" - "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" ) -func NewDefaultStorenodeRequestor(store *store.WakuStore) StorenodeRequestor { +func NewDefaultStorenodeRequestor(store *store.WakuStore) common.StorenodeRequestor { return &defaultStorenodeRequestor{ store: store, } @@ -24,10 +24,6 @@ func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerI return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize)) } -func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) { - return d.store.Query(ctx, store.FilterCriteria{ - ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...), - TimeStart: from, - TimeEnd: to, - }, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false)) +func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) { + return d.store.RequestRaw(ctx, peerID, storeQueryRequest) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go index 1af991eb51f..72ac4f9f355 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go @@ -14,6 +14,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -31,17 +32,12 @@ type MessageTracker interface { MessageExists(pb.MessageHash) (bool, error) } -type StorenodeRequestor interface { - GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) - QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) -} - // MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria type MissingMessageVerifier struct { ctx context.Context params missingMessageVerifierParams - storenodeRequestor StorenodeRequestor + storenodeRequestor common.StorenodeRequestor messageTracker MessageTracker criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages @@ -54,7 +50,7 @@ type MissingMessageVerifier struct { } // NewMissingMessageVerifier creates an instance of a MissingMessageVerifier -func NewMissingMessageVerifier(storenodeRequester StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier { +func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier { options = append(defaultMissingMessagesVerifierOptions, options...) params := missingMessageVerifierParams{} for _, opt := range options { @@ -219,14 +215,19 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, ) result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { - return m.storenodeRequestor.QueryWithCriteria( + storeQueryRequest := &storepb.StoreQueryRequest{ + RequestId: hex.EncodeToString(protocol.GenerateRequestID()), + PubsubTopic: &interest.contentFilter.PubsubTopic, + ContentTopics: contentTopics[batchFrom:batchTo], + TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), + TimeEnd: proto.Int64(now.Add(-m.params.delay).UnixNano()), + PaginationLimit: proto.Uint64(messageFetchPageSize), + } + + return m.storenodeRequestor.Query( ctx, interest.peerID, - messageFetchPageSize, - interest.contentFilter.PubsubTopic, - contentTopics[batchFrom:batchTo], - proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), - proto.Int64(now.Add(-m.params.delay).UnixNano()), + storeQueryRequest, ) }, logger, "retrieving history to check for missing messages") if err != nil { @@ -295,7 +296,20 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout) defer cancel() - return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes) + + var messageHashesBytes [][]byte + for _, m := range messageHashes { + messageHashesBytes = append(messageHashesBytes, m.Bytes()) + } + + storeQueryRequest := &storepb.StoreQueryRequest{ + RequestId: hex.EncodeToString(protocol.GenerateRequestID()), + IncludeData: true, + MessageHashes: messageHashesBytes, + PaginationLimit: proto.Uint64(maxMsgHashesPerRequest), + } + + return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest) }, logger, "retrieving missing messages") if err != nil { if !errors.Is(err, context.Canceled) { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 2de62cc7eff..1ae5b244854 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -464,13 +464,17 @@ func (w *WakuNode) Start(ctx context.Context) error { if err != nil { return err } - //TODO: setting this up temporarily to improve connectivity success for lightNode in status. - //This will have to be removed or changed with community sharding will be implemented. - if w.opts.shards != nil { - err = w.SetRelayShards(*w.opts.shards) - if err != nil { - return err - } + } + + //TODO: setting this up temporarily to improve connectivity success for lightNode + // in status. Also, when executing go-waku service-node as a lightclient + // (using --pubsub-topic and --relay=false) + // This will have to be removed or changed with community sharding will be + // implemented. + if w.opts.shards != nil { + err = w.SetRelayShards(*w.opts.shards) + if err != nil { + return err } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index 14cc100dda0..c543cbe8e30 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -101,7 +101,9 @@ const ( const maxFailedAttempts = 5 const prunePeerStoreInterval = 10 * time.Minute const peerConnectivityLoopSecs = 15 -const maxConnsToPeerRatio = 5 +const maxConnsToPeerRatio = 3 +const badPeersCleanupInterval = 1 * time.Minute +const maxDialFailures = 2 // 80% relay peers 20% service peers func relayAndServicePeers(maxConnections int) (int, int) { @@ -256,16 +258,32 @@ func (pm *PeerManager) Start(ctx context.Context) { } } +func (pm *PeerManager) removeBadPeers() { + if !pm.RelayEnabled { + for _, peerID := range pm.host.Peerstore().Peers() { + if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures { + //delete peer from peerStore + pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) + pm.RemovePeer(peerID) + } + } + } +} + func (pm *PeerManager) peerStoreLoop(ctx context.Context) { defer utils.LogOnPanic() t := time.NewTicker(prunePeerStoreInterval) + t1 := time.NewTicker(badPeersCleanupInterval) defer t.Stop() + defer t1.Stop() for { select { case <-ctx.Done(): return case <-t.C: pm.prunePeerStore() + case <-t1.C: + pm.removeBadPeers() } } } @@ -744,4 +762,9 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { pm.logger.Error("failed to emit DialError", zap.Error(emitterErr)) } } + if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures { + //delete peer from peerStore + pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) + pm.RemovePeer(peerID) + } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go index 6b3c9b2e1f4..febb863e508 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go @@ -194,6 +194,35 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return result, nil } +func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) { + err := storeRequest.Validate() + if err != nil { + return nil, err + } + + var params Parameters + params.selectedPeer = peerID + if params.selectedPeer == "" { + return nil, ErrMustSelectPeer + } + + response, err := s.queryFrom(ctx, storeRequest, ¶ms) + if err != nil { + return nil, err + } + + result := &resultImpl{ + store: s, + messages: response.Messages, + storeRequest: storeRequest, + storeResponse: response, + peerID: params.selectedPeer, + cursor: response.PaginationCursor, + } + + return result, nil +} + // Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not. func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) { return s.Request(ctx, criteria, opts...) diff --git a/vendor/modules.txt b/vendor/modules.txt index 84d80556e8f..8e6b7b3c736 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1044,7 +1044,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0 +# github.com/waku-org/go-waku v0.8.1-0.20241125190547-f98a17bacfd0 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/waku.go b/wakuv2/waku.go index b78cf3a9037..1fc541bd2f7 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -72,6 +72,8 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/utils" + commonapi "github.com/waku-org/go-waku/waku/v2/api/common" + gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" @@ -1064,10 +1066,10 @@ func (w *Waku) Start() error { return fmt.Errorf("failed to start go-waku node: %v", err) } - w.StorenodeCycle = history.NewStorenodeCycle(w.logger) - w.HistoryRetriever = history.NewHistoryRetriever(w.node.Store(), NewHistoryProcessorWrapper(w), w.logger) + w.StorenodeCycle = history.NewStorenodeCycle(w.logger, commonapi.NewDefaultPinger(w.node.Host())) + w.HistoryRetriever = history.NewHistoryRetriever(missing.NewDefaultStorenodeRequestor(w.node.Store()), NewHistoryProcessorWrapper(w), w.logger) - w.StorenodeCycle.Start(w.ctx, w.node.Host()) + w.StorenodeCycle.Start(w.ctx) w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID()))