Skip to content

Commit

Permalink
feat: use custom odb pubsub
Browse files Browse the repository at this point in the history
Signed-off-by: Guilhem Fanton <guilhem.fanton@gmail.com>
  • Loading branch information
gfanton committed Jul 16, 2020
1 parent 1693d1d commit 4b0393f
Show file tree
Hide file tree
Showing 10 changed files with 329 additions and 38 deletions.
63 changes: 60 additions & 3 deletions go/internal/ipfsutil/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"berty.tech/berty/v2/go/pkg/errcode"
datastore "github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
ipfs_ds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
Expand All @@ -14,11 +15,17 @@ import (
ipfs_libp2p "github.com/ipfs/go-ipfs/core/node/libp2p"
ipfs_repo "github.com/ipfs/go-ipfs/repo"
ipfs_interface "github.com/ipfs/interface-go-ipfs-core"
"github.com/pkg/errors"

p2p "github.com/libp2p/go-libp2p" // nolint:staticcheck
host "github.com/libp2p/go-libp2p-core/host"
p2p_host "github.com/libp2p/go-libp2p-core/host"
p2p_peer "github.com/libp2p/go-libp2p-core/peer" // nolint:staticcheck
p2p_ps "github.com/libp2p/go-libp2p-core/peerstore"
p2p_routing "github.com/libp2p/go-libp2p-core/routing"
p2p_dht "github.com/libp2p/go-libp2p-kad-dht"
p2p_dualdht "github.com/libp2p/go-libp2p-kad-dht/dual"
p2p_record "github.com/libp2p/go-libp2p-record"
// nolint:staticcheck
)

Expand All @@ -30,7 +37,9 @@ type CoreAPIConfig struct {
APIAddrs []string
APIConfig config.API

HostConfig func(host p2p_host.Host) error
ExtraLibp2pOption p2p.Option
DHTOption []p2p_dht.Option
Routing ipfs_libp2p.RoutingOption

Options []CoreAPIOption
Expand Down Expand Up @@ -96,16 +105,24 @@ func CreateBuildConfig(repo ipfs_repo.Repo, opts *CoreAPIConfig) (*ipfs_node.Bui
opts = &CoreAPIConfig{}
}

routingOpt := ipfs_libp2p.DHTOption
routingOpt := ipfs_libp2p.DHTClientOption
if opts.Routing != nil {
routingOpt = opts.Routing
routingOpt = configureRouting(p2p_dht.ModeClient,
p2p_dht.Concurrency(5),
p2p_dht.DisableAutoRefresh(),
p2p_dht.V1CompatibleMode(false),
)
}

hostOpt := ipfs_libp2p.DefaultHostOption
if opts.ExtraLibp2pOption != nil {
hostOpt = wrapP2POptionsToHost(hostOpt, opts.ExtraLibp2pOption)
}

if opts.HostConfig != nil {
hostOpt = wrapHostConfig(hostOpt, opts.HostConfig)
}

return &ipfs_node.BuildCfg{
Online: true,
Permanent: true,
Expand All @@ -115,7 +132,7 @@ func CreateBuildConfig(repo ipfs_repo.Repo, opts *CoreAPIConfig) (*ipfs_node.Bui
Host: hostOpt,
Repo: repo,
ExtraOpts: map[string]bool{
"pubsub": true,
"pubsub": false,
},
}, nil
}
Expand Down Expand Up @@ -145,6 +162,46 @@ func updateRepoConfig(repo ipfs_repo.Repo, cfg *CoreAPIConfig) error {
return repo.SetConfig(rcfg)
}

func wrapHostConfig(hf ipfs_libp2p.HostOption, hc func(h host.Host) error) ipfs_libp2p.HostOption {
return func(ctx context.Context, id p2p_peer.ID, ps p2p_ps.Peerstore, options ...p2p.Option) (p2p_host.Host, error) {
h, err := hf(ctx, id, ps, options...)
if err != nil {
return nil, err
}

if err = hc(h); err != nil {
_ = h.Close()
return nil, errors.Wrap(err, "failed to config host")
}

return h, nil
}
}

func configureRouting(mode p2p_dht.ModeOpt, opts ...p2p_dht.Option) func(
ctx context.Context,
host p2p_host.Host,
dstore datastore.Batching,
validator p2p_record.Validator,
bootstrapPeers ...p2p_peer.AddrInfo,
) (p2p_routing.Routing, error) {
return func(
ctx context.Context,
host p2p_host.Host,
dstore datastore.Batching,
validator p2p_record.Validator,
bootstrapPeers ...p2p_peer.AddrInfo,
) (p2p_routing.Routing, error) {
return p2p_dualdht.New(ctx, host,
append(opts,
p2p_dht.Mode(mode),
p2p_dht.Datastore(dstore),
p2p_dht.Validator(validator),
p2p_dht.BootstrapPeers(bootstrapPeers...),
)...)
}
}

func wrapP2POptionsToHost(hf ipfs_libp2p.HostOption, opt ...p2p.Option) ipfs_libp2p.HostOption {
return func(ctx context.Context, id p2p_peer.ID, ps p2p_ps.Peerstore, options ...p2p.Option) (p2p_host.Host, error) {
return hf(ctx, id, ps, append(options, opt...)...)
Expand Down
39 changes: 39 additions & 0 deletions go/internal/ipfsutil/pubsub_adaptater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ipfsutil

import (
ipfs_interface "github.com/ipfs/interface-go-ipfs-core"
)

type pubsubCoreAPIAdaptater struct {
ipfs_interface.PubSubAPI

ipfs_interface.CoreAPI
}

func (ps *pubsubCoreAPIAdaptater) PubSub() ipfs_interface.PubSubAPI {
return ps.PubSubAPI
}

func InjectPubSubAPI(api ipfs_interface.CoreAPI, ps ipfs_interface.PubSubAPI) ipfs_interface.CoreAPI {
return &pubsubCoreAPIAdaptater{
PubSubAPI: ps,
CoreAPI: api,
}
}

type pubsubExtendedCoreAPIAdaptater struct {
ipfs_interface.PubSubAPI

ExtendedCoreAPI
}

func (ps *pubsubExtendedCoreAPIAdaptater) PubSub() ipfs_interface.PubSubAPI {
return ps.PubSubAPI
}

func InjectPubSubCoreAPIExtendedAdaptater(exapi ExtendedCoreAPI, ps ipfs_interface.PubSubAPI) ExtendedCoreAPI {
return &pubsubExtendedCoreAPIAdaptater{
PubSubAPI: ps,
ExtendedCoreAPI: exapi,
}
}
171 changes: 171 additions & 0 deletions go/internal/ipfsutil/pubsub_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package ipfsutil

import (
"context"
"sync"

ipfs_interface "github.com/ipfs/interface-go-ipfs-core"
ipfs_iopts "github.com/ipfs/interface-go-ipfs-core/options"
p2p_disc "github.com/libp2p/go-libp2p-core/discovery"
p2p_host "github.com/libp2p/go-libp2p-core/host"
p2p_peer "github.com/libp2p/go-libp2p-core/peer"
p2p_pubsub "github.com/libp2p/go-libp2p-pubsub"

"go.uber.org/zap"
)

type PubSubAPI struct {
*p2p_pubsub.PubSub

host p2p_host.Host
disc p2p_disc.Discovery
logger *zap.Logger

muTopics sync.RWMutex
topics map[string]*p2p_pubsub.Topic
}

func NewPubSubAPI(ctx context.Context, logger *zap.Logger, disc p2p_disc.Discovery, h p2p_host.Host) (ipfs_interface.PubSubAPI, error) {
ps, err := p2p_pubsub.NewGossipSub(ctx, h,
p2p_pubsub.WithMessageSigning(true),
p2p_pubsub.WithFloodPublish(true),
p2p_pubsub.WithDiscovery(disc),
)

if err != nil {
return nil, err
}

return &PubSubAPI{
host: h,
disc: disc,
PubSub: ps,
logger: logger,
topics: make(map[string]*p2p_pubsub.Topic),
}, nil
}

func (ps *PubSubAPI) getTopic(topic string) (*p2p_pubsub.Topic, error) {
ps.muTopics.Lock()
defer ps.muTopics.Unlock()

t, ok := ps.topics[topic]
if !ok {
var err error
t, err = ps.PubSub.Join(topic)
if err != nil {
return nil, err
}

ps.topics[topic] = t
}

return t, nil
}

// Ls lists subscribed topics by name
func (ps *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
return ps.PubSub.GetTopics(), nil
}

// Peers list peers we are currently pubsubbing with
func (ps *PubSubAPI) Peers(ctx context.Context, opts ...ipfs_iopts.PubSubPeersOption) ([]p2p_peer.ID, error) {
s, err := ipfs_iopts.PubSubPeersOptions(opts...)
if err != nil {
return nil, err
}

return ps.PubSub.ListPeers(s.Topic), nil
}

var minTopicSize = p2p_pubsub.WithReadiness(p2p_pubsub.MinTopicSize(1))

// Publish a message to a given pubsub topic
func (ps *PubSubAPI) Publish(ctx context.Context, topic string, msg []byte) error {
ps.logger.Debug("publishing", zap.String("topic", topic), zap.Int("msglen", len(msg)))

t, err := ps.getTopic(topic)
if err != nil {
ps.logger.Warn("unable to get topic", zap.Error(err))
return err
}

peers := t.ListPeers()
if len(peers) == 0 {
ps.logger.Warn("no peers connected to this topic...", zap.String("topic", topic))
}

return t.Publish(context.Background(), msg, minTopicSize)
}

// Subscribe to messages on a given topic
func (ps *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...ipfs_iopts.PubSubSubscribeOption) (ipfs_interface.PubSubSubscription, error) {
t, err := ps.getTopic(topic)
if err != nil {
ps.logger.Warn("unable to join topic", zap.Error(err))
return nil, err
}

ps.logger.Debug("subscribing:", zap.String("topic", topic))
sub, err := t.Subscribe()
if err != nil {
ps.logger.Warn("unable to subscribe to topic", zap.String("topic", topic), zap.Error(err))
return nil, err
}

return &pubsubSubscriptionAPI{ps.logger, sub}, nil
}

// PubSubSubscription is an active PubSub subscription
type pubsubSubscriptionAPI struct {
logger *zap.Logger
*p2p_pubsub.Subscription
}

// io.Closer
func (pss *pubsubSubscriptionAPI) Close() (_ error) {
pss.Subscription.Cancel()
return
}

// Next return the next incoming message
func (pss *pubsubSubscriptionAPI) Next(ctx context.Context) (ipfs_interface.PubSubMessage, error) {
m, err := pss.Subscription.Next(ctx)
if err != nil {
pss.logger.Warn("unable to get next message", zap.Error(err))
return nil, err
}

pss.logger.Debug("got a message from pubsub",
zap.Int("size", len(m.Message.Data)),
zap.String("from", m.ReceivedFrom.String()),
zap.Strings("topic", m.TopicIDs),
)

return &pubsubMessageAPI{m}, nil
}

// // PubSubMessage is a single PubSub message
type pubsubMessageAPI struct {
*p2p_pubsub.Message
}

// // From returns id of a peer from which the message has arrived
func (psm *pubsubMessageAPI) From() p2p_peer.ID {
return psm.Message.GetFrom()
}

// Data returns the message body
func (psm *pubsubMessageAPI) Data() []byte {
return psm.Message.GetData()
}

// Seq returns message identifier
func (psm *pubsubMessageAPI) Seq() []byte {
return psm.Message.GetSeqno()
}

// // Topics returns list of topics this message was set to
func (psm *pubsubMessageAPI) Topics() []string {
return psm.Message.GetTopicIDs()
}
6 changes: 3 additions & 3 deletions go/internal/ipfsutil/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (

// defaultConnMgrHighWater is the default value for the connection managers
// 'high water' mark
const defaultConnMgrHighWater = 100
const defaultConnMgrHighWater = 50

// defaultConnMgrLowWater is the default value for the connection managers 'low
// water' mark
const defaultConnMgrLowWater = 50
const defaultConnMgrLowWater = 10

// defaultConnMgrGracePeriod is the default value for the connection managers
// grace period
Expand Down Expand Up @@ -95,7 +95,7 @@ func createBaseConfig() (*ipfs_cfg.Config, error) {

// Discovery
c.Discovery.MDNS.Enabled = true
c.Discovery.MDNS.Interval = 1
c.Discovery.MDNS.Interval = 5

// swarm listeners
c.Addresses.Swarm = []string{
Expand Down
Loading

0 comments on commit 4b0393f

Please sign in to comment.