Skip to content

Commit

Permalink
feat_: extract storenode cycle to go-waku api
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Oct 15, 2024
1 parent fadce93 commit 65d7a85
Show file tree
Hide file tree
Showing 35 changed files with 1,506 additions and 1,385 deletions.
62 changes: 53 additions & 9 deletions eth-node/bridge/geth/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"

"github.com/waku-org/go-waku/waku/v2/api/history"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
gocommon "github.com/status-im/status-go/common"
Expand Down Expand Up @@ -274,10 +276,6 @@ func (w *GethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
w.waku.MarkP2PMessageAsProcessed(hash)
}

func (w *GethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) {
return nil, 0, errors.New("not implemented")
}

func (w *GethWakuWrapper) ConnectionChanged(_ connection.State) {}

func (w *GethWakuWrapper) ClearEnvelopesCache() {
Expand Down Expand Up @@ -314,13 +312,59 @@ func (w *wakuFilterWrapper) ID() string {
func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) {
}

func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) {
func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not available in WakuV1")

Check warning on line 316 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L315-L316

Added lines #L315 - L316 were not covered by tests
}

func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not implemented")
func (w *GethWakuWrapper) GetActiveStorenode() peer.ID {
panic("not available in WakuV1")

Check warning on line 320 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L319-L320

Added lines #L319 - L320 were not covered by tests
}

func (w *GethWakuWrapper) OnStorenodeAvailableOneShot() <-chan struct{} {
panic("not available in WakuV1")

Check warning on line 324 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L323-L324

Added lines #L323 - L324 were not covered by tests
}

func (w *GethWakuWrapper) OnStorenodeChanged() <-chan peer.ID {
panic("not available in WakuV1")

Check warning on line 328 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L327-L328

Added lines #L327 - L328 were not covered by tests
}

func (w *GethWakuWrapper) OnStorenodeNotWorking() <-chan struct{} {
panic("not available in WakuV1")

Check warning on line 332 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L331-L332

Added lines #L331 - L332 were not covered by tests
}

func (w *GethWakuWrapper) OnStorenodeAvailable() <-chan peer.ID {
panic("not available in WakuV1")

Check warning on line 336 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L335-L336

Added lines #L335 - L336 were not covered by tests
}

func (w *GethWakuWrapper) WaitForAvailableStoreNode(timeout time.Duration) bool {
panic("not available in WakuV1")

Check warning on line 340 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L339-L340

Added lines #L339 - L340 were not covered by tests
}

func (w *GethWakuWrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
panic("not available in WakuV1")

Check warning on line 344 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L343-L344

Added lines #L343 - L344 were not covered by tests
}

func (w *GethWakuWrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
return errors.New("not available in WakuV1")

Check warning on line 355 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L354-L355

Added lines #L354 - L355 were not covered by tests
}

func (w *GethWakuWrapper) IsStorenodeAvailable(peerID peer.ID) bool {
panic("not available in WakuV1")

Check warning on line 359 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L358-L359

Added lines #L358 - L359 were not covered by tests

}

func (w *GethWakuWrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error {
panic("not available in WakuV1")

Check warning on line 364 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L363-L364

Added lines #L363 - L364 were not covered by tests

}

func (w *GethWakuWrapper) PingPeer(context.Context, peer.ID) (time.Duration, error) {
return 0, errors.New("not available in WakuV1")
func (w *GethWakuWrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) {
panic("not available in WakuV1")

Check warning on line 369 in eth-node/bridge/geth/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/waku.go#L368-L369

Added lines #L368 - L369 were not covered by tests
}
109 changes: 69 additions & 40 deletions eth-node/bridge/geth/wakuv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"

"github.com/waku-org/go-waku/waku/v2/api/history"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/store"

Expand Down Expand Up @@ -176,39 +177,6 @@ func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.Privat
}, id), nil
}

func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) {
options := []store.RequestOption{
store.WithPaging(false, uint64(r.Limit)),
}

var cursor []byte
if r.StoreCursor != nil {
cursor = r.StoreCursor
}

contentTopics := []string{}
for _, topic := range r.ContentTopics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic).ContentTopic())
}

query := store.FilterCriteria{
TimeStart: proto.Int64(int64(r.From) * int64(time.Second)),
TimeEnd: proto.Int64(int64(r.To) * int64(time.Second)),
ContentFilter: protocol.NewContentFilter(w.waku.GetPubsubTopic(r.PubsubTopic), contentTopics...),
}

pbCursor, envelopesCount, err := w.waku.Query(ctx, peerID, query, cursor, options, processEnvelopes)
if err != nil {
return nil, 0, err
}

if pbCursor != nil {
return pbCursor, envelopesCount, nil
}

return nil, envelopesCount, nil
}

func (w *gethWakuV2Wrapper) StartDiscV5() error {
return w.waku.StartDiscV5()
}
Expand Down Expand Up @@ -289,7 +257,7 @@ func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSub
func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []types.TopicType) error {
var cTopics []string
for _, ct := range contentTopics {
cTopics = append(cTopics, wakucommon.TopicType(ct).ContentTopic())
cTopics = append(cTopics, wakucommon.BytesToTopic(ct.Bytes()).ContentTopic())
}
pubsubTopic = w.waku.GetPubsubTopic(pubsubTopic)
w.waku.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic, cTopics)
Expand Down Expand Up @@ -338,14 +306,75 @@ func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) {
w.waku.ConfirmMessageDelivered(hashes)
}

func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) {
w.waku.SetStorePeerID(peerID)
}

func (w *gethWakuV2Wrapper) PeerID() peer.ID {
return w.waku.PeerID()
}

func (w *gethWakuV2Wrapper) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
return w.waku.PingPeer(ctx, peerID)
func (w *gethWakuV2Wrapper) GetActiveStorenode() peer.ID {
return w.waku.StorenodeCycle.GetActiveStorenode()
}

func (w *gethWakuV2Wrapper) OnStorenodeAvailableOneShot() <-chan struct{} {
return w.waku.StorenodeCycle.StorenodeAvailableOneshotEmitter.Subscribe()

Check warning on line 318 in eth-node/bridge/geth/wakuv2.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/wakuv2.go#L317-L318

Added lines #L317 - L318 were not covered by tests
}

func (w *gethWakuV2Wrapper) OnStorenodeChanged() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeChangedEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeNotWorking() <-chan struct{} {
return w.waku.StorenodeCycle.StorenodeNotWorkingEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeAvailable() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeAvailableEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(timeout time.Duration) bool {
return w.waku.StorenodeCycle.WaitForAvailableStoreNode(context.TODO(), timeout)

Check warning on line 334 in eth-node/bridge/geth/wakuv2.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/wakuv2.go#L333-L334

Added lines #L333 - L334 were not covered by tests
}

func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
w.waku.StorenodeCycle.SetStorenodeConfigProvider(c)
}

func (w *gethWakuV2Wrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
pubsubTopic := w.waku.GetPubsubTopic(batch.PubsubTopic)
contentTopics := []string{}
for _, topic := range batch.Topics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic.Bytes()).ContentTopic())

Check warning on line 352 in eth-node/bridge/geth/wakuv2.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/wakuv2.go#L348-L352

Added lines #L348 - L352 were not covered by tests
}

criteria := store.FilterCriteria{
TimeStart: proto.Int64(int64(batch.From) * int64(time.Second)),
TimeEnd: proto.Int64(int64(batch.To) * int64(time.Second)),
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),

Check warning on line 358 in eth-node/bridge/geth/wakuv2.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/wakuv2.go#L355-L358

Added lines #L355 - L358 were not covered by tests
}

return w.waku.HistoryRetriever.Query(ctx, criteria, storenodeID, pageLimit, shouldProcessNextPage, processEnvelopes)

Check warning on line 361 in eth-node/bridge/geth/wakuv2.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/wakuv2.go#L361

Added line #L361 was not covered by tests
}

func (w *gethWakuV2Wrapper) IsStorenodeAvailable(peerID peer.ID) bool {
return w.waku.StorenodeCycle.IsStorenodeAvailable(peerID)

Check warning on line 365 in eth-node/bridge/geth/wakuv2.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/wakuv2.go#L364-L365

Added lines #L364 - L365 were not covered by tests
}

func (w *gethWakuV2Wrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error {
return w.waku.StorenodeCycle.PerformStorenodeTask(fn, opts...)

Check warning on line 369 in eth-node/bridge/geth/wakuv2.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/wakuv2.go#L368-L369

Added lines #L368 - L369 were not covered by tests
}

func (w *gethWakuV2Wrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) {
w.waku.StorenodeCycle.Lock()
defer w.waku.StorenodeCycle.Unlock()

Check warning on line 374 in eth-node/bridge/geth/wakuv2.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/wakuv2.go#L372-L374

Added lines #L372 - L374 were not covered by tests

w.waku.StorenodeCycle.DisconnectActiveStorenode(backoff)
if shouldCycle {
w.waku.StorenodeCycle.Cycle(ctx)

Check warning on line 378 in eth-node/bridge/geth/wakuv2.go

View check run for this annotation

Codecov / codecov/patch

eth-node/bridge/geth/wakuv2.go#L376-L378

Added lines #L376 - L378 were not covered by tests
}
}
54 changes: 0 additions & 54 deletions eth-node/types/mailserver.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,5 @@
package types

import (
"time"
)

const (
// MaxLimitInMessagesRequest represents the maximum number of messages
// that can be requested from the mailserver
MaxLimitInMessagesRequest = 1000
)

// MessagesRequest contains details of a request of historic messages.
type MessagesRequest struct {
// ID of the request. The current implementation requires ID to be 32-byte array,
// however, it's not enforced for future implementation.
ID []byte `json:"id"`
// From is a lower bound of time range.
From uint32 `json:"from"`
// To is a upper bound of time range.
To uint32 `json:"to"`
// Limit determines the number of messages sent by the mail server
// for the current paginated request.
Limit uint32 `json:"limit"`
// Cursor is used as starting point for paginated requests.
Cursor []byte `json:"cursor"`
// StoreCursor is used as starting point for WAKUV2 paginatedRequests
StoreCursor StoreRequestCursor `json:"storeCursor"`
// Bloom is a filter to match requested messages.
Bloom []byte `json:"bloom"`
// PubsubTopic is the gossipsub topic on which the message was broadcasted
PubsubTopic string `json:"pubsubTopic"`
// ContentTopics is a list of topics. A returned message should
// belong to one of the topics from the list.
ContentTopics [][]byte `json:"contentTopics"`
}

type StoreRequestCursor []byte

// SetDefaults sets the From and To defaults
func (r *MessagesRequest) SetDefaults(now time.Time) {
// set From and To defaults
if r.To == 0 {
r.To = uint32(now.UTC().Unix())
}

if r.From == 0 {
oneDay := uint32(86400) // -24 hours
if r.To < oneDay {
r.From = 0
} else {
r.From = r.To - oneDay
}
}
}

// MailServerResponse is the response payload sent by the mailserver.
type MailServerResponse struct {
LastEnvelopeHash Hash
Expand Down
4 changes: 4 additions & 0 deletions eth-node/types/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (t TopicType) String() string {
return EncodeHex(t[:])
}

func (t TopicType) Bytes() []byte {
return TopicTypeToByteArray(t)
}

// MarshalText returns the hex representation of t.
func (t TopicType) MarshalText() ([]byte, error) {
return HexBytes(t[:]).MarshalText()
Expand Down
66 changes: 58 additions & 8 deletions eth-node/types/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package types
import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"sync"
"time"

Expand All @@ -12,6 +15,8 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/pborman/uuid"

"github.com/waku-org/go-waku/waku/v2/api/history"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/connection"
Expand Down Expand Up @@ -176,9 +181,6 @@ type Waku interface {
Unsubscribe(ctx context.Context, id string) error
UnsubscribeMany(ids []string) error

// RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages
RequestStoreMessages(ctx context.Context, peerID peer.ID, request MessagesRequest, processEnvelopes bool) (StoreRequestCursor, int, error)

// ProcessingP2PMessages indicates whether there are in-flight p2p messages
ProcessingP2PMessages() bool

Expand All @@ -194,12 +196,60 @@ type Waku interface {
// ConfirmMessageDelivered updates a message has been delivered in waku
ConfirmMessageDelivered(hash []common.Hash)

// SetStorePeerID updates the peer id of store node
SetStorePeerID(peerID peer.ID)

// PeerID returns node's PeerID
PeerID() peer.ID

// PingPeer returns the reply time
PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error)
// GetActiveStorenode returns the peer ID of the currently active storenode. It will be empty if no storenode is active
GetActiveStorenode() peer.ID

// OnStorenodeAvailableOneShot returns a channel that will be triggered only once when a storenode becomes available
OnStorenodeAvailableOneShot() <-chan struct{}

// OnStorenodeChanged is triggered when a new storenode is promoted to become the active storenode or when the active storenode is removed
OnStorenodeChanged() <-chan peer.ID

// OnStorenodeNotWorking is triggered when the last active storenode fails to return results consistently
OnStorenodeNotWorking() <-chan struct{}

// OnStorenodeAvailable is triggered when there is a new active storenode selected
OnStorenodeAvailable() <-chan peer.ID

// WaitForAvailableStoreNode will wait for a storenode to be available until `timeout` happens
WaitForAvailableStoreNode(timeout time.Duration) bool

// SetStorenodeConfigProvider will set the configuration provider for the storenode cycle
SetStorenodeConfigProvider(c history.StorenodeConfigProvider)

// ProcessMailserverBatch will receive a criteria and storenode and execute a query
ProcessMailserverBatch(
ctx context.Context,
batch MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error

// IsStorenodeAvailable is used to determine whether a storenode is available or not
IsStorenodeAvailable(peerID peer.ID) bool

PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error

// DisconnectActiveStorenode will trigger a disconnection of the active storenode, and potentially execute a cycling so a new storenode is promoted
DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool)
}

type MailserverBatch struct {
From uint32
To uint32
Cursor string
PubsubTopic string
Topics []TopicType
ChatIDs []string
}

func (mb *MailserverBatch) Hash() string {
data := fmt.Sprintf("%d%d%s%s%v%v", mb.From, mb.To, mb.Cursor, mb.PubsubTopic, mb.Topics, mb.ChatIDs)
hash := sha256.Sum256([]byte(data))
return hex.EncodeToString(hash[:4])

Check warning on line 254 in eth-node/types/waku.go

View check run for this annotation

Codecov / codecov/patch

eth-node/types/waku.go#L251-L254

Added lines #L251 - L254 were not covered by tests
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20241004054019-0ed94ce0b1cb
github.com/waku-org/go-waku v0.8.1-0.20241014185851-76275f6fb835
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
Expand Down
Loading

0 comments on commit 65d7a85

Please sign in to comment.