Skip to content

Commit

Permalink
fix!: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Oct 23, 2024
1 parent 9f8e748 commit 7ad58d8
Show file tree
Hide file tree
Showing 24 changed files with 323 additions and 155 deletions.
6 changes: 1 addition & 5 deletions eth-node/bridge/geth/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,6 @@ func (w *GethWakuWrapper) GetActiveStorenode() peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeAvailableOneShot() <-chan struct{} {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeChanged() <-chan peer.ID {
panic("not available in WakuV1")
}
Expand All @@ -336,7 +332,7 @@ func (w *GethWakuWrapper) OnStorenodeAvailable() <-chan peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) WaitForAvailableStoreNode(timeout time.Duration) bool {
func (w *GethWakuWrapper) WaitForAvailableStoreNode(ctx context.Context) bool {
return false
}

Expand Down
12 changes: 4 additions & 8 deletions eth-node/bridge/geth/wakuv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,6 @@ func (w *gethWakuV2Wrapper) GetActiveStorenode() peer.ID {
return w.waku.StorenodeCycle.GetActiveStorenode()
}

func (w *gethWakuV2Wrapper) OnStorenodeAvailableOneShot() <-chan struct{} {
return w.waku.StorenodeCycle.StorenodeAvailableOneshotEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeChanged() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeChangedEmitter.Subscribe()
}
Expand All @@ -330,8 +326,8 @@ func (w *gethWakuV2Wrapper) OnStorenodeAvailable() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeAvailableEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(timeout time.Duration) bool {
return w.waku.StorenodeCycle.WaitForAvailableStoreNode(context.TODO(), timeout)
func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(ctx context.Context) bool {
return w.waku.StorenodeCycle.WaitForAvailableStoreNode(ctx)
}

func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
Expand All @@ -353,8 +349,8 @@ func (w *gethWakuV2Wrapper) ProcessMailserverBatch(
}

criteria := store.FilterCriteria{
TimeStart: proto.Int64(int64(batch.From) * int64(time.Second)),
TimeEnd: proto.Int64(int64(batch.To) * int64(time.Second)),
TimeStart: proto.Int64(batch.From.UnixNano()),
TimeEnd: proto.Int64(batch.From.UnixNano()),
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
}

Expand Down
13 changes: 5 additions & 8 deletions eth-node/types/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,6 @@ type Waku interface {
// GetActiveStorenode returns the peer ID of the currently active storenode. It will be empty if no storenode is active
GetActiveStorenode() peer.ID

// OnStorenodeAvailableOneShot returns a channel that will be triggered only once when a storenode becomes available
OnStorenodeAvailableOneShot() <-chan struct{}

// OnStorenodeChanged is triggered when a new storenode is promoted to become the active storenode or when the active storenode is removed
OnStorenodeChanged() <-chan peer.ID

Expand All @@ -214,8 +211,8 @@ type Waku interface {
// OnStorenodeAvailable is triggered when there is a new active storenode selected
OnStorenodeAvailable() <-chan peer.ID

// WaitForAvailableStoreNode will wait for a storenode to be available until `timeout` happens
WaitForAvailableStoreNode(timeout time.Duration) bool
// WaitForAvailableStoreNode will wait for a storenode to be available depending on the context
WaitForAvailableStoreNode(ctx context.Context) bool

// SetStorenodeConfigProvider will set the configuration provider for the storenode cycle
SetStorenodeConfigProvider(c history.StorenodeConfigProvider)
Expand All @@ -240,16 +237,16 @@ type Waku interface {
}

type MailserverBatch struct {
From uint32
To uint32
From time.Time
To time.Time
Cursor string
PubsubTopic string
Topics []TopicType
ChatIDs []string
}

func (mb *MailserverBatch) Hash() string {
data := fmt.Sprintf("%d%d%s%s%v%v", mb.From, mb.To, mb.Cursor, mb.PubsubTopic, mb.Topics, mb.ChatIDs)
data := fmt.Sprintf("%d%d%s%s%v%v", mb.From.UnixNano(), mb.To.UnixNano(), mb.Cursor, mb.PubsubTopic, mb.Topics, mb.ChatIDs)
hash := sha256.Sum256([]byte(data))
return hex.EncodeToString(hash[:4])
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20241014185851-76275f6fb835
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2140,8 +2140,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20241014185851-76275f6fb835 h1:Vp6BhXiDEilmchHy8OLMZVhugudsnvveNkAKD5nhAGk=
github.com/waku-org/go-waku v0.8.1-0.20241014185851-76275f6fb835/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0 h1:PNKcOPMn0yoC2NQaJPPB8FvHT/YtaU8hZAoovSl42KM=
github.com/waku-org/go-waku v0.8.1-0.20241021202955-3c4e40c729a0/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
Expand Down
8 changes: 7 additions & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,13 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
if m.archiveManager.IsReady() {
go func() {
defer gocommon.LogOnPanic()
<-m.transport.OnStorenodeAvailableOneShot()

select {
case <-m.ctx.Done():
return
case <-m.transport.OnStorenodeAvailable():
}

m.InitHistoryArchiveTasks(controlledCommunities)
}()
}
Expand Down
16 changes: 10 additions & 6 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -3293,15 +3293,15 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities
WithWaitForResponseOption(request.WaitForResponse),
}

community, _, err := m.storeNodeRequestsManager.FetchCommunity(communityAddress, options)
community, _, err := m.storeNodeRequestsManager.FetchCommunity(m.ctx, communityAddress, options)

return community, err
}

// fetchCommunities installs filter for community and requests its details from store node.
// When response received it will be passed through signals handler.
func (m *Messenger) fetchCommunities(communities []communities.CommunityShard) error {
return m.storeNodeRequestsManager.FetchCommunities(communities, []StoreNodeRequestOption{})
return m.storeNodeRequestsManager.FetchCommunities(m.ctx, communities, []StoreNodeRequestOption{})
}

// passStoredCommunityInfoToSignalHandler calls signal handler with community info
Expand Down Expand Up @@ -3972,7 +3972,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
}

// Request possibly missed waku messages for community
ms := m.getCommunityMailserver(c.ID().String())
ms := m.getCommunityStorenode(c.ID().String())
_, err = m.syncFiltersFrom(ms, filters, uint32(latestWakuMessageTimestamp))
if err != nil {
m.logger.Error("failed to request missing messages", zap.Error(err))
Expand Down Expand Up @@ -5158,9 +5158,9 @@ func (m *Messenger) startRequestMissingCommunityChannelsHRKeysLoop() {
}()
}

// getCommunityMailserver returns the active mailserver if a communityID is present then it'll return the mailserver
// getCommunityStorenode returns the active mailserver if a communityID is present then it'll return the mailserver
// for that community if it has a mailserver setup otherwise it'll return the global mailserver
func (m *Messenger) getCommunityMailserver(communityID ...string) peer.ID {
func (m *Messenger) getCommunityStorenode(communityID ...string) peer.ID {
if m.transport.WakuVersion() != 2 {
return ""
}
Expand All @@ -5178,7 +5178,11 @@ func (m *Messenger) getCommunityMailserver(communityID ...string) peer.ID {
return m.transport.GetActiveStorenode()
}

peerID, _ := ms.PeerID()
peerID, err := ms.PeerID()
if err != nil {
m.logger.Error("getting storenode for community, using global", zap.String("communityID", communityID[0]), zap.Error(err))
return m.transport.GetActiveStorenode()
}

return peerID
}
2 changes: 1 addition & 1 deletion protocol/messenger_contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,7 @@ func (m *Messenger) FetchContact(contactID string, waitForResponse bool) (*Conta
options := []StoreNodeRequestOption{
WithWaitForResponseOption(waitForResponse),
}
contact, _, err := m.storeNodeRequestsManager.FetchContact(contactID, options)
contact, _, err := m.storeNodeRequestsManager.FetchContact(m.ctx, contactID, options)
return contact, err
}

Expand Down
Loading

0 comments on commit 7ad58d8

Please sign in to comment.