From 4b0393fb9e0f8c93ff9b53f61c7facd8a32734ce Mon Sep 17 00:00:00 2001 From: Guilhem Fanton Date: Thu, 25 Jun 2020 15:24:03 +0200 Subject: [PATCH] feat: use custom odb pubsub Signed-off-by: Guilhem Fanton --- go/internal/ipfsutil/api.go | 63 ++++++++- go/internal/ipfsutil/pubsub_adaptater.go | 39 ++++++ go/internal/ipfsutil/pubsub_api.go | 171 +++++++++++++++++++++++ go/internal/ipfsutil/repo.go | 6 +- go/internal/ipfsutil/testing.go | 63 ++++++--- go/internal/tinder/driver_multi.go | 10 +- go/internal/tinder/service.go | 7 +- go/pkg/bertyprotocol/scenario_test.go | 6 +- go/pkg/bertyprotocol/service.go | 1 + go/pkg/bertyprotocol/testing.go | 1 + 10 files changed, 329 insertions(+), 38 deletions(-) create mode 100644 go/internal/ipfsutil/pubsub_adaptater.go create mode 100644 go/internal/ipfsutil/pubsub_api.go diff --git a/go/internal/ipfsutil/api.go b/go/internal/ipfsutil/api.go index 9d15f220..0d52c26e 100644 --- a/go/internal/ipfsutil/api.go +++ b/go/internal/ipfsutil/api.go @@ -4,6 +4,7 @@ import ( "context" "berty.tech/berty/v2/go/pkg/errcode" + datastore "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore" ipfs_ds "github.com/ipfs/go-datastore" dsync "github.com/ipfs/go-datastore/sync" @@ -14,11 +15,17 @@ import ( ipfs_libp2p "github.com/ipfs/go-ipfs/core/node/libp2p" ipfs_repo "github.com/ipfs/go-ipfs/repo" ipfs_interface "github.com/ipfs/interface-go-ipfs-core" + "github.com/pkg/errors" p2p "github.com/libp2p/go-libp2p" // nolint:staticcheck + host "github.com/libp2p/go-libp2p-core/host" p2p_host "github.com/libp2p/go-libp2p-core/host" p2p_peer "github.com/libp2p/go-libp2p-core/peer" // nolint:staticcheck p2p_ps "github.com/libp2p/go-libp2p-core/peerstore" + p2p_routing "github.com/libp2p/go-libp2p-core/routing" + p2p_dht "github.com/libp2p/go-libp2p-kad-dht" + p2p_dualdht "github.com/libp2p/go-libp2p-kad-dht/dual" + p2p_record "github.com/libp2p/go-libp2p-record" // nolint:staticcheck ) @@ -30,7 +37,9 @@ type CoreAPIConfig struct { APIAddrs []string APIConfig config.API + HostConfig func(host p2p_host.Host) error ExtraLibp2pOption p2p.Option + DHTOption []p2p_dht.Option Routing ipfs_libp2p.RoutingOption Options []CoreAPIOption @@ -96,9 +105,13 @@ func CreateBuildConfig(repo ipfs_repo.Repo, opts *CoreAPIConfig) (*ipfs_node.Bui opts = &CoreAPIConfig{} } - routingOpt := ipfs_libp2p.DHTOption + routingOpt := ipfs_libp2p.DHTClientOption if opts.Routing != nil { - routingOpt = opts.Routing + routingOpt = configureRouting(p2p_dht.ModeClient, + p2p_dht.Concurrency(5), + p2p_dht.DisableAutoRefresh(), + p2p_dht.V1CompatibleMode(false), + ) } hostOpt := ipfs_libp2p.DefaultHostOption @@ -106,6 +119,10 @@ func CreateBuildConfig(repo ipfs_repo.Repo, opts *CoreAPIConfig) (*ipfs_node.Bui hostOpt = wrapP2POptionsToHost(hostOpt, opts.ExtraLibp2pOption) } + if opts.HostConfig != nil { + hostOpt = wrapHostConfig(hostOpt, opts.HostConfig) + } + return &ipfs_node.BuildCfg{ Online: true, Permanent: true, @@ -115,7 +132,7 @@ func CreateBuildConfig(repo ipfs_repo.Repo, opts *CoreAPIConfig) (*ipfs_node.Bui Host: hostOpt, Repo: repo, ExtraOpts: map[string]bool{ - "pubsub": true, + "pubsub": false, }, }, nil } @@ -145,6 +162,46 @@ func updateRepoConfig(repo ipfs_repo.Repo, cfg *CoreAPIConfig) error { return repo.SetConfig(rcfg) } +func wrapHostConfig(hf ipfs_libp2p.HostOption, hc func(h host.Host) error) ipfs_libp2p.HostOption { + return func(ctx context.Context, id p2p_peer.ID, ps p2p_ps.Peerstore, options ...p2p.Option) (p2p_host.Host, error) { + h, err := hf(ctx, id, ps, options...) + if err != nil { + return nil, err + } + + if err = hc(h); err != nil { + _ = h.Close() + return nil, errors.Wrap(err, "failed to config host") + } + + return h, nil + } +} + +func configureRouting(mode p2p_dht.ModeOpt, opts ...p2p_dht.Option) func( + ctx context.Context, + host p2p_host.Host, + dstore datastore.Batching, + validator p2p_record.Validator, + bootstrapPeers ...p2p_peer.AddrInfo, +) (p2p_routing.Routing, error) { + return func( + ctx context.Context, + host p2p_host.Host, + dstore datastore.Batching, + validator p2p_record.Validator, + bootstrapPeers ...p2p_peer.AddrInfo, + ) (p2p_routing.Routing, error) { + return p2p_dualdht.New(ctx, host, + append(opts, + p2p_dht.Mode(mode), + p2p_dht.Datastore(dstore), + p2p_dht.Validator(validator), + p2p_dht.BootstrapPeers(bootstrapPeers...), + )...) + } +} + func wrapP2POptionsToHost(hf ipfs_libp2p.HostOption, opt ...p2p.Option) ipfs_libp2p.HostOption { return func(ctx context.Context, id p2p_peer.ID, ps p2p_ps.Peerstore, options ...p2p.Option) (p2p_host.Host, error) { return hf(ctx, id, ps, append(options, opt...)...) diff --git a/go/internal/ipfsutil/pubsub_adaptater.go b/go/internal/ipfsutil/pubsub_adaptater.go new file mode 100644 index 00000000..147c7cec --- /dev/null +++ b/go/internal/ipfsutil/pubsub_adaptater.go @@ -0,0 +1,39 @@ +package ipfsutil + +import ( + ipfs_interface "github.com/ipfs/interface-go-ipfs-core" +) + +type pubsubCoreAPIAdaptater struct { + ipfs_interface.PubSubAPI + + ipfs_interface.CoreAPI +} + +func (ps *pubsubCoreAPIAdaptater) PubSub() ipfs_interface.PubSubAPI { + return ps.PubSubAPI +} + +func InjectPubSubAPI(api ipfs_interface.CoreAPI, ps ipfs_interface.PubSubAPI) ipfs_interface.CoreAPI { + return &pubsubCoreAPIAdaptater{ + PubSubAPI: ps, + CoreAPI: api, + } +} + +type pubsubExtendedCoreAPIAdaptater struct { + ipfs_interface.PubSubAPI + + ExtendedCoreAPI +} + +func (ps *pubsubExtendedCoreAPIAdaptater) PubSub() ipfs_interface.PubSubAPI { + return ps.PubSubAPI +} + +func InjectPubSubCoreAPIExtendedAdaptater(exapi ExtendedCoreAPI, ps ipfs_interface.PubSubAPI) ExtendedCoreAPI { + return &pubsubExtendedCoreAPIAdaptater{ + PubSubAPI: ps, + ExtendedCoreAPI: exapi, + } +} diff --git a/go/internal/ipfsutil/pubsub_api.go b/go/internal/ipfsutil/pubsub_api.go new file mode 100644 index 00000000..1c67ab76 --- /dev/null +++ b/go/internal/ipfsutil/pubsub_api.go @@ -0,0 +1,171 @@ +package ipfsutil + +import ( + "context" + "sync" + + ipfs_interface "github.com/ipfs/interface-go-ipfs-core" + ipfs_iopts "github.com/ipfs/interface-go-ipfs-core/options" + p2p_disc "github.com/libp2p/go-libp2p-core/discovery" + p2p_host "github.com/libp2p/go-libp2p-core/host" + p2p_peer "github.com/libp2p/go-libp2p-core/peer" + p2p_pubsub "github.com/libp2p/go-libp2p-pubsub" + + "go.uber.org/zap" +) + +type PubSubAPI struct { + *p2p_pubsub.PubSub + + host p2p_host.Host + disc p2p_disc.Discovery + logger *zap.Logger + + muTopics sync.RWMutex + topics map[string]*p2p_pubsub.Topic +} + +func NewPubSubAPI(ctx context.Context, logger *zap.Logger, disc p2p_disc.Discovery, h p2p_host.Host) (ipfs_interface.PubSubAPI, error) { + ps, err := p2p_pubsub.NewGossipSub(ctx, h, + p2p_pubsub.WithMessageSigning(true), + p2p_pubsub.WithFloodPublish(true), + p2p_pubsub.WithDiscovery(disc), + ) + + if err != nil { + return nil, err + } + + return &PubSubAPI{ + host: h, + disc: disc, + PubSub: ps, + logger: logger, + topics: make(map[string]*p2p_pubsub.Topic), + }, nil +} + +func (ps *PubSubAPI) getTopic(topic string) (*p2p_pubsub.Topic, error) { + ps.muTopics.Lock() + defer ps.muTopics.Unlock() + + t, ok := ps.topics[topic] + if !ok { + var err error + t, err = ps.PubSub.Join(topic) + if err != nil { + return nil, err + } + + ps.topics[topic] = t + } + + return t, nil +} + +// Ls lists subscribed topics by name +func (ps *PubSubAPI) Ls(ctx context.Context) ([]string, error) { + return ps.PubSub.GetTopics(), nil +} + +// Peers list peers we are currently pubsubbing with +func (ps *PubSubAPI) Peers(ctx context.Context, opts ...ipfs_iopts.PubSubPeersOption) ([]p2p_peer.ID, error) { + s, err := ipfs_iopts.PubSubPeersOptions(opts...) + if err != nil { + return nil, err + } + + return ps.PubSub.ListPeers(s.Topic), nil +} + +var minTopicSize = p2p_pubsub.WithReadiness(p2p_pubsub.MinTopicSize(1)) + +// Publish a message to a given pubsub topic +func (ps *PubSubAPI) Publish(ctx context.Context, topic string, msg []byte) error { + ps.logger.Debug("publishing", zap.String("topic", topic), zap.Int("msglen", len(msg))) + + t, err := ps.getTopic(topic) + if err != nil { + ps.logger.Warn("unable to get topic", zap.Error(err)) + return err + } + + peers := t.ListPeers() + if len(peers) == 0 { + ps.logger.Warn("no peers connected to this topic...", zap.String("topic", topic)) + } + + return t.Publish(context.Background(), msg, minTopicSize) +} + +// Subscribe to messages on a given topic +func (ps *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...ipfs_iopts.PubSubSubscribeOption) (ipfs_interface.PubSubSubscription, error) { + t, err := ps.getTopic(topic) + if err != nil { + ps.logger.Warn("unable to join topic", zap.Error(err)) + return nil, err + } + + ps.logger.Debug("subscribing:", zap.String("topic", topic)) + sub, err := t.Subscribe() + if err != nil { + ps.logger.Warn("unable to subscribe to topic", zap.String("topic", topic), zap.Error(err)) + return nil, err + } + + return &pubsubSubscriptionAPI{ps.logger, sub}, nil +} + +// PubSubSubscription is an active PubSub subscription +type pubsubSubscriptionAPI struct { + logger *zap.Logger + *p2p_pubsub.Subscription +} + +// io.Closer +func (pss *pubsubSubscriptionAPI) Close() (_ error) { + pss.Subscription.Cancel() + return +} + +// Next return the next incoming message +func (pss *pubsubSubscriptionAPI) Next(ctx context.Context) (ipfs_interface.PubSubMessage, error) { + m, err := pss.Subscription.Next(ctx) + if err != nil { + pss.logger.Warn("unable to get next message", zap.Error(err)) + return nil, err + } + + pss.logger.Debug("got a message from pubsub", + zap.Int("size", len(m.Message.Data)), + zap.String("from", m.ReceivedFrom.String()), + zap.Strings("topic", m.TopicIDs), + ) + + return &pubsubMessageAPI{m}, nil +} + +// // PubSubMessage is a single PubSub message +type pubsubMessageAPI struct { + *p2p_pubsub.Message +} + +// // From returns id of a peer from which the message has arrived +func (psm *pubsubMessageAPI) From() p2p_peer.ID { + return psm.Message.GetFrom() +} + +// Data returns the message body +func (psm *pubsubMessageAPI) Data() []byte { + return psm.Message.GetData() +} + +// Seq returns message identifier +func (psm *pubsubMessageAPI) Seq() []byte { + return psm.Message.GetSeqno() +} + +// // Topics returns list of topics this message was set to +func (psm *pubsubMessageAPI) Topics() []string { + return psm.Message.GetTopicIDs() +} diff --git a/go/internal/ipfsutil/repo.go b/go/internal/ipfsutil/repo.go index b00031f7..3a9c7897 100644 --- a/go/internal/ipfsutil/repo.go +++ b/go/internal/ipfsutil/repo.go @@ -19,11 +19,11 @@ import ( // defaultConnMgrHighWater is the default value for the connection managers // 'high water' mark -const defaultConnMgrHighWater = 100 +const defaultConnMgrHighWater = 50 // defaultConnMgrLowWater is the default value for the connection managers 'low // water' mark -const defaultConnMgrLowWater = 50 +const defaultConnMgrLowWater = 10 // defaultConnMgrGracePeriod is the default value for the connection managers // grace period @@ -95,7 +95,7 @@ func createBaseConfig() (*ipfs_cfg.Config, error) { // Discovery c.Discovery.MDNS.Enabled = true - c.Discovery.MDNS.Interval = 1 + c.Discovery.MDNS.Interval = 5 // swarm listeners c.Addresses.Swarm = []string{ diff --git a/go/internal/ipfsutil/testing.go b/go/internal/ipfsutil/testing.go index bdcf5edb..857bed9b 100644 --- a/go/internal/ipfsutil/testing.go +++ b/go/internal/ipfsutil/testing.go @@ -4,9 +4,11 @@ import ( "context" crand "crypto/rand" "encoding/base64" + "math/rand" "os" "strconv" "testing" + "time" tinder "berty.tech/berty/v2/go/internal/tinder" ds "github.com/ipfs/go-datastore" @@ -26,7 +28,9 @@ import ( p2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" libp2p_peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" + discovery "github.com/libp2p/go-libp2p-discovery" rendezvous "github.com/libp2p/go-libp2p-rendezvous" p2p_rpdb "github.com/libp2p/go-libp2p-rendezvous/db/sqlite" libp2p_mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -80,7 +84,7 @@ type TestingAPIOpts struct { } // TestingCoreAPIUsingMockNet returns a fully initialized mocked Core API with the given mocknet -func TestingCoreAPIUsingMockNet(ctx context.Context, t testing.TB, opts *TestingAPIOpts) (api CoreAPIMock, cleanup func()) { +func TestingCoreAPIUsingMockNet(ctx context.Context, t testing.TB, opts *TestingAPIOpts) (CoreAPIMock, func()) { t.Helper() if opts.Logger == nil { @@ -88,44 +92,65 @@ func TestingCoreAPIUsingMockNet(ctx context.Context, t testing.TB, opts *Testing } r := TestingRepo(t) - routingopt, crout := NewTinderRouting(opts.Logger, &opts.RDVPeer, false, true) - node, err := ipfs_core.NewNode(ctx, &ipfs_core.BuildCfg{ - Repo: r, - Online: true, - Routing: routingopt, - Host: ipfs_mock.MockHostOption(opts.Mocknet), + Repo: r, + Online: true, + Host: ipfs_mock.MockHostOption(opts.Mocknet), ExtraOpts: map[string]bool{ - "pubsub": true, + "pubsub": false, }, }) + require.NoError(t, err, "failed to initialize IPFS node mock") if ok, _ := strconv.ParseBool(os.Getenv("POI_DEBUG")); ok { EnableConnLogger(opts.Logger, node.PeerHost) } - // get back tinder service - routing := <-crout - coreapi, err := ipfs_coreapi.NewCoreAPI(node) require.NoError(t, err, "failed to initialize IPFS Core API mock") - api = &coreAPIMock{ + var disc tinder.Driver + if opts.RDVPeer.ID != "" { + // opts.Mocknet.ConnectPeers(node.Identity, opts.RDVPeer.ID) + _, err = opts.Mocknet.LinkPeers(node.Identity, opts.RDVPeer.ID) + require.NoError(t, err, "failed to link peers with rdvp") + + node.Peerstore.AddAddrs(opts.RDVPeer.ID, opts.RDVPeer.Addrs, peerstore.PermanentAddrTTL) + err = node.PeerHost.Connect(ctx, opts.RDVPeer) + require.NoError(t, err, "failed to connect to rdvPeer") + + // @FIXME(gfanton): use rand as argument + disc = tinder.NewRendezvousDiscovery(opts.Logger, node.PeerHost, opts.RDVPeer.ID, rand.New(rand.NewSource(rand.Int63()))) + } else { + disc = tinder.NewDHTDriver(node.DHT.LAN) + } + + // drivers := []tinder.Driver{} + + minBackoff, maxBackoff := time.Second*60, time.Hour + rng := rand.New(rand.NewSource(rand.Int63())) + disc, err = tinder.NewService( + opts.Logger, + disc, + discovery.NewExponentialBackoff(minBackoff, maxBackoff, discovery.FullJitter, time.Second, 5.0, 0, rng), + ) + + psapi, err := NewPubSubAPI(ctx, opts.Logger, disc, node.PeerHost) + require.NoError(t, err, "failed to initialize PubSub") + + coreapi = InjectPubSubAPI(coreapi, psapi) + + api := &coreAPIMock{ CoreAPI: coreapi, mocknet: opts.Mocknet, node: node, - tinder: routing.Routing, + tinder: disc, } - cleanup = func() { + return api, func() { node.Close() - - // manually close dht since ipfs will not be able to do that - routing.IpfsDHT.Close() } - - return } // TestingCoreAPI returns a fully initialized mocked Core API. diff --git a/go/internal/tinder/driver_multi.go b/go/internal/tinder/driver_multi.go index 43e49c4c..e482dee8 100644 --- a/go/internal/tinder/driver_multi.go +++ b/go/internal/tinder/driver_multi.go @@ -37,6 +37,8 @@ func (md *MultiDriver) Advertise(ctx context.Context, ns string, opts ...p2p_dis return 0, err } + md.logger.Debug("Advertising", zap.String("key", ns)) + md.muc.Lock() if cf, ok := md.mapc[ns]; ok { cf() @@ -79,13 +81,9 @@ func (md *MultiDriver) advertise(ctx context.Context, d Driver, ns string, opts md.logger.Debug("advertise success", zap.String("driver", d.Name()), zap.String("key", ns), - zap.Duration("ttl", ttl)) - - if ttl < 1 { - return - } + zap.Int("ttl", int(ttl))) - wait := 7 * ttl / 8 + wait := (7 * ttl / 8) select { case <-time.After(wait): case <-ctx.Done(): diff --git a/go/internal/tinder/service.go b/go/internal/tinder/service.go index 699d64b3..d347d856 100644 --- a/go/internal/tinder/service.go +++ b/go/internal/tinder/service.go @@ -10,12 +10,11 @@ type Service interface { Driver } -func NewService(logger *zap.Logger, drivers []Driver, stratFactory p2p_discovery.BackoffFactory, opts ...p2p_discovery.BackoffDiscoveryOption) (Service, error) { - mdriver := NewMultiDriver(logger, drivers...) - disc, err := p2p_discovery.NewBackoffDiscovery(mdriver, stratFactory, opts...) +func NewService(logger *zap.Logger, driver Driver, stratFactory p2p_discovery.BackoffFactory, opts ...p2p_discovery.BackoffDiscoveryOption) (Service, error) { + disc, err := p2p_discovery.NewBackoffDiscovery(driver, stratFactory, opts...) if err != nil { return nil, err } - return ComposeDriver("tinder", disc, disc, mdriver), nil + return ComposeDriver("tinder", disc, disc, driver), nil } diff --git a/go/pkg/bertyprotocol/scenario_test.go b/go/pkg/bertyprotocol/scenario_test.go index d890ff86..041c721d 100644 --- a/go/pkg/bertyprotocol/scenario_test.go +++ b/go/pkg/bertyprotocol/scenario_test.go @@ -35,10 +35,10 @@ func TestScenario_JoinGroup(t *testing.T) { {"2 clients/connectAll", 2, ConnectAll, true, false}, // {"3 clients/connectAll", 3, ConnectAll, false, true}, - //{"5 clients/connectAll", 5, ConnectAll, true, true}, - //{"8 clients/connectAll", 8, ConnectAll, false, true}, + // {"5 clients/connectAll", 5, ConnectAll, true, false}, + // {"8 clients/connectAll", 8, ConnectAll, false, false}, //@FIXME(gfanton): those tests doesn't works - //{"10 clients/connectAll", 10, ConnectAll, true, true}, + // {"10 clients/connectAll", 10, ConnectAll, true, false}, // {"10 clients/connectInLine", 10, ConnectInLine, true, true}, } diff --git a/go/pkg/bertyprotocol/service.go b/go/pkg/bertyprotocol/service.go index cb320350..9a3e5e55 100644 --- a/go/pkg/bertyprotocol/service.go +++ b/go/pkg/bertyprotocol/service.go @@ -15,6 +15,7 @@ import ( "berty.tech/go-orbit-db/pubsub/directchannel" "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" + ipfs_core "github.com/ipfs/go-ipfs/core" ipfs_interface "github.com/ipfs/interface-go-ipfs-core" "github.com/libp2p/go-libp2p-core/host" diff --git a/go/pkg/bertyprotocol/testing.go b/go/pkg/bertyprotocol/testing.go index 39f344c4..52c88f51 100644 --- a/go/pkg/bertyprotocol/testing.go +++ b/go/pkg/bertyprotocol/testing.go @@ -47,6 +47,7 @@ func NewTestingProtocol(ctx context.Context, t *testing.T, opts *TestingOpts) (* opts.applyDefaults(ctx) ipfsopts := &ipfsutil.TestingAPIOpts{ + Logger: opts.Logger, Mocknet: opts.Mocknet, RDVPeer: opts.RDVPeer, }