Skip to content

Commit

Permalink
p2p: fix sharness tests after refactor
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed May 26, 2018
1 parent a541535 commit 7d97f04
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 60 deletions.
34 changes: 28 additions & 6 deletions core/commands/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Examples:
cmdkit.StringArg("target-address", true, false, "Target endpoint."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
n, err := p2pGetNode(req)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down Expand Up @@ -164,7 +164,7 @@ var p2pLsCmd = &cmds.Command{
cmdkit.BoolOption("headers", "v", "Print table headers (Protocol, Listen, Target)."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
n, err := p2pGetNode(req)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down Expand Up @@ -221,7 +221,7 @@ var p2pCloseCmd = &cmds.Command{
Run: func(req cmds.Request, res cmds.Response) {
res.SetOutput(nil)

n, err := getNode(req)
n, err := p2pGetNode(req)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand All @@ -244,6 +244,10 @@ var p2pCloseCmd = &cmds.Command{

match := func(listener p2p.Listener) bool {
out := true
if p || !strings.HasPrefix(proto, "/p2p/") {
proto = "/p2p/" + proto
}

if p {
out = out && (proto == listener.Protocol())
}
Expand All @@ -258,12 +262,30 @@ var p2pCloseCmd = &cmds.Command{
return out
}

var closed int
for _, listener := range n.P2P.Listeners.Listeners {
if !match(listener) {
continue
}
listener.Close()
closed++
}
res.SetOutput(closed)
},
Type: int(0),
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}

closed := v.(int)
buf := new(bytes.Buffer)
fmt.Fprintf(buf, "Closed %d stream(s)\n", closed)

return buf, nil
},
},
}

Expand Down Expand Up @@ -292,7 +314,7 @@ var p2pStreamLsCmd = &cmds.Command{
cmdkit.BoolOption("headers", "v", "Print table headers (HagndlerID, Protocol, Local, Remote)."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
n, err := p2pGetNode(req)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down Expand Up @@ -352,7 +374,7 @@ var p2pStreamCloseCmd = &cmds.Command{
Run: func(req cmds.Request, res cmds.Response) {
res.SetOutput(nil)

n, err := getNode(req)
n, err := p2pGetNode(req)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down Expand Up @@ -386,7 +408,7 @@ var p2pStreamCloseCmd = &cmds.Command{
},
}

func getNode(req cmds.Request) (*core.IpfsNode, error) {
func p2pGetNode(req cmds.Request) (*core.IpfsNode, error) {
n, err := req.InvocContext().GetNode()
if err != nil {
return nil, err
Expand Down
17 changes: 16 additions & 1 deletion p2p/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package p2p

import (
"sync"

"github.com/pkg/errors"
)

type Listener interface {
Expand All @@ -25,9 +27,22 @@ type ListenerRegistry struct {
lk *sync.Mutex
}

func (r *ListenerRegistry) Lock(l Listener) error {
r.lk.Lock()

if _, ok := r.Listeners[getListenerKey(l)]; ok {
r.lk.Unlock()
return errors.New("listener already registered")
}
return nil
}

func (r *ListenerRegistry) Unlock() {
r.lk.Unlock()
}

// Register registers listenerInfo in this registry
func (r *ListenerRegistry) Register(l Listener) {
r.lk.Lock()
defer r.lk.Unlock()

r.Listeners[getListenerKey(l)] = l
Expand Down
21 changes: 14 additions & 7 deletions p2p/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,37 @@ type localListener struct {
id peer.ID

proto string
laddr ma.Multiaddr
peer peer.ID

listener manet.Listener
}

// ForwardLocal creates new P2P stream to a remote listener
func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto string, bindAddr ma.Multiaddr) (Listener, error) {
maListener, err := manet.Listen(bindAddr)
if err != nil {
return nil, err
}

listener := &localListener{
ctx: ctx,

p2p: p2p,
id: p2p.identity,

proto: proto,
laddr: bindAddr,
peer: peer,
}

listener: maListener,
if err := p2p.Listeners.Lock(listener); err != nil {
return nil, err
}

maListener, err := manet.Listen(bindAddr)
if err != nil {
p2p.Listeners.Unlock()
return nil, err
}

listener.listener = maListener

p2p.Listeners.Register(listener)
go listener.acceptConns()

Expand Down Expand Up @@ -109,7 +116,7 @@ func (l *localListener) Protocol() string {
}

func (l *localListener) ListenAddress() string {
return l.listener.Multiaddr().String()
return l.laddr.String()
}

func (l *localListener) TargetAddress() string {
Expand Down
19 changes: 15 additions & 4 deletions p2p/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ type remoteListener struct {

// ForwardRemote creates new p2p listener
func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiaddr) (Listener, error) {
listenerInfo := &remoteListener{
listener := &remoteListener{
p2p: p2p,

proto: proto,
addr: addr,
}

p2p.Listeners.Register(listenerInfo)
if err := p2p.Listeners.Lock(listener); err != nil {
return nil, err
}

p2p.peerHost.SetStreamHandler(protocol.ID(proto), func(remote net.Stream) {
local, err := manet.Dial(addr)
Expand All @@ -38,10 +40,17 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiad
return
}

//TODO: review: is there a better way to do this?
peerMa, err := ma.NewMultiaddr("/ipfs/" + remote.Conn().RemotePeer().Pretty())
if err != nil {
remote.Reset()
return
}

stream := &Stream{
Protocol: proto,

OriginAddr: remote.Conn().RemoteMultiaddr(),
OriginAddr: peerMa,
TargetAddr: addr,

Local: local,
Expand All @@ -54,7 +63,9 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiad
stream.startStreaming()
})

return listenerInfo, nil
p2p.Listeners.Register(listener)

return listener, nil
}

func (l *remoteListener) Protocol() string {
Expand Down
Loading

0 comments on commit 7d97f04

Please sign in to comment.