From 4fe496a830d111968ccbcdef3bf0ba73785df679 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Wed, 29 Jun 2022 17:25:00 -0400 Subject: [PATCH 01/13] Add store client for querying --- store/client/client.go | 123 ++++++++++++++++++++++++++++ store/client/options.go | 27 ++++++ store/store.go | 176 ++++++++++++++-------------------------- 3 files changed, 213 insertions(+), 113 deletions(-) create mode 100644 store/client/client.go create mode 100644 store/client/options.go diff --git a/store/client/client.go b/store/client/client.go new file mode 100644 index 00000000..e0ce524b --- /dev/null +++ b/store/client/client.go @@ -0,0 +1,123 @@ +package client + +import ( + "context" + "encoding/hex" + "math" + "sync" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-msgio/protoio" + "github.com/pkg/errors" + "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/store" + "github.com/xmtp/xmtp-node-go/logging" + "github.com/xmtp/xmtp-node-go/metrics" + "go.uber.org/zap" +) + +type Client struct { + log *zap.Logger + host host.Host + peer *peer.ID +} + +func New(opts ...ClientOption) (*Client, error) { + c := &Client{} + for _, opt := range opts { + opt(c) + } + + // Required logger option. + if c.log == nil { + return nil, errors.New("missing logger option") + } + c.log = c.log.Named("client") + + // Required host option. + if c.host == nil { + return nil, errors.New("missing host option") + } + c.log = c.log.With(zap.String("host", c.host.ID().Pretty())) + + // Required peer option. + if c.peer == nil { + return nil, errors.New("missing peer option") + } + c.log = c.log.With(zap.String("peer", c.peer.Pretty())) + + return c, nil +} + +// Query executes query requests against a peer, calling the given pageFn with +// every page response, traversing every page until the end or until pageFn +// returns false. +func (c *Client) Query(ctx context.Context, query *pb.HistoryQuery, pageFn func(res *pb.HistoryResponse) (int, bool)) (int, error) { + var msgCount int + var msgCountLock sync.RWMutex + var res *pb.HistoryResponse + for { + if res != nil { + if res.PagingInfo == nil || res.PagingInfo.Cursor == nil || len(res.Messages) == 0 { + break + } + query.PagingInfo = res.PagingInfo + } + var err error + res, err = c.queryFrom(ctx, query, protocol.GenerateRequestId()) + if err != nil { + return 0, err + } + count, ok := pageFn(res) + if !ok { + break + } + msgCountLock.Lock() + msgCount += count + msgCountLock.Unlock() + } + return msgCount, nil +} + +func (c *Client) queryFrom(ctx context.Context, q *pb.HistoryQuery, requestId []byte) (*pb.HistoryResponse, error) { + peer := *c.peer + c.log.Info("querying from peer", logging.HostID("peer", peer)) + + // We connect first so dns4 addresses are resolved (NewStream does not do it) + err := c.host.Connect(ctx, c.host.Peerstore().PeerInfo(peer)) + if err != nil { + return nil, err + } + + stream, err := c.host.NewStream(ctx, peer, store.StoreID_v20beta4) + if err != nil { + c.log.Error("connecting to peer", zap.Error(err)) + return nil, err + } + defer stream.Close() + defer func() { + _ = stream.Reset() + }() + + historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestId)} + writer := protoio.NewDelimitedWriter(stream) + err = writer.WriteMsg(historyRequest) + if err != nil { + c.log.Error("writing request", zap.Error(err)) + return nil, err + } + + historyResponseRPC := &pb.HistoryRPC{} + reader := protoio.NewDelimitedReader(stream, math.MaxInt32) + err = reader.ReadMsg(historyResponseRPC) + if err != nil { + c.log.Error("reading response", zap.Error(err)) + metrics.RecordStoreError(ctx, "decodeRPCFailure") + return nil, err + } + + metrics.RecordMessage(ctx, "retrieved", 1) + return historyResponseRPC.Response, nil +} diff --git a/store/client/options.go b/store/client/options.go new file mode 100644 index 00000000..7d1a3491 --- /dev/null +++ b/store/client/options.go @@ -0,0 +1,27 @@ +package client + +import ( + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "go.uber.org/zap" +) + +type ClientOption func(c *Client) + +func WithLog(log *zap.Logger) ClientOption { + return func(c *Client) { + c.log = log + } +} + +func WithHost(host host.Host) ClientOption { + return func(c *Client) { + c.host = host + } +} + +func WithPeer(id peer.ID) ClientOption { + return func(c *Client) { + c.peer = &id + } +} diff --git a/store/store.go b/store/store.go index e491300e..e22ed11b 100644 --- a/store/store.go +++ b/store/store.go @@ -4,8 +4,6 @@ import ( "context" "crypto/sha256" "database/sql" - "encoding/hex" - "errors" "math" "sync" "time" @@ -14,12 +12,14 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-msgio/protoio" + "github.com/pkg/errors" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/status-im/go-waku/waku/v2/utils" "github.com/xmtp/xmtp-node-go/logging" "github.com/xmtp/xmtp-node-go/metrics" + "github.com/xmtp/xmtp-node-go/store/client" "github.com/xmtp/xmtp-node-go/tracing" "go.uber.org/zap" ) @@ -39,6 +39,7 @@ type XmtpStore struct { started bool statsPeriod time.Duration resumePageSize int + queryPageSize int msgProvider store.MessageProvider h host.Host @@ -102,10 +103,10 @@ func (s *XmtpStore) FindMessages(query *pb.HistoryQuery) (res *pb.HistoryRespons return FindMessages(s.db, query) } -func (s *XmtpStore) Query(ctx context.Context, q store.Query, opts ...store.HistoryRequestOption) (*store.Result, error) { - s.log.Error("Query not implemented") +func (s *XmtpStore) Query(ctx context.Context, query store.Query, opts ...store.HistoryRequestOption) (*store.Result, error) { + s.log.Error("Query is not implemented") - return nil, errors.New("XmtpStore.Query not implemented!") + return nil, errors.New("Not implemented") } // Next is used to retrieve the next page of rows from a query response. @@ -126,7 +127,7 @@ func (s *XmtpStore) Next(ctx context.Context, r *store.Result) (*store.Result, e // peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). // if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window. // the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string -func (s *XmtpStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) { +func (s *XmtpStore) Resume(ctx context.Context, pubsubTopic string, peers []peer.ID) (int, error) { if !s.started { return 0, errors.New("can't resume: store has not started") } @@ -136,14 +137,21 @@ func (s *XmtpStore) Resume(ctx context.Context, pubsubTopic string, peerList []p currentTime := utils.GetUnixEpoch() lastSeenTime, err := s.findLastSeen() if err != nil { - return 0, err + return 0, errors.Wrap(err, "finding latest timestamp") } - var offset int64 = int64(20 * time.Nanosecond) + var offset int64 = int64(20 * time.Second) currentTime = currentTime + offset lastSeenTime = max(lastSeenTime-offset, 0) - rpc := &pb.HistoryQuery{ + if len(peers) == 0 { + peers, err = selectPeers(s.h, string(store.StoreID_v20beta4), maxPeersToResume, s.log) + if err != nil { + return 0, errors.Wrap(err, "selecting peer") + } + } + + req := &pb.HistoryQuery{ PubsubTopic: pubsubTopic, StartTime: lastSeenTime, EndTime: currentTime, @@ -152,30 +160,60 @@ func (s *XmtpStore) Resume(ctx context.Context, pubsubTopic string, peerList []p Direction: pb.PagingInfo_BACKWARD, }, } + var wg sync.WaitGroup + var msgCount int + var msgCountLock sync.RWMutex + for _, p := range peers { + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + count, err := s.queryPeer(ctx, req, p, func(msg *pb.WakuMessage) bool { + err := s.storeMessage(protocol.NewEnvelope(msg, utils.GetUnixEpoch(), req.PubsubTopic)) + if err != nil { + s.log.Error("storing message", zap.Error(err)) + } + return true + }) + if err != nil { + s.log.Error("querying peer", zap.Error(err), zap.String("peer", p.Pretty())) + return + } + msgCountLock.Lock() + defer msgCountLock.Unlock() + msgCount += count + }(p) + } + wg.Wait() + s.log.Info("resume complete", zap.Int("count", msgCount)) - if len(peerList) == 0 { - peerList, err = selectPeers(s.h, string(store.StoreID_v20beta4), maxPeersToResume, s.log) - if err != nil { - s.log.Error("selecting peer", zap.Error(err)) - return -1, store.ErrNoPeersAvailable - } + return msgCount, nil +} + +func (s *XmtpStore) queryPeer(ctx context.Context, req *pb.HistoryQuery, peerID peer.ID, msgFn func(*pb.WakuMessage) bool) (int, error) { + c, err := client.New( + client.WithLog(s.log), + client.WithHost(s.h), + client.WithPeer(peerID), + ) + if err != nil { + return 0, err } - msgCount, err := s.queryLoop(ctx, rpc, peerList, func(msg *pb.WakuMessage) error { - err := s.storeMessage(protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic)) - if err != nil { - s.log.Error("storing message during resume", zap.Error(err)) - return err + msgCount, err := c.Query(ctx, req, func(res *pb.HistoryResponse) (int, bool) { + var count int + for _, msg := range res.Messages { + ok := msgFn(msg) + if !ok { + continue + } + count++ } - return nil + return count, true }) if err != nil { - s.log.Error("resuming history", zap.Error(err)) - return -1, store.ErrFailedToResumeHistory + return 0, err } - s.log.Info("Retrieved messages since the last online time", zap.Int("count", msgCount)) - return msgCount, nil } @@ -193,94 +231,6 @@ func (s *XmtpStore) findLastSeen() (int64, error) { return res.Messages[0].Timestamp, nil } -func (s *XmtpStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { - s.log.Info("querying message history", logging.HostID("peer", selectedPeer)) - - // We connect first so dns4 addresses are resolved (NewStream does not do it) - err := s.h.Connect(ctx, s.h.Peerstore().PeerInfo(selectedPeer)) - if err != nil { - return nil, err - } - - connOpt, err := s.h.NewStream(ctx, selectedPeer, store.StoreID_v20beta4) - if err != nil { - s.log.Error("connecting to peer", zap.Error(err)) - return nil, err - } - - defer connOpt.Close() - defer func() { - _ = connOpt.Reset() - }() - - historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestId)} - - writer := protoio.NewDelimitedWriter(connOpt) - reader := protoio.NewDelimitedReader(connOpt, math.MaxInt32) - - err = writer.WriteMsg(historyRequest) - if err != nil { - s.log.Error("writing request", zap.Error(err)) - return nil, err - } - - historyResponseRPC := &pb.HistoryRPC{} - err = reader.ReadMsg(historyResponseRPC) - if err != nil { - s.log.Error("reading response", zap.Error(err)) - metrics.RecordStoreError(s.ctx, "decodeRPCFailure") - return nil, err - } - - metrics.RecordMessage(ctx, "retrieved", 1) - - return historyResponseRPC.Response, nil -} - -// queryLoop loops through the candidates list of peers, or finds a peer -// if necessary, and calls the msgFn on each message, iterating through pages -// until there are no more. -func (s *XmtpStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, candidateList []peer.ID, msgFn func(msg *pb.WakuMessage) error) (int, error) { - var count int - var countLock sync.RWMutex - var wg sync.WaitGroup - for _, candidate := range candidateList { - wg.Add(1) - candidateQ := *query - go func(peer peer.ID) { - defer wg.Done() - tracing.Do("resume-from-peer", func() { - var res *pb.HistoryResponse - for { - if res != nil { - if res.PagingInfo == nil || res.PagingInfo.Cursor == nil || len(res.Messages) == 0 { - break - } - candidateQ.PagingInfo = res.PagingInfo - } - var err error - res, err = s.queryFrom(ctx, &candidateQ, peer, protocol.GenerateRequestId()) - if err != nil { - s.log.Error("resuming history", logging.HostID("peer", peer), zap.Error(err)) - return - } - for _, msg := range res.Messages { - err := msgFn(msg) - if err != nil { - continue - } - countLock.Lock() - count++ - countLock.Unlock() - } - } - }) - }(candidate) - } - wg.Wait() - return count, nil -} - func (s *XmtpStore) onRequest(stream network.Stream) { defer stream.Close() log := s.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) From 5f08daa4107f3f25d7bd4e897b8df158da5b5619 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Thu, 16 Jun 2022 09:27:18 -0400 Subject: [PATCH 02/13] Add e2e test that checks the core workflow across network nodes --- e2e/e2e_test.go | 185 +++++++++++++++++++++++++++++++++++++++++++ go.mod | 6 +- go.sum | 9 ++- server/node_test.go | 168 +++++++++++++++++++-------------------- store/resume_test.go | 84 ++++++++++---------- store/store.go | 11 ++- testing/message.go | 9 ++- testing/node.go | 19 ++++- testing/relay.go | 18 ++++- 9 files changed, 361 insertions(+), 148 deletions(-) create mode 100644 e2e/e2e_test.go diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go new file mode 100644 index 00000000..6ec47fc4 --- /dev/null +++ b/e2e/e2e_test.go @@ -0,0 +1,185 @@ +package e2e + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/status-im/go-waku/waku/v2/node" + wakunode "github.com/status-im/go-waku/waku/v2/node" + wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/relay" + "github.com/stretchr/testify/require" + storeclient "github.com/xmtp/xmtp-node-go/store/client" + test "github.com/xmtp/xmtp-node-go/testing" + "go.uber.org/zap" +) + +var ( + envShouldRunE2ETests = envVarBool("E2E") + envNetworkEnv = envVar("XMTP_E2E_ENV", "dev") + envBootstrapAddrs = envVarStrings("XMTP_E2E_BOOTSTRAP_ADDRS") + envNodesURL = envVar("XMTP_E2E_NODES_URL", "https://nodes.xmtp.com") +) + +func TestE2E(t *testing.T) { + t.Run("publish subscribe query across nodes", func(t *testing.T) { + if !envShouldRunE2ETests { + t.SkipNow() + } + + // Fetch bootstrap node addresses. + var bootstrapAddrs []string + if len(envBootstrapAddrs) == 0 { + var err error + bootstrapAddrs, err = fetchBootstrapAddrs(envNetworkEnv) + require.NoError(t, err) + require.NotEmpty(t, bootstrapAddrs) + require.Len(t, bootstrapAddrs, 3) + } else { + bootstrapAddrs = envBootstrapAddrs + } + + // Create a client node for each bootstrap node, and connect to it. + clients := make([]*wakunode.WakuNode, len(bootstrapAddrs)) + for i, addr := range bootstrapAddrs { + c, cleanup := test.NewNode(t, nil) + defer cleanup() + test.ConnectWithAddr(t, c, addr) + clients[i] = c + } + time.Sleep(500 * time.Millisecond) + + // Subscribe to a topic on each client, connected to each node. + contentTopic := "test-" + test.RandomStringLower(5) + envCs := make([]chan *wakuprotocol.Envelope, len(clients)) + for i, c := range clients { + envCs[i] = test.SubscribeTo(t, c, []string{contentTopic}) + } + time.Sleep(500 * time.Millisecond) + + // Send a message to every node. + msgs := make([]*pb.WakuMessage, len(clients)) + for i := range clients { + msgs[i] = test.NewMessage(contentTopic, int64(i+1), fmt.Sprintf("msg%d", i+1)) + } + for i, sender := range clients { + test.Publish(t, sender, msgs[i]) + } + + // Expect them to be relayed to all nodes. + for _, envC := range envCs { + test.SubscribeExpect(t, envC, msgs) + } + + // Expect that they've all been stored on each node. + for i, c := range clients { + expectQueryMessagesEventually(t, c, bootstrapAddrs[i], []string{contentTopic}, msgs) + } + }) +} + +func fetchBootstrapAddrs(env string) ([]string, error) { + client := &http.Client{} + r, err := client.Get(envNodesURL) + if err != nil { + return nil, err + } + defer r.Body.Close() + + var manifest map[string]interface{} + err = json.NewDecoder(r.Body).Decode(&manifest) + if err != nil { + return nil, err + } + + envManifest := manifest[env].(map[string]interface{}) + addrs := make([]string, len(envManifest)) + i := 0 + for _, addr := range envManifest { + addrs[i] = addr.(string) + i++ + } + + return addrs, nil +} + +func envVar(name, defaultVal string) string { + val := os.Getenv(name) + if val == "" { + return defaultVal + } + return val +} + +func envVarStrings(name string) []string { + val := os.Getenv(name) + vals := strings.Split(val, ",") + retVals := make([]string, 0, len(vals)) + for _, v := range vals { + if v == "" { + continue + } + retVals = append(retVals, v) + } + return retVals +} + +func envVarBool(name string) bool { + valStr := os.Getenv(name) + return valStr != "" +} + +func queryMessages(t *testing.T, c *node.WakuNode, peerAddr string, contentTopics []string) []*pb.WakuMessage { + log, err := zap.NewDevelopment() + require.NoError(t, err) + + pi, err := peer.AddrInfoFromString(peerAddr) + require.NoError(t, err) + + storeClient, err := storeclient.New( + storeclient.WithLog(log), + storeclient.WithHost(c.Host()), + storeclient.WithPeer(pi.ID), + ) + require.NoError(t, err) + + msgs := []*pb.WakuMessage{} + ctx := context.Background() + contentFilters := make([]*pb.ContentFilter, len(contentTopics)) + for i, contentTopic := range contentTopics { + contentFilters[i] = &pb.ContentFilter{ + ContentTopic: contentTopic, + } + } + msgCount, err := storeClient.Query(ctx, &pb.HistoryQuery{ + PubsubTopic: relay.DefaultWakuTopic, + ContentFilters: contentFilters, + }, func(res *pb.HistoryResponse) (int, bool) { + for _, msg := range res.Messages { + msgs = append(msgs, msg) + } + return len(res.Messages), true + }) + require.NoError(t, err) + require.Equal(t, msgCount, len(msgs)) + + return msgs +} + +func expectQueryMessagesEventually(t *testing.T, n *node.WakuNode, peerAddr string, contentTopics []string, expectedMsgs []*pb.WakuMessage) []*pb.WakuMessage { + var msgs []*pb.WakuMessage + require.Eventually(t, func() bool { + msgs = queryMessages(t, n, peerAddr, contentTopics) + return len(msgs) == len(expectedMsgs) + }, 3*time.Second, 500*time.Millisecond) + require.ElementsMatch(t, expectedMsgs, msgs) + return msgs +} diff --git a/go.mod b/go.mod index 05337690..df88325f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/ipfs/go-log v1.0.5 github.com/jarcoal/httpmock v1.2.0 github.com/jessevdk/go-flags v1.4.0 - github.com/libp2p/go-libp2p v0.20.0 + github.com/libp2p/go-libp2p v0.20.2 github.com/libp2p/go-libp2p-core v0.16.1 github.com/libp2p/go-libp2p-peerstore v0.6.0 github.com/libp2p/go-libp2p-pubsub v0.6.1 @@ -105,7 +105,7 @@ require ( github.com/libp2p/go-openssl v0.0.7 // indirect github.com/libp2p/go-reuseport v0.2.0 // indirect github.com/libp2p/go-stream-muxer-multistream v0.4.0 // indirect - github.com/libp2p/go-yamux/v3 v3.1.1 // indirect + github.com/libp2p/go-yamux/v3 v3.1.2 // indirect github.com/lucas-clemente/quic-go v0.27.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect @@ -127,7 +127,7 @@ require ( github.com/multiformats/go-multibase v0.0.3 // indirect github.com/multiformats/go-multicodec v0.4.1 // indirect github.com/multiformats/go-multihash v0.1.0 // indirect - github.com/multiformats/go-multistream v0.3.1 // indirect + github.com/multiformats/go-multistream v0.3.2 // indirect github.com/multiformats/go-varint v0.0.6 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/onsi/ginkgo v1.16.5 // indirect diff --git a/go.sum b/go.sum index 88b9e488..291a0341 100644 --- a/go.sum +++ b/go.sum @@ -1232,8 +1232,9 @@ github.com/libp2p/go-eventbus v0.2.1/go.mod h1:jc2S4SoEVPP48H9Wpzm5aiGwUCBMfGhVh github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM= github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= -github.com/libp2p/go-libp2p v0.20.0 h1:FpwrR9l3ZVsL9ArwgENHYn1I32OogiCAFS6abxdUVH4= github.com/libp2p/go-libp2p v0.20.0/go.mod h1:g0C5Fu+aXXbCXkusCzLycuBowEih3ElmDqtbo61Em7k= +github.com/libp2p/go-libp2p v0.20.2 h1:uPCbLjx1VIGt4noOoGsSQKsoUqd+WwOq0IeFbrAThXM= +github.com/libp2p/go-libp2p v0.20.2/go.mod h1:heAEqZPMOagd26sado6/P4ifArxkUe9uV8PGrTn9K2k= github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/1q+8WeNAsw= github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI= github.com/libp2p/go-libp2p-blankhost v0.2.0/go.mod h1:eduNKXGTioTuQAUcZ5epXi9vMl+t4d8ugUBRQ4SqaNQ= @@ -1338,8 +1339,9 @@ github.com/libp2p/go-yamux v1.4.1 h1:P1Fe9vF4th5JOxxgQvfbOHkrGqIZniTLf+ddhZp8YTI github.com/libp2p/go-yamux v1.4.1/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE= github.com/libp2p/go-yamux/v3 v3.0.1/go.mod h1:s2LsDhHbh+RfCsQoICSYt58U2f8ijtPANFD8BmE74Bo= github.com/libp2p/go-yamux/v3 v3.0.2/go.mod h1:s2LsDhHbh+RfCsQoICSYt58U2f8ijtPANFD8BmE74Bo= -github.com/libp2p/go-yamux/v3 v3.1.1 h1:X0qSVodCZciOu/f4KTp9V+O0LAqcqP2tdaUGB0+0lng= github.com/libp2p/go-yamux/v3 v3.1.1/go.mod h1:jeLEQgLXqE2YqX1ilAClIfCMDY+0uXQUKmmb/qp0gT4= +github.com/libp2p/go-yamux/v3 v3.1.2 h1:lNEy28MBk1HavUAlzKgShp+F6mn/ea1nDYWftZhFW9Q= +github.com/libp2p/go-yamux/v3 v3.1.2/go.mod h1:jeLEQgLXqE2YqX1ilAClIfCMDY+0uXQUKmmb/qp0gT4= github.com/libp2p/zeroconf/v2 v2.1.1/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0DmyYyNO1Xs= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= @@ -1532,8 +1534,9 @@ github.com/multiformats/go-multihash v0.1.0 h1:CgAgwqk3//SVEw3T+6DqI4mWMyRuDwZtO github.com/multiformats/go-multihash v0.1.0/go.mod h1:RJlXsxt6vHGaia+S8We0ErjhojtKzPP2AH4+kYM7k84= github.com/multiformats/go-multistream v0.1.1/go.mod h1:KmHZ40hzVxiaiwlj3MEbYgK9JFk2/9UktWZAF54Du38= github.com/multiformats/go-multistream v0.2.1/go.mod h1:5GZPQZbkWOLOn3J2y4Y99vVW7vOfsAflxARk3x14o6k= -github.com/multiformats/go-multistream v0.3.1 h1:GQM84yyQ5EZB9l0p5+5eDwFoQgwHI2tLmYGpaWlLF/U= github.com/multiformats/go-multistream v0.3.1/go.mod h1:ODRoqamLUsETKS9BNcII4gcRsJBU5VAwRIv7O39cEXg= +github.com/multiformats/go-multistream v0.3.2 h1:YRJzBzM8BdZuOn3FjIns1ceKEyEQrT+8JJ581PNyGyI= +github.com/multiformats/go-multistream v0.3.2/go.mod h1:ODRoqamLUsETKS9BNcII4gcRsJBU5VAwRIv7O39cEXg= github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.2/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= diff --git a/server/node_test.go b/server/node_test.go index 9ff86e74..cb7b1206 100644 --- a/server/node_test.go +++ b/server/node_test.go @@ -27,15 +27,15 @@ func TestNode_Resume_OnStart_StoreNodesConnectedBefore(t *testing.T) { topic1 := test.NewTopic() topic2 := test.NewTopic() - test.Publish(t, n1, topic1, 1) - test.Publish(t, n1, topic2, 2) + test.Publish(t, n1, test.NewMessage(topic1, 1, "msg1")) + test.Publish(t, n1, test.NewMessage(topic2, 2, "msg2")) n2, cleanup := newTestNode(t, []*node.WakuNode{n1}, true) defer cleanup() expectStoreMessagesEventually(t, n2, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), }) } @@ -48,16 +48,16 @@ func TestNode_Resume_OnStart_StoreNodesConnectedAfter(t *testing.T) { topic1 := test.NewTopic() topic2 := test.NewTopic() - test.Publish(t, n1, topic1, 1) - test.Publish(t, n1, topic2, 2) + test.Publish(t, n1, test.NewMessage(topic1, 1, "msg1")) + test.Publish(t, n1, test.NewMessage(topic2, 2, "msg2")) n2, cleanup := newTestNode(t, nil, true) defer cleanup() test.ConnectStoreNode(t, n2, n1) expectStoreMessagesEventually(t, n2, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), }) } func TestNode_DataPartition_WithoutResume(t *testing.T) { @@ -73,66 +73,66 @@ func TestNode_DataPartition_WithoutResume(t *testing.T) { // are relayed to the other nodes. test.Connect(t, n1, n2) - n1EnvC := test.Subscribe(t, n1) - n2EnvC := test.Subscribe(t, n2) - topic1 := test.NewTopic() topic2 := test.NewTopic() - test.Publish(t, n1, topic1, 1) - test.Publish(t, n2, topic2, 2) + n1EnvC := test.Subscribe(t, n1) + n2EnvC := test.Subscribe(t, n2) + + test.Publish(t, n1, test.NewMessage(topic1, 1, "msg1")) + test.Publish(t, n2, test.NewMessage(topic2, 2, "msg2")) test.SubscribeExpect(t, n1EnvC, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), }) test.SubscribeExpect(t, n2EnvC, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), }) test.SubscribeExpectNone(t, n1EnvC) test.SubscribeExpectNone(t, n2EnvC) expectStoreMessages(t, n1, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), }) expectStoreMessages(t, n2, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), }) // Disconnect and send a message to each node, expecting that the messages // are not relayed to the other node. test.Disconnect(t, n1, n2.Host().ID()) - test.Publish(t, n1, topic1, 4) - test.Publish(t, n2, topic2, 5) - test.Publish(t, n1, topic1, 6) - test.Publish(t, n2, topic2, 7) + test.Publish(t, n1, test.NewMessage(topic1, 4, "msg4")) + test.Publish(t, n2, test.NewMessage(topic2, 5, "msg5")) + test.Publish(t, n1, test.NewMessage(topic1, 6, "msg6")) + test.Publish(t, n2, test.NewMessage(topic2, 7, "msg7")) test.SubscribeExpect(t, n1EnvC, []*pb.WakuMessage{ - test.NewMessage(topic1, 4), - test.NewMessage(topic1, 6), + test.NewMessage(topic1, 4, "msg4"), + test.NewMessage(topic1, 6, "msg6"), }) test.SubscribeExpect(t, n2EnvC, []*pb.WakuMessage{ - test.NewMessage(topic2, 5), - test.NewMessage(topic2, 7), + test.NewMessage(topic2, 5, "msg5"), + test.NewMessage(topic2, 7, "msg7"), }) test.SubscribeExpectNone(t, n1EnvC) test.SubscribeExpectNone(t, n2EnvC) expectStoreMessages(t, n1, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), - test.NewMessage(topic1, 4), - test.NewMessage(topic1, 6), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), + test.NewMessage(topic1, 4, "msg4"), + test.NewMessage(topic1, 6, "msg6"), }) expectStoreMessages(t, n2, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), - test.NewMessage(topic2, 5), - test.NewMessage(topic2, 7), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), + test.NewMessage(topic2, 5, "msg5"), + test.NewMessage(topic2, 7, "msg7"), }) // Reconnect and expect that no new messages are relayed. @@ -142,16 +142,16 @@ func TestNode_DataPartition_WithoutResume(t *testing.T) { test.SubscribeExpectNone(t, n2EnvC) expectStoreMessages(t, n1, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), - test.NewMessage(topic1, 4), - test.NewMessage(topic1, 6), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), + test.NewMessage(topic1, 4, "msg4"), + test.NewMessage(topic1, 6, "msg6"), }) expectStoreMessages(t, n2, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), - test.NewMessage(topic2, 5), - test.NewMessage(topic2, 7), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), + test.NewMessage(topic2, 5, "msg5"), + test.NewMessage(topic2, 7, "msg7"), }) } @@ -168,66 +168,66 @@ func TestNode_DataPartition_WithResume(t *testing.T) { // are relayed to the other nodes. test.Connect(t, n1, n2) - n1EnvC := test.Subscribe(t, n1) - n2EnvC := test.Subscribe(t, n2) - topic1 := test.NewTopic() topic2 := test.NewTopic() - test.Publish(t, n1, topic1, 1) - test.Publish(t, n2, topic2, 2) + n1EnvC := test.Subscribe(t, n1) + n2EnvC := test.Subscribe(t, n2) + + test.Publish(t, n1, test.NewMessage(topic1, 1, "msg1")) + test.Publish(t, n2, test.NewMessage(topic2, 2, "msg2")) test.SubscribeExpect(t, n1EnvC, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), }) test.SubscribeExpect(t, n2EnvC, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), }) test.SubscribeExpectNone(t, n1EnvC) test.SubscribeExpectNone(t, n2EnvC) expectStoreMessages(t, n1, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), }) expectStoreMessages(t, n2, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), }) // Disconnect and send a message to each node, expecting that the messages // are not relayed to the other node. test.Disconnect(t, n1, n2.Host().ID()) - test.Publish(t, n1, topic1, 4) - test.Publish(t, n2, topic2, 5) - test.Publish(t, n1, topic1, 6) - test.Publish(t, n2, topic2, 7) + test.Publish(t, n1, test.NewMessage(topic1, 4, "msg4")) + test.Publish(t, n2, test.NewMessage(topic2, 5, "msg5")) + test.Publish(t, n1, test.NewMessage(topic1, 6, "msg6")) + test.Publish(t, n2, test.NewMessage(topic2, 7, "msg7")) test.SubscribeExpect(t, n1EnvC, []*pb.WakuMessage{ - test.NewMessage(topic1, 4), - test.NewMessage(topic1, 6), + test.NewMessage(topic1, 4, "msg4"), + test.NewMessage(topic1, 6, "msg6"), }) test.SubscribeExpect(t, n2EnvC, []*pb.WakuMessage{ - test.NewMessage(topic2, 5), - test.NewMessage(topic2, 7), + test.NewMessage(topic2, 5, "msg5"), + test.NewMessage(topic2, 7, "msg7"), }) test.SubscribeExpectNone(t, n1EnvC) test.SubscribeExpectNone(t, n2EnvC) expectStoreMessages(t, n1, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), - test.NewMessage(topic1, 4), - test.NewMessage(topic1, 6), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), + test.NewMessage(topic1, 4, "msg4"), + test.NewMessage(topic1, 6, "msg6"), }) expectStoreMessages(t, n2, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), - test.NewMessage(topic2, 5), - test.NewMessage(topic2, 7), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), + test.NewMessage(topic2, 5, "msg5"), + test.NewMessage(topic2, 7, "msg7"), }) // Reconnect, trigger a resume from node 2, and expect new messages . @@ -240,18 +240,18 @@ func TestNode_DataPartition_WithResume(t *testing.T) { test.SubscribeExpectNone(t, n2EnvC) expectStoreMessages(t, n1, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), - test.NewMessage(topic1, 4), - test.NewMessage(topic2, 5), - test.NewMessage(topic1, 6), - test.NewMessage(topic2, 7), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), + test.NewMessage(topic1, 4, "msg4"), + test.NewMessage(topic2, 5, "msg5"), + test.NewMessage(topic1, 6, "msg6"), + test.NewMessage(topic2, 7, "msg7"), }) expectStoreMessages(t, n2, []string{topic1, topic2}, []*pb.WakuMessage{ - test.NewMessage(topic1, 1), - test.NewMessage(topic2, 2), - test.NewMessage(topic2, 5), - test.NewMessage(topic2, 7), + test.NewMessage(topic1, 1, "msg1"), + test.NewMessage(topic2, 2, "msg2"), + test.NewMessage(topic2, 5, "msg5"), + test.NewMessage(topic2, 7, "msg7"), }) } diff --git a/store/resume_test.go b/store/resume_test.go index 26d955a3..9a45b660 100644 --- a/store/resume_test.go +++ b/store/resume_test.go @@ -13,11 +13,11 @@ import ( func TestStore_FindLastSeen(t *testing.T) { pubSubTopic := "test" - msg1 := test.NewMessage("topic1", 1) - msg2 := test.NewMessage("topic2", 2) - msg3 := test.NewMessage("topic3", 3) - msg4 := test.NewMessage("topic4", 4) - msg5 := test.NewMessage("topic5", 5) + msg1 := test.NewMessage("topic1", 1, "msg") + msg2 := test.NewMessage("topic2", 2, "msg") + msg3 := test.NewMessage("topic3", 3, "msg") + msg4 := test.NewMessage("topic4", 4, "msg") + msg5 := test.NewMessage("topic5", 5, "msg") s, cleanup := newTestStore(t) defer cleanup() @@ -43,16 +43,16 @@ func TestStore_Resume_FromPeer(t *testing.T) { pubSubTopic := "test" msgs := []*pb.WakuMessage{ - test.NewMessage("topic1", 1), - test.NewMessage("topic1", 2), - test.NewMessage("topic1", 3), - test.NewMessage("topic1", 4), - test.NewMessage("topic1", 5), - test.NewMessage("topic2", 6), - test.NewMessage("topic2", 7), - test.NewMessage("topic2", 8), - test.NewMessage("topic2", 9), - test.NewMessage("topic2", 10), + test.NewMessage("topic1", 1, "msg"), + test.NewMessage("topic1", 2, "msg"), + test.NewMessage("topic1", 3, "msg"), + test.NewMessage("topic1", 4, "msg"), + test.NewMessage("topic1", 5, "msg"), + test.NewMessage("topic2", 6, "msg"), + test.NewMessage("topic2", 7, "msg"), + test.NewMessage("topic2", 8, "msg"), + test.NewMessage("topic2", 9, "msg"), + test.NewMessage("topic2", 10, "msg"), } for _, msg := range msgs { @@ -142,22 +142,22 @@ func TestStore_Resume_MultiplePeersDifferentData(t *testing.T) { addStoreProtocol(t, s1.h, s3.h) msgsS2 := []*pb.WakuMessage{ - test.NewMessage("topic1", 1), - test.NewMessage("topic1", 2), - test.NewMessage("topic2", 3), - test.NewMessage("topic2", 4), - test.NewMessage("topic3", 5), + test.NewMessage("topic1", 1, "msg1"), + test.NewMessage("topic1", 2, "msg2"), + test.NewMessage("topic2", 3, "msg3"), + test.NewMessage("topic2", 4, "msg4"), + test.NewMessage("topic3", 5, "msg5"), } for _, msg := range msgsS2 { storeMessage(t, s2, msg, pubSubTopic) } msgsS3 := []*pb.WakuMessage{ - test.NewMessage("topic1", 1), - test.NewMessage("topic1", 2), - test.NewMessage("topic2", 3), - test.NewMessage("topic3", 4), - test.NewMessage("topic4", 6), + test.NewMessage("topic1", 1, "msg1"), + test.NewMessage("topic1", 2, "msg2"), + test.NewMessage("topic2", 3, "msg3"), + test.NewMessage("topic3", 4, "msg4"), + test.NewMessage("topic4", 6, "msg6"), } for _, msg := range msgsS3 { storeMessage(t, s3, msg, pubSubTopic) @@ -168,13 +168,13 @@ func TestStore_Resume_MultiplePeersDifferentData(t *testing.T) { require.Equal(t, 7, msgCount) expectMessages(t, s1, pubSubTopic, []*pb.WakuMessage{ - test.NewMessage("topic1", 1), - test.NewMessage("topic1", 2), - test.NewMessage("topic2", 3), - test.NewMessage("topic2", 4), - test.NewMessage("topic3", 4), - test.NewMessage("topic3", 5), - test.NewMessage("topic4", 6), + test.NewMessage("topic1", 1, "msg1"), + test.NewMessage("topic1", 2, "msg2"), + test.NewMessage("topic2", 3, "msg3"), + test.NewMessage("topic2", 4, "msg4"), + test.NewMessage("topic3", 4, "msg4"), + test.NewMessage("topic3", 5, "msg5"), + test.NewMessage("topic4", 6, "msg6"), }) } @@ -188,16 +188,16 @@ func TestStore_Resume_Paginated(t *testing.T) { pubSubTopic := "test" msgs := []*pb.WakuMessage{ - test.NewMessage("topic1", 1), - test.NewMessage("topic1", 2), - test.NewMessage("topic1", 3), - test.NewMessage("topic1", 4), - test.NewMessage("topic1", 5), - test.NewMessage("topic2", 6), - test.NewMessage("topic2", 7), - test.NewMessage("topic2", 8), - test.NewMessage("topic2", 9), - test.NewMessage("topic2", 10), + test.NewMessage("topic1", 1, "msg1"), + test.NewMessage("topic1", 2, "msg2"), + test.NewMessage("topic1", 3, "msg3"), + test.NewMessage("topic1", 4, "msg4"), + test.NewMessage("topic1", 5, "msg5"), + test.NewMessage("topic2", 6, "msg6"), + test.NewMessage("topic2", 7, "msg7"), + test.NewMessage("topic2", 8, "msg8"), + test.NewMessage("topic2", 9, "msg9"), + test.NewMessage("topic2", 10, "msg10"), } for _, msg := range msgs { diff --git a/store/store.go b/store/store.go index e22ed11b..d079646e 100644 --- a/store/store.go +++ b/store/store.go @@ -104,9 +104,8 @@ func (s *XmtpStore) FindMessages(query *pb.HistoryQuery) (res *pb.HistoryRespons } func (s *XmtpStore) Query(ctx context.Context, query store.Query, opts ...store.HistoryRequestOption) (*store.Result, error) { - s.log.Error("Query is not implemented") - - return nil, errors.New("Not implemented") + s.log.Named("query").Error("not implemented") + return nil, errors.New("not implemented") } // Next is used to retrieve the next page of rows from a query response. @@ -114,9 +113,8 @@ func (s *XmtpStore) Query(ctx context.Context, query store.Query, opts ...store. // This function is useful for iterating over results without having to manually // specify the cursor and pagination order and max number of results func (s *XmtpStore) Next(ctx context.Context, r *store.Result) (*store.Result, error) { - s.log.Error("Next is not implemented") - - return nil, errors.New("Not implemented") + s.log.Named("next").Error("not implemented") + return nil, errors.New("not implemented") } // Resume retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online @@ -171,6 +169,7 @@ func (s *XmtpStore) Resume(ctx context.Context, pubsubTopic string, peers []peer err := s.storeMessage(protocol.NewEnvelope(msg, utils.GetUnixEpoch(), req.PubsubTopic)) if err != nil { s.log.Error("storing message", zap.Error(err)) + return false } return true }) diff --git a/testing/message.go b/testing/message.go index 9e33422c..465a2c53 100644 --- a/testing/message.go +++ b/testing/message.go @@ -3,14 +3,17 @@ package testing import ( "testing" - "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/utils" ) -func NewMessage(contentTopic string, timestamp int64) *pb.WakuMessage { - return tests.CreateWakuMessage(contentTopic, timestamp) +func NewMessage(contentTopic string, timestamp int64, content string) *pb.WakuMessage { + return &pb.WakuMessage{ + Payload: []byte(content), + ContentTopic: contentTopic, + Timestamp: timestamp, + } } func NewEnvelope(t *testing.T, msg *pb.WakuMessage, pubSubTopic string) *protocol.Envelope { diff --git a/testing/node.go b/testing/node.go index 35bac64a..6d9a5e01 100644 --- a/testing/node.go +++ b/testing/node.go @@ -12,13 +12,12 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/status-im/go-waku/tests" - "github.com/status-im/go-waku/waku/v2/node" wakunode "github.com/status-im/go-waku/waku/v2/node" wakustore "github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/stretchr/testify/require" ) -func Connect(t *testing.T, n1 *wakunode.WakuNode, n2 *wakunode.WakuNode, protocols ...string) { +func Connect(t *testing.T, n1 *wakunode.WakuNode, n2 *wakunode.WakuNode) { ctx := context.Background() err := n1.DialPeer(ctx, n2.ListenAddresses()[0].String()) require.NoError(t, err) @@ -28,6 +27,16 @@ func Connect(t *testing.T, n1 *wakunode.WakuNode, n2 *wakunode.WakuNode, protoco time.Sleep(100 * time.Millisecond) } +func ConnectWithAddr(t *testing.T, n *wakunode.WakuNode, addr string) { + ctx := context.Background() + err := n.DialPeer(ctx, addr) + require.NoError(t, err) + + // This delay is necessary, but it's unclear why at this point. We see + // similar delays throughout the waku codebase as well for this reason. + time.Sleep(100 * time.Millisecond) +} + func Disconnect(t *testing.T, n1 *wakunode.WakuNode, peerID peer.ID) { err := n1.ClosePeerById(peerID) require.NoError(t, err) @@ -37,16 +46,18 @@ func NewTopic() string { return "test-" + RandomStringLower(5) } -func NewNode(t *testing.T, storeNodes []*wakunode.WakuNode, opts ...node.WakuNodeOption) (*wakunode.WakuNode, func()) { +func NewNode(t *testing.T, storeNodes []*wakunode.WakuNode, opts ...wakunode.WakuNodeOption) (*wakunode.WakuNode, func()) { hostAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") prvKey := NewPrivateKey(t) ctx := context.Background() - opts = append([]node.WakuNodeOption{ + opts = append([]wakunode.WakuNodeOption{ wakunode.WithPrivateKey(prvKey), wakunode.WithHostAddress(hostAddr), wakunode.WithWakuRelay(), + wakunode.WithWakuFilter(true), + wakunode.WithWebsockets("0.0.0.0", 0), }, opts...) node, err := wakunode.New(ctx, opts...) require.NoError(t, err) diff --git a/testing/relay.go b/testing/relay.go index bd4048d2..78d68e98 100644 --- a/testing/relay.go +++ b/testing/relay.go @@ -5,13 +5,22 @@ import ( "testing" "time" - "github.com/status-im/go-waku/tests" wakunode "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/stretchr/testify/require" ) +func SubscribeTo(t *testing.T, n *wakunode.WakuNode, contentTopics []string) chan *protocol.Envelope { + ctx := context.Background() + _, f, err := n.Filter().Subscribe(ctx, filter.ContentFilter{ + ContentTopics: contentTopics, + }) + require.NoError(t, err) + return f.Chan +} + func Subscribe(t *testing.T, n *wakunode.WakuNode) chan *protocol.Envelope { ctx := context.Background() sub, err := n.Relay().Subscribe(ctx) @@ -19,9 +28,9 @@ func Subscribe(t *testing.T, n *wakunode.WakuNode) chan *protocol.Envelope { return sub.C } -func Publish(t *testing.T, n *wakunode.WakuNode, contentTopic string, timestamp int64) { +func Publish(t *testing.T, n *wakunode.WakuNode, msg *pb.WakuMessage) { ctx := context.Background() - _, err := n.Relay().Publish(ctx, tests.CreateWakuMessage(contentTopic, timestamp)) + _, err := n.Relay().Publish(ctx, msg) require.NoError(t, err) } @@ -32,6 +41,9 @@ func SubscribeExpect(t *testing.T, envC chan *protocol.Envelope, msgs []*pb.Waku select { case env := <-envC: receivedMsgs = append(receivedMsgs, env.Message()) + if len(receivedMsgs) == len(msgs) { + done = true + } case <-time.After(500 * time.Millisecond): done = true } From 8bb7c3fe33a55ee6c15cf9d498503b87fd623ff5 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Wed, 29 Jun 2022 21:17:52 -0400 Subject: [PATCH 03/13] Fill out some store/client test coverage --- e2e/e2e_test.go | 12 +- store/{client => }/client.go | 84 ++++--- store/client/options.go | 27 --- store/client_test.go | 426 +++++++++++++++++++++++++++++++++++ store/store.go | 9 +- store/store_test.go | 4 - testing/log.go | 14 ++ 7 files changed, 507 insertions(+), 69 deletions(-) rename store/{client => }/client.go (54%) delete mode 100644 store/client/options.go create mode 100644 store/client_test.go create mode 100644 testing/log.go diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 6ec47fc4..97703cb2 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -17,7 +17,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/stretchr/testify/require" - storeclient "github.com/xmtp/xmtp-node-go/store/client" + "github.com/xmtp/xmtp-node-go/store" test "github.com/xmtp/xmtp-node-go/testing" "go.uber.org/zap" ) @@ -144,10 +144,10 @@ func queryMessages(t *testing.T, c *node.WakuNode, peerAddr string, contentTopic pi, err := peer.AddrInfoFromString(peerAddr) require.NoError(t, err) - storeClient, err := storeclient.New( - storeclient.WithLog(log), - storeclient.WithHost(c.Host()), - storeclient.WithPeer(pi.ID), + client, err := store.New( + store.WithLog(log), + store.WithHost(c.Host()), + store.WithPeer(pi.ID), ) require.NoError(t, err) @@ -159,7 +159,7 @@ func queryMessages(t *testing.T, c *node.WakuNode, peerAddr string, contentTopic ContentTopic: contentTopic, } } - msgCount, err := storeClient.Query(ctx, &pb.HistoryQuery{ + msgCount, err := client.Query(ctx, &pb.HistoryQuery{ PubsubTopic: relay.DefaultWakuTopic, ContentFilters: contentFilters, }, func(res *pb.HistoryResponse) (int, bool) { diff --git a/store/client/client.go b/store/client.go similarity index 54% rename from store/client/client.go rename to store/client.go index e0ce524b..fbb543b0 100644 --- a/store/client/client.go +++ b/store/client.go @@ -1,4 +1,4 @@ -package client +package store import ( "context" @@ -24,6 +24,32 @@ type Client struct { peer *peer.ID } +var ( + ErrMissingLogOption = errors.New("missing log option") + ErrMissingHostOption = errors.New("missing host option") + ErrMissingPeerOption = errors.New("missing peer option") +) + +type ClientOption func(c *Client) + +func WithLog(log *zap.Logger) ClientOption { + return func(c *Client) { + c.log = log + } +} + +func WithHost(host host.Host) ClientOption { + return func(c *Client) { + c.host = host + } +} + +func WithPeer(id peer.ID) ClientOption { + return func(c *Client) { + c.peer = &id + } +} + func New(opts ...ClientOption) (*Client, error) { c := &Client{} for _, opt := range opts { @@ -32,19 +58,19 @@ func New(opts ...ClientOption) (*Client, error) { // Required logger option. if c.log == nil { - return nil, errors.New("missing logger option") + return nil, ErrMissingLogOption } c.log = c.log.Named("client") // Required host option. if c.host == nil { - return nil, errors.New("missing host option") + return nil, ErrMissingHostOption } c.log = c.log.With(zap.String("host", c.host.ID().Pretty())) // Required peer option. if c.peer == nil { - return nil, errors.New("missing peer option") + return nil, ErrMissingPeerOption } c.log = c.log.With(zap.String("peer", c.peer.Pretty())) @@ -55,25 +81,30 @@ func New(opts ...ClientOption) (*Client, error) { // every page response, traversing every page until the end or until pageFn // returns false. func (c *Client) Query(ctx context.Context, query *pb.HistoryQuery, pageFn func(res *pb.HistoryResponse) (int, bool)) (int, error) { + c.log.Info("querying", logging.HostID("peer", *c.peer)) + var msgCount int var msgCountLock sync.RWMutex var res *pb.HistoryResponse for { if res != nil { - if res.PagingInfo == nil || res.PagingInfo.Cursor == nil || len(res.Messages) == 0 { + if isLastPage(query, res) { break } query.PagingInfo = res.PagingInfo } + var err error - res, err = c.queryFrom(ctx, query, protocol.GenerateRequestId()) + res, err = c.queryFrom(ctx, query) if err != nil { return 0, err } + count, ok := pageFn(res) if !ok { break } + msgCountLock.Lock() msgCount += count msgCountLock.Unlock() @@ -81,43 +112,42 @@ func (c *Client) Query(ctx context.Context, query *pb.HistoryQuery, pageFn func( return msgCount, nil } -func (c *Client) queryFrom(ctx context.Context, q *pb.HistoryQuery, requestId []byte) (*pb.HistoryResponse, error) { - peer := *c.peer - c.log.Info("querying from peer", logging.HostID("peer", peer)) - - // We connect first so dns4 addresses are resolved (NewStream does not do it) - err := c.host.Connect(ctx, c.host.Peerstore().PeerInfo(peer)) +func (c *Client) queryFrom(ctx context.Context, query *pb.HistoryQuery) (*pb.HistoryResponse, error) { + err := c.host.Connect(ctx, c.host.Peerstore().PeerInfo(*c.peer)) if err != nil { - return nil, err + return nil, errors.Wrap(err, "connecting to peer") } - stream, err := c.host.NewStream(ctx, peer, store.StoreID_v20beta4) + stream, err := c.host.NewStream(ctx, *c.peer, store.StoreID_v20beta4) if err != nil { - c.log.Error("connecting to peer", zap.Error(err)) - return nil, err + return nil, errors.Wrap(err, "opening query stream") } - defer stream.Close() defer func() { - _ = stream.Reset() + stream.Reset() + stream.Close() }() - historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestId)} writer := protoio.NewDelimitedWriter(stream) - err = writer.WriteMsg(historyRequest) + err = writer.WriteMsg(&pb.HistoryRPC{ + Query: query, + RequestId: hex.EncodeToString(protocol.GenerateRequestId()), + }) if err != nil { - c.log.Error("writing request", zap.Error(err)) - return nil, err + return nil, errors.Wrap(err, "writing query request") } - historyResponseRPC := &pb.HistoryRPC{} + res := &pb.HistoryRPC{} reader := protoio.NewDelimitedReader(stream, math.MaxInt32) - err = reader.ReadMsg(historyResponseRPC) + err = reader.ReadMsg(res) if err != nil { - c.log.Error("reading response", zap.Error(err)) metrics.RecordStoreError(ctx, "decodeRPCFailure") - return nil, err + return nil, errors.Wrap(err, "reading query response") } metrics.RecordMessage(ctx, "retrieved", 1) - return historyResponseRPC.Response, nil + return res.Response, nil +} + +func isLastPage(query *pb.HistoryQuery, res *pb.HistoryResponse) bool { + return res.PagingInfo == nil || res.PagingInfo.Cursor == nil || len(res.Messages) == 0 } diff --git a/store/client/options.go b/store/client/options.go deleted file mode 100644 index 7d1a3491..00000000 --- a/store/client/options.go +++ /dev/null @@ -1,27 +0,0 @@ -package client - -import ( - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" - "go.uber.org/zap" -) - -type ClientOption func(c *Client) - -func WithLog(log *zap.Logger) ClientOption { - return func(c *Client) { - c.log = log - } -} - -func WithHost(host host.Host) ClientOption { - return func(c *Client) { - c.host = host - } -} - -func WithPeer(id peer.ID) ClientOption { - return func(c *Client) { - c.peer = &id - } -} diff --git a/store/client_test.go b/store/client_test.go new file mode 100644 index 00000000..5012b79c --- /dev/null +++ b/store/client_test.go @@ -0,0 +1,426 @@ +package store + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + libp2ptest "github.com/libp2p/go-libp2p-core/test" + "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/stretchr/testify/require" + test "github.com/xmtp/xmtp-node-go/testing" +) + +func TestStoreClient_New(t *testing.T) { + host := test.NewPeer(t) + log := test.NewLog(t) + peerID := libp2ptest.RandPeerIDFatal(t) + tcs := []struct { + name string + opts []ClientOption + expect func(t *testing.T, c *Client, err error) + }{ + { + name: "missing log", + opts: []ClientOption{}, + expect: func(t *testing.T, c *Client, err error) { + require.Equal(t, ErrMissingLogOption, err) + require.Nil(t, c) + }, + }, + { + name: "missing host", + opts: []ClientOption{ + WithLog(log), + }, + expect: func(t *testing.T, c *Client, err error) { + require.Equal(t, ErrMissingHostOption, err) + require.Nil(t, c) + }, + }, + { + name: "missing peer", + opts: []ClientOption{ + WithLog(log), + WithHost(host), + }, + expect: func(t *testing.T, c *Client, err error) { + require.Equal(t, ErrMissingPeerOption, err) + require.Nil(t, c) + }, + }, + { + name: "success", + opts: []ClientOption{ + WithLog(log), + WithHost(host), + WithPeer(peerID), + }, + expect: func(t *testing.T, c *Client, err error) { + require.NoError(t, err) + require.NotNil(t, c) + }, + }, + } + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + c, err := New(tc.opts...) + tc.expect(t, c, err) + }) + } +} + +func TestStoreClient_Query(t *testing.T) { + tcs := []struct { + name string + query *pb.HistoryQuery + stored []*protocol.Envelope + expected []*pb.WakuMessage + }{ + { + name: "empty no messages", + query: &pb.HistoryQuery{}, + stored: []*protocol.Envelope{}, + expected: []*pb.WakuMessage{}, + }, + { + name: "empty with messages", + query: &pb.HistoryQuery{}, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub2"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic1", 1, "msg1"), + test.NewMessage("topic2", 2, "msg2"), + }, + }, + { + name: "with content topic", + query: &pb.HistoryQuery{ + ContentFilters: buildContentFilters("topic1"), + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub2"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic1", 1, "msg1"), + }, + }, + { + name: "with multiple content topics", + query: &pb.HistoryQuery{ + ContentFilters: buildContentFilters("topic1", "topic3"), + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic3", 3, "msg3"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic4", 4, "msg4"), "pubsub1"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic1", 1, "msg1"), + test.NewMessage("topic3", 3, "msg3"), + }, + }, + { + name: "with pubsub topic", + query: &pb.HistoryQuery{ + PubsubTopic: "pubsub2", + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub2"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic2", 2, "msg2"), + }, + }, + { + name: "with start time", + query: &pb.HistoryQuery{ + StartTime: 2, + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub2"), + test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub3"), + test.NewEnvelope(t, test.NewMessage("topic2", 4, "msg4"), "pubsub4"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic2", 2, "msg2"), + test.NewMessage("topic1", 3, "msg3"), + test.NewMessage("topic2", 4, "msg4"), + }, + }, + { + name: "with end time", + query: &pb.HistoryQuery{ + EndTime: 3, + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub2"), + test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub3"), + test.NewEnvelope(t, test.NewMessage("topic2", 4, "msg4"), "pubsub4"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic1", 1, "msg1"), + test.NewMessage("topic2", 2, "msg2"), + test.NewMessage("topic1", 3, "msg3"), + }, + }, + { + name: "with start and end time", + query: &pb.HistoryQuery{ + StartTime: 2, + EndTime: 3, + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub2"), + test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub3"), + test.NewEnvelope(t, test.NewMessage("topic2", 4, "msg4"), "pubsub4"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic2", 2, "msg2"), + test.NewMessage("topic1", 3, "msg3"), + }, + }, + { + name: "with content topic start and end time", + query: &pb.HistoryQuery{ + ContentFilters: buildContentFilters("topic1"), + StartTime: 2, + EndTime: 3, + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub2"), + test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub3"), + test.NewEnvelope(t, test.NewMessage("topic2", 4, "msg4"), "pubsub4"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic1", 3, "msg3"), + }, + }, + { + name: "with pubsub topic content topic start and end time", + query: &pb.HistoryQuery{ + PubsubTopic: "pubsub2", + ContentFilters: buildContentFilters("topic2"), + StartTime: 2, + EndTime: 5, + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub2"), + test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 4, "msg4"), "pubsub2"), + test.NewEnvelope(t, test.NewMessage("topic2", 5, "msg5"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 6, "msg4"), "pubsub2"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic2", 2, "msg2"), + test.NewMessage("topic2", 4, "msg4"), + }, + }, + { + name: "with pubsub topic and content topic", + query: &pb.HistoryQuery{ + PubsubTopic: "pubsub2", + ContentFilters: buildContentFilters("topic2"), + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub2"), + test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 4, "msg4"), "pubsub2"), + test.NewEnvelope(t, test.NewMessage("topic2", 5, "msg5"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 6, "msg6"), "pubsub2"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic2", 2, "msg2"), + test.NewMessage("topic2", 4, "msg4"), + test.NewMessage("topic2", 6, "msg6"), + }, + }, + { + name: "with paging page size", + query: &pb.HistoryQuery{ + PubsubTopic: "pubsub1", + ContentFilters: buildContentFilters("topic1", "topic2"), + PagingInfo: &pb.PagingInfo{ + PageSize: 2, + }, + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 4, "msg4"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic3", 5, "msg5"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 6, "msg6"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic1", 7, "msg7"), "pubsub2"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic1", 1, "msg1"), + test.NewMessage("topic2", 2, "msg2"), + test.NewMessage("topic1", 3, "msg3"), + test.NewMessage("topic2", 4, "msg4"), + test.NewMessage("topic2", 6, "msg6"), + }, + }, + { + name: "with paging cursor forward", + query: &pb.HistoryQuery{ + PubsubTopic: "pubsub1", + ContentFilters: buildContentFilters("topic1", "topic2"), + PagingInfo: &pb.PagingInfo{ + PageSize: 2, + Cursor: test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub1").Index(), + Direction: pb.PagingInfo_FORWARD, + }, + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 4, "msg4"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic3", 5, "msg5"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 6, "msg6"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic1", 7, "msg7"), "pubsub2"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic2", 4, "msg4"), + test.NewMessage("topic2", 6, "msg6"), + }, + }, + { + name: "with paging cursor backward", + query: &pb.HistoryQuery{ + PubsubTopic: "pubsub1", + ContentFilters: buildContentFilters("topic1", "topic2"), + PagingInfo: &pb.PagingInfo{ + PageSize: 1, + Cursor: test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub1").Index(), + Direction: pb.PagingInfo_BACKWARD, + }, + }, + stored: []*protocol.Envelope{ + test.NewEnvelope(t, test.NewMessage("topic1", 1, "msg1"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 2, "msg2"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic1", 3, "msg3"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 4, "msg4"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic3", 5, "msg5"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic2", 6, "msg6"), "pubsub1"), + test.NewEnvelope(t, test.NewMessage("topic1", 7, "msg7"), "pubsub2"), + }, + expected: []*pb.WakuMessage{ + test.NewMessage("topic1", 1, "msg1"), + test.NewMessage("topic2", 2, "msg2"), + }, + }, + } + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + s, cleanup := newTestStore(t) + defer cleanup() + c := newTestClient(t, s.h.ID()) + addStoreProtocol(t, c.host, s.h) + for _, env := range tc.stored { + storeMessage(t, s, env.Message(), env.PubsubTopic()) + } + expectQueryMessagesEventually(t, c, tc.query, tc.expected) + }) + } +} + +func TestStoreClient_Query_PagingShouldStopOnReturnFalse(t *testing.T) { + s, cleanup := newTestStore(t) + defer cleanup() + c := newTestClient(t, s.h.ID()) + addStoreProtocol(t, c.host, s.h) + storeMessage(t, s, test.NewMessage("topic1", 1, "msg1"), "pubsub1") + storeMessage(t, s, test.NewMessage("topic1", 2, "msg2"), "pubsub1") + storeMessage(t, s, test.NewMessage("topic1", 3, "msg3"), "pubsub1") + storeMessage(t, s, test.NewMessage("topic1", 4, "msg4"), "pubsub1") + var page int + ctx := context.Background() + msgCount, err := c.Query(ctx, &pb.HistoryQuery{ + PagingInfo: &pb.PagingInfo{ + PageSize: 1, + }, + }, func(res *pb.HistoryResponse) (int, bool) { + page++ + if page == 3 { + return 0, false + } + return len(res.Messages), true + }) + require.NoError(t, err) + require.Equal(t, 2, msgCount) +} + +func newTestClient(t *testing.T, peerID peer.ID) *Client { + host := test.NewPeer(t) + log := test.NewLog(t) + c, err := New( + WithLog(log), + WithHost(host), + WithPeer(peerID), + ) + require.NoError(t, err) + require.NotNil(t, c) + return c +} + +func expectQueryMessagesEventually(t *testing.T, c *Client, query *pb.HistoryQuery, expectedMsgs []*pb.WakuMessage) []*pb.WakuMessage { + var msgs []*pb.WakuMessage + require.Eventually(t, func() bool { + msgs = queryMessages(t, c, query) + // fmt.Println(len(msgs)) + // for _, msg := range msgs { + // fmt.Println(msg) + // } + // fmt.Println("") + return len(msgs) == len(expectedMsgs) + }, 3*time.Second, 500*time.Millisecond, "expected %d == %d", len(msgs), len(expectedMsgs)) + require.ElementsMatch(t, expectedMsgs, msgs) + return msgs +} + +func queryMessages(t *testing.T, client *Client, query *pb.HistoryQuery) []*pb.WakuMessage { + var msgs []*pb.WakuMessage + var msgsLock sync.RWMutex + ctx := context.Background() + msgCount, err := client.Query(ctx, query, func(res *pb.HistoryResponse) (int, bool) { + msgsLock.Lock() + defer msgsLock.Unlock() + for _, msg := range res.Messages { + msgs = append(msgs, msg) + } + return len(res.Messages), true + }) + require.NoError(t, err) + require.Equal(t, msgCount, len(msgs)) + return msgs +} + +func buildContentFilters(contentTopics ...string) []*pb.ContentFilter { + contentFilters := make([]*pb.ContentFilter, len(contentTopics)) + for i, contentTopic := range contentTopics { + contentFilters[i] = &pb.ContentFilter{ + ContentTopic: contentTopic, + } + } + return contentFilters +} diff --git a/store/store.go b/store/store.go index d079646e..8e9c1a96 100644 --- a/store/store.go +++ b/store/store.go @@ -19,7 +19,6 @@ import ( "github.com/status-im/go-waku/waku/v2/utils" "github.com/xmtp/xmtp-node-go/logging" "github.com/xmtp/xmtp-node-go/metrics" - "github.com/xmtp/xmtp-node-go/store/client" "github.com/xmtp/xmtp-node-go/tracing" "go.uber.org/zap" ) @@ -189,10 +188,10 @@ func (s *XmtpStore) Resume(ctx context.Context, pubsubTopic string, peers []peer } func (s *XmtpStore) queryPeer(ctx context.Context, req *pb.HistoryQuery, peerID peer.ID, msgFn func(*pb.WakuMessage) bool) (int, error) { - c, err := client.New( - client.WithLog(s.log), - client.WithHost(s.h), - client.WithPeer(peerID), + c, err := New( + WithLog(s.log), + WithHost(s.h), + WithPeer(peerID), ) if err != nil { return 0, err diff --git a/store/store_test.go b/store/store_test.go index 9259a50d..7c33610d 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -20,10 +20,6 @@ func newTestStore(t *testing.T) (*XmtpStore, func()) { require.NoError(t, err) host := test.NewPeer(t) - host.Peerstore().AddAddr(host.ID(), tests.GetHostAddress(host), peerstore.PermanentAddrTTL) - err = host.Peerstore().AddProtocols(host.ID(), string(store.StoreID_v20beta4)) - require.NoError(t, err) - store := NewXmtpStore(host, db, dbStore, 0, utils.Logger()) store.Start(context.Background()) diff --git a/testing/log.go b/testing/log.go new file mode 100644 index 00000000..d2527a0a --- /dev/null +++ b/testing/log.go @@ -0,0 +1,14 @@ +package testing + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func NewLog(t *testing.T) *zap.Logger { + log, err := zap.NewDevelopment() + require.NoError(t, err) + return log +} From 1e8bf980b121fed22b5f2c64a5831ad86caea4c2 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Thu, 30 Jun 2022 10:08:39 -0400 Subject: [PATCH 04/13] Fix lint errors :cop: --- e2e/e2e_test.go | 4 +--- store/client.go | 2 +- store/client_test.go | 13 +------------ store/store.go | 1 - 4 files changed, 3 insertions(+), 17 deletions(-) diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 97703cb2..a64f8832 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -163,9 +163,7 @@ func queryMessages(t *testing.T, c *node.WakuNode, peerAddr string, contentTopic PubsubTopic: relay.DefaultWakuTopic, ContentFilters: contentFilters, }, func(res *pb.HistoryResponse) (int, bool) { - for _, msg := range res.Messages { - msgs = append(msgs, msg) - } + msgs = append(msgs, res.Messages...) return len(res.Messages), true }) require.NoError(t, err) diff --git a/store/client.go b/store/client.go index fbb543b0..791863e0 100644 --- a/store/client.go +++ b/store/client.go @@ -123,7 +123,7 @@ func (c *Client) queryFrom(ctx context.Context, query *pb.HistoryQuery) (*pb.His return nil, errors.Wrap(err, "opening query stream") } defer func() { - stream.Reset() + _ = stream.Reset() stream.Close() }() diff --git a/store/client_test.go b/store/client_test.go index 5012b79c..a0454948 100644 --- a/store/client_test.go +++ b/store/client_test.go @@ -2,7 +2,6 @@ package store import ( "context" - "sync" "testing" "time" @@ -387,11 +386,6 @@ func expectQueryMessagesEventually(t *testing.T, c *Client, query *pb.HistoryQue var msgs []*pb.WakuMessage require.Eventually(t, func() bool { msgs = queryMessages(t, c, query) - // fmt.Println(len(msgs)) - // for _, msg := range msgs { - // fmt.Println(msg) - // } - // fmt.Println("") return len(msgs) == len(expectedMsgs) }, 3*time.Second, 500*time.Millisecond, "expected %d == %d", len(msgs), len(expectedMsgs)) require.ElementsMatch(t, expectedMsgs, msgs) @@ -400,14 +394,9 @@ func expectQueryMessagesEventually(t *testing.T, c *Client, query *pb.HistoryQue func queryMessages(t *testing.T, client *Client, query *pb.HistoryQuery) []*pb.WakuMessage { var msgs []*pb.WakuMessage - var msgsLock sync.RWMutex ctx := context.Background() msgCount, err := client.Query(ctx, query, func(res *pb.HistoryResponse) (int, bool) { - msgsLock.Lock() - defer msgsLock.Unlock() - for _, msg := range res.Messages { - msgs = append(msgs, msg) - } + msgs = append(msgs, res.Messages...) return len(res.Messages), true }) require.NoError(t, err) diff --git a/store/store.go b/store/store.go index 8e9c1a96..c3c93ce1 100644 --- a/store/store.go +++ b/store/store.go @@ -38,7 +38,6 @@ type XmtpStore struct { started bool statsPeriod time.Duration resumePageSize int - queryPageSize int msgProvider store.MessageProvider h host.Host From c4f588b64573c4b5e005c0818195d711c0fde101 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Tue, 5 Jul 2022 14:56:14 -0400 Subject: [PATCH 05/13] Remove unecessary arg from isLastPage --- store/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/client.go b/store/client.go index 791863e0..b03d5ecf 100644 --- a/store/client.go +++ b/store/client.go @@ -88,7 +88,7 @@ func (c *Client) Query(ctx context.Context, query *pb.HistoryQuery, pageFn func( var res *pb.HistoryResponse for { if res != nil { - if isLastPage(query, res) { + if isLastPage(res) { break } query.PagingInfo = res.PagingInfo @@ -148,6 +148,6 @@ func (c *Client) queryFrom(ctx context.Context, query *pb.HistoryQuery) (*pb.His return res.Response, nil } -func isLastPage(query *pb.HistoryQuery, res *pb.HistoryResponse) bool { +func isLastPage(res *pb.HistoryResponse) bool { return res.PagingInfo == nil || res.PagingInfo.Cursor == nil || len(res.Messages) == 0 } From c95194aa32c4ca3449acfa55363556671febd54e Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Tue, 5 Jul 2022 14:57:04 -0400 Subject: [PATCH 06/13] Remove redundant log --- store/client.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/store/client.go b/store/client.go index b03d5ecf..7ba1dc6d 100644 --- a/store/client.go +++ b/store/client.go @@ -13,7 +13,6 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/store" - "github.com/xmtp/xmtp-node-go/logging" "github.com/xmtp/xmtp-node-go/metrics" "go.uber.org/zap" ) @@ -81,8 +80,6 @@ func New(opts ...ClientOption) (*Client, error) { // every page response, traversing every page until the end or until pageFn // returns false. func (c *Client) Query(ctx context.Context, query *pb.HistoryQuery, pageFn func(res *pb.HistoryResponse) (int, bool)) (int, error) { - c.log.Info("querying", logging.HostID("peer", *c.peer)) - var msgCount int var msgCountLock sync.RWMutex var res *pb.HistoryResponse From 457da995f033592b84186fb4103ce342be7f838d Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Wed, 6 Jul 2022 15:56:30 -0400 Subject: [PATCH 07/13] s/queryFrom/queryPage --- store/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/client.go b/store/client.go index 7ba1dc6d..b1d996c8 100644 --- a/store/client.go +++ b/store/client.go @@ -92,7 +92,7 @@ func (c *Client) Query(ctx context.Context, query *pb.HistoryQuery, pageFn func( } var err error - res, err = c.queryFrom(ctx, query) + res, err = c.queryPage(ctx, query) if err != nil { return 0, err } @@ -109,7 +109,7 @@ func (c *Client) Query(ctx context.Context, query *pb.HistoryQuery, pageFn func( return msgCount, nil } -func (c *Client) queryFrom(ctx context.Context, query *pb.HistoryQuery) (*pb.HistoryResponse, error) { +func (c *Client) queryPage(ctx context.Context, query *pb.HistoryQuery) (*pb.HistoryResponse, error) { err := c.host.Connect(ctx, c.host.Peerstore().PeerInfo(*c.peer)) if err != nil { return nil, errors.Wrap(err, "connecting to peer") From c4f14d5c9cb18e63f922b0d6b4c93a56440efd3b Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Mon, 11 Jul 2022 06:53:34 -0400 Subject: [PATCH 08/13] s/New/NewClient and namespace the client option funcs --- e2e/e2e_test.go | 8 ++++---- store/client.go | 8 ++++---- store/client_test.go | 22 +++++++++++----------- store/store.go | 8 ++++---- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index a64f8832..2bf9ac32 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -144,10 +144,10 @@ func queryMessages(t *testing.T, c *node.WakuNode, peerAddr string, contentTopic pi, err := peer.AddrInfoFromString(peerAddr) require.NoError(t, err) - client, err := store.New( - store.WithLog(log), - store.WithHost(c.Host()), - store.WithPeer(pi.ID), + client, err := store.NewClient( + store.WithClientLog(log), + store.WithClientHost(c.Host()), + store.WithClientPeer(pi.ID), ) require.NoError(t, err) diff --git a/store/client.go b/store/client.go index b1d996c8..0de3334d 100644 --- a/store/client.go +++ b/store/client.go @@ -31,25 +31,25 @@ var ( type ClientOption func(c *Client) -func WithLog(log *zap.Logger) ClientOption { +func WithClientLog(log *zap.Logger) ClientOption { return func(c *Client) { c.log = log } } -func WithHost(host host.Host) ClientOption { +func WithClientHost(host host.Host) ClientOption { return func(c *Client) { c.host = host } } -func WithPeer(id peer.ID) ClientOption { +func WithClientPeer(id peer.ID) ClientOption { return func(c *Client) { c.peer = &id } } -func New(opts ...ClientOption) (*Client, error) { +func NewClient(opts ...ClientOption) (*Client, error) { c := &Client{} for _, opt := range opts { opt(c) diff --git a/store/client_test.go b/store/client_test.go index a0454948..b1824155 100644 --- a/store/client_test.go +++ b/store/client_test.go @@ -33,7 +33,7 @@ func TestStoreClient_New(t *testing.T) { { name: "missing host", opts: []ClientOption{ - WithLog(log), + WithClientLog(log), }, expect: func(t *testing.T, c *Client, err error) { require.Equal(t, ErrMissingHostOption, err) @@ -43,8 +43,8 @@ func TestStoreClient_New(t *testing.T) { { name: "missing peer", opts: []ClientOption{ - WithLog(log), - WithHost(host), + WithClientLog(log), + WithClientHost(host), }, expect: func(t *testing.T, c *Client, err error) { require.Equal(t, ErrMissingPeerOption, err) @@ -54,9 +54,9 @@ func TestStoreClient_New(t *testing.T) { { name: "success", opts: []ClientOption{ - WithLog(log), - WithHost(host), - WithPeer(peerID), + WithClientLog(log), + WithClientHost(host), + WithClientPeer(peerID), }, expect: func(t *testing.T, c *Client, err error) { require.NoError(t, err) @@ -68,7 +68,7 @@ func TestStoreClient_New(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() - c, err := New(tc.opts...) + c, err := NewClient(tc.opts...) tc.expect(t, c, err) }) } @@ -372,10 +372,10 @@ func TestStoreClient_Query_PagingShouldStopOnReturnFalse(t *testing.T) { func newTestClient(t *testing.T, peerID peer.ID) *Client { host := test.NewPeer(t) log := test.NewLog(t) - c, err := New( - WithLog(log), - WithHost(host), - WithPeer(peerID), + c, err := NewClient( + WithClientLog(log), + WithClientHost(host), + WithClientPeer(peerID), ) require.NoError(t, err) require.NotNil(t, c) diff --git a/store/store.go b/store/store.go index c3c93ce1..5100737a 100644 --- a/store/store.go +++ b/store/store.go @@ -187,10 +187,10 @@ func (s *XmtpStore) Resume(ctx context.Context, pubsubTopic string, peers []peer } func (s *XmtpStore) queryPeer(ctx context.Context, req *pb.HistoryQuery, peerID peer.ID, msgFn func(*pb.WakuMessage) bool) (int, error) { - c, err := New( - WithLog(s.log), - WithHost(s.h), - WithPeer(peerID), + c, err := NewClient( + WithClientLog(s.log), + WithClientHost(s.h), + WithClientPeer(peerID), ) if err != nil { return 0, err From c482dd5b41698b90d993305d1b896a3d133a3c56 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Mon, 11 Jul 2022 06:59:03 -0400 Subject: [PATCH 09/13] Remove unused mutex on msg count --- store/client.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/store/client.go b/store/client.go index 0de3334d..bbfa010f 100644 --- a/store/client.go +++ b/store/client.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "math" - "sync" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" @@ -81,7 +80,6 @@ func NewClient(opts ...ClientOption) (*Client, error) { // returns false. func (c *Client) Query(ctx context.Context, query *pb.HistoryQuery, pageFn func(res *pb.HistoryResponse) (int, bool)) (int, error) { var msgCount int - var msgCountLock sync.RWMutex var res *pb.HistoryResponse for { if res != nil { @@ -102,9 +100,7 @@ func (c *Client) Query(ctx context.Context, query *pb.HistoryQuery, pageFn func( break } - msgCountLock.Lock() msgCount += count - msgCountLock.Unlock() } return msgCount, nil } From a6016833fd30ab3f66280711bc267d39546a6039 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Mon, 11 Jul 2022 08:10:14 -0400 Subject: [PATCH 10/13] go mod tidy --- go.mod | 7 ++----- go.sum | 8 -------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index df88325f..db2bfcac 100644 --- a/go.mod +++ b/go.mod @@ -8,17 +8,16 @@ require ( github.com/hashicorp/go-tfe v1.2.0 github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/huandu/go-sqlbuilder v1.13.0 - github.com/ipfs/go-ds-sql v0.3.0 github.com/ipfs/go-log v1.0.5 github.com/jarcoal/httpmock v1.2.0 github.com/jessevdk/go-flags v1.4.0 github.com/libp2p/go-libp2p v0.20.2 github.com/libp2p/go-libp2p-core v0.16.1 - github.com/libp2p/go-libp2p-peerstore v0.6.0 github.com/libp2p/go-libp2p-pubsub v0.6.1 github.com/libp2p/go-msgio v0.2.0 github.com/mattn/go-sqlite3 v1.14.13 github.com/multiformats/go-multiaddr v0.5.0 + github.com/pkg/errors v0.9.1 github.com/status-im/go-waku v0.0.0-20220310221450-e7098efcff73 github.com/stretchr/testify v1.7.1 github.com/uptrace/bun v1.1.3 @@ -81,12 +80,10 @@ require ( github.com/huandu/xstrings v1.3.2 // indirect github.com/huin/goupnp v1.0.3 // indirect github.com/ipfs/go-cid v0.1.0 // indirect - github.com/ipfs/go-datastore v0.5.1 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/jbenet/goprocess v0.1.4 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.15.1 // indirect @@ -98,6 +95,7 @@ require ( github.com/libp2p/go-flow-metrics v0.0.3 // indirect github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect github.com/libp2p/go-libp2p-discovery v0.6.0 // indirect + github.com/libp2p/go-libp2p-peerstore v0.6.0 // indirect github.com/libp2p/go-libp2p-resource-manager v0.3.0 // indirect github.com/libp2p/go-mplex v0.7.0 // indirect github.com/libp2p/go-nat v0.1.0 // indirect @@ -135,7 +133,6 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/philhofer/fwd v1.1.1 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect diff --git a/go.sum b/go.sum index 291a0341..6884dfb7 100644 --- a/go.sum +++ b/go.sum @@ -65,7 +65,6 @@ dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg= -github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M= github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4= github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= @@ -504,7 +503,6 @@ github.com/denisenkom/go-mssqldb v0.0.0-20200428022330-06a60b6afbbc/go.mod h1:xb github.com/denisenkom/go-mssqldb v0.10.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/denisenkom/go-mssqldb v0.11.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0= -github.com/dgraph-io/badger v1.6.2 h1:mNw0qs90GVgGGWylh0umH5iag1j6n/PeJtNvL6KY/x8= github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrVH//y2UQE= github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI= @@ -1010,15 +1008,11 @@ github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqg github.com/ipfs/go-cid v0.1.0 h1:YN33LQulcRHjfom/i25yoOZR4Telp1Hr/2RU3d0PnC0= github.com/ipfs/go-cid v0.1.0/go.mod h1:rH5/Xv83Rfy8Rw6xG+id3DYAMUVmem1MowoKwdXmN2o= github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= -github.com/ipfs/go-datastore v0.5.1 h1:WkRhLuISI+XPD0uk3OskB0fYFSyqK8Ob5ZYew9Qa1nQ= github.com/ipfs/go-datastore v0.5.1/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= -github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1Ro= github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupVCGm4QUIek= -github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo= github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q= -github.com/ipfs/go-ds-sql v0.3.0 h1:PLBbl0Rt0tBwWhQ0b3GCQbH+Bgd6aj2srKG6vJ7nYl4= github.com/ipfs/go-ds-sql v0.3.0/go.mod h1:jE3bhmuUnMPXFftc4NEAiPUfgiwiv7fIdjozuX+m1/E= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= @@ -1104,7 +1098,6 @@ github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5D github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= -github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= @@ -1217,7 +1210,6 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtTX406BpdQMqw= github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= From b63548eee6603fc62360bc354cf04acfced73ee6 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Mon, 11 Jul 2022 08:11:08 -0400 Subject: [PATCH 11/13] Remove redundant peer connect in store client --- store/client.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/store/client.go b/store/client.go index bbfa010f..a8ec0bae 100644 --- a/store/client.go +++ b/store/client.go @@ -106,11 +106,6 @@ func (c *Client) Query(ctx context.Context, query *pb.HistoryQuery, pageFn func( } func (c *Client) queryPage(ctx context.Context, query *pb.HistoryQuery) (*pb.HistoryResponse, error) { - err := c.host.Connect(ctx, c.host.Peerstore().PeerInfo(*c.peer)) - if err != nil { - return nil, errors.Wrap(err, "connecting to peer") - } - stream, err := c.host.NewStream(ctx, *c.peer, store.StoreID_v20beta4) if err != nil { return nil, errors.Wrap(err, "opening query stream") From 4806a212d89b155af45508beaa77e8d3f5ff5bf5 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Mon, 11 Jul 2022 08:31:31 -0400 Subject: [PATCH 12/13] Fix flakey test --- testing/relay.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/relay.go b/testing/relay.go index 78d68e98..3116b87c 100644 --- a/testing/relay.go +++ b/testing/relay.go @@ -44,7 +44,7 @@ func SubscribeExpect(t *testing.T, envC chan *protocol.Envelope, msgs []*pb.Waku if len(receivedMsgs) == len(msgs) { done = true } - case <-time.After(500 * time.Millisecond): + case <-time.After(2 * time.Second): done = true } } From 8620204f76dba253d30e17809b13e9a0b12f90de Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Mon, 11 Jul 2022 09:17:47 -0400 Subject: [PATCH 13/13] Fix flakey test --- server/server.go | 2 +- server/server_test.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index dd0cfb90..d3a0033c 100644 --- a/server/server.go +++ b/server/server.go @@ -200,7 +200,7 @@ func (server *Server) staticNodesConnectLoop(staticNodes []string) { dialPeer := func(peerAddr string) { err := server.wakuNode.DialPeer(server.ctx, peerAddr) if err != nil { - server.logger.Error("dialing static node", zap.Error(err)) + server.logger.Error("dialing static node", zap.Error(err), zap.String("peer_addr", peerAddr)) } } diff --git a/server/server_test.go b/server/server_test.go index e50d1aea..ab0b73a8 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -59,6 +59,9 @@ func newTestServer(t *testing.T, staticNodes []string) (*Server, func()) { DbConnectionString: dbDSN, }, StaticNodes: staticNodes, + WSAddress: "0.0.0.0", + WSPort: 0, + EnableWS: true, }) require.NotNil(t, s) return s, func() {