Skip to content

Commit

Permalink
fix(libwaku): fix compilation issues for go-waku usage
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Dec 18, 2024
1 parent a3bff47 commit c2beb83
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 62 deletions.
6 changes: 3 additions & 3 deletions eth-node/bridge/geth/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (w *GethWakuWrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSubsc
return nil, errors.New("not available in WakuV1")
}

func (w *GethWakuWrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []types.TopicType) error {
func (w *GethWakuWrapper) SetCriteriaForMissingMessageVerification(peerInfo peer.AddrInfo, pubsubTopic string, contentTopics []types.TopicType) error {
return errors.New("not available in WakuV1")
}

Expand Down Expand Up @@ -311,7 +311,7 @@ func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) GetActiveStorenode() peer.ID {
func (w *GethWakuWrapper) GetActiveStorenode() peer.AddrInfo {
panic("not available in WakuV1")
}

Expand All @@ -338,7 +338,7 @@ func (w *GethWakuWrapper) SetStorenodeConfigProvider(c history.StorenodeConfigPr
func (w *GethWakuWrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
storenode peer.AddrInfo,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
Expand Down
12 changes: 6 additions & 6 deletions eth-node/bridge/geth/wakuv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,13 @@ func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSub
return w.waku.SubscribeToConnStatusChanges(), nil
}

func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []types.TopicType) error {
func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerInfo peer.AddrInfo, pubsubTopic string, contentTopics []types.TopicType) error {
var cTopics []string
for _, ct := range contentTopics {
cTopics = append(cTopics, wakucommon.BytesToTopic(ct.Bytes()).ContentTopic())
}
pubsubTopic = w.waku.GetPubsubTopic(pubsubTopic)
w.waku.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic, cTopics)
w.waku.SetTopicsToVerifyForMissingMessages(peerInfo, pubsubTopic, cTopics)

// No err can be be generated by this function. The function returns an error
// Just so there's compatibility with GethWakuWrapper from V1
Expand Down Expand Up @@ -306,8 +306,8 @@ func (w *gethWakuV2Wrapper) PeerID() peer.ID {
return w.waku.PeerID()
}

func (w *gethWakuV2Wrapper) GetActiveStorenode() peer.ID {
return w.waku.StorenodeCycle.GetActiveStorenode()
func (w *gethWakuV2Wrapper) GetActiveStorenode() peer.AddrInfo {
return w.waku.StorenodeCycle.GetActiveStorenodePeerInfo()
}

func (w *gethWakuV2Wrapper) OnStorenodeChanged() <-chan peer.ID {
Expand All @@ -333,7 +333,7 @@ func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfig
func (w *gethWakuV2Wrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
storenode peer.AddrInfo,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
Expand All @@ -350,7 +350,7 @@ func (w *gethWakuV2Wrapper) ProcessMailserverBatch(
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
}

return w.waku.HistoryRetriever.Query(ctx, criteria, storenodeID, pageLimit, shouldProcessNextPage, processEnvelopes)
return w.waku.HistoryRetriever.Query(ctx, criteria, storenode, pageLimit, shouldProcessNextPage, processEnvelopes)
}

func (w *gethWakuV2Wrapper) IsStorenodeAvailable(peerID peer.ID) bool {
Expand Down
8 changes: 4 additions & 4 deletions eth-node/types/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type Waku interface {

SubscribeToConnStatusChanges() (*ConnStatusSubscription, error)

SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []TopicType) error
SetCriteriaForMissingMessageVerification(peerInfo peer.AddrInfo, pubsubTopic string, contentTopics []TopicType) error

// MinPow returns the PoW value required by this node.
MinPow() float64
Expand Down Expand Up @@ -197,8 +197,8 @@ type Waku interface {
// PeerID returns node's PeerID
PeerID() peer.ID

// GetActiveStorenode returns the peer ID of the currently active storenode. It will be empty if no storenode is active
GetActiveStorenode() peer.ID
// GetActiveStorenode returns the AddrInfo of the currently active storenode. It will be empty if no storenode is active
GetActiveStorenode() peer.AddrInfo

// OnStorenodeChanged is triggered when a new storenode is promoted to become the active storenode or when the active storenode is removed
OnStorenodeChanged() <-chan peer.ID
Expand All @@ -219,7 +219,7 @@ type Waku interface {
ProcessMailserverBatch(
ctx context.Context,
batch MailserverBatch,
storenodeID peer.ID,
storenode peer.AddrInfo,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
Expand Down
8 changes: 4 additions & 4 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -5160,9 +5160,9 @@ func (m *Messenger) startRequestMissingCommunityChannelsHRKeysLoop() {

// getCommunityStorenode returns the active mailserver if a communityID is present then it'll return the mailserver
// for that community if it has a mailserver setup otherwise it'll return the global mailserver
func (m *Messenger) getCommunityStorenode(communityID ...string) peer.ID {
func (m *Messenger) getCommunityStorenode(communityID ...string) peer.AddrInfo {
if m.transport.WakuVersion() != 2 {
return ""
return peer.AddrInfo{}
}

if len(communityID) == 0 || communityID[0] == "" {
Expand All @@ -5178,11 +5178,11 @@ func (m *Messenger) getCommunityStorenode(communityID ...string) peer.ID {
return m.transport.GetActiveStorenode()
}

peerID, err := ms.PeerID()
peerInfo, err := ms.PeerInfo()
if err != nil {
m.logger.Error("getting storenode for community, using global", zap.String("communityID", communityID[0]), zap.Error(err))
return m.transport.GetActiveStorenode()
}

return peerID
return peerInfo
}
62 changes: 31 additions & 31 deletions protocol/messenger_mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (m *Messenger) shouldSync() (bool, error) {
}

// TODO (pablo) support community store node as well
if m.transport.GetActiveStorenode() == "" || !m.Online() {
if m.transport.GetActiveStorenode().ID == "" || !m.Online() {
return false, nil
}

Expand All @@ -69,9 +69,9 @@ func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) {

go func() {
defer gocommon.LogOnPanic()
peerID := m.getCommunityStorenode(chat.CommunityID)
peerInfo := m.getCommunityStorenode(chat.CommunityID)
_, err = m.performStorenodeTask(func() (*MessengerResponse, error) {
response, err := m.syncChatWithFilters(peerID, chat.ID)
response, err := m.syncChatWithFilters(peerInfo, chat.ID)

if err != nil {
m.logger.Error("failed to sync chat", zap.Error(err))
Expand All @@ -82,7 +82,7 @@ func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) {
m.config.messengerSignalsHandler.MessengerResponse(response)
}
return response, nil
}, history.WithPeerID(peerID))
}, history.WithPeerID(peerInfo.ID))
if err != nil {
m.logger.Error("failed to perform mailserver request", zap.Error(err))
}
Expand Down Expand Up @@ -133,9 +133,9 @@ func (m *Messenger) scheduleSyncFilters(filters []*transport.Filter) (bool, erro
// split filters by community store node so we can request the filters to the correct mailserver
filtersByMs := m.SplitFiltersByStoreNode(filters)
for communityID, filtersForMs := range filtersByMs {
peerID := m.getCommunityStorenode(communityID)
peerInfo := m.getCommunityStorenode(communityID)
_, err := m.performStorenodeTask(func() (*MessengerResponse, error) {
response, err := m.syncFilters(peerID, filtersForMs)
response, err := m.syncFilters(peerInfo, filtersForMs)

if err != nil {
m.logger.Error("failed to sync filter", zap.Error(err))
Expand All @@ -146,7 +146,7 @@ func (m *Messenger) scheduleSyncFilters(filters []*transport.Filter) (bool, erro
m.config.messengerSignalsHandler.MessengerResponse(response)
}
return response, nil
}, history.WithPeerID(peerID))
}, history.WithPeerID(peerInfo.ID))
if err != nil {
m.logger.Error("failed to perform mailserver request", zap.Error(err))
}
Expand Down Expand Up @@ -214,13 +214,13 @@ func (m *Messenger) topicsForChat(chatID string) (string, []types.TopicType, err
return filters[0].PubsubTopic, contentTopics, nil
}

func (m *Messenger) syncChatWithFilters(peerID peer.ID, chatID string) (*MessengerResponse, error) {
func (m *Messenger) syncChatWithFilters(peerInfo peer.AddrInfo, chatID string) (*MessengerResponse, error) {
filters, err := m.filtersForChat(chatID)
if err != nil {
return nil, err
}

return m.syncFilters(peerID, filters)
return m.syncFilters(peerInfo, filters)
}

func (m *Messenger) syncBackup() error {
Expand Down Expand Up @@ -336,11 +336,11 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries
filtersByMs := m.SplitFiltersByStoreNode(filters)
allResponses := &MessengerResponse{}
for communityID, filtersForMs := range filtersByMs {
peerID := m.getCommunityStorenode(communityID)
peerInfo := m.getCommunityStorenode(communityID)
if withRetries {
response, err := m.performStorenodeTask(func() (*MessengerResponse, error) {
return m.syncFilters(peerID, filtersForMs)
}, history.WithPeerID(peerID))
return m.syncFilters(peerInfo, filtersForMs)
}, history.WithPeerID(peerInfo.ID))
if err != nil {
return nil, err
}
Expand All @@ -350,7 +350,7 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries
}
continue
}
response, err := m.syncFilters(peerID, filtersForMs)
response, err := m.syncFilters(peerInfo, filtersForMs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -391,12 +391,12 @@ func (m *Messenger) checkForMissingMessagesLoop() {
filters := m.transport.Filters()
filtersByMs := m.SplitFiltersByStoreNode(filters)
for communityID, filtersForMs := range filtersByMs {
peerID := m.getCommunityStorenode(communityID)
if peerID == "" {
peerInfo := m.getCommunityStorenode(communityID)
if peerInfo.ID == "" {
continue
}

m.transport.SetCriteriaForMissingMessageVerification(peerID, filtersForMs)
m.transport.SetCriteriaForMissingMessageVerification(peerInfo, filtersForMs)
}
}
}
Expand All @@ -405,7 +405,7 @@ func getPrioritizedBatches() []int {
return []int{1, 5, 10}
}

func (m *Messenger) syncFiltersFrom(peerID peer.ID, filters []*transport.Filter, lastRequest uint32) (*MessengerResponse, error) {
func (m *Messenger) syncFiltersFrom(peerInfo peer.AddrInfo, filters []*transport.Filter, lastRequest uint32) (*MessengerResponse, error) {
canSync, err := m.canSyncWithStoreNodes()
if err != nil {
return nil, err
Expand Down Expand Up @@ -544,7 +544,7 @@ func (m *Messenger) syncFiltersFrom(peerID peer.ID, filters []*transport.Filter,
batchKeys := maps.Keys(batches[pubsubTopic])
sort.Ints(batchKeys)
for _, k := range batchKeys {
err := m.processMailserverBatch(peerID, batches[pubsubTopic][k])
err := m.processMailserverBatch(peerInfo, batches[pubsubTopic][k])
if err != nil {
m.logger.Error("error syncing topics", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -603,8 +603,8 @@ func (m *Messenger) syncFiltersFrom(peerID peer.ID, filters []*transport.Filter,
return response, nil
}

func (m *Messenger) syncFilters(peerID peer.ID, filters []*transport.Filter) (*MessengerResponse, error) {
return m.syncFiltersFrom(peerID, filters, 0)
func (m *Messenger) syncFilters(peerInfo peer.AddrInfo, filters []*transport.Filter) (*MessengerResponse, error) {
return m.syncFiltersFrom(peerInfo, filters, 0)
}

func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Message, error) {
Expand Down Expand Up @@ -658,7 +658,7 @@ func (m *Messenger) DisableStoreNodes() {
m.featureFlags.StoreNodesDisabled = true
}

func (m *Messenger) processMailserverBatch(peerID peer.ID, batch types.MailserverBatch) error {
func (m *Messenger) processMailserverBatch(peerInfo peer.AddrInfo, batch types.MailserverBatch) error {
canSync, err := m.canSyncWithStoreNodes()
if err != nil {
return err
Expand All @@ -667,10 +667,10 @@ func (m *Messenger) processMailserverBatch(peerID peer.ID, batch types.Mailserve
return nil
}

return m.transport.ProcessMailserverBatch(m.ctx, batch, peerID, defaultStoreNodeRequestPageSize, nil, false)
return m.transport.ProcessMailserverBatch(m.ctx, batch, peerInfo, defaultStoreNodeRequestPageSize, nil, false)
}

func (m *Messenger) processMailserverBatchWithOptions(peerID peer.ID, batch types.MailserverBatch, pageLimit uint64, shouldProcessNextPage func(int) (bool, uint64), processEnvelopes bool) error {
func (m *Messenger) processMailserverBatchWithOptions(peerInfo peer.AddrInfo, batch types.MailserverBatch, pageLimit uint64, shouldProcessNextPage func(int) (bool, uint64), processEnvelopes bool) error {
canSync, err := m.canSyncWithStoreNodes()
if err != nil {
return err
Expand All @@ -679,7 +679,7 @@ func (m *Messenger) processMailserverBatchWithOptions(peerID peer.ID, batch type
return nil
}

return m.transport.ProcessMailserverBatch(m.ctx, batch, peerID, pageLimit, shouldProcessNextPage, processEnvelopes)
return m.transport.ProcessMailserverBatch(m.ctx, batch, peerInfo, pageLimit, shouldProcessNextPage, processEnvelopes)
}

func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
Expand All @@ -688,7 +688,7 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
return 0, ErrChatNotFound
}

peerID := m.getCommunityStorenode(chat.CommunityID)
peerInfo := m.getCommunityStorenode(chat.CommunityID)
var from uint32
_, err := m.performStorenodeTask(func() (*MessengerResponse, error) {
canSync, err := m.canSyncWithStoreNodes()
Expand Down Expand Up @@ -720,7 +720,7 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
m.config.messengerSignalsHandler.HistoryRequestStarted(1)
}

err = m.processMailserverBatch(peerID, batch)
err = m.processMailserverBatch(peerInfo, batch)
if err != nil {
return nil, err
}
Expand All @@ -737,7 +737,7 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
err = m.persistence.SetSyncTimestamps(uint32(batch.From.Unix()), chat.SyncedTo, chat.ID)
from = uint32(batch.From.Unix())
return nil, err
}, history.WithPeerID(peerID))
}, history.WithPeerID(peerInfo.ID))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -856,7 +856,7 @@ func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32
return 0, ErrChatNotFound
}

peerID := m.getCommunityStorenode(chat.CommunityID)
peerInfo := m.getCommunityStorenode(chat.CommunityID)
_, err := m.performStorenodeTask(func() (*MessengerResponse, error) {
canSync, err := m.canSyncWithStoreNodes()
if err != nil {
Expand All @@ -866,7 +866,7 @@ func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32
return nil, nil
}

m.logger.Debug("fetching messages", zap.String("chatID", chatID), zap.Stringer("peerID", peerID))
m.logger.Debug("fetching messages", zap.String("chatID", chatID), zap.Stringer("peerID", peerInfo.ID))
pubsubTopic, topics, err := m.topicsForChat(chatID)
if err != nil {
return nil, nil
Expand All @@ -883,7 +883,7 @@ func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32
m.config.messengerSignalsHandler.HistoryRequestStarted(1)
}

err = m.processMailserverBatch(peerID, batch)
err = m.processMailserverBatch(peerInfo, batch)
if err != nil {
return nil, err
}
Expand All @@ -900,7 +900,7 @@ func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32
err = m.persistence.SetSyncTimestamps(uint32(batch.From.Unix()), chat.SyncedTo, chat.ID)
from = batch.From
return nil, err
}, history.WithPeerID(peerID))
}, history.WithPeerID(peerInfo.ID))
if err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/messenger_store_node_request_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (r *storeNodeRequest) routine() {
}

return nil, r.manager.messenger.processMailserverBatchWithOptions(storeNode, batch, r.config.InitialPageSize, r.shouldFetchNextPage, true)
}, history.WithPeerID(storeNode))
}, history.WithPeerID(storeNode.ID))

r.result.err = err
}
Expand Down
Loading

0 comments on commit c2beb83

Please sign in to comment.