Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: extract storenode cycle to go-waku api #5857

Merged
merged 5 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 49 additions & 9 deletions eth-node/bridge/geth/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"

"github.com/waku-org/go-waku/waku/v2/api/history"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
gocommon "github.com/status-im/status-go/common"
Expand Down Expand Up @@ -274,10 +276,6 @@ func (w *GethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
w.waku.MarkP2PMessageAsProcessed(hash)
}

func (w *GethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) {
return nil, 0, errors.New("not implemented")
}

func (w *GethWakuWrapper) ConnectionChanged(_ connection.State) {}

func (w *GethWakuWrapper) ClearEnvelopesCache() {
Expand Down Expand Up @@ -314,13 +312,55 @@ func (w *wakuFilterWrapper) ID() string {
func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) {
}

func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) {
func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not implemented")
func (w *GethWakuWrapper) GetActiveStorenode() peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeChanged() <-chan peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeNotWorking() <-chan struct{} {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeAvailable() <-chan peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) WaitForAvailableStoreNode(ctx context.Context) bool {
return false
}

func (w *GethWakuWrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
return errors.New("not available in WakuV1")
}

func (w *GethWakuWrapper) IsStorenodeAvailable(peerID peer.ID) bool {
panic("not available in WakuV1")

}

func (w *GethWakuWrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error {
panic("not available in WakuV1")

}

func (w *GethWakuWrapper) PingPeer(context.Context, peer.ID) (time.Duration, error) {
return 0, errors.New("not available in WakuV1")
func (w *GethWakuWrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) {
panic("not available in WakuV1")
}
105 changes: 65 additions & 40 deletions eth-node/bridge/geth/wakuv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"

"github.com/waku-org/go-waku/waku/v2/api/history"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/store"

Expand Down Expand Up @@ -176,39 +177,6 @@ func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.Privat
}, id), nil
}

func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) {
options := []store.RequestOption{
store.WithPaging(false, uint64(r.Limit)),
}

var cursor []byte
if r.StoreCursor != nil {
cursor = r.StoreCursor
}

contentTopics := []string{}
for _, topic := range r.ContentTopics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic).ContentTopic())
}

query := store.FilterCriteria{
TimeStart: proto.Int64(int64(r.From) * int64(time.Second)),
TimeEnd: proto.Int64(int64(r.To) * int64(time.Second)),
ContentFilter: protocol.NewContentFilter(w.waku.GetPubsubTopic(r.PubsubTopic), contentTopics...),
}

pbCursor, envelopesCount, err := w.waku.Query(ctx, peerID, query, cursor, options, processEnvelopes)
if err != nil {
return nil, 0, err
}

if pbCursor != nil {
return pbCursor, envelopesCount, nil
}

return nil, envelopesCount, nil
}

func (w *gethWakuV2Wrapper) StartDiscV5() error {
return w.waku.StartDiscV5()
}
Expand Down Expand Up @@ -289,7 +257,7 @@ func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSub
func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []types.TopicType) error {
var cTopics []string
for _, ct := range contentTopics {
cTopics = append(cTopics, wakucommon.TopicType(ct).ContentTopic())
cTopics = append(cTopics, wakucommon.BytesToTopic(ct.Bytes()).ContentTopic())
}
pubsubTopic = w.waku.GetPubsubTopic(pubsubTopic)
w.waku.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic, cTopics)
Expand Down Expand Up @@ -338,14 +306,71 @@ func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) {
w.waku.ConfirmMessageDelivered(hashes)
}

func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) {
w.waku.SetStorePeerID(peerID)
}

func (w *gethWakuV2Wrapper) PeerID() peer.ID {
return w.waku.PeerID()
}

func (w *gethWakuV2Wrapper) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
return w.waku.PingPeer(ctx, peerID)
func (w *gethWakuV2Wrapper) GetActiveStorenode() peer.ID {
return w.waku.StorenodeCycle.GetActiveStorenode()
}

func (w *gethWakuV2Wrapper) OnStorenodeChanged() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeChangedEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeNotWorking() <-chan struct{} {
return w.waku.StorenodeCycle.StorenodeNotWorkingEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeAvailable() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeAvailableEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(ctx context.Context) bool {
return w.waku.StorenodeCycle.WaitForAvailableStoreNode(ctx)
}

func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
w.waku.StorenodeCycle.SetStorenodeConfigProvider(c)
}

func (w *gethWakuV2Wrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
pubsubTopic := w.waku.GetPubsubTopic(batch.PubsubTopic)
contentTopics := []string{}
for _, topic := range batch.Topics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic.Bytes()).ContentTopic())
}

criteria := store.FilterCriteria{
TimeStart: proto.Int64(batch.From.UnixNano()),
TimeEnd: proto.Int64(batch.To.UnixNano()),
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
}

return w.waku.HistoryRetriever.Query(ctx, criteria, storenodeID, pageLimit, shouldProcessNextPage, processEnvelopes)
}

func (w *gethWakuV2Wrapper) IsStorenodeAvailable(peerID peer.ID) bool {
return w.waku.StorenodeCycle.IsStorenodeAvailable(peerID)
}

func (w *gethWakuV2Wrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error {
return w.waku.StorenodeCycle.PerformStorenodeTask(fn, opts...)
}

func (w *gethWakuV2Wrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) {
igor-sirotin marked this conversation as resolved.
Show resolved Hide resolved
w.waku.StorenodeCycle.Lock()
defer w.waku.StorenodeCycle.Unlock()

w.waku.StorenodeCycle.DisconnectActiveStorenode(backoff)
if shouldCycle {
w.waku.StorenodeCycle.Cycle(ctx)
}
}
54 changes: 0 additions & 54 deletions eth-node/types/mailserver.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,5 @@
package types

import (
"time"
)

const (
// MaxLimitInMessagesRequest represents the maximum number of messages
// that can be requested from the mailserver
MaxLimitInMessagesRequest = 1000
)

// MessagesRequest contains details of a request of historic messages.
type MessagesRequest struct {
// ID of the request. The current implementation requires ID to be 32-byte array,
// however, it's not enforced for future implementation.
ID []byte `json:"id"`
// From is a lower bound of time range.
From uint32 `json:"from"`
// To is a upper bound of time range.
To uint32 `json:"to"`
// Limit determines the number of messages sent by the mail server
// for the current paginated request.
Limit uint32 `json:"limit"`
// Cursor is used as starting point for paginated requests.
Cursor []byte `json:"cursor"`
// StoreCursor is used as starting point for WAKUV2 paginatedRequests
StoreCursor StoreRequestCursor `json:"storeCursor"`
// Bloom is a filter to match requested messages.
Bloom []byte `json:"bloom"`
// PubsubTopic is the gossipsub topic on which the message was broadcasted
PubsubTopic string `json:"pubsubTopic"`
// ContentTopics is a list of topics. A returned message should
// belong to one of the topics from the list.
ContentTopics [][]byte `json:"contentTopics"`
}

type StoreRequestCursor []byte

// SetDefaults sets the From and To defaults
func (r *MessagesRequest) SetDefaults(now time.Time) {
// set From and To defaults
if r.To == 0 {
r.To = uint32(now.UTC().Unix())
}

if r.From == 0 {
oneDay := uint32(86400) // -24 hours
if r.To < oneDay {
r.From = 0
} else {
r.From = r.To - oneDay
}
}
}

// MailServerResponse is the response payload sent by the mailserver.
type MailServerResponse struct {
LastEnvelopeHash Hash
Expand Down
4 changes: 4 additions & 0 deletions eth-node/types/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (t TopicType) String() string {
return EncodeHex(t[:])
}

func (t TopicType) Bytes() []byte {
return TopicTypeToByteArray(t)
}

// MarshalText returns the hex representation of t.
func (t TopicType) MarshalText() ([]byte, error) {
return HexBytes(t[:]).MarshalText()
Expand Down
63 changes: 55 additions & 8 deletions eth-node/types/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package types
import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"sync"
"time"

Expand All @@ -12,6 +15,8 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/pborman/uuid"

"github.com/waku-org/go-waku/waku/v2/api/history"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/connection"
Expand Down Expand Up @@ -176,9 +181,6 @@ type Waku interface {
Unsubscribe(ctx context.Context, id string) error
UnsubscribeMany(ids []string) error

// RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages
RequestStoreMessages(ctx context.Context, peerID peer.ID, request MessagesRequest, processEnvelopes bool) (StoreRequestCursor, int, error)

// ProcessingP2PMessages indicates whether there are in-flight p2p messages
ProcessingP2PMessages() bool

Expand All @@ -194,12 +196,57 @@ type Waku interface {
// ConfirmMessageDelivered updates a message has been delivered in waku
ConfirmMessageDelivered(hash []common.Hash)

// SetStorePeerID updates the peer id of store node
SetStorePeerID(peerID peer.ID)

// PeerID returns node's PeerID
PeerID() peer.ID

// PingPeer returns the reply time
PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error)
// GetActiveStorenode returns the peer ID of the currently active storenode. It will be empty if no storenode is active
GetActiveStorenode() peer.ID

// OnStorenodeChanged is triggered when a new storenode is promoted to become the active storenode or when the active storenode is removed
OnStorenodeChanged() <-chan peer.ID

// OnStorenodeNotWorking is triggered when the last active storenode fails to return results consistently
OnStorenodeNotWorking() <-chan struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it's possible to use a state machine for different status of store node to simplify the channels here, like

type State int

const (
    Offline State = iota
    Online
)

// Define StoreNodeState struct
type StoreNodeState struct {
    status State
    peerID peer.ID
}

// Define Waku struct
type Waku struct {
    storeNodeState StoreNodeState
}

// Function to handle state changes
func handleStoreNodeStateChange(newState chan StoreNodeState) {
    for {
        select {
        case state := <-newState:
            if state.status == Online {
                doA()
            }
            if state.status == Offline {
                doB()
            }
        }
    }
}

// Placeholder for actual functions to be called
func doA() {
    fmt.Println("Doing A: Node is online")
}

func doB() {
    fmt.Println("Doing B: Node is offline")
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a good idea! Will tackle it as part of waku-org/go-waku#1246


// OnStorenodeAvailable is triggered when there is a new active storenode selected
OnStorenodeAvailable() <-chan peer.ID

// WaitForAvailableStoreNode will wait for a storenode to be available depending on the context
WaitForAvailableStoreNode(ctx context.Context) bool

// SetStorenodeConfigProvider will set the configuration provider for the storenode cycle
SetStorenodeConfigProvider(c history.StorenodeConfigProvider)

// ProcessMailserverBatch will receive a criteria and storenode and execute a query
ProcessMailserverBatch(
ctx context.Context,
batch MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error

// IsStorenodeAvailable is used to determine whether a storenode is available or not
IsStorenodeAvailable(peerID peer.ID) bool

PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error

// DisconnectActiveStorenode will trigger a disconnection of the active storenode, and potentially execute a cycling so a new storenode is promoted
DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool)
}

type MailserverBatch struct {
From time.Time
To time.Time
Cursor string
PubsubTopic string
Topics []TopicType
ChatIDs []string
}

func (mb *MailserverBatch) Hash() string {
data := fmt.Sprintf("%d%d%s%s%v%v", mb.From.UnixNano(), mb.To.UnixNano(), mb.Cursor, mb.PubsubTopic, mb.Topics, mb.ChatIDs)
hash := sha256.Sum256([]byte(data))
return hex.EncodeToString(hash[:4])
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb h1:E3J49PH9iXpjaOOI/VrEX/VhSk3obKjxVehGEDzZgXI=
github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0 h1:PNKcOPMn0yoC2NQaJPPB8FvHT/YtaU8hZAoovSl42KM=
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
Expand Down
Loading