Skip to content
This repository has been archived by the owner on Dec 7, 2019. It is now read-only.

Commit

Permalink
run mux tests with QUIC
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann authored and Stebalien committed Oct 31, 2017
1 parent 0de943e commit 425caea
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 82 deletions.
30 changes: 27 additions & 3 deletions muxtest/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,37 @@ package muxtest
import (
"testing"

tpt "github.com/libp2p/go-libp2p-transport"
tcpc "github.com/libp2p/go-tcp-transport"
quict "github.com/marten-seemann/libp2p-quic-transport"
ma "github.com/multiformats/go-multiaddr"
multistream "github.com/whyrusleeping/go-smux-multistream"
spdy "github.com/whyrusleeping/go-smux-spdystream"
yamux "github.com/whyrusleeping/go-smux-yamux"
)

func TestYamuxTransport(t *testing.T) {
SubtestAll(t, yamux.DefaultTransport)
l := listenerOpts{
transportCb: func() tpt.Transport { return tcpc.NewTCPTransport() },
localaddr: ma.StringCast("/ip4/127.0.0.1/tcp/0"),
}
SubtestAll(t, l, yamux.DefaultTransport)
}

func TestSpdyStreamTransport(t *testing.T) {
SubtestAll(t, spdy.Transport)
l := listenerOpts{
transportCb: func() tpt.Transport { return tcpc.NewTCPTransport() },
localaddr: ma.StringCast("/ip4/127.0.0.1/tcp/0"),
}
SubtestAll(t, l, spdy.Transport)
}

func TestQuicTransport(t *testing.T) {
l := listenerOpts{
transportCb: func() tpt.Transport { return quict.NewQuicTransport() },
localaddr: ma.StringCast("/ip4/127.0.0.1/udp/0/quic"),
}
SubtestAll(t, l, nil)
}

/*
Expand All @@ -27,8 +47,12 @@ func TestMuxadoTransport(t *testing.T) {
*/

func TestMultistreamTransport(t *testing.T) {
l := listenerOpts{
transportCb: func() tpt.Transport { return tcpc.NewTCPTransport() },
localaddr: ma.StringCast("/ip4/127.0.0.1/tcp/0"),
}
tpt := multistream.NewBlankTransport()
tpt.AddTransport("/yamux", yamux.DefaultTransport)
tpt.AddTransport("/spdy", spdy.Transport)
SubtestAll(t, tpt)
SubtestAll(t, l, tpt)
}
151 changes: 72 additions & 79 deletions muxtest/muxt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ import (
ps "github.com/libp2p/go-peerstream"

conn "github.com/libp2p/go-libp2p-conn"
iconn "github.com/libp2p/go-libp2p-interface-conn"
tpt "github.com/libp2p/go-libp2p-transport"
smux "github.com/libp2p/go-stream-muxer"
tcpc "github.com/libp2p/go-tcp-transport"
testutil "github.com/libp2p/go-testutil"
ma "github.com/multiformats/go-multiaddr"
)

var localaddr = ma.StringCast("/ip4/127.0.0.1/tcp/0")

var randomness []byte
var nextPort = 20000
var verbose = false
Expand All @@ -37,6 +36,11 @@ func init() {
}
}

type listenerOpts struct {
transportCb func() tpt.Transport
localaddr ma.Multiaddr
}

func randBuf(size int) []byte {
n := len(randomness) - size
if size < 1 {
Expand All @@ -49,6 +53,7 @@ func randBuf(size int) []byte {

func checkErr(t *testing.T, err error) {
if err != nil {
panic(err)
t.Fatal(err)
}
}
Expand All @@ -64,9 +69,16 @@ type echoSetup struct {
conns []*ps.Conn
}

func singleConn(t *testing.T, tr smux.Transport) echoSetup {
listenerIdentity := testutil.RandIdentityOrFatal(t)
tcp := tcpc.NewTCPTransport()
func createConnListener(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) (iconn.Listener, peer.ID) {
identity := testutil.RandIdentityOrFatal(t)
l, err := lo.transportCb().Listen(lo.localaddr)
checkErr(t, err)
cl, err := conn.WrapTransportListener(context.Background(), l, identity.ID(), streamMuxer, identity.PrivateKey())
checkErr(t, err)
return cl, identity.ID()
}

func singleConn(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) echoSetup {
swarm := ps.NewSwarm()
swarm.SetStreamHandler(func(s *ps.Stream) {
defer s.Close()
Expand All @@ -76,17 +88,17 @@ func singleConn(t *testing.T, tr smux.Transport) echoSetup {
})

log("listening at %s", "localhost:0")
l, err := tcp.Listen(localaddr)
checkErr(t, err)
cl, err := conn.WrapTransportListener(context.Background(), l, listenerIdentity.ID(), tr, listenerIdentity.PrivateKey())
checkErr(t, err)
_, err = swarm.AddListener(cl)
cl, listenerID := createConnListener(t, lo, streamMuxer)
_, err := swarm.AddListener(cl)
checkErr(t, err)

log("dialing to %s", l.Multiaddr())
log("dialing to %s", cl.Multiaddr())
dialerIdentity := testutil.RandIdentityOrFatal(t)
dialer := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, tr)
nc1, err := dialer.Dial(context.Background(), l.Multiaddr(), listenerIdentity.ID())
cd := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, streamMuxer)
dialer, err := lo.transportCb().Dialer(cl.Multiaddr())
checkErr(t, err)
cd.AddDialer(dialer)
nc1, err := cd.Dial(context.Background(), cl.Multiaddr(), listenerID)
checkErr(t, err)

c1, err := swarm.AddConn(nc1)
Expand All @@ -98,8 +110,7 @@ func singleConn(t *testing.T, tr smux.Transport) echoSetup {
}
}

func makeSwarm(t *testing.T, tr smux.Transport, nListeners int) *ps.Swarm {
tcp := tcpc.NewTCPTransport()
func makeSwarm(t *testing.T, lo listenerOpts, streamMuxer smux.Transport, nListeners int) *ps.Swarm {
swarm := ps.NewSwarm()
swarm.SetStreamHandler(func(s *ps.Stream) {
defer s.Close()
Expand All @@ -109,59 +120,50 @@ func makeSwarm(t *testing.T, tr smux.Transport, nListeners int) *ps.Swarm {
})

for i := 0; i < nListeners; i++ {
listenerIdentity := testutil.RandIdentityOrFatal(t)
log("%p listening at %s", swarm, "localhost:0")
l, err := tcp.Listen(localaddr)
checkErr(t, err)
cl, err := conn.WrapTransportListener(context.Background(), l, listenerIdentity.ID(), tr, listenerIdentity.PrivateKey())
checkErr(t, err)
_, err = swarm.AddListener(cl)
cl, _ := createConnListener(t, lo, streamMuxer)
_, err := swarm.AddListener(cl)
checkErr(t, err)
}

return swarm
}

func makeSwarms(t *testing.T, tr smux.Transport, nSwarms, nListeners int) []*ps.Swarm {
func makeSwarms(t *testing.T, lo listenerOpts, streamMuxer smux.Transport, nSwarms, nListeners int) []*ps.Swarm {
swarms := make([]*ps.Swarm, nSwarms)
for i := 0; i < nSwarms; i++ {
swarms[i] = makeSwarm(t, tr, nListeners)
swarms[i] = makeSwarm(t, lo, streamMuxer, nListeners)
}
return swarms
}

func SubtestConstructSwarm(t *testing.T, tr smux.Transport) {
func SubtestConstructSwarm(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) {
ps.NewSwarm()
}

func SubtestSimpleWrite(t *testing.T, tr smux.Transport) {
tcp := tcpc.NewTCPTransport()
func SubtestSimpleWrite(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) {
swarm := ps.NewSwarm()
defer swarm.Close()

piper, pipew := io.Pipe()
swarm.SetStreamHandler(func(s *ps.Stream) {
defer s.Close()
log("accepted stream")
w := io.MultiWriter(s, pipew)
io.Copy(w, s) // echo everything and write it to pipew
io.Copy(s, s)
log("closing stream")
})

log("listening at %s", "localhost:0")
l, err := tcp.Listen(localaddr)
cl, listenerID := createConnListener(t, lo, streamMuxer)
_, err := swarm.AddListener(cl)
checkErr(t, err)

listenerIdentity := testutil.RandIdentityOrFatal(t)
cl, err := conn.WrapTransportListener(context.Background(), l, listenerIdentity.ID(), tr, listenerIdentity.PrivateKey())
checkErr(t, err)
_, err = swarm.AddListener(cl)
checkErr(t, err)

log("dialing to %s", l.Addr().String())
log("dialing to %s", cl.Addr().String())
dialerIdentity := testutil.RandIdentityOrFatal(t)
dialer := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, tr)
nc1, err := dialer.Dial(context.Background(), l.Multiaddr(), listenerIdentity.ID())
cd := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, streamMuxer)
dialer, err := lo.transportCb().Dialer(cl.Multiaddr())
checkErr(t, err)
cd.AddDialer(dialer)
nc1, err := cd.Dial(context.Background(), cl.Multiaddr(), listenerID)
checkErr(t, err)
c1, err := swarm.AddConn(nc1)
checkErr(t, err)
Expand All @@ -170,7 +172,6 @@ func SubtestSimpleWrite(t *testing.T, tr smux.Transport) {
log("creating stream")
s1, err := c1.NewStream()
checkErr(t, err)
defer s1.Close()

buf1 := randBuf(4096)
log("writing %d bytes to stream", len(buf1))
Expand All @@ -179,26 +180,17 @@ func SubtestSimpleWrite(t *testing.T, tr smux.Transport) {

buf2 := make([]byte, len(buf1))
log("reading %d bytes from stream (echoed)", len(buf2))
_, err = s1.Read(buf2)
_, err = io.ReadFull(s1, buf2)
checkErr(t, err)
if string(buf2) != string(buf1) {
t.Errorf("buf1 and buf2 not equal: %s != %s", string(buf1), string(buf2))
}

buf3 := make([]byte, len(buf1))
log("reading %d bytes from pipe (tee)", len(buf3))
_, err = piper.Read(buf3)
checkErr(t, err)
if string(buf3) != string(buf1) {
t.Errorf("buf1 and buf3 not equal: %s != %s", string(buf1), string(buf3))
t.Errorf("buf1 and buf2 not equal: %#v != %#v", buf1, buf2)
}
}

func SubtestSimpleWrite100msgs(t *testing.T, tr smux.Transport) {

func SubtestSimpleWrite100msgs(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) {
msgs := 100
msgsize := 1 << 19
es := singleConn(t, tr)
es := singleConn(t, lo, streamMuxer)
defer es.swarm.Close()

log("creating stream")
Expand Down Expand Up @@ -243,12 +235,10 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr smux.Transport) {
}
}
}()

wg.Wait()
}

func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm, nConn, nStream, nMsg int) {

func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport, nSwarm, nConn, nStream, nMsg int) {
msgsize := 1 << 11

rateLimitN := 5000
Expand All @@ -269,7 +259,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm
for i := 0; i < nMsg; i++ {
buf := randBuf(msgsize)
bufs <- buf
log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3])
// log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3])
if _, err := s.Write(buf); err != nil {
t.Error(fmt.Errorf("s.Write(buf): %s", err))
continue
Expand All @@ -284,10 +274,10 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm
i := 0
for buf1 := range bufs {
i++
log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3])
// log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3])

if _, err := io.ReadFull(s, buf2); err != nil {
log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3])
// log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3])
t.Error(fmt.Errorf("io.ReadFull(s, buf2): %s", err))
continue
}
Expand Down Expand Up @@ -323,8 +313,11 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm
l := ls[mrand.Intn(len(ls))]

dialerIdentity := testutil.RandIdentityOrFatal(t)
dialer := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, tr)
nc, err := dialer.Dial(context.Background(), l.Multiaddr(), peer.ID(0))
cd := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, streamMuxer)
dialer, err := lo.transportCb().Dialer(l.Multiaddr())
checkErr(t, err)
cd.AddDialer(dialer)
nc, err := cd.Dial(context.Background(), l.Multiaddr(), peer.ID(0))
checkErr(t, err)

c, err := a.AddConn(nc)
Expand All @@ -342,6 +335,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm
})
}
wg.Wait()
log("Closing connection")
c.Close()
}

Expand Down Expand Up @@ -377,40 +371,39 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm
wg.Wait()
}

swarms := makeSwarms(t, tr, nSwarm, 3) // 3 listeners per swarm.
swarms := makeSwarms(t, lo, streamMuxer, nSwarm, 3) // 3 listeners per swarm.
connectSwarmsAndRW(swarms)
for _, s := range swarms {
s.Close()
}

}

func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 1, 1)
func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 1, 1, 1, 1)
}

func SubtestStress1Swarm1Conn1Stream100Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 1, 100)
func SubtestStress1Swarm1Conn1Stream100Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 1, 1, 1, 100)
}

func SubtestStress1Swarm1Conn100Stream100Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 100, 100)
func SubtestStress1Swarm1Conn100Stream100Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 1, 1, 300, 100)
}

func SubtestStress1Swarm10Conn50Stream50Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 10, 50, 50)
func SubtestStress1Swarm10Conn50Stream50Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 1, 10, 50, 50)
}

func SubtestStress5Swarm2Conn20Stream20Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 5, 2, 20, 20)
func SubtestStress5Swarm2Conn20Stream20Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 5, 2, 20, 20)
}

func SubtestStress10Swarm2Conn100Stream100Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 10, 2, 100, 100)
func SubtestStress10Swarm2Conn100Stream100Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 10, 2, 100, 100)
}

func SubtestAll(t *testing.T, tr smux.Transport) {

func SubtestAll(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) {
tests := []TransportTest{
SubtestConstructSwarm,
SubtestSimpleWrite,
Expand All @@ -427,11 +420,11 @@ func SubtestAll(t *testing.T, tr smux.Transport) {
if testing.Verbose() {
fmt.Fprintf(os.Stderr, "==== RUN %s\n", GetFunctionName(f))
}
f(t, tr)
f(t, lo, streamMuxer)
}
}

type TransportTest func(t *testing.T, tr smux.Transport)
type TransportTest func(t *testing.T, lo listenerOpts, streamMuxer smux.Transport)

func TestNoOp(t *testing.T) {}

Expand Down

0 comments on commit 425caea

Please sign in to comment.