Skip to content

Commit

Permalink
Merge pull request #4805 from ipfs/feat/coreapi/pubsub
Browse files Browse the repository at this point in the history
coreapi: PubSub API
  • Loading branch information
Stebalien authored Oct 5, 2018
2 parents 0fb2020 + 5618fed commit 41a7388
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 98 deletions.
133 changes: 36 additions & 97 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,16 @@ package commands
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"sort"
"sync"
"time"

core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
blocks "gx/ipfs/QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM/go-block-format"
pstore "gx/ipfs/QmSJ36wcYQyEViJUWUEhJU81tw1KdakTKqLLHbvYbA9zDv/go-libp2p-peerstore"
cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
floodsub "gx/ipfs/QmUK4h113Hh7bR2gPpsMcbUEbbzc7hspocmPi91Bmi69nH/go-libp2p-floodsub"
cmds "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds"
)

Expand Down Expand Up @@ -48,6 +41,13 @@ const (
pubsubDiscoverOptionName = "discover"
)

type pubsubMessage struct {
From []byte `json:"from,omitempty"`
Data []byte `json:"data,omitempty"`
Seqno []byte `json:"seqno,omitempty"`
TopicIDs []string `json:"topicIDs,omitempty"`
}

var PubsubSubCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Subscribe to messages on a given topic.",
Expand Down Expand Up @@ -79,40 +79,19 @@ This command outputs data in the following encodings:
cmdkit.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
}

topic := req.Arguments[0]
sub, err := n.Floodsub.Subscribe(topic)
discover, _ := req.Options[pubsubDiscoverOptionName].(bool)

sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
if err != nil {
return err
}
defer sub.Cancel()

discover, _ := req.Options[pubsubDiscoverOptionName].(bool)
if discover {
go func() {
blk := blocks.NewBlock([]byte("floodsub:" + topic))
err := n.Blocks.AddBlock(blk)
if err != nil {
log.Error("pubsub discovery: ", err)
return
}

connectToPubSubPeers(req.Context, n, blk.Cid())
}()
}
defer sub.Close()

if f, ok := res.(http.Flusher); ok {
f.Flush()
Expand All @@ -126,15 +105,17 @@ This command outputs data in the following encodings:
return err
}

err = res.Emit(msg)
if err != nil {
return err
}
res.Emit(&pubsubMessage{
Data: msg.Data(),
From: []byte(msg.From()),
Seqno: msg.Seq(),
TopicIDs: msg.Topics(),
})
}
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -143,7 +124,7 @@ This command outputs data in the following encodings:
return err
}),
"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -153,7 +134,7 @@ This command outputs data in the following encodings:
return err
}),
"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -166,31 +147,7 @@ This command outputs data in the following encodings:
return err
}),
},
Type: floodsub.Message{},
}

func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := n.PeerHost.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}

wg.Wait()
Type: pubsubMessage{},
}

var PubsubPubCmd = &cmds.Command{
Expand All @@ -210,20 +167,11 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
}

topic := req.Arguments[0]

err = req.ParseBodyArgs()
Expand All @@ -232,7 +180,7 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
}

for _, data := range req.Arguments[1:] {
if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil {
return err
}
}
Expand All @@ -254,21 +202,17 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
l, err := api.PubSub().Ls(req.Context)
if err != nil {
return err
}

return cmds.EmitOnce(res, stringList{n.Floodsub.GetTopics()})
return cmds.EmitOnce(res, stringList{l})
},
Type: stringList{},
Encoders: cmds.EncoderMap{
Expand Down Expand Up @@ -308,26 +252,21 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
}

var topic string
if len(req.Arguments) == 1 {
topic = req.Arguments[0]
}

peers := n.Floodsub.ListPeers(topic)
peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic))
if err != nil {
return err
}

list := &stringList{make([]string, 0, len(peers))}

for _, peer := range peers {
Expand Down
9 changes: 9 additions & 0 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ package coreapi
import (
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"

logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log"
)

var log = logging.Logger("core/coreapi")

type CoreAPI struct {
node *core.IpfsNode
}
Expand Down Expand Up @@ -72,3 +76,8 @@ func (api *CoreAPI) Dht() coreiface.DhtAPI {
func (api *CoreAPI) Swarm() coreiface.SwarmAPI {
return (*SwarmAPI)(api)
}

// PubSub returns the PubSubAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) PubSub() coreiface.PubSubAPI {
return (*PubSubAPI)(api)
}
3 changes: 3 additions & 0 deletions core/coreapi/interface/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type CoreAPI interface {
// Swarm returns an implementation of Swarm API
Swarm() SwarmAPI

// PubSub returns an implementation of PubSub API
PubSub() PubSubAPI

// ResolvePath resolves the path using Unixfs resolver
ResolvePath(context.Context, Path) (ResolvedPath, error)

Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/interface/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import "errors"

var (
ErrIsDir = errors.New("object is a directory")
ErrOffline = errors.New("can't resolve, ipfs node is offline")
ErrOffline = errors.New("this action must be run in online mode, try running 'ipfs daemon' first")
)
58 changes: 58 additions & 0 deletions core/coreapi/interface/options/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package options

type PubSubPeersSettings struct {
Topic string
}

type PubSubSubscribeSettings struct {
Discover bool
}

type PubSubPeersOption func(*PubSubPeersSettings) error
type PubSubSubscribeOption func(*PubSubSubscribeSettings) error

func PubSubPeersOptions(opts ...PubSubPeersOption) (*PubSubPeersSettings, error) {
options := &PubSubPeersSettings{
Topic: "",
}

for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}

func PubSubSubscribeOptions(opts ...PubSubSubscribeOption) (*PubSubSubscribeSettings, error) {
options := &PubSubSubscribeSettings{
Discover: false,
}

for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}

type pubsubOpts struct{}

var PubSub pubsubOpts

func (pubsubOpts) Topic(topic string) PubSubPeersOption {
return func(settings *PubSubPeersSettings) error {
settings.Topic = topic
return nil
}
}

func (pubsubOpts) Discover(discover bool) PubSubSubscribeOption {
return func(settings *PubSubSubscribeSettings) error {
settings.Discover = discover
return nil
}
}
48 changes: 48 additions & 0 deletions core/coreapi/interface/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package iface

import (
"context"
"io"

options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
)

// PubSubSubscription is an active PubSub subscription
type PubSubSubscription interface {
io.Closer

// Next return the next incoming message
Next(context.Context) (PubSubMessage, error)
}

// PubSubMessage is a single PubSub message
type PubSubMessage interface {
// From returns id of a peer from which the message has arrived
From() peer.ID

// Data returns the message body
Data() []byte

// Seq returns message identifier
Seq() []byte

// Topics returns list of topics this message was set to
Topics() []string
}

// PubSubAPI specifies the interface to PubSub
type PubSubAPI interface {
// Ls lists subscribed topics by name
Ls(context.Context) ([]string, error)

// Peers list peers we are currently pubsubbing with
Peers(context.Context, ...options.PubSubPeersOption) ([]peer.ID, error)

// Publish a message to a given pubsub topic
Publish(context.Context, string, []byte) error

// Subscribe to messages on a given topic
Subscribe(context.Context, string, ...options.PubSubSubscribeOption) (PubSubSubscription, error)
}
Loading

0 comments on commit 41a7388

Please sign in to comment.