diff --git a/muxtest/mux_test.go b/muxtest/mux_test.go index 5cd2e48..dc67de4 100644 --- a/muxtest/mux_test.go +++ b/muxtest/mux_test.go @@ -3,17 +3,37 @@ package muxtest import ( "testing" + tpt "github.com/libp2p/go-libp2p-transport" + tcpc "github.com/libp2p/go-tcp-transport" + quict "github.com/marten-seemann/libp2p-quic-transport" + ma "github.com/multiformats/go-multiaddr" multistream "github.com/whyrusleeping/go-smux-multistream" spdy "github.com/whyrusleeping/go-smux-spdystream" yamux "github.com/whyrusleeping/go-smux-yamux" ) func TestYamuxTransport(t *testing.T) { - SubtestAll(t, yamux.DefaultTransport) + l := listenerOpts{ + transportCb: func() tpt.Transport { return tcpc.NewTCPTransport() }, + localaddr: ma.StringCast("/ip4/127.0.0.1/tcp/0"), + } + SubtestAll(t, l, yamux.DefaultTransport) } func TestSpdyStreamTransport(t *testing.T) { - SubtestAll(t, spdy.Transport) + l := listenerOpts{ + transportCb: func() tpt.Transport { return tcpc.NewTCPTransport() }, + localaddr: ma.StringCast("/ip4/127.0.0.1/tcp/0"), + } + SubtestAll(t, l, spdy.Transport) +} + +func TestQuicTransport(t *testing.T) { + l := listenerOpts{ + transportCb: func() tpt.Transport { return quict.NewQuicTransport() }, + localaddr: ma.StringCast("/ip4/127.0.0.1/udp/0/quic"), + } + SubtestAll(t, l, nil) } /* @@ -27,8 +47,12 @@ func TestMuxadoTransport(t *testing.T) { */ func TestMultistreamTransport(t *testing.T) { + l := listenerOpts{ + transportCb: func() tpt.Transport { return tcpc.NewTCPTransport() }, + localaddr: ma.StringCast("/ip4/127.0.0.1/tcp/0"), + } tpt := multistream.NewBlankTransport() tpt.AddTransport("/yamux", yamux.DefaultTransport) tpt.AddTransport("/spdy", spdy.Transport) - SubtestAll(t, tpt) + SubtestAll(t, l, tpt) } diff --git a/muxtest/muxt.go b/muxtest/muxt.go index a4d845a..a0c85e8 100644 --- a/muxtest/muxt.go +++ b/muxtest/muxt.go @@ -17,14 +17,13 @@ import ( ps "github.com/libp2p/go-peerstream" conn "github.com/libp2p/go-libp2p-conn" + iconn "github.com/libp2p/go-libp2p-interface-conn" + tpt "github.com/libp2p/go-libp2p-transport" smux "github.com/libp2p/go-stream-muxer" - tcpc "github.com/libp2p/go-tcp-transport" testutil "github.com/libp2p/go-testutil" ma "github.com/multiformats/go-multiaddr" ) -var localaddr = ma.StringCast("/ip4/127.0.0.1/tcp/0") - var randomness []byte var nextPort = 20000 var verbose = false @@ -37,6 +36,11 @@ func init() { } } +type listenerOpts struct { + transportCb func() tpt.Transport + localaddr ma.Multiaddr +} + func randBuf(size int) []byte { n := len(randomness) - size if size < 1 { @@ -49,6 +53,7 @@ func randBuf(size int) []byte { func checkErr(t *testing.T, err error) { if err != nil { + panic(err) t.Fatal(err) } } @@ -64,9 +69,16 @@ type echoSetup struct { conns []*ps.Conn } -func singleConn(t *testing.T, tr smux.Transport) echoSetup { - listenerIdentity := testutil.RandIdentityOrFatal(t) - tcp := tcpc.NewTCPTransport() +func createConnListener(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) (iconn.Listener, peer.ID) { + identity := testutil.RandIdentityOrFatal(t) + l, err := lo.transportCb().Listen(lo.localaddr) + checkErr(t, err) + cl, err := conn.WrapTransportListener(context.Background(), l, identity.ID(), streamMuxer, identity.PrivateKey()) + checkErr(t, err) + return cl, identity.ID() +} + +func singleConn(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) echoSetup { swarm := ps.NewSwarm() swarm.SetStreamHandler(func(s *ps.Stream) { defer s.Close() @@ -76,17 +88,17 @@ func singleConn(t *testing.T, tr smux.Transport) echoSetup { }) log("listening at %s", "localhost:0") - l, err := tcp.Listen(localaddr) - checkErr(t, err) - cl, err := conn.WrapTransportListener(context.Background(), l, listenerIdentity.ID(), tr, listenerIdentity.PrivateKey()) - checkErr(t, err) - _, err = swarm.AddListener(cl) + cl, listenerID := createConnListener(t, lo, streamMuxer) + _, err := swarm.AddListener(cl) checkErr(t, err) - log("dialing to %s", l.Multiaddr()) + log("dialing to %s", cl.Multiaddr()) dialerIdentity := testutil.RandIdentityOrFatal(t) - dialer := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, tr) - nc1, err := dialer.Dial(context.Background(), l.Multiaddr(), listenerIdentity.ID()) + cd := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, streamMuxer) + dialer, err := lo.transportCb().Dialer(cl.Multiaddr()) + checkErr(t, err) + cd.AddDialer(dialer) + nc1, err := cd.Dial(context.Background(), cl.Multiaddr(), listenerID) checkErr(t, err) c1, err := swarm.AddConn(nc1) @@ -98,8 +110,7 @@ func singleConn(t *testing.T, tr smux.Transport) echoSetup { } } -func makeSwarm(t *testing.T, tr smux.Transport, nListeners int) *ps.Swarm { - tcp := tcpc.NewTCPTransport() +func makeSwarm(t *testing.T, lo listenerOpts, streamMuxer smux.Transport, nListeners int) *ps.Swarm { swarm := ps.NewSwarm() swarm.SetStreamHandler(func(s *ps.Stream) { defer s.Close() @@ -109,59 +120,50 @@ func makeSwarm(t *testing.T, tr smux.Transport, nListeners int) *ps.Swarm { }) for i := 0; i < nListeners; i++ { - listenerIdentity := testutil.RandIdentityOrFatal(t) log("%p listening at %s", swarm, "localhost:0") - l, err := tcp.Listen(localaddr) - checkErr(t, err) - cl, err := conn.WrapTransportListener(context.Background(), l, listenerIdentity.ID(), tr, listenerIdentity.PrivateKey()) - checkErr(t, err) - _, err = swarm.AddListener(cl) + cl, _ := createConnListener(t, lo, streamMuxer) + _, err := swarm.AddListener(cl) checkErr(t, err) } return swarm } -func makeSwarms(t *testing.T, tr smux.Transport, nSwarms, nListeners int) []*ps.Swarm { +func makeSwarms(t *testing.T, lo listenerOpts, streamMuxer smux.Transport, nSwarms, nListeners int) []*ps.Swarm { swarms := make([]*ps.Swarm, nSwarms) for i := 0; i < nSwarms; i++ { - swarms[i] = makeSwarm(t, tr, nListeners) + swarms[i] = makeSwarm(t, lo, streamMuxer, nListeners) } return swarms } -func SubtestConstructSwarm(t *testing.T, tr smux.Transport) { +func SubtestConstructSwarm(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) { ps.NewSwarm() } -func SubtestSimpleWrite(t *testing.T, tr smux.Transport) { - tcp := tcpc.NewTCPTransport() +func SubtestSimpleWrite(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) { swarm := ps.NewSwarm() defer swarm.Close() - piper, pipew := io.Pipe() swarm.SetStreamHandler(func(s *ps.Stream) { defer s.Close() log("accepted stream") - w := io.MultiWriter(s, pipew) - io.Copy(w, s) // echo everything and write it to pipew + io.Copy(s, s) log("closing stream") }) log("listening at %s", "localhost:0") - l, err := tcp.Listen(localaddr) + cl, listenerID := createConnListener(t, lo, streamMuxer) + _, err := swarm.AddListener(cl) checkErr(t, err) - listenerIdentity := testutil.RandIdentityOrFatal(t) - cl, err := conn.WrapTransportListener(context.Background(), l, listenerIdentity.ID(), tr, listenerIdentity.PrivateKey()) - checkErr(t, err) - _, err = swarm.AddListener(cl) - checkErr(t, err) - - log("dialing to %s", l.Addr().String()) + log("dialing to %s", cl.Addr().String()) dialerIdentity := testutil.RandIdentityOrFatal(t) - dialer := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, tr) - nc1, err := dialer.Dial(context.Background(), l.Multiaddr(), listenerIdentity.ID()) + cd := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, streamMuxer) + dialer, err := lo.transportCb().Dialer(cl.Multiaddr()) + checkErr(t, err) + cd.AddDialer(dialer) + nc1, err := cd.Dial(context.Background(), cl.Multiaddr(), listenerID) checkErr(t, err) c1, err := swarm.AddConn(nc1) checkErr(t, err) @@ -170,7 +172,6 @@ func SubtestSimpleWrite(t *testing.T, tr smux.Transport) { log("creating stream") s1, err := c1.NewStream() checkErr(t, err) - defer s1.Close() buf1 := randBuf(4096) log("writing %d bytes to stream", len(buf1)) @@ -179,26 +180,17 @@ func SubtestSimpleWrite(t *testing.T, tr smux.Transport) { buf2 := make([]byte, len(buf1)) log("reading %d bytes from stream (echoed)", len(buf2)) - _, err = s1.Read(buf2) + _, err = io.ReadFull(s1, buf2) checkErr(t, err) if string(buf2) != string(buf1) { - t.Errorf("buf1 and buf2 not equal: %s != %s", string(buf1), string(buf2)) - } - - buf3 := make([]byte, len(buf1)) - log("reading %d bytes from pipe (tee)", len(buf3)) - _, err = piper.Read(buf3) - checkErr(t, err) - if string(buf3) != string(buf1) { - t.Errorf("buf1 and buf3 not equal: %s != %s", string(buf1), string(buf3)) + t.Errorf("buf1 and buf2 not equal: %#v != %#v", buf1, buf2) } } -func SubtestSimpleWrite100msgs(t *testing.T, tr smux.Transport) { - +func SubtestSimpleWrite100msgs(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) { msgs := 100 msgsize := 1 << 19 - es := singleConn(t, tr) + es := singleConn(t, lo, streamMuxer) defer es.swarm.Close() log("creating stream") @@ -243,12 +235,10 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr smux.Transport) { } } }() - wg.Wait() } -func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm, nConn, nStream, nMsg int) { - +func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport, nSwarm, nConn, nStream, nMsg int) { msgsize := 1 << 11 rateLimitN := 5000 @@ -269,7 +259,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm for i := 0; i < nMsg; i++ { buf := randBuf(msgsize) bufs <- buf - log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3]) + // log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3]) if _, err := s.Write(buf); err != nil { t.Error(fmt.Errorf("s.Write(buf): %s", err)) continue @@ -284,10 +274,10 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm i := 0 for buf1 := range bufs { i++ - log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3]) + // log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3]) if _, err := io.ReadFull(s, buf2); err != nil { - log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3]) + // log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3]) t.Error(fmt.Errorf("io.ReadFull(s, buf2): %s", err)) continue } @@ -323,8 +313,11 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm l := ls[mrand.Intn(len(ls))] dialerIdentity := testutil.RandIdentityOrFatal(t) - dialer := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, tr) - nc, err := dialer.Dial(context.Background(), l.Multiaddr(), peer.ID(0)) + cd := conn.NewDialer(dialerIdentity.ID(), dialerIdentity.PrivateKey(), nil, streamMuxer) + dialer, err := lo.transportCb().Dialer(l.Multiaddr()) + checkErr(t, err) + cd.AddDialer(dialer) + nc, err := cd.Dial(context.Background(), l.Multiaddr(), peer.ID(0)) checkErr(t, err) c, err := a.AddConn(nc) @@ -342,6 +335,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm }) } wg.Wait() + log("Closing connection") c.Close() } @@ -377,7 +371,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm wg.Wait() } - swarms := makeSwarms(t, tr, nSwarm, 3) // 3 listeners per swarm. + swarms := makeSwarms(t, lo, streamMuxer, nSwarm, 3) // 3 listeners per swarm. connectSwarmsAndRW(swarms) for _, s := range swarms { s.Close() @@ -385,32 +379,31 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm } -func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr smux.Transport) { - SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 1, 1) +func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) { + SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 1, 1, 1, 1) } -func SubtestStress1Swarm1Conn1Stream100Msg(t *testing.T, tr smux.Transport) { - SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 1, 100) +func SubtestStress1Swarm1Conn1Stream100Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) { + SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 1, 1, 1, 100) } -func SubtestStress1Swarm1Conn100Stream100Msg(t *testing.T, tr smux.Transport) { - SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 100, 100) +func SubtestStress1Swarm1Conn100Stream100Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) { + SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 1, 1, 300, 100) } -func SubtestStress1Swarm10Conn50Stream50Msg(t *testing.T, tr smux.Transport) { - SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 10, 50, 50) +func SubtestStress1Swarm10Conn50Stream50Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) { + SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 1, 10, 50, 50) } -func SubtestStress5Swarm2Conn20Stream20Msg(t *testing.T, tr smux.Transport) { - SubtestStressNSwarmNConnNStreamNMsg(t, tr, 5, 2, 20, 20) +func SubtestStress5Swarm2Conn20Stream20Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) { + SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 5, 2, 20, 20) } -func SubtestStress10Swarm2Conn100Stream100Msg(t *testing.T, tr smux.Transport) { - SubtestStressNSwarmNConnNStreamNMsg(t, tr, 10, 2, 100, 100) +func SubtestStress10Swarm2Conn100Stream100Msg(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) { + SubtestStressNSwarmNConnNStreamNMsg(t, lo, streamMuxer, 10, 2, 100, 100) } -func SubtestAll(t *testing.T, tr smux.Transport) { - +func SubtestAll(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) { tests := []TransportTest{ SubtestConstructSwarm, SubtestSimpleWrite, @@ -427,11 +420,11 @@ func SubtestAll(t *testing.T, tr smux.Transport) { if testing.Verbose() { fmt.Fprintf(os.Stderr, "==== RUN %s\n", GetFunctionName(f)) } - f(t, tr) + f(t, lo, streamMuxer) } } -type TransportTest func(t *testing.T, tr smux.Transport) +type TransportTest func(t *testing.T, lo listenerOpts, streamMuxer smux.Transport) func TestNoOp(t *testing.T) {}