Skip to content

Commit

Permalink
fix_: simplify performStorenodeTask
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Oct 23, 2024
1 parent 777aefb commit 12a0c7c
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 30 deletions.
40 changes: 15 additions & 25 deletions protocol/messenger_mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,35 +90,25 @@ func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) {
}

func (m *Messenger) performStorenodeTask(task func() (*MessengerResponse, error), opts ...history.StorenodeTaskOption) (*MessengerResponse, error) {
responseCh := make(chan *MessengerResponse)
errCh := make(chan error)

go func() {
defer gocommon.LogOnPanic()
err := m.transport.PerformStorenodeTask(func() error {
r, err := task()
if err != nil {
return err
}

select {
case <-m.ctx.Done():
return m.ctx.Err()
case responseCh <- r:
default:
//
}
responseCh := make(chan *MessengerResponse, 1)
err := m.transport.PerformStorenodeTask(func() error {
r, err := task()
if err != nil {
return err
}

select {
case <-m.ctx.Done():
return m.ctx.Err()

Check warning on line 102 in protocol/messenger_mailserver.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_mailserver.go#L101-L102

Added lines #L101 - L102 were not covered by tests
case responseCh <- r:
return nil
}, opts...)
if err != nil {
errCh <- err
}
}()
}, opts...)
if err != nil {
return nil, err
}

select {
case err := <-errCh:
return nil, err
case r := <-responseCh:
if r != nil {
return r, nil
Expand Down Expand Up @@ -784,7 +774,7 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
chat.SyncedFrom = uint32(batch.From.Unix())

Check warning on line 774 in protocol/messenger_mailserver.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_mailserver.go#L773-L774

Added lines #L773 - L774 were not covered by tests
}

m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From.Unix())), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID))
m.logger.Debug("setting sync timestamps", zap.Int64("from", batch.From.Unix()), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID))

Check warning on line 777 in protocol/messenger_mailserver.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_mailserver.go#L777

Added line #L777 was not covered by tests

err = m.persistence.SetSyncTimestamps(uint32(batch.From.Unix()), chat.SyncedTo, chat.ID)
from = uint32(batch.From.Unix())

Check warning on line 780 in protocol/messenger_mailserver.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_mailserver.go#L779-L780

Added lines #L779 - L780 were not covered by tests
Expand Down
2 changes: 2 additions & 0 deletions protocol/messenger_mailserver_cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ func (m *Messenger) Storenodes() ([]peer.ID, error) {
}

func (m *Messenger) checkForStorenodeCycleSignals() {
defer gocommon.LogOnPanic()

if m.transport.WakuVersion() != 2 {
return
}
Expand Down
4 changes: 2 additions & 2 deletions protocol/messenger_storenode_comunity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *MessengerStoreNodeCommunitySuite) newMessenger(name string, storenodeAd
}

func (s *MessengerStoreNodeCommunitySuite) createCommunityWithChat(m *Messenger) (*communities.Community, *Chat) {
WaitForAvailableStoreNode(&s.Suite, m, 500*time.Millisecond)
WaitForAvailableStoreNode(&s.Suite, m, context.TODO())

storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(nil)

Expand Down Expand Up @@ -197,7 +197,7 @@ func (s *MessengerStoreNodeCommunitySuite) fetchCommunity(m *Messenger, communit
WithWaitForResponseOption(true),
}

fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, options)
fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(context.TODO(), communityShard, options)

s.Require().NoError(err)
s.requireCommunitiesEqual(fetchedCommunity, expectedCommunity)
Expand Down
6 changes: 3 additions & 3 deletions protocol/messenger_storenode_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityS
WithWaitForResponseOption(true),
}

fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, options)
fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(context.TODO(), communityShard, options)

s.Require().NoError(err)
s.requireCommunitiesEqual(fetchedCommunity, expectedCommunity)
Expand Down Expand Up @@ -808,7 +808,7 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() {
}

// Fetch the community
fetchedCommunity, _, err := s.bob.storeNodeRequestsManager.FetchCommunity(community.CommunityShard(), options)
fetchedCommunity, _, err := s.bob.storeNodeRequestsManager.FetchCommunity(context.TODO(), community.CommunityShard(), options)
s.Require().NoError(err)
s.requireCommunitiesEqual(fetchedCommunity, community)

Expand Down Expand Up @@ -1162,7 +1162,7 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
}
storeNodeRequestOptions = append(storeNodeRequestOptions, exampleToRun.CustomOptions...)

fetchedCommunity, stats, err := user.storeNodeRequestsManager.FetchCommunity(communityAddress, storeNodeRequestOptions)
fetchedCommunity, stats, err := user.storeNodeRequestsManager.FetchCommunity(context.TODO(), communityAddress, storeNodeRequestOptions)

result.EnvelopesCount = stats.FetchedEnvelopesCount
result.FetchedCommunity = fetchedCommunity
Expand Down
2 changes: 2 additions & 0 deletions protocol/messenger_testing_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ func SetIdentityImagesAndWaitForChange(s *suite.Suite, messenger *Messenger, tim
}

func WaitForAvailableStoreNode(s *suite.Suite, m *Messenger, ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()
available := m.transport.WaitForAvailableStoreNode(ctx)

Check warning on line 370 in protocol/messenger_testing_utils.go

View check run for this annotation

Codecov / codecov/patch

protocol/messenger_testing_utils.go#L367-L370

Added lines #L367 - L370 were not covered by tests
s.Require().True(available)
}
Expand Down

0 comments on commit 12a0c7c

Please sign in to comment.