Skip to content
This repository has been archived by the owner on Dec 7, 2019. It is now read-only.

do connection setup here #9

Closed
wants to merge 10 commits into from
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ covertools:
go get github.com/mattn/goveralls
go get golang.org/x/tools/cmd/cover

deps: gx covertools
ginkgo:
go get github.com/onsi/ginkgo/ginkgo
go get github.com/onsi/gomega

deps: gx covertools ginkgo
gx --verbose install --global
gx-go rewrite

Expand Down
136 changes: 86 additions & 50 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,89 @@ package conn

import (
"context"
"errors"
"io"
"net"
"time"

logging "github.com/ipfs/go-log"
mpool "github.com/jbenet/go-msgio/mpool"
ci "github.com/libp2p/go-libp2p-crypto"
ic "github.com/libp2p/go-libp2p-crypto"
iconn "github.com/libp2p/go-libp2p-interface-conn"
lgbl "github.com/libp2p/go-libp2p-loggables"
peer "github.com/libp2p/go-libp2p-peer"
secio "github.com/libp2p/go-libp2p-secio"
tpt "github.com/libp2p/go-libp2p-transport"
smux "github.com/libp2p/go-stream-muxer"
ma "github.com/multiformats/go-multiaddr"
)

var log = logging.Logger("conn")

// ReleaseBuffer puts the given byte array back into the buffer pool,
// first verifying that it is the correct size
func ReleaseBuffer(b []byte) {
log.Debugf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
mpool.ByteSlicePool.Put(uint32(cap(b)), b)
}

// singleConn represents a single connection to another Peer (IPFS Node).
// singleConn represents a single stream-multipexed connection to another Peer (IPFS Node).
type singleConn struct {
local peer.ID
remote peer.ID
maconn tpt.Conn
event io.Closer
streamConn smux.Conn
tptConn tpt.Conn

secSession secio.Session

event io.Closer
}

// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote peer.ID, maconn tpt.Conn) (iconn.Conn, error) {
ml := lgbl.Dial("conn", local, remote, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
var _ iconn.Conn = &singleConn{}

conn := &singleConn{
local: local,
remote: remote,
maconn: maconn,
event: log.EventBegin(ctx, "connLifetime", ml),
// newSingleConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote peer.ID, privKey ci.PrivKey, tptConn tpt.Conn, pstpt smux.Transport, isServer bool) (iconn.Conn, error) {
ml := lgbl.Dial("conn", local, remote, tptConn.LocalMultiaddr(), tptConn.RemoteMultiaddr())

var streamConn smux.Conn
var secSession secio.Session

c := tptConn
// 1. secure the connection
if privKey != nil && iconn.EncryptConnections {
var err error
secSession, err = setupSecureSession(ctx, local, privKey, tptConn)
if err != nil {
return nil, err
}
c = &secureConn{
insecure: tptConn,
secure: secSession,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If remote is not "", we should check if remote == secSession.RemoteID() here. That way, we do this before doing anything else. Currently, we check in swarm but, IMO, that's too late. Note: this is an improvement, not a bug.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do that in a separate PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

} else {
log.Warning("creating INSECURE connection %s at %s", tptConn.LocalMultiaddr(), tptConn.RemoteMultiaddr())
}

// 2. start stream multipling
var err error
streamConn, err = pstpt.NewConn(c, isServer)
if err != nil {
return nil, err
}

log.Debugf("newSingleConn %p: %v to %v", conn, local, remote)
return conn, nil
sconn := &singleConn{
streamConn: streamConn,
tptConn: tptConn,
secSession: secSession,
event: log.EventBegin(ctx, "connLifetime", ml),
}

log.Debugf("newSingleConn %p: %v to %v", sconn, local, remote)
return sconn, nil
}

func setupSecureSession(ctx context.Context, local peer.ID, privKey ci.PrivKey, ch io.ReadWriteCloser) (secio.Session, error) {
if local == "" {
return nil, errors.New("local peer is nil")
}
if privKey == nil {
return nil, errors.New("private key is nil")
}
sessgen := secio.SessionGenerator{
LocalID: local,
PrivateKey: privKey,
}
return sessgen.NewSession(ctx, ch)
}

// close is the internal close function, called by ContextCloser.Close
Expand All @@ -57,8 +96,8 @@ func (c *singleConn) Close() error {
}
}()

// close underlying connection
return c.maconn.Close()
// closing the stream muxer also closes the underlying net.Conn
return c.streamConn.Close()
}

// ID is an identifier unique to this connection.
Expand All @@ -71,62 +110,59 @@ func (c *singleConn) String() string {
}

func (c *singleConn) LocalAddr() net.Addr {
return c.maconn.LocalAddr()
return c.tptConn.LocalAddr()
}

func (c *singleConn) RemoteAddr() net.Addr {
return c.maconn.RemoteAddr()
return c.tptConn.RemoteAddr()
}

func (c *singleConn) LocalPrivateKey() ic.PrivKey {
if c.secSession != nil {
return c.secSession.LocalPrivateKey()
}
return nil
}

func (c *singleConn) RemotePublicKey() ic.PubKey {
if c.secSession != nil {
return c.secSession.RemotePublicKey()
}
return nil
}

func (c *singleConn) SetDeadline(t time.Time) error {
return c.maconn.SetDeadline(t)
}
func (c *singleConn) SetReadDeadline(t time.Time) error {
return c.maconn.SetReadDeadline(t)
}

func (c *singleConn) SetWriteDeadline(t time.Time) error {
return c.maconn.SetWriteDeadline(t)
}

// LocalMultiaddr is the Multiaddr on this side
func (c *singleConn) LocalMultiaddr() ma.Multiaddr {
return c.maconn.LocalMultiaddr()
return c.tptConn.LocalMultiaddr()
}

// RemoteMultiaddr is the Multiaddr on the remote side
func (c *singleConn) RemoteMultiaddr() ma.Multiaddr {
return c.maconn.RemoteMultiaddr()
return c.tptConn.RemoteMultiaddr()
}

func (c *singleConn) Transport() tpt.Transport {
return c.maconn.Transport()
return c.tptConn.Transport()
}

// LocalPeer is the Peer on this side
func (c *singleConn) LocalPeer() peer.ID {
return c.local
return c.secSession.LocalPeer()
}

// RemotePeer is the Peer on the remote side
func (c *singleConn) RemotePeer() peer.ID {
return c.remote
return c.secSession.RemotePeer()
}

func (c *singleConn) AcceptStream() (smux.Stream, error) {
return c.streamConn.AcceptStream()
}

// Read reads data, net.Conn style
func (c *singleConn) Read(buf []byte) (int, error) {
return c.maconn.Read(buf)
func (c *singleConn) OpenStream() (smux.Stream, error) {
return c.streamConn.OpenStream()
}

// Write writes data, net.Conn style
func (c *singleConn) Write(buf []byte) (int, error) {
return c.maconn.Write(buf)
func (c *singleConn) IsClosed() bool {
return c.streamConn.IsClosed()
}
82 changes: 82 additions & 0 deletions conn_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package conn

import (
"context"
"strings"
"testing"
"time"

ci "github.com/libp2p/go-libp2p-crypto"
iconn "github.com/libp2p/go-libp2p-interface-conn"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
tcpt "github.com/libp2p/go-tcp-transport"
tu "github.com/libp2p/go-testutil"
ma "github.com/multiformats/go-multiaddr"
yamux "github.com/whyrusleeping/go-smux-yamux"
grc "github.com/whyrusleeping/gorocheck"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestGoLibp2pConn(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "go-libp2p-conn Suite")
}

var _ = AfterEach(func() {
time.Sleep(300 * time.Millisecond)
Expect(grc.CheckForLeaks(func(r *grc.Goroutine) bool {
return strings.Contains(r.Function, "go-log.") ||
strings.Contains(r.Stack[0], "testing.(*T).Run") ||
strings.Contains(r.Function, "specrunner.") ||
strings.Contains(r.Function, "runtime.gopark")
})).To(Succeed())
})

// the stream muxer used for tests using the single stream connection
var streamMuxer = yamux.DefaultTransport

// dialRawConn dials a tpt.Conn
// but it stops there. It doesn't do protocol selection and handshake
func dialRawConn(laddr, raddr ma.Multiaddr) tpt.Conn {
d, err := tcpt.NewTCPTransport().Dialer(laddr)
Expect(err).ToNot(HaveOccurred())
c, err := d.Dial(raddr)
Expect(err).ToNot(HaveOccurred())
return c
}

// getTransport gets the right transport for a multiaddr
func getTransport(a ma.Multiaddr) tpt.Transport {
return tcpt.NewTCPTransport()
}

// getListener creates a listener based on the PeerNetParams
// it updates the PeerNetParams to reflect the local address that was selected by the kernel
func getListener(ctx context.Context, p *tu.PeerNetParams) iconn.Listener {
tptListener, err := getTransport(p.Addr).Listen(p.Addr)
Expect(err).ToNot(HaveOccurred())
list, err := WrapTransportListener(ctx, tptListener, p.ID, streamMuxer, p.PrivKey)
Expect(err).ToNot(HaveOccurred())
p.Addr = list.Multiaddr()
return list
}

func getDialer(localPeer peer.ID, privKey ci.PrivKey, addr ma.Multiaddr) *Dialer {
d := NewDialer(localPeer, privKey, nil, streamMuxer)
d.fallback = nil // unset the fallback dialer. We want tests use the configured dialer, and to fail otherwise
tptd, err := getTransport(addr).Dialer(addr)
Expect(err).ToNot(HaveOccurred())
d.AddDialer(tptd)
return d
}

// randPeerNetParams works like testutil.RandPeerNetParams
// if called for a multi-stream transport, it replaces the address with a QUIC address
func randPeerNetParams() *tu.PeerNetParams {
p, err := tu.RandPeerNetParams()
Expect(err).ToNot(HaveOccurred())
return p
}
Loading