Skip to content

Commit

Permalink
feat!: extract storenode cycle to go-waku api
Browse files Browse the repository at this point in the history
Extracts the storenode cycle code to go-waku.
  • Loading branch information
richard-ramos authored Nov 25, 2024
1 parent 987a9e8 commit 0c838b0
Show file tree
Hide file tree
Showing 40 changed files with 1,764 additions and 1,481 deletions.
58 changes: 49 additions & 9 deletions eth-node/bridge/geth/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"

"github.com/waku-org/go-waku/waku/v2/api/history"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
gocommon "github.com/status-im/status-go/common"
Expand Down Expand Up @@ -274,10 +276,6 @@ func (w *GethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
w.waku.MarkP2PMessageAsProcessed(hash)
}

func (w *GethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) {
return nil, 0, errors.New("not implemented")
}

func (w *GethWakuWrapper) ConnectionChanged(_ connection.State) {}

func (w *GethWakuWrapper) ClearEnvelopesCache() {
Expand Down Expand Up @@ -314,13 +312,55 @@ func (w *wakuFilterWrapper) ID() string {
func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) {
}

func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) {
func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not implemented")
func (w *GethWakuWrapper) GetActiveStorenode() peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeChanged() <-chan peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeNotWorking() <-chan struct{} {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeAvailable() <-chan peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) WaitForAvailableStoreNode(ctx context.Context) bool {
return false
}

func (w *GethWakuWrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
return errors.New("not available in WakuV1")
}

func (w *GethWakuWrapper) IsStorenodeAvailable(peerID peer.ID) bool {
panic("not available in WakuV1")

}

func (w *GethWakuWrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error {
panic("not available in WakuV1")

}

func (w *GethWakuWrapper) PingPeer(context.Context, peer.ID) (time.Duration, error) {
return 0, errors.New("not available in WakuV1")
func (w *GethWakuWrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) {
panic("not available in WakuV1")
}
105 changes: 65 additions & 40 deletions eth-node/bridge/geth/wakuv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"

"github.com/waku-org/go-waku/waku/v2/api/history"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/store"

Expand Down Expand Up @@ -176,39 +177,6 @@ func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.Privat
}, id), nil
}

func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) {
options := []store.RequestOption{
store.WithPaging(false, uint64(r.Limit)),
}

var cursor []byte
if r.StoreCursor != nil {
cursor = r.StoreCursor
}

contentTopics := []string{}
for _, topic := range r.ContentTopics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic).ContentTopic())
}

query := store.FilterCriteria{
TimeStart: proto.Int64(int64(r.From) * int64(time.Second)),
TimeEnd: proto.Int64(int64(r.To) * int64(time.Second)),
ContentFilter: protocol.NewContentFilter(w.waku.GetPubsubTopic(r.PubsubTopic), contentTopics...),
}

pbCursor, envelopesCount, err := w.waku.Query(ctx, peerID, query, cursor, options, processEnvelopes)
if err != nil {
return nil, 0, err
}

if pbCursor != nil {
return pbCursor, envelopesCount, nil
}

return nil, envelopesCount, nil
}

func (w *gethWakuV2Wrapper) StartDiscV5() error {
return w.waku.StartDiscV5()
}
Expand Down Expand Up @@ -289,7 +257,7 @@ func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSub
func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []types.TopicType) error {
var cTopics []string
for _, ct := range contentTopics {
cTopics = append(cTopics, wakucommon.TopicType(ct).ContentTopic())
cTopics = append(cTopics, wakucommon.BytesToTopic(ct.Bytes()).ContentTopic())
}
pubsubTopic = w.waku.GetPubsubTopic(pubsubTopic)
w.waku.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic, cTopics)
Expand Down Expand Up @@ -338,14 +306,71 @@ func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) {
w.waku.ConfirmMessageDelivered(hashes)
}

func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) {
w.waku.SetStorePeerID(peerID)
}

func (w *gethWakuV2Wrapper) PeerID() peer.ID {
return w.waku.PeerID()
}

func (w *gethWakuV2Wrapper) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
return w.waku.PingPeer(ctx, peerID)
func (w *gethWakuV2Wrapper) GetActiveStorenode() peer.ID {
return w.waku.StorenodeCycle.GetActiveStorenode()
}

func (w *gethWakuV2Wrapper) OnStorenodeChanged() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeChangedEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeNotWorking() <-chan struct{} {
return w.waku.StorenodeCycle.StorenodeNotWorkingEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeAvailable() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeAvailableEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(ctx context.Context) bool {
return w.waku.StorenodeCycle.WaitForAvailableStoreNode(ctx)
}

func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
w.waku.StorenodeCycle.SetStorenodeConfigProvider(c)
}

func (w *gethWakuV2Wrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
pubsubTopic := w.waku.GetPubsubTopic(batch.PubsubTopic)
contentTopics := []string{}
for _, topic := range batch.Topics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic.Bytes()).ContentTopic())
}

criteria := store.FilterCriteria{
TimeStart: proto.Int64(batch.From.UnixNano()),
TimeEnd: proto.Int64(batch.To.UnixNano()),
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
}

return w.waku.HistoryRetriever.Query(ctx, criteria, storenodeID, pageLimit, shouldProcessNextPage, processEnvelopes)
}

func (w *gethWakuV2Wrapper) IsStorenodeAvailable(peerID peer.ID) bool {
return w.waku.StorenodeCycle.IsStorenodeAvailable(peerID)
}

func (w *gethWakuV2Wrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error {
return w.waku.StorenodeCycle.PerformStorenodeTask(fn, opts...)
}

func (w *gethWakuV2Wrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) {
w.waku.StorenodeCycle.Lock()
defer w.waku.StorenodeCycle.Unlock()

w.waku.StorenodeCycle.DisconnectActiveStorenode(backoff)
if shouldCycle {
w.waku.StorenodeCycle.Cycle(ctx)
}
}
54 changes: 0 additions & 54 deletions eth-node/types/mailserver.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,5 @@
package types

import (
"time"
)

const (
// MaxLimitInMessagesRequest represents the maximum number of messages
// that can be requested from the mailserver
MaxLimitInMessagesRequest = 1000
)

// MessagesRequest contains details of a request of historic messages.
type MessagesRequest struct {
// ID of the request. The current implementation requires ID to be 32-byte array,
// however, it's not enforced for future implementation.
ID []byte `json:"id"`
// From is a lower bound of time range.
From uint32 `json:"from"`
// To is a upper bound of time range.
To uint32 `json:"to"`
// Limit determines the number of messages sent by the mail server
// for the current paginated request.
Limit uint32 `json:"limit"`
// Cursor is used as starting point for paginated requests.
Cursor []byte `json:"cursor"`
// StoreCursor is used as starting point for WAKUV2 paginatedRequests
StoreCursor StoreRequestCursor `json:"storeCursor"`
// Bloom is a filter to match requested messages.
Bloom []byte `json:"bloom"`
// PubsubTopic is the gossipsub topic on which the message was broadcasted
PubsubTopic string `json:"pubsubTopic"`
// ContentTopics is a list of topics. A returned message should
// belong to one of the topics from the list.
ContentTopics [][]byte `json:"contentTopics"`
}

type StoreRequestCursor []byte

// SetDefaults sets the From and To defaults
func (r *MessagesRequest) SetDefaults(now time.Time) {
// set From and To defaults
if r.To == 0 {
r.To = uint32(now.UTC().Unix())
}

if r.From == 0 {
oneDay := uint32(86400) // -24 hours
if r.To < oneDay {
r.From = 0
} else {
r.From = r.To - oneDay
}
}
}

// MailServerResponse is the response payload sent by the mailserver.
type MailServerResponse struct {
LastEnvelopeHash Hash
Expand Down
4 changes: 4 additions & 0 deletions eth-node/types/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (t TopicType) String() string {
return EncodeHex(t[:])
}

func (t TopicType) Bytes() []byte {
return TopicTypeToByteArray(t)
}

// MarshalText returns the hex representation of t.
func (t TopicType) MarshalText() ([]byte, error) {
return HexBytes(t[:]).MarshalText()
Expand Down
63 changes: 55 additions & 8 deletions eth-node/types/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package types
import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"sync"
"time"

Expand All @@ -12,6 +15,8 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/pborman/uuid"

"github.com/waku-org/go-waku/waku/v2/api/history"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/connection"
Expand Down Expand Up @@ -176,9 +181,6 @@ type Waku interface {
Unsubscribe(ctx context.Context, id string) error
UnsubscribeMany(ids []string) error

// RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages
RequestStoreMessages(ctx context.Context, peerID peer.ID, request MessagesRequest, processEnvelopes bool) (StoreRequestCursor, int, error)

// ProcessingP2PMessages indicates whether there are in-flight p2p messages
ProcessingP2PMessages() bool

Expand All @@ -194,12 +196,57 @@ type Waku interface {
// ConfirmMessageDelivered updates a message has been delivered in waku
ConfirmMessageDelivered(hash []common.Hash)

// SetStorePeerID updates the peer id of store node
SetStorePeerID(peerID peer.ID)

// PeerID returns node's PeerID
PeerID() peer.ID

// PingPeer returns the reply time
PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error)
// GetActiveStorenode returns the peer ID of the currently active storenode. It will be empty if no storenode is active
GetActiveStorenode() peer.ID

// OnStorenodeChanged is triggered when a new storenode is promoted to become the active storenode or when the active storenode is removed
OnStorenodeChanged() <-chan peer.ID

// OnStorenodeNotWorking is triggered when the last active storenode fails to return results consistently
OnStorenodeNotWorking() <-chan struct{}

// OnStorenodeAvailable is triggered when there is a new active storenode selected
OnStorenodeAvailable() <-chan peer.ID

// WaitForAvailableStoreNode will wait for a storenode to be available depending on the context
WaitForAvailableStoreNode(ctx context.Context) bool

// SetStorenodeConfigProvider will set the configuration provider for the storenode cycle
SetStorenodeConfigProvider(c history.StorenodeConfigProvider)

// ProcessMailserverBatch will receive a criteria and storenode and execute a query
ProcessMailserverBatch(
ctx context.Context,
batch MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error

// IsStorenodeAvailable is used to determine whether a storenode is available or not
IsStorenodeAvailable(peerID peer.ID) bool

PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error

// DisconnectActiveStorenode will trigger a disconnection of the active storenode, and potentially execute a cycling so a new storenode is promoted
DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool)
}

type MailserverBatch struct {
From time.Time
To time.Time
Cursor string
PubsubTopic string
Topics []TopicType
ChatIDs []string
}

func (mb *MailserverBatch) Hash() string {
data := fmt.Sprintf("%d%d%s%s%v%v", mb.From.UnixNano(), mb.To.UnixNano(), mb.Cursor, mb.PubsubTopic, mb.Topics, mb.ChatIDs)
hash := sha256.Sum256([]byte(data))
return hex.EncodeToString(hash[:4])
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb h1:E3J49PH9iXpjaOOI/VrEX/VhSk3obKjxVehGEDzZgXI=
github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0 h1:PNKcOPMn0yoC2NQaJPPB8FvHT/YtaU8hZAoovSl42KM=
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
Expand Down
Loading

0 comments on commit 0c838b0

Please sign in to comment.