diff --git a/go.mod b/go.mod index ee911ff2233..28b03b900f3 100644 --- a/go.mod +++ b/go.mod @@ -88,7 +88,7 @@ require ( github.com/gorilla/sessions v1.2.1 github.com/gorilla/websocket v1.5.3 github.com/ipfs/go-log/v2 v2.5.1 - github.com/jellydator/ttlcache/v3 v3.2.0 + github.com/jellydator/ttlcache/v3 v3.3.0 github.com/jmoiron/sqlx v1.3.5 github.com/klauspost/reedsolomon v1.12.1 github.com/ladydascalie/currency v1.6.0 @@ -97,7 +97,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90 + github.com/waku-org/go-waku v0.8.1-0.20241204120216-e0a9ae0447ce github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 3917ff3c3fe..c4282b12686 100644 --- a/go.sum +++ b/go.sum @@ -1238,8 +1238,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= -github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE= -github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= +github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc= +github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= @@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27 github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90 h1:p7tehUW7f+D6pvMJYop2yJV03SJU2fFUusmSnKL3uow= -github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= +github.com/waku-org/go-waku v0.8.1-0.20241204120216-e0a9ae0447ce h1:lZ1CCDihvegbajB/xaUAwGibfA1NmbcpHQKfk+X3fd0= +github.com/waku-org/go-waku v0.8.1-0.20241204120216-e0a9ae0447ce/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/jellydator/ttlcache/v3/README.md b/vendor/github.com/jellydator/ttlcache/v3/README.md index 3a557b030d9..a17cb243718 100644 --- a/vendor/github.com/jellydator/ttlcache/v3/README.md +++ b/vendor/github.com/jellydator/ttlcache/v3/README.md @@ -10,7 +10,8 @@ - Type parameters - Item expiration and automatic deletion - Automatic expiration time extension on each `Get` call -- `Loader` interface that may be used to load/lazily initialize missing cache +- `Loader` interface that may be used to load/lazily initialize missing cache +- Thread Safe items - Event handlers (insertion and eviction) - Metrics diff --git a/vendor/github.com/jellydator/ttlcache/v3/cache.go b/vendor/github.com/jellydator/ttlcache/v3/cache.go index 1ad3afbece4..1b9e72ef015 100644 --- a/vendor/github.com/jellydator/ttlcache/v3/cache.go +++ b/vendor/github.com/jellydator/ttlcache/v3/cache.go @@ -133,7 +133,7 @@ func (c *Cache[K, V]) set(key K, value V, ttl time.Duration) *Item[K, V] { ttl = c.options.ttl } - elem := c.get(key, false) + elem := c.get(key, false, true) if elem != nil { // update/overwrite an existing item item := elem.Value.(*Item[K, V]) @@ -176,14 +176,14 @@ func (c *Cache[K, V]) set(key K, value V, ttl time.Duration) *Item[K, V] { // It returns nil if the item is not found or is expired. // Not safe for concurrent use by multiple goroutines without additional // locking. -func (c *Cache[K, V]) get(key K, touch bool) *list.Element { +func (c *Cache[K, V]) get(key K, touch bool, includeExpired bool) *list.Element { elem := c.items.values[key] if elem == nil { return nil } item := elem.Value.(*Item[K, V]) - if item.isExpiredUnsafe() { + if !includeExpired && item.isExpiredUnsafe() { return nil } @@ -218,7 +218,7 @@ func (c *Cache[K, V]) getWithOpts(key K, lockAndLoad bool, opts ...Option[K, V]) c.items.mu.Lock() } - elem := c.get(key, !getOpts.disableTouchOnHit) + elem := c.get(key, !getOpts.disableTouchOnHit, false) if lockAndLoad { c.items.mu.Unlock() @@ -339,8 +339,8 @@ func (c *Cache[K, V]) Has(key K) bool { c.items.mu.RLock() defer c.items.mu.RUnlock() - _, ok := c.items.values[key] - return ok + elem, ok := c.items.values[key] + return ok && !elem.Value.(*Item[K, V]).isExpiredUnsafe() } // GetOrSet retrieves an item from the cache by the provided key. @@ -436,26 +436,66 @@ func (c *Cache[K, V]) DeleteExpired() { // If the item is not found, the method is no-op. func (c *Cache[K, V]) Touch(key K) { c.items.mu.Lock() - c.get(key, true) + c.get(key, true, false) c.items.mu.Unlock() } -// Len returns the total number of items in the cache. +// Len returns the number of unexpired items in the cache. func (c *Cache[K, V]) Len() int { c.items.mu.RLock() defer c.items.mu.RUnlock() - return len(c.items.values) + total := c.items.expQueue.Len() + if total == 0 { + return 0 + } + + // search the heap-based expQueue by BFS + countExpired := func() int { + var ( + q []int + res int + ) + + item := c.items.expQueue[0].Value.(*Item[K, V]) + if !item.isExpiredUnsafe() { + return res + } + + q = append(q, 0) + for len(q) > 0 { + pop := q[0] + q = q[1:] + res++ + + for i := 1; i <= 2; i++ { + idx := 2*pop + i + if idx >= total { + break + } + + item = c.items.expQueue[idx].Value.(*Item[K, V]) + if item.isExpiredUnsafe() { + q = append(q, idx) + } + } + } + return res + } + + return total - countExpired() } -// Keys returns all keys currently present in the cache. +// Keys returns all unexpired keys in the cache. func (c *Cache[K, V]) Keys() []K { c.items.mu.RLock() defer c.items.mu.RUnlock() - res := make([]K, 0, len(c.items.values)) - for k := range c.items.values { - res = append(res, k) + res := make([]K, 0) + for k, elem := range c.items.values { + if !elem.Value.(*Item[K, V]).isExpiredUnsafe() { + res = append(res, k) + } } return res @@ -467,18 +507,18 @@ func (c *Cache[K, V]) Items() map[K]*Item[K, V] { c.items.mu.RLock() defer c.items.mu.RUnlock() - items := make(map[K]*Item[K, V], len(c.items.values)) - for k := range c.items.values { - item := c.get(k, false) - if item != nil { - items[k] = item.Value.(*Item[K, V]) + items := make(map[K]*Item[K, V]) + for k, elem := range c.items.values { + item := elem.Value.(*Item[K, V]) + if item != nil && !item.isExpiredUnsafe() { + items[k] = item } } return items } -// Range calls fn for each item present in the cache. If fn returns false, +// Range calls fn for each unexpired item in the cache. If fn returns false, // Range stops the iteration. func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) { c.items.mu.RLock() @@ -491,9 +531,10 @@ func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) { for item := c.items.lru.Front(); item != c.items.lru.Back().Next(); item = item.Next() { i := item.Value.(*Item[K, V]) + expired := i.isExpiredUnsafe() c.items.mu.RUnlock() - if !fn(i) { + if !expired && !fn(i) { return } @@ -503,6 +544,32 @@ func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) { } } +// RangeBackwards calls fn for each unexpired item in the cache in reverse order. +// If fn returns false, RangeBackwards stops the iteration. +func (c *Cache[K, V]) RangeBackwards(fn func(item *Item[K, V]) bool) { + c.items.mu.RLock() + + // Check if cache is empty + if c.items.lru.Len() == 0 { + c.items.mu.RUnlock() + return + } + + for item := c.items.lru.Back(); item != c.items.lru.Front().Prev(); item = item.Prev() { + i := item.Value.(*Item[K, V]) + expired := i.isExpiredUnsafe() + c.items.mu.RUnlock() + + if !expired && !fn(i) { + return + } + + if item.Prev() != nil { + c.items.mu.RLock() + } + } +} + // Metrics returns the metrics of the cache. func (c *Cache[K, V]) Metrics() Metrics { c.metricsMu.RLock() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go index a43c3c3963c..665d577bd0f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go @@ -61,7 +61,8 @@ type EnevelopeProcessor interface { OnNewEnvelope(env *protocol.Envelope) error } -func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { +func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, + envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx @@ -162,6 +163,7 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { defer utils.LogOnPanic() ctx, cancel := context.WithCancel(mgr.ctx) config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} + sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params) mgr.Lock() mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} @@ -188,6 +190,7 @@ func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online + mgr.onlineChecker.SetOnline(newStatus) mgr.NetworkChange() mgr.logger.Debug("switching from offline to online") mgr.Lock() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go index 72ac4f9f355..927ffb9c931 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go @@ -35,6 +35,7 @@ type MessageTracker interface { // MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria type MissingMessageVerifier struct { ctx context.Context + cancel context.CancelFunc params missingMessageVerifierParams storenodeRequestor common.StorenodeRequestor @@ -43,10 +44,12 @@ type MissingMessageVerifier struct { criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages criteriaInterestMu sync.RWMutex - C <-chan *protocol.Envelope + C chan *protocol.Envelope - timesource timesource.Timesource - logger *zap.Logger + timesource timesource.Timesource + logger *zap.Logger + isRunning bool + runningMutex sync.RWMutex } // NewMissingMessageVerifier creates an instance of a MissingMessageVerifier @@ -63,6 +66,8 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes messageTracker: messageTracker, logger: logger.Named("missing-msg-verifier"), params: params, + criteriaInterest: make(map[string]criteriaInterest), + C: make(chan *protocol.Envelope, 1000), } } @@ -97,12 +102,24 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest } +func (m *MissingMessageVerifier) setRunning(running bool) { + m.runningMutex.Lock() + defer m.runningMutex.Unlock() + m.isRunning = running +} + func (m *MissingMessageVerifier) Start(ctx context.Context) { - m.ctx = ctx - m.criteriaInterest = make(map[string]criteriaInterest) + m.runningMutex.Lock() + if m.isRunning { //make sure verifier only runs once. + m.runningMutex.Unlock() + return + } + m.isRunning = true + m.runningMutex.Unlock() - c := make(chan *protocol.Envelope, 1000) - m.C = c + ctx, cancelFunc := context.WithCancel(ctx) + m.ctx = ctx + m.cancel = cancelFunc go func() { defer utils.LogOnPanic() @@ -123,24 +140,33 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { for _, interest := range critIntList { select { case <-ctx.Done(): + m.setRunning(false) return default: semaphore <- struct{}{} go func(interest criteriaInterest) { defer utils.LogOnPanic() - m.fetchHistory(c, interest) + m.fetchHistory(m.C, interest) <-semaphore }(interest) } } case <-ctx.Done(): + m.setRunning(false) return } } }() } +func (m *MissingMessageVerifier) Stop() { + m.cancel() + m.runningMutex.Lock() + defer m.runningMutex.Unlock() + m.isRunning = false +} + func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, interest criteriaInterest) { contentTopics := interest.contentFilter.ContentTopics.ToList() for i := 0; i < len(contentTopics); i += maxContentTopicsPerRequest { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go index c457589e76b..62dcb4af7fa 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go @@ -6,6 +6,8 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -53,6 +55,12 @@ type MessageSender struct { messageSentCheck ISentCheck rateLimiter *PublishRateLimiter logger *zap.Logger + evtMessageSent event.Emitter +} + +type MessageSent struct { + Size uint32 // Size of payload in bytes + Timestamp int64 } type Request struct { @@ -96,6 +104,15 @@ func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *Mess return ms } +func (ms *MessageSender) WithMessageSentEmitter(host host.Host) *MessageSender { + evtMessageSent, err := host.EventBus().Emitter(new(MessageSent)) + if err != nil { + ms.logger.Error("failed to create message sent emitter", zap.Error(err)) + } + ms.evtMessageSent = evtMessageSent + return ms +} + func (ms *MessageSender) Send(req *Request) error { logger := ms.logger.With( zap.Stringer("envelopeHash", req.envelope.Hash()), @@ -149,6 +166,16 @@ func (ms *MessageSender) Send(req *Request) error { ) } + if ms.evtMessageSent != nil { + err := ms.evtMessageSent.Emit(MessageSent{ + Size: uint32(len(req.envelope.Message().Payload)), + Timestamp: req.envelope.Message().GetTimestamp(), + }) + if err != nil { + logger.Error("failed to emit message sent event", zap.Error(err)) + } + } + return nil } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index c543cbe8e30..69a0b23c11f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -102,7 +102,6 @@ const maxFailedAttempts = 5 const prunePeerStoreInterval = 10 * time.Minute const peerConnectivityLoopSecs = 15 const maxConnsToPeerRatio = 3 -const badPeersCleanupInterval = 1 * time.Minute const maxDialFailures = 2 // 80% relay peers 20% service peers @@ -258,14 +257,13 @@ func (pm *PeerManager) Start(ctx context.Context) { } } -func (pm *PeerManager) removeBadPeers() { - if !pm.RelayEnabled { - for _, peerID := range pm.host.Peerstore().Peers() { - if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures { - //delete peer from peerStore - pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) - pm.RemovePeer(peerID) - } +func (pm *PeerManager) CheckAndRemoveBadPeer(peerID peer.ID) { + if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures && + pm.peerConnector.onlineChecker.IsOnline() { + if origin, _ := pm.host.Peerstore().(wps.WakuPeerstore).Origin(peerID); origin != wps.Static { // delete only if a peer is discovered and not configured statically. + //delete peer from peerStore + pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) + pm.RemovePeer(peerID) } } } @@ -273,17 +271,13 @@ func (pm *PeerManager) removeBadPeers() { func (pm *PeerManager) peerStoreLoop(ctx context.Context) { defer utils.LogOnPanic() t := time.NewTicker(prunePeerStoreInterval) - t1 := time.NewTicker(badPeersCleanupInterval) defer t.Stop() - defer t1.Stop() for { select { case <-ctx.Done(): return case <-t.C: pm.prunePeerStore() - case <-t1.C: - pm.removeBadPeers() } } } @@ -749,6 +743,7 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { if err == nil || errors.Is(err, context.Canceled) { return } + if pm.peerConnector != nil { pm.peerConnector.addConnectionBackoff(peerID) } @@ -762,9 +757,4 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { pm.logger.Error("failed to emit DialError", zap.Error(emitterErr)) } } - if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures { - //delete peer from peerStore - pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) - pm.RemovePeer(peerID) - } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 3b56d4700ee..8fbcd91c138 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" @@ -54,6 +55,7 @@ type WakuFilterLightNode struct { log *zap.Logger subscriptions *subscription.SubscriptionsMap pm *peermanager.PeerManager + limiter *utils.RateLimiter peerPingInterval time.Duration } @@ -89,6 +91,7 @@ func NewWakuFilterLightNode( onlineChecker onlinechecker.OnlineChecker, reg prometheus.Registerer, log *zap.Logger, + opts ...LightNodeOption, ) *WakuFilterLightNode { wf := new(WakuFilterLightNode) wf.log = log.Named("filterv2-lightnode") @@ -99,6 +102,14 @@ func NewWakuFilterLightNode( wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) wf.peerPingInterval = 1 * time.Minute + + params := &LightNodeParameters{} + opts = append(DefaultLightNodeOptions(), opts...) + for _, opt := range opts { + opt(params) + } + wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB) + return wf } @@ -155,6 +166,14 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea logger := wf.log.With(logging.HostID("peerID", peerID)) + if !wf.limiter.Allow(peerID) { + wf.metrics.RecordError(rateLimitFailure) + if err := stream.Reset(); err != nil { + wf.log.Error("resetting connection", zap.Error(err)) + } + return + } + if !wf.subscriptions.IsSubscribedTo(peerID) { logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) wf.metrics.RecordError(unknownPeerMessagePush) @@ -249,6 +268,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, wf.metrics.RecordError(dialFailure) if wf.pm != nil { wf.pm.HandleDialError(err, peerID) + if errors.Is(err, swarm.ErrAllDialsFailed) || + errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) { + wf.pm.CheckAndRemoveBadPeer(peerID) + } } return err } @@ -287,7 +310,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, } - if filterSubscribeResponse.RequestId != request.RequestId { + if filterSubscribeResponse.RequestId != "N/A" && filterSubscribeResponse.RequestId != request.RequestId { wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId)) wf.metrics.RecordError(requestIDMismatch) err := NewFilterError(300, "request_id_mismatch") @@ -337,7 +360,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, if params.pm != nil && reqPeerCount > 0 { wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude))) - params.selectedPeers, err = wf.pm.SelectPeers( + selectedPeers, err := wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, @@ -350,7 +373,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, ) if err != nil { wf.log.Error("peer selection returned err", zap.Error(err)) - return nil, nil, err + if len(params.selectedPeers) == 0 { + return nil, nil, err + } + } + if len(selectedPeers) > 0 { + params.selectedPeers = append(params.selectedPeers, selectedPeers...) } } wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers))) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go index 126090d9939..7bdd1569489 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go @@ -24,7 +24,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout) defer cancel() err := wf.Ping(ctxWithTimeout, peer) - if err != nil { + if err != nil && wf.onlineChecker.IsOnline() { wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err)) //quickly retry ping again before marking subscription as failure //Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent. diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/metrics.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/metrics.go index 51e3b356d1d..89ac8e4aa97 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/metrics.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/metrics.go @@ -96,6 +96,7 @@ var ( peerNotFoundFailure metricsErrCategory = "peer_not_found_failure" writeResponseFailure metricsErrCategory = "write_response_failure" pushTimeoutFailure metricsErrCategory = "push_timeout_failure" + rateLimitFailure metricsErrCategory = "ratelimit_failure" ) // RecordError increases the counter for different error types diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go index f19876212ac..bde105e479f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" + "golang.org/x/time/rate" ) func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters { @@ -57,13 +58,35 @@ type ( Timeout time.Duration MaxSubscribers int pm *peermanager.PeerManager + limitR rate.Limit + limitB int } Option func(*FilterParameters) + LightNodeParameters struct { + limitR rate.Limit + limitB int + } + + LightNodeOption func(*LightNodeParameters) + FilterSubscribeOption func(*FilterSubscribeParameters) error ) +func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption { + return func(params *LightNodeParameters) { + params.limitR = r + params.limitB = b + } +} + +func DefaultLightNodeOptions() []LightNodeOption { + return []LightNodeOption{ + WithLightNodeRateLimiter(1, 1), + } +} + func WithTimeout(timeout time.Duration) Option { return func(params *FilterParameters) { params.Timeout = timeout @@ -202,9 +225,17 @@ func WithPeerManager(pm *peermanager.PeerManager) Option { } } +func WithFullNodeRateLimiter(r rate.Limit, b int) Option { + return func(params *FilterParameters) { + params.limitR = r + params.limitB = b + } +} + func DefaultOptions() []Option { return []Option{ WithTimeout(DefaultIdleSubscriptionTimeout), WithMaxSubscribers(DefaultMaxSubscribers), + WithFullNodeRateLimiter(1, 1), } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go index bacfe85cc14..82c4c47da8f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go @@ -37,9 +37,9 @@ type ( metrics Metrics log *zap.Logger *service.CommonService - subscriptions *SubscribersMap - pm *peermanager.PeerManager - + subscriptions *SubscribersMap + pm *peermanager.PeerManager + limiter *utils.RateLimiter maxSubscriptions int } ) @@ -56,6 +56,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi opt(params) } + wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB) wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) wf.subscriptions = NewSubscribersMap(params.Timeout) @@ -93,7 +94,14 @@ func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error { func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream) { return func(stream network.Stream) { - logger := wf.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) + peerID := stream.Conn().RemotePeer() + logger := wf.log.With(logging.HostID("peer", peerID)) + + if !wf.limiter.Allow(peerID) { + wf.metrics.RecordError(rateLimitFailure) + wf.reply(ctx, stream, &pb.FilterSubscribeRequest{RequestId: "N/A"}, http.StatusTooManyRequests, "filter request rejected due rate limit exceeded") + return + } reader := pbio.NewDelimitedReader(stream, math.MaxInt32) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go index 015cb352e5b..88b9e04e6f4 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go @@ -22,6 +22,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" + "golang.org/x/time/rate" ) type LightNodeData struct { @@ -133,7 +134,7 @@ func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bo nodeData := s.GetWakuRelay(topic) - node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, WithFullNodeRateLimiter(rate.Inf, 0)) node2Filter.SetHost(nodeData.FullNodeHost) var sub *relay.Subscription @@ -166,7 +167,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { b := relay.NewBroadcaster(10) s.Require().NoError(b.Start(context.Background())) pm := peermanager.NewPeerManager(5, 5, nil, nil, true, s.Log) - filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log) + filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log, WithLightNodeRateLimiter(rate.Inf, 0)) filterPush.SetHost(host) pm.SetHost(host) return LightNodeData{filterPush, host} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 9d6744315a6..c6bed8c2d49 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" @@ -24,7 +25,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "golang.org/x/time/rate" ) // LightPushID_v20beta1 is the current Waku LightPush protocol identifier @@ -40,7 +40,7 @@ var ( type WakuLightPush struct { h host.Host relay *relay.WakuRelay - limiter *rate.Limiter + limiter *utils.RateLimiter cancel context.CancelFunc pm *peermanager.PeerManager metrics Metrics @@ -59,11 +59,12 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p wakuLP.metrics = newMetrics(reg) params := &LightpushParameters{} + opts = append(DefaultLightpushOptions(), opts...) for _, opt := range opts { opt(params) } - wakuLP.limiter = params.limiter + wakuLP.limiter = utils.NewRateLimiter(params.limitR, params.limitB) return wakuLP } @@ -106,7 +107,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream) Response: &pb.PushResponse{}, } - if wakuLP.limiter != nil && !wakuLP.limiter.Allow() { + if !wakuLP.limiter.Allow(stream.Conn().RemotePeer()) { wakuLP.metrics.RecordError(rateLimitFailure) responseMsg := "exceeds the rate limit" responsePushRPC.Response.Info = &responseMsg @@ -198,6 +199,10 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p wakuLP.metrics.RecordError(dialFailure) if wakuLP.pm != nil { wakuLP.pm.HandleDialError(err, peerID) + if errors.Is(err, swarm.ErrAllDialsFailed) || + errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) { + wakuLP.pm.CheckAndRemoveBadPeer(peerID) + } } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go index 7ed04370544..b9740ab4c76 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -14,7 +14,8 @@ import ( ) type LightpushParameters struct { - limiter *rate.Limiter + limitR rate.Limit + limitB int } type Option func(*LightpushParameters) @@ -22,7 +23,14 @@ type Option func(*LightpushParameters) // WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol func WithRateLimiter(r rate.Limit, b int) Option { return func(params *LightpushParameters) { - params.limiter = rate.NewLimiter(r, b) + params.limitR = r + params.limitB = b + } +} + +func DefaultLightpushOptions() []Option { + return []Option{ + WithRateLimiter(1, 1), } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go index 5a3821b96a1..dc181fb424a 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go @@ -23,7 +23,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "golang.org/x/time/rate" ) // PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier @@ -51,7 +50,7 @@ type WakuPeerExchange struct { peerConnector PeerConnector enrCache *enrCache - limiter *rate.Limiter + limiter *utils.RateLimiter } // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct @@ -68,11 +67,12 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, clusterID uint16, peerConnect wakuPX.CommonService = service.NewCommonService() params := &PeerExchangeParameters{} + opts = append(DefaultPeerExchangeOptions(), opts...) for _, opt := range opts { opt(params) } - wakuPX.limiter = params.limiter + wakuPX.limiter = utils.NewRateLimiter(params.limiterR, params.limiterB) return wakuPX, nil } @@ -97,9 +97,10 @@ func (wakuPX *WakuPeerExchange) start() error { func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) { return func(stream network.Stream) { - logger := wakuPX.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) + peerID := stream.Conn().RemotePeer() + logger := wakuPX.log.With(logging.HostID("peer", peerID)) - if wakuPX.limiter != nil && !wakuPX.limiter.Allow() { + if wakuPX.limiter != nil && !wakuPX.limiter.Allow(peerID) { wakuPX.metrics.RecordError(rateLimitFailure) wakuPX.log.Info("exceeds the rate limit") // TODO: peer exchange protocol should contain an err field diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index c0898809102..c25078b73db 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -12,7 +12,8 @@ import ( ) type PeerExchangeParameters struct { - limiter *rate.Limiter + limiterR rate.Limit + limiterB int } type Option func(*PeerExchangeParameters) @@ -20,7 +21,14 @@ type Option func(*PeerExchangeParameters) // WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol func WithRateLimiter(r rate.Limit, b int) Option { return func(params *PeerExchangeParameters) { - params.limiter = rate.NewLimiter(r, b) + params.limiterR = r + params.limiterB = b + } +} + +func DefaultPeerExchangeOptions() []Option { + return []Option{ + WithRateLimiter(1, 1), } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/utils/limiter.go b/vendor/github.com/waku-org/go-waku/waku/v2/utils/limiter.go new file mode 100644 index 00000000000..a587659f8f7 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/utils/limiter.go @@ -0,0 +1,69 @@ +package utils + +import ( + "context" + "sync" + "time" + + "github.com/jellydator/ttlcache/v3" + "github.com/libp2p/go-libp2p/core/peer" + "golang.org/x/time/rate" +) + +type RateLimiter struct { + sync.Mutex + limiters *ttlcache.Cache[peer.ID, *rate.Limiter] + r rate.Limit + b int +} + +func NewRateLimiter(r rate.Limit, b int) *RateLimiter { + return &RateLimiter{ + r: r, + b: b, + limiters: ttlcache.New[peer.ID, *rate.Limiter]( + ttlcache.WithTTL[peer.ID, *rate.Limiter](30 * time.Minute), + ), + } +} + +func (r *RateLimiter) Start(ctx context.Context) { + go func() { + t := time.NewTicker(time.Hour) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + r.Lock() + r.limiters.DeleteExpired() + r.Unlock() + } + } + }() +} + +func (r *RateLimiter) getOrCreate(peerID peer.ID) *rate.Limiter { + r.Lock() + defer r.Unlock() + + var limiter *rate.Limiter + if !r.limiters.Has(peerID) { + limiter = rate.NewLimiter(r.r, r.b) + r.limiters.Set(peerID, limiter, ttlcache.DefaultTTL) + } else { + v := r.limiters.Get(peerID) + limiter = v.Value() + } + return limiter +} + +func (r *RateLimiter) Allow(peerID peer.ID) bool { + return r.getOrCreate(peerID).Allow() +} + +func (r *RateLimiter) Wait(ctx context.Context, peerID peer.ID) error { + return r.getOrCreate(peerID).Wait(ctx) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index a30c3e7c481..cee9982b445 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -448,7 +448,7 @@ github.com/jackpal/go-nat-pmp # github.com/jbenet/go-temp-err-catcher v0.1.0 ## explicit; go 1.13 github.com/jbenet/go-temp-err-catcher -# github.com/jellydator/ttlcache/v3 v3.2.0 +# github.com/jellydator/ttlcache/v3 v3.3.0 ## explicit; go 1.18 github.com/jellydator/ttlcache/v3 # github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a @@ -1044,7 +1044,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90 +# github.com/waku-org/go-waku v0.8.1-0.20241204120216-e0a9ae0447ce ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 1fc541bd2f7..ed06703a3a8 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1707,6 +1707,15 @@ func (w *Waku) handleNetworkChangeFromApp(state connection.State) { func (w *Waku) ConnectionChanged(state connection.State) { isOnline := !state.Offline + if isOnline && !w.onlineChecker.IsOnline() { + w.logger.Debug("Starting mising msg verifier as back online") + w.missingMsgVerifier.Start(w.ctx) + } + if !isOnline && w.onlineChecker.IsOnline() { + w.logger.Debug("Stopping mising msg verifier as went offline") + w.missingMsgVerifier.Stop() + } + if w.cfg.LightClient { //TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114 go func() {