Skip to content

Commit

Permalink
use a single packet conn for all outgoing connections
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jun 24, 2018
1 parent c0e1cce commit 308b97b
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 12 deletions.
73 changes: 62 additions & 11 deletions p2p/transport/quic/conn_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package libp2pquic

import (
"bytes"
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"time"

ic "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
Expand All @@ -23,12 +25,14 @@ var _ = Describe("Connection", func() {
serverID, clientID peer.ID
)

createPeer := func() ic.PrivKey {
createPeer := func() (peer.ID, ic.PrivKey) {
key, err := rsa.GenerateKey(rand.Reader, 1024)
Expect(err).ToNot(HaveOccurred())
priv, err := ic.UnmarshalRsaPrivateKey(x509.MarshalPKCS1PrivateKey(key))
Expect(err).ToNot(HaveOccurred())
return priv
id, err := peer.IDFromPrivateKey(priv)
Expect(err).ToNot(HaveOccurred())
return id, priv
}

runServer := func(tr tpt.Transport) (ma.Multiaddr, <-chan tpt.Conn) {
Expand All @@ -54,13 +58,8 @@ var _ = Describe("Connection", func() {
}

BeforeEach(func() {
var err error
serverKey = createPeer()
serverID, err = peer.IDFromPrivateKey(serverKey)
Expect(err).ToNot(HaveOccurred())
clientKey = createPeer()
clientID, err = peer.IDFromPrivateKey(clientKey)
Expect(err).ToNot(HaveOccurred())
serverID, serverKey = createPeer()
clientID, clientKey = createPeer()
})

It("handshakes", func() {
Expand Down Expand Up @@ -107,8 +106,7 @@ var _ = Describe("Connection", func() {
})

It("fails if the peer ID doesn't match", func() {
thirdPartyID, err := peer.IDFromPrivateKey(createPeer())
Expect(err).ToNot(HaveOccurred())
thirdPartyID, _ := createPeer()

serverTransport, err := NewTransport(serverKey)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -171,4 +169,57 @@ var _ = Describe("Connection", func() {
Expect(err).ToNot(HaveOccurred())
Eventually(serverConnChan).Should(Receive())
})

It("dials to two servers at the same time", func() {
serverID2, serverKey2 := createPeer()

serverTransport, err := NewTransport(serverKey)
Expect(err).ToNot(HaveOccurred())
serverAddr, serverConnChan := runServer(serverTransport)
serverTransport2, err := NewTransport(serverKey2)
Expect(err).ToNot(HaveOccurred())
serverAddr2, serverConnChan2 := runServer(serverTransport2)

data := bytes.Repeat([]byte{'a'}, 5*1<<20) // 5 MB
// wait for both servers to accept a connection
// then send some data
go func() {
for _, c := range []tpt.Conn{<-serverConnChan, <-serverConnChan2} {
go func(conn tpt.Conn) {
defer GinkgoRecover()
str, err := conn.OpenStream()
Expect(err).ToNot(HaveOccurred())
defer str.Close()
_, err = str.Write(data)
Expect(err).ToNot(HaveOccurred())
}(c)
}
}()

clientTransport, err := NewTransport(clientKey)
Expect(err).ToNot(HaveOccurred())
c1, err := clientTransport.Dial(context.Background(), serverAddr, serverID)
Expect(err).ToNot(HaveOccurred())
c2, err := clientTransport.Dial(context.Background(), serverAddr2, serverID2)
Expect(err).ToNot(HaveOccurred())

done := make(chan struct{}, 2)
// receive the data on both connections at the same time
for _, c := range []tpt.Conn{c1, c2} {
go func(conn tpt.Conn) {
defer GinkgoRecover()
str, err := conn.AcceptStream()
Expect(err).ToNot(HaveOccurred())
str.Close()
d, err := ioutil.ReadAll(str)
Expect(err).ToNot(HaveOccurred())
Expect(d).To(Equal(data))
conn.Close()
done <- struct{}{}
}(c)
}

Eventually(done, 5*time.Second).Should(Receive())
Eventually(done, 5*time.Second).Should(Receive())
})
})
19 changes: 18 additions & 1 deletion p2p/transport/quic/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type transport struct {
privKey ic.PrivKey
localPeer peer.ID
tlsConf *tls.Config
pconn net.PacketConn
}

var _ tpt.Transport = &transport{}
Expand All @@ -45,10 +46,22 @@ func NewTransport(key ic.PrivKey) (tpt.Transport, error) {
if err != nil {
return nil, err
}

// create a packet conn for outgoing connections
addr, err := net.ResolveUDPAddr("udp", "localhost:0")
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}

return &transport{
privKey: key,
localPeer: localPeer,
tlsConf: tlsConf,
pconn: conn,
}, nil
}

Expand All @@ -58,6 +71,10 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp
if err != nil {
return nil, err
}
addr, err := fromQuicMultiaddr(raddr)
if err != nil {
return nil, err
}
var remotePubKey ic.PubKey
tlsConf := t.tlsConf.Clone()
// We need to check the peer ID in the VerifyPeerCertificate callback.
Expand All @@ -82,7 +99,7 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp
}
return nil
}
sess, err := quic.DialAddrContext(ctx, host, tlsConf, quicConfig)
sess, err := quic.DialContext(ctx, t.pconn, addr, host, tlsConf, quicConfig)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 308b97b

Please sign in to comment.